diff --git a/gunicorn/workers/async.py b/gunicorn/workers/async.py index 09341e00..2e9e2890 100644 --- a/gunicorn/workers/async.py +++ b/gunicorn/workers/async.py @@ -28,22 +28,19 @@ class AsyncWorker(base.Worker): def handle(self, client, addr): try: parser = http.RequestParser(client) - try: - if self.cfg.keepalive > 0: - while True: - req = None - with self.timeout_ctx(): - req = parser.next() - if not req: - break - self.handle_request(req, client, addr) - else: - req = parser.next() + while True: + req = None + with self.timeout_ctx(): + req = parser.next() + + if not req: + break self.handle_request(req, client, addr) + except StopIteration: pass - except socket.error, e: + except socket.error, e: if e[0] not in (errno.EPIPE, errno.ECONNRESET): self.log.exception("Socket error processing request.") else: diff --git a/gunicorn/workers/ggevent.py b/gunicorn/workers/ggevent.py index 9eeea6ce..b4b67cc6 100644 --- a/gunicorn/workers/ggevent.py +++ b/gunicorn/workers/ggevent.py @@ -13,6 +13,7 @@ import sys import gevent from gevent import monkey monkey.noisy = False +from gevent import core from gevent import greenlet from gevent.pool import Pool from gevent import pywsgi, wsgi @@ -35,7 +36,16 @@ BASE_WSGI_ENV = { class GeventWorker(AsyncWorker): - + min_delay = 0.01 + max_delay = 1 + + def __init__(self, *args, **kwargs): + super(GeventWorker, self).__init__(*args, **kwargs) + self.pool = None + self._accept_event = None + self._acceptor_timer = None + self.delay = self.min_delay + @classmethod def setup(cls): from gevent import monkey @@ -44,61 +54,91 @@ class GeventWorker(AsyncWorker): def timeout_ctx(self): return gevent.Timeout(self.cfg.keepalive, False) + def acceptor(self): + if self._accept_event is None: + self._accept_event = core.read_event(self.socket.fileno(), self._do_accept, persist=True) + + def _acceptor(self, event): - if self.alive: + if self._accept_event is None: + if not self.alive: + return + + # create a read event + self._accept_event = core.read_event(self.socket.fileno(), self._do_accept, persist=True) + + def _do_accept(self, event, _evtype): + try: + try: + conn, addr = self.socket.accept() + except socket.error, e: + if err[0] == errno.EAGAIN: + sys.exc_clear() + return + raise + + self.delay = self.min_delay + self.pool.spawn(self.handle, conn, addr) + except socket.error, e: + if e[0] not in (errno.EBADF, errno.EINVAL, errno.ENOTSOCK): + self.alive = False + return + except: + self.log.exception("Unexpected error in acceptor. Sepuku.") + self.stop() return - def acceptor(self, pool): - while self.alive: - try: - try: - conn, addr = self.socket.accept() - except socket.error, e: - if err[0] == errno.EAGAIN: - sys.exc_clear() - return - raise + if self.delay >= 0: + self.stop_acceptor() + self._start_accepting_timer = core.timer(self.delay, + self.acceptor) + self.delay = min(self.max_delay, self.delay*2) - pool.spawn(self.handle, conn, addr) - except: - raise - self.log.exception("Unexpected error in acceptor. Sepuku.") - self.alive = False + def stop_acceptor(self): + if self._accept_event is not None: + self._accept_event.cancel() + self._accept_event = None + if self._acceptor_timer is not None: + self._acceptor_timer.cancel() + self.acceptor_timer = None + + def stop(self): + self.stop_acceptor() + self.pool.join(timeout=self.timeout) + self.pool.kill(block=True, timeout=1) + self.alive = False def run(self): self.socket.setblocking(1) - pool = Pool(self.worker_connections) - pool._semaphore.rawlink(self._acceptor) - acceptor = gevent.spawn(self.acceptor, pool) - + + # start to accept + self.acceptor() + + # enter in the main loop try: while self.alive: - self.notify() - + gevent.spawn(self.notify) if self.ppid != os.getppid(): self.log.info("Parent changed, shutting down: %s" % self) break - gevent.sleep(0.1) - except KeyboardInterrupt: + gevent.sleep(self.timeout) + except: pass - - - # end - gevent.kill(acceptor) - pool.join(timeout=self.timeout) - pool.kill(block=True, timeout=1) + self.stop() def init_process(self): #gevent doesn't reinitialize dns for us after forking #here's the workaround gevent.core.dns_shutdown(fail_requests=1) gevent.core.dns_init() - + + # init the pool + self.pool = Pool(self.worker_connections) + self.pool._semaphore.rawlink(self._acceptor) + super(GeventWorker, self).init_process() - - class GeventBaseWorker(Worker): """\ This base class is used for the two variants of workers that use