reuse asyncio code in the threaded worker

This commit is contained in:
benoitc 2014-05-30 11:07:35 +02:00
parent 81810d9f04
commit 7f9d745eb5
5 changed files with 620 additions and 466 deletions

View File

@ -133,8 +133,7 @@ class Arbiter(object):
listeners_str = ",".join([str(l) for l in self.LISTENERS])
self.log.debug("Arbiter booted")
self.log.info("Listening at: %s (%s)", listeners_str, self.pid)
self.log.info("Using worker: %s",
self.cfg.settings['worker_class'].get())
self.log.info("Using worker: %s", self.cfg.worker_class_str)
self.cfg.when_ready(self)

View File

@ -88,6 +88,16 @@ class Config(object):
return parser
@property
def worker_class_str(self):
uri = self.settings['worker_class'].get()
## are we using a threaded worker?
is_sync = uri.endswith('SyncWorker') or uri == 'sync'
if is_sync and self.threads > 1:
return "threads"
return uri
@property
def worker_class(self):
uri = self.settings['worker_class'].get()

View File

@ -1,451 +0,0 @@
# -*- coding: utf-8 -
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
""" A module implementing Poller depending on the platform. A pollster
allows you to register an fd, and retrieve events on it. """
import select
import sys
from .util import fd_, close_on_exec
class PollerBase(object):
def addfd(self, fd, mode, repeat=True):
""" add a filed escriptor to poll.
fdevent Parameters:
* fd : file descriptor or file object
* mode: 'r' to wait for read events, 'w' to wait for write events
* repeat: true or false . to continuously wait on this event or
not (default is true).
"""
raise NotImplementedError
def delfd(self, fd, mode):
""" stop to poll for the event on this file descriptor
Parameters:
* fd : file descriptor or file object
* mode: 'r' to wait for read events, 'w' to wait for write events
"""
raise NotImplementedError
def waitfd(self, nsec):
""" return one event from the pollster.
return: (fd, mode)
"""
self._wait(nsec)
if self.events:
return self.events.pop(0)
return None
def wait(self, nsec):
""" return all events raised in the pollster when calling the
function.
return: [(fd, mode), ....]
"""
events = self._wait(nsec)
self.events = []
return events
def close(self):
""" close the pollster """
raise NotImplementedError
class SelectPoller(PollerBase):
def __init__(self):
self.read_fds = {}
self.write_fds = {}
self.events = []
def addfd(self, fd, mode, repeat=True):
fd = fd_(fd)
if mode == 'r':
self.read_fds[fd] = repeat
elif mode == 'w':
self.write_fds[fd] = repeat
else:
raise ValueError('unkown mode {0}'.format(mode))
def delfd(self, fd, mode):
if mode == 'r' and fd in self.read_fds:
del self.read_fds[fd]
elif fd in self.write_fds:
del self.write_fds[fd]
def _wait(self, nsec):
read_fds = [fd for fd in self.read_fds]
write_fds = [fd for fd in self.write_fds]
if len(self.events) == 0:
try:
r, w, e = select.select(read_fds, write_fds, [], nsec)
except select.error as e:
if e.args[0] != errno.EINTR:
raise
events = []
for fd in r:
if fd in self.read_fds:
if self.read_fds[fd] == False:
del self.read_fds[fd]
events.append((fd, 'r'))
for fd in w:
if fd in self.write_fds:
if self.write_fds[fd] == False:
del self.write_fds[fd]
events.append((fd, 'w'))
self.events.extend(events)
return self.events
def close(self):
self.read_fds = []
self.write_fds = []
if hasattr(select, 'kqueue'):
class KQueuePoller(PollerBase):
def __init__(self):
self.kq = select.kqueue()
close_on_exec(self.kq.fileno())
self.events = []
self.max_ev = 0
def addfd(self, fd, mode, repeat=True):
fd = fd_(fd)
self.max_ev += 1
if mode == 'r':
kmode = select.KQ_FILTER_READ
else:
kmode = select.KQ_FILTER_WRITE
flags = select.KQ_EV_ADD
if sys.platform.startswith("darwin"):
flags |= select.KQ_EV_ENABLE
if not repeat:
flags |= select.KQ_EV_ONESHOT
ev = select.kevent(fd, kmode, flags)
self.kq.control([ev], 0, 0)
def delfd(self, fd, mode):
fd = fd_(fd)
self.max_ev -= 1
if fd < 0:
return
if mode == 'r':
kmode = select.KQ_FILTER_READ
else:
kmode = select.KQ_FILTER_WRITE
ev = select.kevent(fd_(fd), select.KQ_FILTER_READ,
select.KQ_EV_DELETE)
self.kq.control([ev], 0)
def _wait(self, nsec=0):
if len(self.events) == 0:
try:
events = self.kq.control(None, self.max_ev, nsec)
except select.error as e:
if e.args[0] != errno.EINTR:
raise
# process events
all_events = []
for ev in events:
if ev.filter == select.KQ_FILTER_READ:
mode = 'r'
else:
mode = 'w'
all_events.append((fd_(ev.ident), mode))
self.events.extend(all_events)
# return all events
return self.events
def close(self):
self.kq.close()
if hasattr(select, "epoll"):
class EpollPoller(PollerBase):
def __init__(self):
self.poll = select.epoll()
close_on_exec(self.poll.fileno())
self.fds = {}
self.events = []
def addfd(self, fd, mode, repeat=True):
fd = fd_(fd)
if mode == 'r':
mode = (select.EPOLLIN, repeat)
else:
mode = (select.EPOLLOUT, repeat)
if fd in self.fds:
modes = self.fds[fd]
if mode in self.fds[fd]:
# already registered for this mode
return
modes.append(mode)
addfd_ = self.poll.modify
else:
modes = [mode]
addfd_ = self.poll.register
# append the new mode to fds
self.fds[fd] = modes
mask = 0
for mode, r in modes:
mask |= mode
if not repeat:
mask |= select.EPOLLONESHOT
addfd_(fd, mask)
def delfd(self, fd, mode):
fd = fd_(fd)
if mode == 'r':
mode = select.POLLIN | select.POLLPRI
else:
mode = select.POLLOUT
if fd not in self.fds:
return
modes = []
for m, r in self.fds[fd]:
if mode != m:
modes.append((m, r))
if not modes:
# del the fd from the poll
self.poll.unregister(fd)
del self.fds[fd]
else:
# modify the fd in the poll
self.fds[fd] = modes
m, r = modes[0]
mask = m[0]
if r:
mask |= select.EPOLLONESHOT
self.poll.modify(fd, mask)
def _wait(self, nsec=0):
# wait for the events
if len(self.events) == 0:
try:
events = self.poll.poll(nsec)
except select.error as e:
if e.args[0] != errno.EINTR:
raise
if events:
all_events = []
fds = {}
for fd, ev in events:
fd = fd_(fd)
if ev & select.EPOLLIN:
mode = 'r'
else:
mode = 'w'
all_events.append((fd, mode))
if fd in fds:
fds[fd].append(mode)
else:
fds[fd] = [mode]
# eventually remove the mode from the list if repeat
# was set to False and modify the poll if needed.
modes = []
for m, r in self.fds[fd]:
if not r:
continue
modes.append((m, r))
if modes != self.fds[fd]:
self.fds[fd] = modes
mask = 0
for m, r in modes:
mask |= m
self.poll.modify(fd, mask)
self.events.extend(all_events)
# return all events
return self.events
def close(self):
self.poll.close()
if hasattr(select, "poll") or hasattr(select, "devpoll"):
class _PollerBase(PollerBase):
POLL_IMPL = None
def __init__(self):
self.poll = self.POLL_IMPL()
self.fds = {}
self.events = []
def addfd(self, fd, mode, repeat=True):
fd = fd_(fd)
if mode == 'r':
mode = (select.POLLIN, repeat)
else:
mode = (select.POLLOUT, repeat)
if fd in self.fds:
modes = self.fds[fd]
if mode in modes:
# already registered for this mode
return
modes.append(mode)
addfd_ = self.poll.modify
else:
modes = [mode]
addfd_ = self.poll.register
# append the new mode to fds
self.fds[fd] = modes
mask = 0
for mode, r in modes:
mask |= mode
addfd_(fd, mask)
def delfd(self, fd, mode):
fd = fd_(fd)
if mode == 'r':
mode = select.POLLIN | select.POLLPRI
else:
mode = select.POLLOUT
if fd not in self.fds:
return
modes = []
for m, r in self.fds[fd]:
if mode != m:
modes.append((m, r))
if not modes:
# del the fd from the poll
self.poll.unregister(fd)
del self.fds[fd]
else:
# modify the fd in the poll
self.fds[fd] = modes
m, r = modes[0]
mask = m[0]
self.poll.modify(fd, mask)
def _wait(self, nsec=0):
# wait for the events
if len(self.events) == 0:
try:
events = self.poll.poll(nsec)
except select.error as e:
if e.args[0] != errno.EINTR:
raise
all_events = []
for fd, ev in events:
fd = fd_(fd)
if fd not in self.fds:
continue
if ev & select.POLLIN or ev & select.POLLPRI:
mode = 'r'
else:
mode = 'w'
# add new event to the list
all_events.append((fd, mode))
# eventually remove the mode from the list if repeat
# was set to False and modify the poll if needed.
modes = []
for m, r in self.fds[fd]:
if not r:
continue
modes.append((m, r))
if not modes:
self.poll.unregister(fd)
else:
mask = 0
if modes != self.fds[fd]:
mask |= m
self.poll.modify(fd, mask)
self.events.extend(all_events)
return self.events
def close(self):
for fd in self.fds:
self.poll.unregister(fd)
self.fds = []
self.poll = None
if hasattr(select, "devpoll"):
class DevPollPoller(_PollerBase):
POLL_IMPL = select.devpoll
if hasattr(select, "poll"):
class PollPoller(_PollerBase):
POLL_IMPL = select.poll
# choose the best implementation depending on the platform.
if 'KQueuePoller' in globals():
DefaultPoller = KQueuePoller
elif 'EpollPoller' in globals():
DefaultPoller = EpollPoller
elif 'DevpollPoller' in globals():
DefaultPoller = DevpollPoller
elif 'PollPoller' in globals():
DefaultPoller = PollPoller
else:
DefaultPoller = SelectPoller

