From 7f9d745eb5919967c682d28e2a6237d62efab97e Mon Sep 17 00:00:00 2001 From: benoitc Date: Fri, 30 May 2014 11:07:35 +0200 Subject: [PATCH] reuse asyncio code in the threaded worker --- gunicorn/arbiter.py | 3 +- gunicorn/config.py | 10 + gunicorn/fdevents.py | 451 --------------------------- gunicorn/selectors.py | 585 ++++++++++++++++++++++++++++++++++++ gunicorn/workers/gthread.py | 37 ++- 5 files changed, 620 insertions(+), 466 deletions(-) delete mode 100644 gunicorn/fdevents.py create mode 100644 gunicorn/selectors.py diff --git a/gunicorn/arbiter.py b/gunicorn/arbiter.py index bd2016d2..e0ffb225 100644 --- a/gunicorn/arbiter.py +++ b/gunicorn/arbiter.py @@ -133,8 +133,7 @@ class Arbiter(object): listeners_str = ",".join([str(l) for l in self.LISTENERS]) self.log.debug("Arbiter booted") self.log.info("Listening at: %s (%s)", listeners_str, self.pid) - self.log.info("Using worker: %s", - self.cfg.settings['worker_class'].get()) + self.log.info("Using worker: %s", self.cfg.worker_class_str) self.cfg.when_ready(self) diff --git a/gunicorn/config.py b/gunicorn/config.py index cd4d1784..4c28dae3 100644 --- a/gunicorn/config.py +++ b/gunicorn/config.py @@ -88,6 +88,16 @@ class Config(object): return parser + @property + def worker_class_str(self): + uri = self.settings['worker_class'].get() + + ## are we using a threaded worker? + is_sync = uri.endswith('SyncWorker') or uri == 'sync' + if is_sync and self.threads > 1: + return "threads" + return uri + @property def worker_class(self): uri = self.settings['worker_class'].get() diff --git a/gunicorn/fdevents.py b/gunicorn/fdevents.py deleted file mode 100644 index 3a273f5d..00000000 --- a/gunicorn/fdevents.py +++ /dev/null @@ -1,451 +0,0 @@ -# -*- coding: utf-8 - -# -# This file is part of gunicorn released under the MIT license. -# See the NOTICE for more information. - - -""" A module implementing Poller depending on the platform. A pollster -allows you to register an fd, and retrieve events on it. """ - -import select -import sys - -from .util import fd_, close_on_exec - - -class PollerBase(object): - - def addfd(self, fd, mode, repeat=True): - """ add a filed escriptor to poll. - - fdevent Parameters: - - * fd : file descriptor or file object - * mode: 'r' to wait for read events, 'w' to wait for write events - * repeat: true or false . to continuously wait on this event or - not (default is true). - """ - - raise NotImplementedError - - def delfd(self, fd, mode): - """ stop to poll for the event on this file descriptor - - Parameters: - - * fd : file descriptor or file object - * mode: 'r' to wait for read events, 'w' to wait for write events - """ - - raise NotImplementedError - - def waitfd(self, nsec): - """ return one event from the pollster. - - return: (fd, mode) - """ - self._wait(nsec) - if self.events: - return self.events.pop(0) - return None - - - def wait(self, nsec): - """ return all events raised in the pollster when calling the - function. - - return: [(fd, mode), ....] - """ - events = self._wait(nsec) - self.events = [] - return events - - def close(self): - """ close the pollster """ - raise NotImplementedError - - -class SelectPoller(PollerBase): - - def __init__(self): - self.read_fds = {} - self.write_fds = {} - self.events = [] - - def addfd(self, fd, mode, repeat=True): - fd = fd_(fd) - - if mode == 'r': - self.read_fds[fd] = repeat - elif mode == 'w': - self.write_fds[fd] = repeat - else: - raise ValueError('unkown mode {0}'.format(mode)) - - def delfd(self, fd, mode): - if mode == 'r' and fd in self.read_fds: - del self.read_fds[fd] - elif fd in self.write_fds: - del self.write_fds[fd] - - def _wait(self, nsec): - read_fds = [fd for fd in self.read_fds] - write_fds = [fd for fd in self.write_fds] - - if len(self.events) == 0: - try: - r, w, e = select.select(read_fds, write_fds, [], nsec) - except select.error as e: - if e.args[0] != errno.EINTR: - raise - - events = [] - for fd in r: - if fd in self.read_fds: - if self.read_fds[fd] == False: - del self.read_fds[fd] - events.append((fd, 'r')) - - for fd in w: - if fd in self.write_fds: - if self.write_fds[fd] == False: - del self.write_fds[fd] - events.append((fd, 'w')) - - self.events.extend(events) - return self.events - - def close(self): - self.read_fds = [] - self.write_fds = [] - -if hasattr(select, 'kqueue'): - - class KQueuePoller(PollerBase): - - def __init__(self): - self.kq = select.kqueue() - close_on_exec(self.kq.fileno()) - self.events = [] - self.max_ev = 0 - - def addfd(self, fd, mode, repeat=True): - fd = fd_(fd) - - self.max_ev += 1 - - if mode == 'r': - kmode = select.KQ_FILTER_READ - else: - kmode = select.KQ_FILTER_WRITE - - flags = select.KQ_EV_ADD - - if sys.platform.startswith("darwin"): - flags |= select.KQ_EV_ENABLE - - if not repeat: - flags |= select.KQ_EV_ONESHOT - - ev = select.kevent(fd, kmode, flags) - self.kq.control([ev], 0, 0) - - def delfd(self, fd, mode): - fd = fd_(fd) - - self.max_ev -= 1 - - if fd < 0: - return - - if mode == 'r': - kmode = select.KQ_FILTER_READ - else: - kmode = select.KQ_FILTER_WRITE - - ev = select.kevent(fd_(fd), select.KQ_FILTER_READ, - select.KQ_EV_DELETE) - self.kq.control([ev], 0) - - def _wait(self, nsec=0): - if len(self.events) == 0: - try: - events = self.kq.control(None, self.max_ev, nsec) - except select.error as e: - if e.args[0] != errno.EINTR: - raise - - # process events - all_events = [] - for ev in events: - if ev.filter == select.KQ_FILTER_READ: - mode = 'r' - else: - mode = 'w' - all_events.append((fd_(ev.ident), mode)) - - self.events.extend(all_events) - - # return all events - return self.events - - def close(self): - self.kq.close() - -if hasattr(select, "epoll"): - class EpollPoller(PollerBase): - - def __init__(self): - self.poll = select.epoll() - close_on_exec(self.poll.fileno()) - self.fds = {} - self.events = [] - - def addfd(self, fd, mode, repeat=True): - fd = fd_(fd) - - if mode == 'r': - mode = (select.EPOLLIN, repeat) - else: - mode = (select.EPOLLOUT, repeat) - - if fd in self.fds: - modes = self.fds[fd] - if mode in self.fds[fd]: - # already registered for this mode - return - modes.append(mode) - addfd_ = self.poll.modify - else: - modes = [mode] - addfd_ = self.poll.register - - # append the new mode to fds - self.fds[fd] = modes - - mask = 0 - for mode, r in modes: - mask |= mode - - if not repeat: - mask |= select.EPOLLONESHOT - - addfd_(fd, mask) - - def delfd(self, fd, mode): - fd = fd_(fd) - - if mode == 'r': - mode = select.POLLIN | select.POLLPRI - else: - mode = select.POLLOUT - - if fd not in self.fds: - return - - modes = [] - for m, r in self.fds[fd]: - if mode != m: - modes.append((m, r)) - - if not modes: - # del the fd from the poll - self.poll.unregister(fd) - del self.fds[fd] - else: - # modify the fd in the poll - self.fds[fd] = modes - m, r = modes[0] - mask = m[0] - if r: - mask |= select.EPOLLONESHOT - - self.poll.modify(fd, mask) - - def _wait(self, nsec=0): - # wait for the events - if len(self.events) == 0: - try: - events = self.poll.poll(nsec) - except select.error as e: - if e.args[0] != errno.EINTR: - raise - - if events: - all_events = [] - fds = {} - for fd, ev in events: - fd = fd_(fd) - if ev & select.EPOLLIN: - mode = 'r' - else: - mode = 'w' - - all_events.append((fd, mode)) - - if fd in fds: - fds[fd].append(mode) - else: - fds[fd] = [mode] - - # eventually remove the mode from the list if repeat - # was set to False and modify the poll if needed. - modes = [] - for m, r in self.fds[fd]: - if not r: - continue - modes.append((m, r)) - - if modes != self.fds[fd]: - self.fds[fd] = modes - mask = 0 - for m, r in modes: - mask |= m - self.poll.modify(fd, mask) - - self.events.extend(all_events) - - # return all events - return self.events - - def close(self): - self.poll.close() - - -if hasattr(select, "poll") or hasattr(select, "devpoll"): - - class _PollerBase(PollerBase): - - POLL_IMPL = None - - def __init__(self): - self.poll = self.POLL_IMPL() - self.fds = {} - self.events = [] - - def addfd(self, fd, mode, repeat=True): - fd = fd_(fd) - if mode == 'r': - mode = (select.POLLIN, repeat) - else: - mode = (select.POLLOUT, repeat) - - if fd in self.fds: - modes = self.fds[fd] - if mode in modes: - # already registered for this mode - return - modes.append(mode) - addfd_ = self.poll.modify - else: - modes = [mode] - addfd_ = self.poll.register - - # append the new mode to fds - self.fds[fd] = modes - - mask = 0 - for mode, r in modes: - mask |= mode - - addfd_(fd, mask) - - def delfd(self, fd, mode): - fd = fd_(fd) - - if mode == 'r': - mode = select.POLLIN | select.POLLPRI - else: - mode = select.POLLOUT - - if fd not in self.fds: - return - - modes = [] - for m, r in self.fds[fd]: - if mode != m: - modes.append((m, r)) - - if not modes: - # del the fd from the poll - self.poll.unregister(fd) - del self.fds[fd] - else: - # modify the fd in the poll - self.fds[fd] = modes - m, r = modes[0] - mask = m[0] - self.poll.modify(fd, mask) - - def _wait(self, nsec=0): - # wait for the events - if len(self.events) == 0: - try: - events = self.poll.poll(nsec) - except select.error as e: - if e.args[0] != errno.EINTR: - raise - - all_events = [] - for fd, ev in events: - fd = fd_(fd) - - if fd not in self.fds: - continue - - if ev & select.POLLIN or ev & select.POLLPRI: - mode = 'r' - else: - mode = 'w' - - # add new event to the list - all_events.append((fd, mode)) - - # eventually remove the mode from the list if repeat - # was set to False and modify the poll if needed. - modes = [] - for m, r in self.fds[fd]: - if not r: - continue - modes.append((m, r)) - - if not modes: - self.poll.unregister(fd) - else: - mask = 0 - if modes != self.fds[fd]: - mask |= m - self.poll.modify(fd, mask) - - - self.events.extend(all_events) - return self.events - - def close(self): - for fd in self.fds: - self.poll.unregister(fd) - self.fds = [] - self.poll = None - - - if hasattr(select, "devpoll"): - - class DevPollPoller(_PollerBase): - POLL_IMPL = select.devpoll - - if hasattr(select, "poll"): - class PollPoller(_PollerBase): - POLL_IMPL = select.poll - - -# choose the best implementation depending on the platform. -if 'KQueuePoller' in globals(): - DefaultPoller = KQueuePoller -elif 'EpollPoller' in globals(): - DefaultPoller = EpollPoller -elif 'DevpollPoller' in globals(): - DefaultPoller = DevpollPoller -elif 'PollPoller' in globals(): - DefaultPoller = PollPoller -else: - DefaultPoller = SelectPoller diff --git a/gunicorn/selectors.py b/gunicorn/selectors.py new file mode 100644 index 00000000..4e9ae6ec --- /dev/null +++ b/gunicorn/selectors.py @@ -0,0 +1,585 @@ +"""Selectors module. + +This module allows high-level and efficient I/O multiplexing, built upon the +`select` module primitives. +""" + + +from abc import ABCMeta, abstractmethod +from collections import namedtuple, Mapping +import math +import select +import sys + + +# generic events, that must be mapped to implementation-specific ones +EVENT_READ = (1 << 0) +EVENT_WRITE = (1 << 1) + + +def _fileobj_to_fd(fileobj): + """Return a file descriptor from a file object. + + Parameters: + fileobj -- file object or file descriptor + + Returns: + corresponding file descriptor + + Raises: + ValueError if the object is invalid + """ + if isinstance(fileobj, int): + fd = fileobj + else: + try: + fd = int(fileobj.fileno()) + except (AttributeError, TypeError, ValueError): + raise ValueError("Invalid file object: " + "{!r}".format(fileobj)) from None + if fd < 0: + raise ValueError("Invalid file descriptor: {}".format(fd)) + return fd + + +SelectorKey = namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data']) +"""Object used to associate a file object to its backing file descriptor, +selected event mask and attached data.""" + + +class _SelectorMapping(Mapping): + """Mapping of file objects to selector keys.""" + + def __init__(self, selector): + self._selector = selector + + def __len__(self): + return len(self._selector._fd_to_key) + + def __getitem__(self, fileobj): + try: + fd = self._selector._fileobj_lookup(fileobj) + return self._selector._fd_to_key[fd] + except KeyError: + raise KeyError("{!r} is not registered".format(fileobj)) from None + + def __iter__(self): + return iter(self._selector._fd_to_key) + + +class BaseSelector(metaclass=ABCMeta): + """Selector abstract base class. + + A selector supports registering file objects to be monitored for specific + I/O events. + + A file object is a file descriptor or any object with a `fileno()` method. + An arbitrary object can be attached to the file object, which can be used + for example to store context information, a callback, etc. + + A selector can use various implementations (select(), poll(), epoll()...) + depending on the platform. The default `Selector` class uses the most + efficient implementation on the current platform. + """ + + @abstractmethod + def register(self, fileobj, events, data=None): + """Register a file object. + + Parameters: + fileobj -- file object or file descriptor + events -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE) + data -- attached data + + Returns: + SelectorKey instance + + Raises: + ValueError if events is invalid + KeyError if fileobj is already registered + OSError if fileobj is closed or otherwise is unacceptable to + the underlying system call (if a system call is made) + + Note: + OSError may or may not be raised + """ + raise NotImplementedError + + @abstractmethod + def unregister(self, fileobj): + """Unregister a file object. + + Parameters: + fileobj -- file object or file descriptor + + Returns: + SelectorKey instance + + Raises: + KeyError if fileobj is not registered + + Note: + If fileobj is registered but has since been closed this does + *not* raise OSError (even if the wrapped syscall does) + """ + raise NotImplementedError + + def modify(self, fileobj, events, data=None): + """Change a registered file object monitored events or attached data. + + Parameters: + fileobj -- file object or file descriptor + events -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE) + data -- attached data + + Returns: + SelectorKey instance + + Raises: + Anything that unregister() or register() raises + """ + self.unregister(fileobj) + return self.register(fileobj, events, data) + + @abstractmethod + def select(self, timeout=None): + """Perform the actual selection, until some monitored file objects are + ready or a timeout expires. + + Parameters: + timeout -- if timeout > 0, this specifies the maximum wait time, in + seconds + if timeout <= 0, the select() call won't block, and will + report the currently ready file objects + if timeout is None, select() will block until a monitored + file object becomes ready + + Returns: + list of (key, events) for ready file objects + `events` is a bitwise mask of EVENT_READ|EVENT_WRITE + """ + raise NotImplementedError + + def close(self): + """Close the selector. + + This must be called to make sure that any underlying resource is freed. + """ + pass + + def get_key(self, fileobj): + """Return the key associated to a registered file object. + + Returns: + SelectorKey for this file object + """ + mapping = self.get_map() + try: + return mapping[fileobj] + except KeyError: + raise KeyError("{!r} is not registered".format(fileobj)) from None + + @abstractmethod + def get_map(self): + """Return a mapping of file objects to selector keys.""" + raise NotImplementedError + + def __enter__(self): + return self + + def __exit__(self, *args): + self.close() + + +class _BaseSelectorImpl(BaseSelector): + """Base selector implementation.""" + + def __init__(self): + # this maps file descriptors to keys + self._fd_to_key = {} + # read-only mapping returned by get_map() + self._map = _SelectorMapping(self) + + def _fileobj_lookup(self, fileobj): + """Return a file descriptor from a file object. + + This wraps _fileobj_to_fd() to do an exhaustive search in case + the object is invalid but we still have it in our map. This + is used by unregister() so we can unregister an object that + was previously registered even if it is closed. It is also + used by _SelectorMapping. + """ + try: + return _fileobj_to_fd(fileobj) + except ValueError: + # Do an exhaustive search. + for key in self._fd_to_key.values(): + if key.fileobj is fileobj: + return key.fd + # Raise ValueError after all. + raise + + def register(self, fileobj, events, data=None): + if (not events) or (events & ~(EVENT_READ | EVENT_WRITE)): + raise ValueError("Invalid events: {!r}".format(events)) + + key = SelectorKey(fileobj, self._fileobj_lookup(fileobj), events, data) + + if key.fd in self._fd_to_key: + raise KeyError("{!r} (FD {}) is already registered" + .format(fileobj, key.fd)) + + self._fd_to_key[key.fd] = key + return key + + def unregister(self, fileobj): + try: + key = self._fd_to_key.pop(self._fileobj_lookup(fileobj)) + except KeyError: + raise KeyError("{!r} is not registered".format(fileobj)) from None + return key + + def modify(self, fileobj, events, data=None): + # TODO: Subclasses can probably optimize this even further. + try: + key = self._fd_to_key[self._fileobj_lookup(fileobj)] + except KeyError: + raise KeyError("{!r} is not registered".format(fileobj)) from None + if events != key.events: + self.unregister(fileobj) + key = self.register(fileobj, events, data) + elif data != key.data: + # Use a shortcut to update the data. + key = key._replace(data=data) + self._fd_to_key[key.fd] = key + return key + + def close(self): + self._fd_to_key.clear() + + def get_map(self): + return self._map + + def _key_from_fd(self, fd): + """Return the key associated to a given file descriptor. + + Parameters: + fd -- file descriptor + + Returns: + corresponding key, or None if not found + """ + try: + return self._fd_to_key[fd] + except KeyError: + return None + + +class SelectSelector(_BaseSelectorImpl): + """Select-based selector.""" + + def __init__(self): + super().__init__() + self._readers = set() + self._writers = set() + + def register(self, fileobj, events, data=None): + key = super().register(fileobj, events, data) + if events & EVENT_READ: + self._readers.add(key.fd) + if events & EVENT_WRITE: + self._writers.add(key.fd) + return key + + def unregister(self, fileobj): + key = super().unregister(fileobj) + self._readers.discard(key.fd) + self._writers.discard(key.fd) + return key + + if sys.platform == 'win32': + def _select(self, r, w, _, timeout=None): + r, w, x = select.select(r, w, w, timeout) + return r, w + x, [] + else: + _select = select.select + + def select(self, timeout=None): + timeout = None if timeout is None else max(timeout, 0) + ready = [] + try: + r, w, _ = self._select(self._readers, self._writers, [], timeout) + except InterruptedError: + return ready + r = set(r) + w = set(w) + for fd in r | w: + events = 0 + if fd in r: + events |= EVENT_READ + if fd in w: + events |= EVENT_WRITE + + key = self._key_from_fd(fd) + if key: + ready.append((key, events & key.events)) + return ready + + +if hasattr(select, 'poll'): + + class PollSelector(_BaseSelectorImpl): + """Poll-based selector.""" + + def __init__(self): + super().__init__() + self._poll = select.poll() + + def register(self, fileobj, events, data=None): + key = super().register(fileobj, events, data) + poll_events = 0 + if events & EVENT_READ: + poll_events |= select.POLLIN + if events & EVENT_WRITE: + poll_events |= select.POLLOUT + self._poll.register(key.fd, poll_events) + return key + + def unregister(self, fileobj): + key = super().unregister(fileobj) + self._poll.unregister(key.fd) + return key + + def select(self, timeout=None): + if timeout is None: + timeout = None + elif timeout <= 0: + timeout = 0 + else: + # poll() has a resolution of 1 millisecond, round away from + # zero to wait *at least* timeout seconds. + timeout = math.ceil(timeout * 1e3) + ready = [] + try: + fd_event_list = self._poll.poll(timeout) + except InterruptedError: + return ready + for fd, event in fd_event_list: + events = 0 + if event & ~select.POLLIN: + events |= EVENT_WRITE + if event & ~select.POLLOUT: + events |= EVENT_READ + + key = self._key_from_fd(fd) + if key: + ready.append((key, events & key.events)) + return ready + + +if hasattr(select, 'epoll'): + + class EpollSelector(_BaseSelectorImpl): + """Epoll-based selector.""" + + def __init__(self): + super().__init__() + self._epoll = select.epoll() + + def fileno(self): + return self._epoll.fileno() + + def register(self, fileobj, events, data=None): + key = super().register(fileobj, events, data) + epoll_events = 0 + if events & EVENT_READ: + epoll_events |= select.EPOLLIN + if events & EVENT_WRITE: + epoll_events |= select.EPOLLOUT + self._epoll.register(key.fd, epoll_events) + return key + + def unregister(self, fileobj): + key = super().unregister(fileobj) + try: + self._epoll.unregister(key.fd) + except OSError: + # This can happen if the FD was closed since it + # was registered. + pass + return key + + def select(self, timeout=None): + if timeout is None: + timeout = -1 + elif timeout <= 0: + timeout = 0 + else: + # epoll_wait() has a resolution of 1 millisecond, round away + # from zero to wait *at least* timeout seconds. + timeout = math.ceil(timeout * 1e3) * 1e-3 + max_ev = len(self._fd_to_key) + ready = [] + try: + fd_event_list = self._epoll.poll(timeout, max_ev) + except InterruptedError: + return ready + for fd, event in fd_event_list: + events = 0 + if event & ~select.EPOLLIN: + events |= EVENT_WRITE + if event & ~select.EPOLLOUT: + events |= EVENT_READ + + key = self._key_from_fd(fd) + if key: + ready.append((key, events & key.events)) + return ready + + def close(self): + self._epoll.close() + super().close() + + +if hasattr(select, 'devpoll'): + + class DevpollSelector(_BaseSelectorImpl): + """Solaris /dev/poll selector.""" + + def __init__(self): + super().__init__() + self._devpoll = select.devpoll() + + def fileno(self): + return self._devpoll.fileno() + + def register(self, fileobj, events, data=None): + key = super().register(fileobj, events, data) + poll_events = 0 + if events & EVENT_READ: + poll_events |= select.POLLIN + if events & EVENT_WRITE: + poll_events |= select.POLLOUT + self._devpoll.register(key.fd, poll_events) + return key + + def unregister(self, fileobj): + key = super().unregister(fileobj) + self._devpoll.unregister(key.fd) + return key + + def select(self, timeout=None): + if timeout is None: + timeout = None + elif timeout <= 0: + timeout = 0 + else: + # devpoll() has a resolution of 1 millisecond, round away from + # zero to wait *at least* timeout seconds. + timeout = math.ceil(timeout * 1e3) + ready = [] + try: + fd_event_list = self._devpoll.poll(timeout) + except InterruptedError: + return ready + for fd, event in fd_event_list: + events = 0 + if event & ~select.POLLIN: + events |= EVENT_WRITE + if event & ~select.POLLOUT: + events |= EVENT_READ + + key = self._key_from_fd(fd) + if key: + ready.append((key, events & key.events)) + return ready + + def close(self): + self._devpoll.close() + super().close() + + +if hasattr(select, 'kqueue'): + + class KqueueSelector(_BaseSelectorImpl): + """Kqueue-based selector.""" + + def __init__(self): + super().__init__() + self._kqueue = select.kqueue() + + def fileno(self): + return self._kqueue.fileno() + + def register(self, fileobj, events, data=None): + key = super().register(fileobj, events, data) + if events & EVENT_READ: + kev = select.kevent(key.fd, select.KQ_FILTER_READ, + select.KQ_EV_ADD) + self._kqueue.control([kev], 0, 0) + if events & EVENT_WRITE: + kev = select.kevent(key.fd, select.KQ_FILTER_WRITE, + select.KQ_EV_ADD) + self._kqueue.control([kev], 0, 0) + return key + + def unregister(self, fileobj): + key = super().unregister(fileobj) + if key.events & EVENT_READ: + kev = select.kevent(key.fd, select.KQ_FILTER_READ, + select.KQ_EV_DELETE) + try: + self._kqueue.control([kev], 0, 0) + except OSError: + # This can happen if the FD was closed since it + # was registered. + pass + if key.events & EVENT_WRITE: + kev = select.kevent(key.fd, select.KQ_FILTER_WRITE, + select.KQ_EV_DELETE) + try: + self._kqueue.control([kev], 0, 0) + except OSError: + # See comment above. + pass + return key + + def select(self, timeout=None): + timeout = None if timeout is None else max(timeout, 0) + max_ev = len(self._fd_to_key) + ready = [] + try: + kev_list = self._kqueue.control(None, max_ev, timeout) + except InterruptedError: + return ready + for kev in kev_list: + fd = kev.ident + flag = kev.filter + events = 0 + if flag == select.KQ_FILTER_READ: + events |= EVENT_READ + if flag == select.KQ_FILTER_WRITE: + events |= EVENT_WRITE + + key = self._key_from_fd(fd) + if key: + ready.append((key, events & key.events)) + return ready + + def close(self): + self._kqueue.close() + super().close() + + +# Choose the best implementation: roughly, epoll|kqueue|devpoll > poll > select. +# select() also can't accept a FD > FD_SETSIZE (usually around 1024) +if 'KqueueSelector' in globals(): + DefaultSelector = KqueueSelector +elif 'EpollSelector' in globals(): + DefaultSelector = EpollSelector +elif 'DevpollSelector' in globals(): + DefaultSelector = DevpollSelector +elif 'PollSelector' in globals(): + DefaultSelector = PollSelector +else: + DefaultSelector = SelectSelector diff --git a/gunicorn/workers/gthread.py b/gunicorn/workers/gthread.py index f34c7867..3acfd40f 100644 --- a/gunicorn/workers/gthread.py +++ b/gunicorn/workers/gthread.py @@ -24,11 +24,18 @@ import time from .. import http from ..http import wsgi -from .. import fdevents from .. import util from . import base from .. import six +try: + from asyncio import selectors +except ImportError: + from .. import selectors + + +ACCEPT = 0 +KEEPALIVED = 1 class TConn(): @@ -66,16 +73,15 @@ class ThreadWorker(base.Worker): def init_process(self): self.tpool = futures.ThreadPoolExecutor(max_workers=self.cfg.threads) - self.poller = fdevents.DefaultPoller() + self.poller = selectors.DefaultSelector() super(ThreadWorker, self).init_process() def run(self): # init listeners, add them to the event loop for s in self.sockets: s.setblocking(False) - self.poller.addfd(s, 'r') + self.poller.register(s, selectors.EVENT_READ, ACCEPT) - listeners = dict([(s.fileno(), s) for s in self.sockets]) while self.alive: # If our parent changed then we shut down. @@ -86,13 +92,14 @@ class ThreadWorker(base.Worker): # notify the arbiter we are alive self.notify() - events = self.poller.wait(0.01) + events = self.poller.select(0.01) if events: - for (fd, mode) in events: + for key, mask in events: fs = None client = None - if fd in listeners: - listener = listeners[fd] + if key.data == ACCEPT: + listener = key.fileobj + # start to accept connections try: client, addr = listener.accept() @@ -107,16 +114,20 @@ class ThreadWorker(base.Worker): if e.args[0] not in (errno.EAGAIN, errno.ECONNABORTED, errno.EWOULDBLOCK): raise - elif fd in self.keepalived: + + else: # get the client connection - client = self.keepalived[fd] + client = key.data # remove it from the heap + try: del self._heap[operator.indexOf(self._heap, client)] except (KeyError, IndexError): pass + self.poller.unregister(key.fileobj) + # add a job to the pool fs = self.tpool.submit(self.handle, client.listener, client.sock, client.addr, client.parser) @@ -146,10 +157,10 @@ class ThreadWorker(base.Worker): # register the connection heapq.heappush(self._heap, tconn) - self.keepalived[fs.sock.fileno()] = tconn # add the socket to the event loop - self.poller.addfd(fs.sock, 'r', False) + self.poller.register(fs.sock, selectors.EVENT_READ, + tconn) else: # at this point the connection should be # closed but we make sure it is. @@ -176,7 +187,7 @@ class ThreadWorker(base.Worker): break else: # remove the socket from the poller - self.poller.delfd(conn.sock, 'r') + self.poller.unregister(conn.sock) # close the socket util.close(conn.sock)