diff --git a/gunicorn/workers/ggevent.py b/gunicorn/workers/ggevent.py index a70495ed..ee89d4e7 100644 --- a/gunicorn/workers/ggevent.py +++ b/gunicorn/workers/ggevent.py @@ -13,9 +13,8 @@ import sys import gevent from gevent import monkey monkey.noisy = False -from gevent import core -from gevent import greenlet from gevent.pool import Pool +from gevent.server import StreamServer from gevent import pywsgi, wsgi import gunicorn @@ -34,17 +33,24 @@ BASE_WSGI_ENV = { 'wsgi.run_once': False } + +class GGeventServer(StreamServer): + def __init__(self, listener, handle, spawn='defaul', worker=None): + StreamServer.__init__(self, listener, spawn=spawn) + self.handle_func = handle + self.worker = worker + + def stop(self): + super(GGeventServer, self).stop() + self.worker.alive = False + + def handle(self, sock, addr): + self.handle_func(sock, addr) + class GeventWorker(AsyncWorker): - min_delay = 0.01 - max_delay = 1 - def __init__(self, *args, **kwargs): super(GeventWorker, self).__init__(*args, **kwargs) - self.pool = None - self._accept_event = None - self._acceptor_timer = None - self.delay = self.min_delay @classmethod def setup(cls): @@ -53,90 +59,34 @@ class GeventWorker(AsyncWorker): def timeout_ctx(self): return gevent.Timeout(self.cfg.keepalive, False) - - def acceptor(self): - if self._accept_event is None: - self._accept_event = core.read_event(self.socket.fileno(), self._do_accept, persist=True) - - - def _acceptor(self, event): - if self._accept_event is None: - if not self.alive: - return - - # create a read event - self._accept_event = core.read_event(self.socket.fileno(), self._do_accept, persist=True) - - def _do_accept(self, event, _evtype): - try: - try: - conn, addr = self.socket.accept() - except socket.error, e: - if e[0] == errno.EAGAIN: - sys.exc_clear() - return - raise - - self.delay = self.min_delay - self.pool.spawn(self.handle, conn, addr) - except socket.error, e: - if e[0] not in (errno.EBADF, errno.EINVAL, errno.ENOTSOCK): - self.alive = False - return - except: - self.log.exception("Unexpected error in acceptor. Sepuku.") - self.stop() - return - - if self.delay >= 0: - self.stop_acceptor() - self._start_accepting_timer = core.timer(self.delay, - self.acceptor) - self.delay = min(self.max_delay, self.delay*2) - - def stop_acceptor(self): - if self._accept_event is not None: - self._accept_event.cancel() - self._accept_event = None - - if self._acceptor_timer is not None: - self._acceptor_timer.cancel() - self.acceptor_timer = None - - def stop(self): - self.stop_acceptor() - self.pool.join(timeout=self.timeout) - self.pool.kill(block=True, timeout=1) - self.alive = False def run(self): self.socket.setblocking(1) - # start to accept - self.acceptor() - - # enter in the main loop + pool = Pool(self.worker_connections) + server = GGeventServer(self.socket, self.handle, spawn=pool, + worker=self) + + server.start() + try: while self.alive: - gevent.spawn(self.notify) + self.notify() if self.ppid != os.getppid(): self.log.info("Parent changed, shutting down: %s" % self) break gevent.sleep(self.timeout) except: pass - self.stop() + + with gevent.Timeout(self.timeout, False): + gevent.spawn(server.stop).join() def init_process(self): #gevent doesn't reinitialize dns for us after forking #here's the workaround gevent.core.dns_shutdown(fail_requests=1) gevent.core.dns_init() - - # init the pool - self.pool = Pool(self.worker_connections) - self.pool._semaphore.rawlink(self._acceptor) - super(GeventWorker, self).init_process() class GeventBaseWorker(Worker): @@ -176,11 +126,11 @@ class GeventBaseWorker(Worker): server.stop() break gevent.sleep(0.1) - self.pool.join(timeout=self.timeout) - self.pool.kill(block=True, timeout=1) except KeyboardInterrupt: pass - + + with gevent.Timeout(self.timeout, False): + gevent.spawn(server.stop).join() class WSGIHandler(wsgi.WSGIHandler): def log_request(self, *args):