don't use core.read_event it's not needed here since we balance only

when a connection come
This commit is contained in:
benoitc 2010-08-11 13:55:50 +02:00
parent 0616e30e90
commit 80cd1f40ed

View File

@ -14,7 +14,6 @@ import gevent
from gevent import monkey from gevent import monkey
monkey.noisy = False monkey.noisy = False
from gevent import greenlet from gevent import greenlet
from gevent import core
from gevent.pool import Pool from gevent.pool import Pool
from gevent import pywsgi, wsgi from gevent import pywsgi, wsgi
@ -36,11 +35,6 @@ BASE_WSGI_ENV = {
class GeventWorker(AsyncWorker): class GeventWorker(AsyncWorker):
def __init__(self, *args, **kwargs):
super(GeventWorker, self).__init__(*args, **kwargs)
self._accept_event = None
self.pool = Pool(self.worker_connections)
@classmethod @classmethod
def setup(cls): def setup(cls):
@ -49,40 +43,34 @@ class GeventWorker(AsyncWorker):
def timeout_ctx(self): def timeout_ctx(self):
return gevent.Timeout(self.cfg.keepalive, False) return gevent.Timeout(self.cfg.keepalive, False)
def acceptor(self):
self.pool._semaphore.rawlink(self._acceptor)
if self._accept_event is None:
self._accept_event = core.read_event(self.socket.fileno(),
self._do_accept, persist=True)
def _acceptor(self, event): def _acceptor(self, event):
if self.alive:
if self._accept_event is None: return
if not self.alive:
return
self._accept_event = core.read_event(self.socket.fileno(),
self._do_accept, persist=True)
def _do_accept(self, event, _evtype): def acceptor(self, pool):
try: while self.alive:
try: try:
conn, addr = self.socket.accept() try:
except socket.error, e: conn, addr = self.socket.accept()
if err[0] == errno.EAGAIN: except socket.error, e:
sys.exc_clear() if err[0] == errno.EAGAIN:
return sys.exc_clear()
raise return
raise
self.pool.spawn(self.handle, conn, addr) pool.spawn(self.handle, conn, addr)
except: except:
self.log.exception("Unexpected error in acceptor. Sepuku.") raise
self.alive = False self.log.exception("Unexpected error in acceptor. Sepuku.")
self.alive = False
def run(self): def run(self):
self.socket.setblocking(1) self.socket.setblocking(1)
acceptor = gevent.spawn(self.acceptor) pool = Pool(self.worker_connections)
pool._semaphore.rawlink(self._acceptor)
acceptor = gevent.spawn(self.acceptor, pool)
try: try:
while self.alive: while self.alive:
@ -95,23 +83,20 @@ class GeventWorker(AsyncWorker):
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
# stop accepting:
if self._accept_event is not None:
self._accept_event.cancel()
# end # end
gevent.kill(acceptor) gevent.kill(acceptor)
self.pool.join(timeout=self.timeout) pool.join(timeout=self.timeout)
self.pool.kill(block=True, timeout=1) pool.kill(block=True, timeout=1)
def init_process(self): def init_process(self):
#gevent doesn't reinitialize dns for us after forking #gevent doesn't reinitialize dns for us after forking
#here's the workaround #here's the workaround
gevent.core.dns_shutdown(fail_requests=1) gevent.core.dns_shutdown(fail_requests=1)
gevent.core.dns_init() gevent.core.dns_init()
super(GeventWorker, self).init_process() super(GeventWorker, self).init_process()
self._accept_event = None
class GeventBaseWorker(Worker): class GeventBaseWorker(Worker):