dramatically improve the gevent worker.

This commit is contained in:
benoitc 2010-08-12 10:44:06 +02:00
parent d7d6fa382b
commit d2c10a95e4

View File

@ -13,9 +13,8 @@ import sys
import gevent import gevent
from gevent import monkey from gevent import monkey
monkey.noisy = False monkey.noisy = False
from gevent import core
from gevent import greenlet
from gevent.pool import Pool from gevent.pool import Pool
from gevent.server import StreamServer
from gevent import pywsgi, wsgi from gevent import pywsgi, wsgi
import gunicorn import gunicorn
@ -34,17 +33,24 @@ BASE_WSGI_ENV = {
'wsgi.run_once': False 'wsgi.run_once': False
} }
class GGeventServer(StreamServer):
def __init__(self, listener, handle, spawn='defaul', worker=None):
StreamServer.__init__(self, listener, spawn=spawn)
self.handle_func = handle
self.worker = worker
def stop(self):
super(GGeventServer, self).stop()
self.worker.alive = False
def handle(self, sock, addr):
self.handle_func(sock, addr)
class GeventWorker(AsyncWorker): class GeventWorker(AsyncWorker):
min_delay = 0.01
max_delay = 1
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super(GeventWorker, self).__init__(*args, **kwargs) super(GeventWorker, self).__init__(*args, **kwargs)
self.pool = None
self._accept_event = None
self._acceptor_timer = None
self.delay = self.min_delay
@classmethod @classmethod
def setup(cls): def setup(cls):
@ -53,90 +59,34 @@ class GeventWorker(AsyncWorker):
def timeout_ctx(self): def timeout_ctx(self):
return gevent.Timeout(self.cfg.keepalive, False) return gevent.Timeout(self.cfg.keepalive, False)
def acceptor(self):
if self._accept_event is None:
self._accept_event = core.read_event(self.socket.fileno(), self._do_accept, persist=True)
def _acceptor(self, event):
if self._accept_event is None:
if not self.alive:
return
# create a read event
self._accept_event = core.read_event(self.socket.fileno(), self._do_accept, persist=True)
def _do_accept(self, event, _evtype):
try:
try:
conn, addr = self.socket.accept()
except socket.error, e:
if e[0] == errno.EAGAIN:
sys.exc_clear()
return
raise
self.delay = self.min_delay
self.pool.spawn(self.handle, conn, addr)
except socket.error, e:
if e[0] not in (errno.EBADF, errno.EINVAL, errno.ENOTSOCK):
self.alive = False
return
except:
self.log.exception("Unexpected error in acceptor. Sepuku.")
self.stop()
return
if self.delay >= 0:
self.stop_acceptor()
self._start_accepting_timer = core.timer(self.delay,
self.acceptor)
self.delay = min(self.max_delay, self.delay*2)
def stop_acceptor(self):
if self._accept_event is not None:
self._accept_event.cancel()
self._accept_event = None
if self._acceptor_timer is not None:
self._acceptor_timer.cancel()
self.acceptor_timer = None
def stop(self):
self.stop_acceptor()
self.pool.join(timeout=self.timeout)
self.pool.kill(block=True, timeout=1)
self.alive = False
def run(self): def run(self):
self.socket.setblocking(1) self.socket.setblocking(1)
# start to accept pool = Pool(self.worker_connections)
self.acceptor() server = GGeventServer(self.socket, self.handle, spawn=pool,
worker=self)
# enter in the main loop
server.start()
try: try:
while self.alive: while self.alive:
gevent.spawn(self.notify) self.notify()
if self.ppid != os.getppid(): if self.ppid != os.getppid():
self.log.info("Parent changed, shutting down: %s" % self) self.log.info("Parent changed, shutting down: %s" % self)
break break
gevent.sleep(self.timeout) gevent.sleep(self.timeout)
except: except:
pass pass
self.stop()
with gevent.Timeout(self.timeout, False):
gevent.spawn(server.stop).join()
def init_process(self): def init_process(self):
#gevent doesn't reinitialize dns for us after forking #gevent doesn't reinitialize dns for us after forking
#here's the workaround #here's the workaround
gevent.core.dns_shutdown(fail_requests=1) gevent.core.dns_shutdown(fail_requests=1)
gevent.core.dns_init() gevent.core.dns_init()
# init the pool
self.pool = Pool(self.worker_connections)
self.pool._semaphore.rawlink(self._acceptor)
super(GeventWorker, self).init_process() super(GeventWorker, self).init_process()
class GeventBaseWorker(Worker): class GeventBaseWorker(Worker):
@ -176,11 +126,11 @@ class GeventBaseWorker(Worker):
server.stop() server.stop()
break break
gevent.sleep(0.1) gevent.sleep(0.1)
self.pool.join(timeout=self.timeout)
self.pool.kill(block=True, timeout=1)
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
with gevent.Timeout(self.timeout, False):
gevent.spawn(server.stop).join()
class WSGIHandler(wsgi.WSGIHandler): class WSGIHandler(wsgi.WSGIHandler):
def log_request(self, *args): def log_request(self, *args):