add refactor gevent support like we did on eventlet. In the future it

may be better to use Event object. While i'm here move the monkey
patching in its own function used on config so we make sure to patch
only one time and prevent some ugly hack like reinit gevent each time we
spawn (it's better to use patched os.fork from gevent once time).
This commit is contained in:
benoitc 2010-04-22 14:38:49 +02:00
parent 7f36bc1bbd
commit 8184eb493c
5 changed files with 60 additions and 23 deletions

View File

@ -7,7 +7,7 @@
import collections import collections
import errno import errno
from gunicorn.async.base import ALREADY_HANDLED from gunicorn.workers.async import ALREADY_HANDLED
import socket import socket
import gevent import gevent
from gevent.pool import Pool from gevent.pool import Pool
@ -133,6 +133,7 @@ def app(environ, start_response):
data = open(os.path.join( data = open(os.path.join(
os.path.dirname(__file__), os.path.dirname(__file__),
'websocket.html')).read() 'websocket.html')).read()
data = data % environ
start_response('200 OK', [('Content-Type', 'text/html'), start_response('200 OK', [('Content-Type', 'text/html'),
('Content-Length', len(data))]) ('Content-Length', len(data))])
return [data] return [data]

View File

@ -85,8 +85,10 @@ class Config(object):
@property @property
def worker_class(self): def worker_class(self):
uri = self.cfg.get('workerclass', None) or 'egg:gunicorn#sync' uri = self.cfg.get('workerclass', None) or 'egg:gunicorn#sync'
print uri worker_class = util.load_worker_class(uri)
return util.load_worker_class(uri) if hasattr(worker_class, "setup"):
worker_class.setup()
return worker_class
@property @property
def workers(self): def workers(self):

View File

@ -105,6 +105,7 @@ class Worker(object):
self.alive = False self.alive = False
def handle_exit(self, sig, frame): def handle_exit(self, sig, frame):
self.alive = False
sys.exit(0) sys.exit(0)
def handle_winch(self, sig, fname): def handle_winch(self, sig, fname):

View File

@ -28,16 +28,13 @@ eventlet.debug.hub_exceptions(True)
class EventletWorker(AsyncWorker): class EventletWorker(AsyncWorker):
def __init__(self, *args, **kwargs): @classmethod
super(EventletWorker, self).__init__(*args, **kwargs) def setup(cls):
import eventlet
if eventlet.version_info < (0,9,7): if eventlet.version_info < (0,9,7):
raise RuntimeError("You need eventlet >= 0.9.7") raise RuntimeError("You need eventlet >= 0.9.7")
def init_process(self):
eventlet.monkey_patch(all=False, socket=True, select=True) eventlet.monkey_patch(all=False, socket=True, select=True)
self.socket = greenio.GreenSocket(self.socket)
super(EventletWorker, self).init_process()
def run(self): def run(self):
self.socket.setblocking(1) self.socket.setblocking(1)

View File

@ -11,26 +11,62 @@ import os
import gevent import gevent
from gevent import monkey from gevent import monkey
from gevent import socket from gevent import socket
from gevent.greenlet import Greenlet from gevent import greenlet
from gevent.pool import Pool from gevent.pool import Pool
from gunicorn import util from gunicorn import util
from gunicorn.workers.async import AsyncWorker from gunicorn import arbiter
from gunicorn.workers.async import AsyncWorker, ALREADY_HANDLED
from gunicorn.http.tee import UnexpectedEOF
class GEventWorker(KeepaliveWorker): class GEventWorker(AsyncWorker):
def init_process(self): @classmethod
def setup(cls):
from gevent import monkey
monkey.patch_all() monkey.patch_all()
super(GEventWorker, self).init_process()
def run(self): def run(self):
raise NotImplementedError() self.socket.setblocking(1)
def accept(self): pool = Pool(self.worker_connections)
acceptor = gevent.spawn(self.acceptor, pool)
try: try:
client, addr = self.socket.accept() while True:
self.pool.spawn(self.handle, client, addr) self.notify()
except socket.error, e:
if e[0] in (errno.EAGAIN, errno.EWOULDBLOCK, errno.ECONNABORTED): if self.ppid != os.getppid():
self.log.info("Parent changed, shutting down: %s" % self)
gevent.kill(acceptor)
break
gevent.sleep(0.1)
pool.join(timeout=self.timeout)
except KeyboardInterrupt:
pass
def acceptor(self, pool):
server_gt = gevent.getcurrent()
while self.alive:
try:
conn, addr = self.socket.accept()
gt = pool.spawn(self.handle, conn, addr)
gt._conn = conn
gt.link(self.cleanup)
conn, addr, gt = None, None, None
except greenlet.GreenletExit:
return return
raise
def cleanup(self, gt):
try:
try:
gt.join()
finally:
gt._conn.close()
except greenlet.GreenletExit:
pass
except Exception:
self.log.exception("Unhandled exception in worker.")