From 67800292e05956cbd4e3b852011495b1da3a5537 Mon Sep 17 00:00:00 2001 From: benoitc Date: Tue, 13 May 2014 13:57:34 +0200 Subject: [PATCH] fix kqueue poller. this change initialise the event loop after the process has forked so we make sure to inherit from the file descriptor. Also fix the number of events we are waiting for. The python implementation requires a positive number. --- gunicorn/fdevents.py | 21 +++++++++++++++------ gunicorn/workers/gthread.py | 15 +++++++++++---- 2 files changed, 26 insertions(+), 10 deletions(-) diff --git a/gunicorn/fdevents.py b/gunicorn/fdevents.py index f53ef047..e50f2a0d 100644 --- a/gunicorn/fdevents.py +++ b/gunicorn/fdevents.py @@ -8,6 +8,7 @@ allows you to register an fd, and retrieve events on it. """ import select +import sys from .util import fd_, close_on_exec @@ -131,10 +132,13 @@ if hasattr(select, 'kqueue'): self.kq = select.kqueue() close_on_exec(self.kq.fileno()) self.events = [] + self.max_ev = 0 def addfd(self, fd, mode, repeat=True): fd = fd_(fd) + self.max_ev += 1 + if mode == 'r': kmode = select.KQ_FILTER_READ else: @@ -148,12 +152,17 @@ if hasattr(select, 'kqueue'): if not repeat: flags |= select.KQ_EV_ONESHOT - ev = select.kevent(fd_(fd), kmode, flags) - self.kq.control([ev], 0) + ev = select.kevent(fd, kmode, flags) + self.kq.control([ev], 0, 0) def delfd(self, fd, mode): fd = fd_(fd) + self.max_ev -= 1 + + if fd < 0: + return + if mode == 'r': kmode = select.KQ_FILTER_READ else: @@ -166,7 +175,7 @@ if hasattr(select, 'kqueue'): def _wait(self, nsec=0): if len(self.events) == 0: try: - events = self.kq.control(None, 0, nsec) + events = self.kq.control(None, self.max_ev, nsec) except select.error as e: if e.args[0] != errno.EINTR: raise @@ -328,7 +337,7 @@ if hasattr(select, "epoll"): self.poll.close() -if hasattr(select, "poll") or hasattr(select, "epoll"): +if hasattr(select, "poll") or hasattr(select, "devpoll"): class _PollerBase(object): @@ -466,8 +475,8 @@ if hasattr(select, "poll") or hasattr(select, "epoll"): # choose the best implementation depending on the platform. -if 'KqueuePoller' in globals(): - DefaultPoller = KqueuePoller +if 'KQueuePoller' in globals(): + DefaultPoller = KQueuePoller elif 'EpollPoller' in globals(): DefaultPoller = EpollPoller elif 'DevpollPoller' in globals(): diff --git a/gunicorn/workers/gthread.py b/gunicorn/workers/gthread.py index 6336e6de..f34c7867 100644 --- a/gunicorn/workers/gthread.py +++ b/gunicorn/workers/gthread.py @@ -52,8 +52,8 @@ class ThreadWorker(base.Worker): def __init__(self, *args, **kwargs): super(ThreadWorker, self).__init__(*args, **kwargs) # initialise the pool - self.tpool = futures.ThreadPoolExecutor(max_workers=self.cfg.threads) - self.poller = fdevents.DefaultPoller() + self.tpool = None + self.poller = None self.futures = set() self._heap = [] self.keepalived = {} @@ -64,9 +64,15 @@ class ThreadWorker(base.Worker): fs.addr = addr self.futures.add(fs) + def init_process(self): + self.tpool = futures.ThreadPoolExecutor(max_workers=self.cfg.threads) + self.poller = fdevents.DefaultPoller() + super(ThreadWorker, self).init_process() + def run(self): + # init listeners, add them to the event loop for s in self.sockets: - s.setblocking(0) + s.setblocking(False) self.poller.addfd(s, 'r') listeners = dict([(s.fileno(), s) for s in self.sockets]) @@ -170,13 +176,14 @@ class ThreadWorker(base.Worker): break else: # remove the socket from the poller - self.poller.delfd(conn.sock.fileno(), 'r') + self.poller.delfd(conn.sock, 'r') # close the socket util.close(conn.sock) # shutdown the pool self.tpool.shutdown(False) + self.poller.close() # wait for the workers futures.wait(self.futures, timeout=self.cfg.graceful_timeout)