fix kqueue poller.

this change initialise the event loop after the process has forked so we
make sure to inherit from the file descriptor.

Also fix the number of events we are waiting for. The python
implementation requires a positive number.
This commit is contained in:
benoitc 2014-05-13 13:57:34 +02:00
parent eadc526192
commit 67800292e0
2 changed files with 26 additions and 10 deletions

View File

@ -8,6 +8,7 @@
allows you to register an fd, and retrieve events on it. """ allows you to register an fd, and retrieve events on it. """
import select import select
import sys
from .util import fd_, close_on_exec from .util import fd_, close_on_exec
@ -131,10 +132,13 @@ if hasattr(select, 'kqueue'):
self.kq = select.kqueue() self.kq = select.kqueue()
close_on_exec(self.kq.fileno()) close_on_exec(self.kq.fileno())
self.events = [] self.events = []
self.max_ev = 0
def addfd(self, fd, mode, repeat=True): def addfd(self, fd, mode, repeat=True):
fd = fd_(fd) fd = fd_(fd)
self.max_ev += 1
if mode == 'r': if mode == 'r':
kmode = select.KQ_FILTER_READ kmode = select.KQ_FILTER_READ
else: else:
@ -148,12 +152,17 @@ if hasattr(select, 'kqueue'):
if not repeat: if not repeat:
flags |= select.KQ_EV_ONESHOT flags |= select.KQ_EV_ONESHOT
ev = select.kevent(fd_(fd), kmode, flags) ev = select.kevent(fd, kmode, flags)
self.kq.control([ev], 0) self.kq.control([ev], 0, 0)
def delfd(self, fd, mode): def delfd(self, fd, mode):
fd = fd_(fd) fd = fd_(fd)
self.max_ev -= 1
if fd < 0:
return
if mode == 'r': if mode == 'r':
kmode = select.KQ_FILTER_READ kmode = select.KQ_FILTER_READ
else: else:
@ -166,7 +175,7 @@ if hasattr(select, 'kqueue'):
def _wait(self, nsec=0): def _wait(self, nsec=0):
if len(self.events) == 0: if len(self.events) == 0:
try: try:
events = self.kq.control(None, 0, nsec) events = self.kq.control(None, self.max_ev, nsec)
except select.error as e: except select.error as e:
if e.args[0] != errno.EINTR: if e.args[0] != errno.EINTR:
raise raise
@ -328,7 +337,7 @@ if hasattr(select, "epoll"):
self.poll.close() self.poll.close()
if hasattr(select, "poll") or hasattr(select, "epoll"): if hasattr(select, "poll") or hasattr(select, "devpoll"):
class _PollerBase(object): class _PollerBase(object):
@ -466,8 +475,8 @@ if hasattr(select, "poll") or hasattr(select, "epoll"):
# choose the best implementation depending on the platform. # choose the best implementation depending on the platform.
if 'KqueuePoller' in globals(): if 'KQueuePoller' in globals():
DefaultPoller = KqueuePoller DefaultPoller = KQueuePoller
elif 'EpollPoller' in globals(): elif 'EpollPoller' in globals():
DefaultPoller = EpollPoller DefaultPoller = EpollPoller
elif 'DevpollPoller' in globals(): elif 'DevpollPoller' in globals():

View File

@ -52,8 +52,8 @@ class ThreadWorker(base.Worker):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super(ThreadWorker, self).__init__(*args, **kwargs) super(ThreadWorker, self).__init__(*args, **kwargs)
# initialise the pool # initialise the pool
self.tpool = futures.ThreadPoolExecutor(max_workers=self.cfg.threads) self.tpool = None
self.poller = fdevents.DefaultPoller() self.poller = None
self.futures = set() self.futures = set()
self._heap = [] self._heap = []
self.keepalived = {} self.keepalived = {}
@ -64,9 +64,15 @@ class ThreadWorker(base.Worker):
fs.addr = addr fs.addr = addr
self.futures.add(fs) self.futures.add(fs)
def init_process(self):
self.tpool = futures.ThreadPoolExecutor(max_workers=self.cfg.threads)
self.poller = fdevents.DefaultPoller()
super(ThreadWorker, self).init_process()
def run(self): def run(self):
# init listeners, add them to the event loop
for s in self.sockets: for s in self.sockets:
s.setblocking(0) s.setblocking(False)
self.poller.addfd(s, 'r') self.poller.addfd(s, 'r')
listeners = dict([(s.fileno(), s) for s in self.sockets]) listeners = dict([(s.fileno(), s) for s in self.sockets])
@ -170,13 +176,14 @@ class ThreadWorker(base.Worker):
break break
else: else:
# remove the socket from the poller # remove the socket from the poller
self.poller.delfd(conn.sock.fileno(), 'r') self.poller.delfd(conn.sock, 'r')
# close the socket # close the socket
util.close(conn.sock) util.close(conn.sock)
# shutdown the pool # shutdown the pool
self.tpool.shutdown(False) self.tpool.shutdown(False)
self.poller.close()
# wait for the workers # wait for the workers
futures.wait(self.futures, timeout=self.cfg.graceful_timeout) futures.wait(self.futures, timeout=self.cfg.graceful_timeout)