585
gunicorn/selectors.py Normal file
View File

@ -0,0 +1,585 @@
"""Selectors module.
This module allows high-level and efficient I/O multiplexing, built upon the
`select` module primitives.
"""
from abc import ABCMeta, abstractmethod
from collections import namedtuple, Mapping
import math
import select
import sys
# generic events, that must be mapped to implementation-specific ones
EVENT_READ = (1 << 0)
EVENT_WRITE = (1 << 1)
def _fileobj_to_fd(fileobj):
"""Return a file descriptor from a file object.
Parameters:
fileobj -- file object or file descriptor
Returns:
corresponding file descriptor
Raises:
ValueError if the object is invalid
"""
if isinstance(fileobj, int):
fd = fileobj
else:
try:
fd = int(fileobj.fileno())
except (AttributeError, TypeError, ValueError):
raise ValueError("Invalid file object: "
"{!r}".format(fileobj)) from None
if fd < 0:
raise ValueError("Invalid file descriptor: {}".format(fd))
return fd
SelectorKey = namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data'])
"""Object used to associate a file object to its backing file descriptor,
selected event mask and attached data."""
class _SelectorMapping(Mapping):
"""Mapping of file objects to selector keys."""
def __init__(self, selector):
self._selector = selector
def __len__(self):
return len(self._selector._fd_to_key)
def __getitem__(self, fileobj):
try:
fd = self._selector._fileobj_lookup(fileobj)
return self._selector._fd_to_key[fd]
except KeyError:
raise KeyError("{!r} is not registered".format(fileobj)) from None
def __iter__(self):
return iter(self._selector._fd_to_key)
class BaseSelector(metaclass=ABCMeta):
"""Selector abstract base class.
A selector supports registering file objects to be monitored for specific
I/O events.
A file object is a file descriptor or any object with a `fileno()` method.
An arbitrary object can be attached to the file object, which can be used
for example to store context information, a callback, etc.
A selector can use various implementations (select(), poll(), epoll()...)
depending on the platform. The default `Selector` class uses the most
efficient implementation on the current platform.
"""
@abstractmethod
def register(self, fileobj, events, data=None):
"""Register a file object.
Parameters:
fileobj -- file object or file descriptor
events -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE)
data -- attached data
Returns:
SelectorKey instance
Raises:
ValueError if events is invalid
KeyError if fileobj is already registered
OSError if fileobj is closed or otherwise is unacceptable to
the underlying system call (if a system call is made)
Note:
OSError may or may not be raised
"""
raise NotImplementedError
@abstractmethod
def unregister(self, fileobj):
"""Unregister a file object.
Parameters:
fileobj -- file object or file descriptor
Returns:
SelectorKey instance
Raises:
KeyError if fileobj is not registered
Note:
If fileobj is registered but has since been closed this does
*not* raise OSError (even if the wrapped syscall does)
"""
raise NotImplementedError
def modify(self, fileobj, events, data=None):
"""Change a registered file object monitored events or attached data.
Parameters:
fileobj -- file object or file descriptor
events -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE)
data -- attached data
Returns:
SelectorKey instance
Raises:
Anything that unregister() or register() raises
"""
self.unregister(fileobj)
return self.register(fileobj, events, data)
@abstractmethod
def select(self, timeout=None):
"""Perform the actual selection, until some monitored file objects are
ready or a timeout expires.
Parameters:
timeout -- if timeout > 0, this specifies the maximum wait time, in
seconds
if timeout <= 0, the select() call won't block, and will
report the currently ready file objects
if timeout is None, select() will block until a monitored
file object becomes ready
Returns:
list of (key, events) for ready file objects
`events` is a bitwise mask of EVENT_READ|EVENT_WRITE
"""
raise NotImplementedError
def close(self):
"""Close the selector.
This must be called to make sure that any underlying resource is freed.
"""
pass
def get_key(self, fileobj):
"""Return the key associated to a registered file object.
Returns:
SelectorKey for this file object
"""
mapping = self.get_map()
try:
return mapping[fileobj]
except KeyError:
raise KeyError("{!r} is not registered".format(fileobj)) from None
@abstractmethod
def get_map(self):
"""Return a mapping of file objects to selector keys."""
raise NotImplementedError
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
class _BaseSelectorImpl(BaseSelector):
"""Base selector implementation."""
def __init__(self):
# this maps file descriptors to keys
self._fd_to_key = {}
# read-only mapping returned by get_map()
self._map = _SelectorMapping(self)
def _fileobj_lookup(self, fileobj):
"""Return a file descriptor from a file object.
This wraps _fileobj_to_fd() to do an exhaustive search in case
the object is invalid but we still have it in our map. This
is used by unregister() so we can unregister an object that
was previously registered even if it is closed. It is also
used by _SelectorMapping.
"""
try:
return _fileobj_to_fd(fileobj)
except ValueError:
# Do an exhaustive search.
for key in self._fd_to_key.values():
if key.fileobj is fileobj:
return key.fd
# Raise ValueError after all.
raise
def register(self, fileobj, events, data=None):
if (not events) or (events & ~(EVENT_READ | EVENT_WRITE)):
raise ValueError("Invalid events: {!r}".format(events))
key = SelectorKey(fileobj, self._fileobj_lookup(fileobj), events, data)
if key.fd in self._fd_to_key:
raise KeyError("{!r} (FD {}) is already registered"
.format(fileobj, key.fd))
self._fd_to_key[key.fd] = key
return key
def unregister(self, fileobj):
try:
key = self._fd_to_key.pop(self._fileobj_lookup(fileobj))
except KeyError:
raise KeyError("{!r} is not registered".format(fileobj)) from None
return key
def modify(self, fileobj, events, data=None):
# TODO: Subclasses can probably optimize this even further.
try:
key = self._fd_to_key[self._fileobj_lookup(fileobj)]
except KeyError:
raise KeyError("{!r} is not registered".format(fileobj)) from None
if events != key.events:
self.unregister(fileobj)
key = self.register(fileobj, events, data)
elif data != key.data:
# Use a shortcut to update the data.
key = key._replace(data=data)
self._fd_to_key[key.fd] = key
return key
def close(self):
self._fd_to_key.clear()
def get_map(self):
return self._map
def _key_from_fd(self, fd):
"""Return the key associated to a given file descriptor.
Parameters:
fd -- file descriptor
Returns:
corresponding key, or None if not found
"""
try:
return self._fd_to_key[fd]
except KeyError:
return None
class SelectSelector(_BaseSelectorImpl):
"""Select-based selector."""
def __init__(self):
super().__init__()
self._readers = set()
self._writers = set()
def register(self, fileobj, events, data=None):
key = super().register(fileobj, events, data)
if events & EVENT_READ:
self._readers.add(key.fd)
if events & EVENT_WRITE:
self._writers.add(key.fd)
return key
def unregister(self, fileobj):
key = super().unregister(fileobj)
self._readers.discard(key.fd)
self._writers.discard(key.fd)
return key
if sys.platform == 'win32':
def _select(self, r, w, _, timeout=None):
r, w, x = select.select(r, w, w, timeout)
return r, w + x, []
else:
_select = select.select
def select(self, timeout=None):
timeout = None if timeout is None else max(timeout, 0)
ready = []
try:
r, w, _ = self._select(self._readers, self._writers, [], timeout)
except InterruptedError:
return ready
r = set(r)
w = set(w)
for fd in r | w:
events = 0
if fd in r:
events |= EVENT_READ
if fd in w:
events |= EVENT_WRITE
key = self._key_from_fd(fd)
if key:
ready.append((key, events & key.events))
return ready
if hasattr(select, 'poll'):
class PollSelector(_BaseSelectorImpl):
"""Poll-based selector."""
def __init__(self):
super().__init__()
self._poll = select.poll()
def register(self, fileobj, events, data=None):
key = super().register(fileobj, events, data)
poll_events = 0
if events & EVENT_READ:
poll_events |= select.POLLIN
if events & EVENT_WRITE:
poll_events |= select.POLLOUT
self._poll.register(key.fd, poll_events)
return key
def unregister(self, fileobj):
key = super().unregister(fileobj)
self._poll.unregister(key.fd)
return key
def select(self, timeout=None):
if timeout is None:
timeout = None
elif timeout <= 0:
timeout = 0
else:
# poll() has a resolution of 1 millisecond, round away from
# zero to wait *at least* timeout seconds.
timeout = math.ceil(timeout * 1e3)
ready = []
try:
fd_event_list = self._poll.poll(timeout)
except InterruptedError:
return ready
for fd, event in fd_event_list:
events = 0
if event & ~select.POLLIN:
events |= EVENT_WRITE
if event & ~select.POLLOUT:
events |= EVENT_READ
key = self._key_from_fd(fd)
if key:
ready.append((key, events & key.events))
return ready
if hasattr(select, 'epoll'):
class EpollSelector(_BaseSelectorImpl):
"""Epoll-based selector."""
def __init__(self):
super().__init__()
self._epoll = select.epoll()
def fileno(self):
return self._epoll.fileno()
def register(self, fileobj, events, data=None):
key = super().register(fileobj, events, data)
epoll_events = 0
if events & EVENT_READ:
epoll_events |= select.EPOLLIN
if events & EVENT_WRITE:
epoll_events |= select.EPOLLOUT
self._epoll.register(key.fd, epoll_events)
return key
def unregister(self, fileobj):
key = super().unregister(fileobj)
try:
self._epoll.unregister(key.fd)
except OSError:
# This can happen if the FD was closed since it
# was registered.
pass
return key
def select(self, timeout=None):
if timeout is None:
timeout = -1
elif timeout <= 0:
timeout = 0
else:
# epoll_wait() has a resolution of 1 millisecond, round away
# from zero to wait *at least* timeout seconds.
timeout = math.ceil(timeout * 1e3) * 1e-3
max_ev = len(self._fd_to_key)
ready = []
try:
fd_event_list = self._epoll.poll(timeout, max_ev)
except InterruptedError:
return ready
for fd, event in fd_event_list:
events = 0
if event & ~select.EPOLLIN:
events |= EVENT_WRITE
if event & ~select.EPOLLOUT:
events |= EVENT_READ
key = self._key_from_fd(fd)
if key:
ready.append((key, events & key.events))
return ready
def close(self):
self._epoll.close()
super().close()
if hasattr(select, 'devpoll'):
class DevpollSelector(_BaseSelectorImpl):
"""Solaris /dev/poll selector."""
def __init__(self):
super().__init__()
self._devpoll = select.devpoll()
def fileno(self):
return self._devpoll.fileno()
def register(self, fileobj, events, data=None):
key = super().register(fileobj, events, data)
poll_events = 0
if events & EVENT_READ:
poll_events |= select.POLLIN
if events & EVENT_WRITE:
poll_events |= select.POLLOUT
self._devpoll.register(key.fd, poll_events)
return key
def unregister(self, fileobj):
key = super().unregister(fileobj)
self._devpoll.unregister(key.fd)
return key
def select(self, timeout=None):
if timeout is None:
timeout = None
elif timeout <= 0:
timeout = 0
else:
# devpoll() has a resolution of 1 millisecond, round away from
# zero to wait *at least* timeout seconds.
timeout = math.ceil(timeout * 1e3)
ready = []
try:
fd_event_list = self._devpoll.poll(timeout)
except InterruptedError:
return ready
for fd, event in fd_event_list:
events = 0
if event & ~select.POLLIN:
events |= EVENT_WRITE
if event & ~select.POLLOUT:
events |= EVENT_READ
key = self._key_from_fd(fd)
if key:
ready.append((key, events & key.events))
return ready
def close(self):
self._devpoll.close()
super().close()
if hasattr(select, 'kqueue'):
class KqueueSelector(_BaseSelectorImpl):
"""Kqueue-based selector."""
def __init__(self):
super().__init__()
self._kqueue = select.kqueue()
def fileno(self):
return self._kqueue.fileno()
def register(self, fileobj, events, data=None):
key = super().register(fileobj, events, data)
if events & EVENT_READ:
kev = select.kevent(key.fd, select.KQ_FILTER_READ,
select.KQ_EV_ADD)
self._kqueue.control([kev], 0, 0)
if events & EVENT_WRITE:
kev = select.kevent(key.fd, select.KQ_FILTER_WRITE,
select.KQ_EV_ADD)
self._kqueue.control([kev], 0, 0)
return key
def unregister(self, fileobj):
key = super().unregister(fileobj)
if key.events & EVENT_READ:
kev = select.kevent(key.fd, select.KQ_FILTER_READ,
select.KQ_EV_DELETE)
try:
self._kqueue.control([kev], 0, 0)
except OSError:
# This can happen if the FD was closed since it
# was registered.
pass
if key.events & EVENT_WRITE:
kev = select.kevent(key.fd, select.KQ_FILTER_WRITE,
select.KQ_EV_DELETE)
try:
self._kqueue.control([kev], 0, 0)
except OSError:
# See comment above.
pass
return key
def select(self, timeout=None):
timeout = None if timeout is None else max(timeout, 0)
max_ev = len(self._fd_to_key)
ready = []
try:
kev_list = self._kqueue.control(None, max_ev, timeout)
except InterruptedError:
return ready
for kev in kev_list:
fd = kev.ident
flag = kev.filter
events = 0
if flag == select.KQ_FILTER_READ:
events |= EVENT_READ
if flag == select.KQ_FILTER_WRITE:
events |= EVENT_WRITE
key = self._key_from_fd(fd)
if key:
ready.append((key, events & key.events))
return ready
def close(self):
self._kqueue.close()
super().close()
# Choose the best implementation: roughly, epoll|kqueue|devpoll > poll > select.
# select() also can't accept a FD > FD_SETSIZE (usually around 1024)
if 'KqueueSelector' in globals():
DefaultSelector = KqueueSelector
elif 'EpollSelector' in globals():
DefaultSelector = EpollSelector
elif 'DevpollSelector' in globals():
DefaultSelector = DevpollSelector
elif 'PollSelector' in globals():
DefaultSelector = PollSelector
else:
DefaultSelector = SelectSelector

