new gevent worker. Use event to read on socket. Also don't notify so

often the fs
This commit is contained in:
benoitc 2010-08-11 16:28:18 +02:00
parent 2ddbe800a5
commit 568f9c3ee1
2 changed files with 83 additions and 46 deletions

View File

@ -28,19 +28,16 @@ class AsyncWorker(base.Worker):
def handle(self, client, addr): def handle(self, client, addr):
try: try:
parser = http.RequestParser(client) parser = http.RequestParser(client)
try: try:
if self.cfg.keepalive > 0: while True:
while True: req = None
req = None with self.timeout_ctx():
with self.timeout_ctx(): req = parser.next()
req = parser.next()
if not req: if not req:
break break
self.handle_request(req, client, addr)
else:
req = parser.next()
self.handle_request(req, client, addr) self.handle_request(req, client, addr)
except StopIteration: except StopIteration:
pass pass
except socket.error, e: except socket.error, e:

View File

@ -13,6 +13,7 @@ import sys
import gevent import gevent
from gevent import monkey from gevent import monkey
monkey.noisy = False monkey.noisy = False
from gevent import core
from gevent import greenlet from gevent import greenlet
from gevent.pool import Pool from gevent.pool import Pool
from gevent import pywsgi, wsgi from gevent import pywsgi, wsgi
@ -35,6 +36,15 @@ BASE_WSGI_ENV = {
class GeventWorker(AsyncWorker): class GeventWorker(AsyncWorker):
min_delay = 0.01
max_delay = 1
def __init__(self, *args, **kwargs):
super(GeventWorker, self).__init__(*args, **kwargs)
self.pool = None
self._accept_event = None
self._acceptor_timer = None
self.delay = self.min_delay
@classmethod @classmethod
def setup(cls): def setup(cls):
@ -44,50 +54,78 @@ class GeventWorker(AsyncWorker):
def timeout_ctx(self): def timeout_ctx(self):
return gevent.Timeout(self.cfg.keepalive, False) return gevent.Timeout(self.cfg.keepalive, False)
def acceptor(self):
if self._accept_event is None:
self._accept_event = core.read_event(self.socket.fileno(), self._do_accept, persist=True)
def _acceptor(self, event): def _acceptor(self, event):
if self.alive: if self._accept_event is None:
if not self.alive:
return
# create a read event
self._accept_event = core.read_event(self.socket.fileno(), self._do_accept, persist=True)
def _do_accept(self, event, _evtype):
try:
try:
conn, addr = self.socket.accept()
except socket.error, e:
if err[0] == errno.EAGAIN:
sys.exc_clear()
return
raise
self.delay = self.min_delay
self.pool.spawn(self.handle, conn, addr)
except socket.error, e:
if e[0] not in (errno.EBADF, errno.EINVAL, errno.ENOTSOCK):
self.alive = False
return
except:
self.log.exception("Unexpected error in acceptor. Sepuku.")
self.stop()
return return
def acceptor(self, pool): if self.delay >= 0:
while self.alive: self.stop_acceptor()
try: self._start_accepting_timer = core.timer(self.delay,
try: self.acceptor)
conn, addr = self.socket.accept() self.delay = min(self.max_delay, self.delay*2)
except socket.error, e:
if err[0] == errno.EAGAIN:
sys.exc_clear()
return
raise
pool.spawn(self.handle, conn, addr) def stop_acceptor(self):
except: if self._accept_event is not None:
raise self._accept_event.cancel()
self.log.exception("Unexpected error in acceptor. Sepuku.") self._accept_event = None
self.alive = False
if self._acceptor_timer is not None:
self._acceptor_timer.cancel()
self.acceptor_timer = None
def stop(self):
self.stop_acceptor()
self.pool.join(timeout=self.timeout)
self.pool.kill(block=True, timeout=1)
self.alive = False
def run(self): def run(self):
self.socket.setblocking(1) self.socket.setblocking(1)
pool = Pool(self.worker_connections)
pool._semaphore.rawlink(self._acceptor)
acceptor = gevent.spawn(self.acceptor, pool)
# start to accept
self.acceptor()
# enter in the main loop
try: try:
while self.alive: while self.alive:
self.notify() gevent.spawn(self.notify)
if self.ppid != os.getppid(): if self.ppid != os.getppid():
self.log.info("Parent changed, shutting down: %s" % self) self.log.info("Parent changed, shutting down: %s" % self)
break break
gevent.sleep(0.1) gevent.sleep(self.timeout)
except KeyboardInterrupt: except:
pass pass
self.stop()
# end
gevent.kill(acceptor)
pool.join(timeout=self.timeout)
pool.kill(block=True, timeout=1)
def init_process(self): def init_process(self):
#gevent doesn't reinitialize dns for us after forking #gevent doesn't reinitialize dns for us after forking
@ -95,10 +133,12 @@ class GeventWorker(AsyncWorker):
gevent.core.dns_shutdown(fail_requests=1) gevent.core.dns_shutdown(fail_requests=1)
gevent.core.dns_init() gevent.core.dns_init()
# init the pool
self.pool = Pool(self.worker_connections)
self.pool._semaphore.rawlink(self._acceptor)
super(GeventWorker, self).init_process() super(GeventWorker, self).init_process()
class GeventBaseWorker(Worker): class GeventBaseWorker(Worker):
"""\ """\
This base class is used for the two variants of workers that use This base class is used for the two variants of workers that use