From 8184eb493cd027881a4f96cf64636e4e027e5547 Mon Sep 17 00:00:00 2001 From: benoitc Date: Thu, 22 Apr 2010 14:38:49 +0200 Subject: [PATCH] add refactor gevent support like we did on eventlet. In the future it may be better to use Event object. While i'm here move the monkey patching in its own function used on config so we make sure to patch only one time and prevent some ugly hack like reinit gevent each time we spawn (it's better to use patched os.fork from gevent once time). --- examples/gevent_websocket.py | 3 +- gunicorn/config.py | 6 ++-- gunicorn/workers/base.py | 1 + gunicorn/workers/geventlet.py | 11 +++---- gunicorn/workers/ggevent.py | 62 +++++++++++++++++++++++++++-------- 5 files changed, 60 insertions(+), 23 deletions(-) diff --git a/examples/gevent_websocket.py b/examples/gevent_websocket.py index 32eb86d9..d150df55 100644 --- a/examples/gevent_websocket.py +++ b/examples/gevent_websocket.py @@ -7,7 +7,7 @@ import collections import errno -from gunicorn.async.base import ALREADY_HANDLED +from gunicorn.workers.async import ALREADY_HANDLED import socket import gevent from gevent.pool import Pool @@ -133,6 +133,7 @@ def app(environ, start_response): data = open(os.path.join( os.path.dirname(__file__), 'websocket.html')).read() + data = data % environ start_response('200 OK', [('Content-Type', 'text/html'), ('Content-Length', len(data))]) return [data] diff --git a/gunicorn/config.py b/gunicorn/config.py index 58b3ce11..26febd4b 100644 --- a/gunicorn/config.py +++ b/gunicorn/config.py @@ -85,8 +85,10 @@ class Config(object): @property def worker_class(self): uri = self.cfg.get('workerclass', None) or 'egg:gunicorn#sync' - print uri - return util.load_worker_class(uri) + worker_class = util.load_worker_class(uri) + if hasattr(worker_class, "setup"): + worker_class.setup() + return worker_class @property def workers(self): diff --git a/gunicorn/workers/base.py b/gunicorn/workers/base.py index b33e8b6b..cf88d583 100644 --- a/gunicorn/workers/base.py +++ b/gunicorn/workers/base.py @@ -105,6 +105,7 @@ class Worker(object): self.alive = False def handle_exit(self, sig, frame): + self.alive = False sys.exit(0) def handle_winch(self, sig, fname): diff --git a/gunicorn/workers/geventlet.py b/gunicorn/workers/geventlet.py index 51ca9c8f..6f51f3d8 100644 --- a/gunicorn/workers/geventlet.py +++ b/gunicorn/workers/geventlet.py @@ -28,16 +28,13 @@ eventlet.debug.hub_exceptions(True) class EventletWorker(AsyncWorker): - def __init__(self, *args, **kwargs): - super(EventletWorker, self).__init__(*args, **kwargs) + @classmethod + def setup(cls): + import eventlet if eventlet.version_info < (0,9,7): raise RuntimeError("You need eventlet >= 0.9.7") - - def init_process(self): eventlet.monkey_patch(all=False, socket=True, select=True) - self.socket = greenio.GreenSocket(self.socket) - super(EventletWorker, self).init_process() - + def run(self): self.socket.setblocking(1) diff --git a/gunicorn/workers/ggevent.py b/gunicorn/workers/ggevent.py index b0eaa6d3..310a6ddf 100644 --- a/gunicorn/workers/ggevent.py +++ b/gunicorn/workers/ggevent.py @@ -11,26 +11,62 @@ import os import gevent from gevent import monkey from gevent import socket -from gevent.greenlet import Greenlet +from gevent import greenlet from gevent.pool import Pool from gunicorn import util -from gunicorn.workers.async import AsyncWorker +from gunicorn import arbiter +from gunicorn.workers.async import AsyncWorker, ALREADY_HANDLED +from gunicorn.http.tee import UnexpectedEOF -class GEventWorker(KeepaliveWorker): - - def init_process(self): +class GEventWorker(AsyncWorker): + + @classmethod + def setup(cls): + from gevent import monkey monkey.patch_all() - super(GEventWorker, self).init_process() def run(self): - raise NotImplementedError() + self.socket.setblocking(1) - def accept(self): + pool = Pool(self.worker_connections) + acceptor = gevent.spawn(self.acceptor, pool) + try: - client, addr = self.socket.accept() - self.pool.spawn(self.handle, client, addr) - except socket.error, e: - if e[0] in (errno.EAGAIN, errno.EWOULDBLOCK, errno.ECONNABORTED): + while True: + self.notify() + + if self.ppid != os.getppid(): + self.log.info("Parent changed, shutting down: %s" % self) + gevent.kill(acceptor) + break + gevent.sleep(0.1) + + pool.join(timeout=self.timeout) + except KeyboardInterrupt: + pass + + def acceptor(self, pool): + server_gt = gevent.getcurrent() + while self.alive: + try: + conn, addr = self.socket.accept() + gt = pool.spawn(self.handle, conn, addr) + gt._conn = conn + gt.link(self.cleanup) + conn, addr, gt = None, None, None + except greenlet.GreenletExit: return - raise + + + + def cleanup(self, gt): + try: + try: + gt.join() + finally: + gt._conn.close() + except greenlet.GreenletExit: + pass + except Exception: + self.log.exception("Unhandled exception in worker.")