View File

@ -24,11 +24,18 @@ import time
from .. import http
from ..http import wsgi
from .. import fdevents
from .. import util
from . import base
from .. import six
try:
from asyncio import selectors
except ImportError:
from .. import selectors
ACCEPT = 0
KEEPALIVED = 1
class TConn():
@ -66,16 +73,15 @@ class ThreadWorker(base.Worker):
def init_process(self):
self.tpool = futures.ThreadPoolExecutor(max_workers=self.cfg.threads)
self.poller = fdevents.DefaultPoller()
self.poller = selectors.DefaultSelector()
super(ThreadWorker, self).init_process()
def run(self):
# init listeners, add them to the event loop
for s in self.sockets:
s.setblocking(False)
self.poller.addfd(s, 'r')
self.poller.register(s, selectors.EVENT_READ, ACCEPT)
listeners = dict([(s.fileno(), s) for s in self.sockets])
while self.alive:
# If our parent changed then we shut down.
@ -86,13 +92,14 @@ class ThreadWorker(base.Worker):
# notify the arbiter we are alive
self.notify()
events = self.poller.wait(0.01)
events = self.poller.select(0.01)
if events:
for (fd, mode) in events:
for key, mask in events:
fs = None
client = None
if fd in listeners:
listener = listeners[fd]
if key.data == ACCEPT:
listener = key.fileobj
# start to accept connections
try:
client, addr = listener.accept()
@ -107,16 +114,20 @@ class ThreadWorker(base.Worker):
if e.args[0] not in (errno.EAGAIN,
errno.ECONNABORTED, errno.EWOULDBLOCK):
raise
elif fd in self.keepalived:
else:
# get the client connection
client = self.keepalived[fd]
client = key.data
# remove it from the heap
try:
del self._heap[operator.indexOf(self._heap, client)]
except (KeyError, IndexError):
pass
self.poller.unregister(key.fileobj)
# add a job to the pool
fs = self.tpool.submit(self.handle, client.listener,
client.sock, client.addr, client.parser)
@ -146,10 +157,10 @@ class ThreadWorker(base.Worker):
# register the connection
heapq.heappush(self._heap, tconn)
self.keepalived[fs.sock.fileno()] = tconn
# add the socket to the event loop
self.poller.addfd(fs.sock, 'r', False)
self.poller.register(fs.sock, selectors.EVENT_READ,
tconn)
else:
# at this point the connection should be
# closed but we make sure it is.
@ -176,7 +187,7 @@ class ThreadWorker(base.Worker):
break
else:
# remove the socket from the poller
self.poller.delfd(conn.sock, 'r')
self.poller.unregister(conn.sock)
# close the socket
util.close(conn.sock)