improve gevent worker.

This commit is contained in:
benoitc 2010-08-11 10:27:42 +02:00
parent d1858d2284
commit 8b2a5c42f7
2 changed files with 64 additions and 36 deletions

View File

@ -28,13 +28,18 @@ class AsyncWorker(base.Worker):
def handle(self, client, addr):
try:
parser = http.RequestParser(client)
try:
while True:
req = None
with self.timeout_ctx():
req = parser.next()
if not req:
break
if self.cfg.keepalive > 0:
while True:
req = None
with self.timeout_ctx():
req = parser.next()
if not req:
break
self.handle_request(req, client, addr)
else:
req = parser.next()
self.handle_request(req, client, addr)
except StopIteration:
pass
@ -43,7 +48,7 @@ class AsyncWorker(base.Worker):
self.log.exception("Socket error processing request.")
else:
if e[0] == errno.ECONNRESET:
self.log.warn("Ignoring connection reset")
self.log.debug("Ignoring connection reset")
else:
self.log.debug("Ignoring EPIPE")
except Exception, e:

View File

@ -5,12 +5,16 @@
from __future__ import with_statement
import errno
import os
import socket
import sys
import gevent
from gevent import monkey
monkey.noisy = False
from gevent import greenlet
from gevent import core
from gevent.pool import Pool
from gevent import pywsgi, wsgi
@ -31,6 +35,12 @@ BASE_WSGI_ENV = {
}
class GeventWorker(AsyncWorker):
def __init__(self, *args, **kwargs):
super(GeventWorker, self).__init__(*args, **kwargs)
self._accept_event = None
self.pool = Pool(self.worker_connections)
@classmethod
def setup(cls):
@ -39,12 +49,39 @@ class GeventWorker(AsyncWorker):
def timeout_ctx(self):
return gevent.Timeout(self.cfg.keepalive, False)
def acceptor(self):
if self._accept_event is None:
self._accept_event = core.read_event(self.socket.fileno(),
self._do_accept, persist=True)
def _acceptor(self, event):
self.pool._semaphore.rawlink(self._acceptor)
if self._accept_event is None:
if not self.alive:
return
self._accept_event = core.read_event(self.socket.fileno(),
self._do_accept, persist=True)
def _do_accept(self, event, _evtype):
try:
try:
conn, addr = self.socket.accept()
except socket.error, e:
if err[0] == errno.EAGAIN:
sys.exc_clear()
return
raise
self.pool.spawn(self.handle, conn, addr)
except:
self.log.exception("Unexpected error in acceptor. Sepuku.")
os._exit(4)
def run(self):
self.socket.setblocking(1)
pool = Pool(self.worker_connections)
acceptor = gevent.spawn(self.acceptor, pool)
acceptor = gevent.spawn(self.acceptor)
try:
while self.alive:
@ -52,44 +89,29 @@ class GeventWorker(AsyncWorker):
if self.ppid != os.getppid():
self.log.info("Parent changed, shutting down: %s" % self)
gevent.kill(acceptor)
break
gevent.sleep(0.1)
pool.join(timeout=self.timeout)
except KeyboardInterrupt:
pass
# stop accepting:
if self._accept_event is not None:
self._accept_event.cancel()
# end
gevent.kill(acceptor)
self.pool.join(timeout=self.timeout)
self.pool.kill(block=True, timeout=1)
def init_process(self):
#gevent doesn't reinitialize dns for us after forking
#here's the workaround
gevent.core.dns_shutdown(fail_requests=1)
gevent.core.dns_init()
super(GeventWorker, self).init_process()
self._accept_event = None
def acceptor(self, pool):
gevent.getcurrent()
while self.alive:
try:
conn, addr = self.socket.accept()
gt = pool.spawn(self.handle, conn, addr)
gt._conn = conn
gt.link(self.cleanup)
conn, addr, gt = None, None, None
except greenlet.GreenletExit:
return
except:
self.log.exception("Unexpected error in acceptor. Sepuku.")
os._exit(4)
def cleanup(self, gt):
try:
gt.join()
except greenlet.GreenletExit:
pass
except Exception:
self.log.exception("Unhandled exception in worker.")
finally:
gt._conn.close()
class GeventBaseWorker(Worker):
"""\
@ -129,6 +151,7 @@ class GeventBaseWorker(Worker):
break
gevent.sleep(0.1)
self.pool.join(timeout=self.timeout)
self.pool.kill(block=True, timeout=1)
except KeyboardInterrupt:
pass