diff --git a/gunicorn/fdevents.py b/gunicorn/fdevents.py index f53ef047..e50f2a0d 100644 --- a/gunicorn/fdevents.py +++ b/gunicorn/fdevents.py @@ -8,6 +8,7 @@ allows you to register an fd, and retrieve events on it. """ import select +import sys from .util import fd_, close_on_exec @@ -131,10 +132,13 @@ if hasattr(select, 'kqueue'): self.kq = select.kqueue() close_on_exec(self.kq.fileno()) self.events = [] + self.max_ev = 0 def addfd(self, fd, mode, repeat=True): fd = fd_(fd) + self.max_ev += 1 + if mode == 'r': kmode = select.KQ_FILTER_READ else: @@ -148,12 +152,17 @@ if hasattr(select, 'kqueue'): if not repeat: flags |= select.KQ_EV_ONESHOT - ev = select.kevent(fd_(fd), kmode, flags) - self.kq.control([ev], 0) + ev = select.kevent(fd, kmode, flags) + self.kq.control([ev], 0, 0) def delfd(self, fd, mode): fd = fd_(fd) + self.max_ev -= 1 + + if fd < 0: + return + if mode == 'r': kmode = select.KQ_FILTER_READ else: @@ -166,7 +175,7 @@ if hasattr(select, 'kqueue'): def _wait(self, nsec=0): if len(self.events) == 0: try: - events = self.kq.control(None, 0, nsec) + events = self.kq.control(None, self.max_ev, nsec) except select.error as e: if e.args[0] != errno.EINTR: raise @@ -328,7 +337,7 @@ if hasattr(select, "epoll"): self.poll.close() -if hasattr(select, "poll") or hasattr(select, "epoll"): +if hasattr(select, "poll") or hasattr(select, "devpoll"): class _PollerBase(object): @@ -466,8 +475,8 @@ if hasattr(select, "poll") or hasattr(select, "epoll"): # choose the best implementation depending on the platform. -if 'KqueuePoller' in globals(): - DefaultPoller = KqueuePoller +if 'KQueuePoller' in globals(): + DefaultPoller = KQueuePoller elif 'EpollPoller' in globals(): DefaultPoller = EpollPoller elif 'DevpollPoller' in globals(): diff --git a/gunicorn/workers/gthread.py b/gunicorn/workers/gthread.py index 6336e6de..f34c7867 100644 --- a/gunicorn/workers/gthread.py +++ b/gunicorn/workers/gthread.py @@ -52,8 +52,8 @@ class ThreadWorker(base.Worker): def __init__(self, *args, **kwargs): super(ThreadWorker, self).__init__(*args, **kwargs) # initialise the pool - self.tpool = futures.ThreadPoolExecutor(max_workers=self.cfg.threads) - self.poller = fdevents.DefaultPoller() + self.tpool = None + self.poller = None self.futures = set() self._heap = [] self.keepalived = {} @@ -64,9 +64,15 @@ class ThreadWorker(base.Worker): fs.addr = addr self.futures.add(fs) + def init_process(self): + self.tpool = futures.ThreadPoolExecutor(max_workers=self.cfg.threads) + self.poller = fdevents.DefaultPoller() + super(ThreadWorker, self).init_process() + def run(self): + # init listeners, add them to the event loop for s in self.sockets: - s.setblocking(0) + s.setblocking(False) self.poller.addfd(s, 'r') listeners = dict([(s.fileno(), s) for s in self.sockets]) @@ -170,13 +176,14 @@ class ThreadWorker(base.Worker): break else: # remove the socket from the poller - self.poller.delfd(conn.sock.fileno(), 'r') + self.poller.delfd(conn.sock, 'r') # close the socket util.close(conn.sock) # shutdown the pool self.tpool.shutdown(False) + self.poller.close() # wait for the workers futures.wait(self.futures, timeout=self.cfg.graceful_timeout)