From 8b2a5c42f7e91179345c706283c95c2bdf35fb4a Mon Sep 17 00:00:00 2001 From: benoitc Date: Wed, 11 Aug 2010 10:27:42 +0200 Subject: [PATCH] improve gevent worker. --- gunicorn/workers/async.py | 19 +++++---- gunicorn/workers/ggevent.py | 81 ++++++++++++++++++++++++------------- 2 files changed, 64 insertions(+), 36 deletions(-) diff --git a/gunicorn/workers/async.py b/gunicorn/workers/async.py index 2445febe..fe1dd6a8 100644 --- a/gunicorn/workers/async.py +++ b/gunicorn/workers/async.py @@ -28,13 +28,18 @@ class AsyncWorker(base.Worker): def handle(self, client, addr): try: parser = http.RequestParser(client) + try: - while True: - req = None - with self.timeout_ctx(): - req = parser.next() - if not req: - break + if self.cfg.keepalive > 0: + while True: + req = None + with self.timeout_ctx(): + req = parser.next() + if not req: + break + self.handle_request(req, client, addr) + else: + req = parser.next() self.handle_request(req, client, addr) except StopIteration: pass @@ -43,7 +48,7 @@ class AsyncWorker(base.Worker): self.log.exception("Socket error processing request.") else: if e[0] == errno.ECONNRESET: - self.log.warn("Ignoring connection reset") + self.log.debug("Ignoring connection reset") else: self.log.debug("Ignoring EPIPE") except Exception, e: diff --git a/gunicorn/workers/ggevent.py b/gunicorn/workers/ggevent.py index b3570501..d32728cf 100644 --- a/gunicorn/workers/ggevent.py +++ b/gunicorn/workers/ggevent.py @@ -5,12 +5,16 @@ from __future__ import with_statement +import errno import os +import socket +import sys import gevent from gevent import monkey monkey.noisy = False from gevent import greenlet +from gevent import core from gevent.pool import Pool from gevent import pywsgi, wsgi @@ -31,6 +35,12 @@ BASE_WSGI_ENV = { } class GeventWorker(AsyncWorker): + + def __init__(self, *args, **kwargs): + super(GeventWorker, self).__init__(*args, **kwargs) + self._accept_event = None + self.pool = Pool(self.worker_connections) + @classmethod def setup(cls): @@ -39,12 +49,39 @@ class GeventWorker(AsyncWorker): def timeout_ctx(self): return gevent.Timeout(self.cfg.keepalive, False) + + def acceptor(self): + if self._accept_event is None: + self._accept_event = core.read_event(self.socket.fileno(), + self._do_accept, persist=True) + def _acceptor(self, event): + self.pool._semaphore.rawlink(self._acceptor) + if self._accept_event is None: + if not self.alive: + return + self._accept_event = core.read_event(self.socket.fileno(), + self._do_accept, persist=True) + + def _do_accept(self, event, _evtype): + try: + try: + conn, addr = self.socket.accept() + except socket.error, e: + if err[0] == errno.EAGAIN: + sys.exc_clear() + return + raise + + self.pool.spawn(self.handle, conn, addr) + except: + self.log.exception("Unexpected error in acceptor. Sepuku.") + os._exit(4) + + def run(self): self.socket.setblocking(1) - - pool = Pool(self.worker_connections) - acceptor = gevent.spawn(self.acceptor, pool) + acceptor = gevent.spawn(self.acceptor) try: while self.alive: @@ -52,44 +89,29 @@ class GeventWorker(AsyncWorker): if self.ppid != os.getppid(): self.log.info("Parent changed, shutting down: %s" % self) - gevent.kill(acceptor) break gevent.sleep(0.1) - pool.join(timeout=self.timeout) except KeyboardInterrupt: pass + + # stop accepting: + if self._accept_event is not None: + self._accept_event.cancel() + + # end + gevent.kill(acceptor) + self.pool.join(timeout=self.timeout) + self.pool.kill(block=True, timeout=1) + def init_process(self): #gevent doesn't reinitialize dns for us after forking #here's the workaround gevent.core.dns_shutdown(fail_requests=1) gevent.core.dns_init() super(GeventWorker, self).init_process() + self._accept_event = None - def acceptor(self, pool): - gevent.getcurrent() - while self.alive: - try: - conn, addr = self.socket.accept() - gt = pool.spawn(self.handle, conn, addr) - gt._conn = conn - gt.link(self.cleanup) - conn, addr, gt = None, None, None - except greenlet.GreenletExit: - return - except: - self.log.exception("Unexpected error in acceptor. Sepuku.") - os._exit(4) - - def cleanup(self, gt): - try: - gt.join() - except greenlet.GreenletExit: - pass - except Exception: - self.log.exception("Unhandled exception in worker.") - finally: - gt._conn.close() class GeventBaseWorker(Worker): """\ @@ -129,6 +151,7 @@ class GeventBaseWorker(Worker): break gevent.sleep(0.1) self.pool.join(timeout=self.timeout) + self.pool.kill(block=True, timeout=1) except KeyboardInterrupt: pass