diff --git a/.gitignore b/.gitignore index 63230128..07c3cc01 100755 --- a/.gitignore +++ b/.gitignore @@ -12,9 +12,8 @@ setuptools-* .DS_Store *.so .Python -distribute-0.6.8-py2.6.egg -distribute-0.6.8.tar.gz -gunicorn.egg-info +*.egg +*.egg-info nohup.out .coverage examples/frameworks/pylonstest/PasteScript* diff --git a/gunicorn/_compat.py b/gunicorn/_compat.py index 63844929..9a4480c3 100644 --- a/gunicorn/_compat.py +++ b/gunicorn/_compat.py @@ -1,8 +1,13 @@ +import sys + from gunicorn import six +PY33 = (sys.version_info >= (3, 3)) + def _check_if_pyc(fname): - """ Returns True if the extension is .pyc, False if .py and None if otherwise """ + """Return True if the extension is .pyc, False if .py + and None if otherwise""" from imp import find_module from os.path import realpath, dirname, basename, splitext @@ -14,11 +19,10 @@ def _check_if_pyc(fname): # Validate and fetch try: fileobj, fullpath, (_, _, pytype) = find_module(module_name, [dirpath]) - except ImportError: - raise IOError("Cannot find config file. Path maybe incorrect! : {0}".format(filepath)) - - return (pytype, fileobj, fullpath) + raise IOError("Cannot find config file. " + "Path maybe incorrect! : {0}".format(filepath)) + return pytype, fileobj, fullpath def _get_codeobj(pyfile): @@ -92,3 +96,107 @@ else: import urllib unquote_to_wsgi_str = urllib.unquote + + +# The following code adapted from trollius.py33_exceptions +def _wrap_error(exc, mapping, key): + if key not in mapping: + return + new_err_cls = mapping[key] + new_err = new_err_cls(*exc.args) + + # raise a new exception with the original traceback + if hasattr(exc, '__traceback__'): + traceback = exc.__traceback__ + else: + traceback = sys.exc_info()[2] + six.reraise(new_err_cls, new_err, traceback) + +if PY33: + import builtins + + BlockingIOError = builtins.BlockingIOError + BrokenPipeError = builtins.BrokenPipeError + ChildProcessError = builtins.ChildProcessError + ConnectionRefusedError = builtins.ConnectionRefusedError + ConnectionResetError = builtins.ConnectionResetError + InterruptedError = builtins.InterruptedError + ConnectionAbortedError = builtins.ConnectionAbortedError + PermissionError = builtins.PermissionError + FileNotFoundError = builtins.FileNotFoundError + ProcessLookupError = builtins.ProcessLookupError + + def wrap_error(func, *args, **kw): + return func(*args, **kw) +else: + import errno + import select + import socket + + class BlockingIOError(OSError): + pass + + class BrokenPipeError(OSError): + pass + + class ChildProcessError(OSError): + pass + + class ConnectionRefusedError(OSError): + pass + + class InterruptedError(OSError): + pass + + class ConnectionResetError(OSError): + pass + + class ConnectionAbortedError(OSError): + pass + + class PermissionError(OSError): + pass + + class FileNotFoundError(OSError): + pass + + class ProcessLookupError(OSError): + pass + + _MAP_ERRNO = { + errno.EACCES: PermissionError, + errno.EAGAIN: BlockingIOError, + errno.EALREADY: BlockingIOError, + errno.ECHILD: ChildProcessError, + errno.ECONNABORTED: ConnectionAbortedError, + errno.ECONNREFUSED: ConnectionRefusedError, + errno.ECONNRESET: ConnectionResetError, + errno.EINPROGRESS: BlockingIOError, + errno.EINTR: InterruptedError, + errno.ENOENT: FileNotFoundError, + errno.EPERM: PermissionError, + errno.EPIPE: BrokenPipeError, + errno.ESHUTDOWN: BrokenPipeError, + errno.EWOULDBLOCK: BlockingIOError, + errno.ESRCH: ProcessLookupError, + } + + def wrap_error(func, *args, **kw): + """ + Wrap socket.error, IOError, OSError, select.error to raise new specialized + exceptions of Python 3.3 like InterruptedError (PEP 3151). + """ + try: + return func(*args, **kw) + except (socket.error, IOError, OSError) as exc: + if hasattr(exc, 'winerror'): + _wrap_error(exc, _MAP_ERRNO, exc.winerror) + # _MAP_ERRNO does not contain all Windows errors. + # For some errors like "file not found", exc.errno should + # be used (ex: ENOENT). + _wrap_error(exc, _MAP_ERRNO, exc.errno) + raise + except select.error as exc: + if exc.args: + _wrap_error(exc, _MAP_ERRNO, exc.args[0]) + raise diff --git a/gunicorn/selectors.py b/gunicorn/selectors.py new file mode 100644 index 00000000..cdae5690 --- /dev/null +++ b/gunicorn/selectors.py @@ -0,0 +1,592 @@ +"""Selectors module. + +This module allows high-level and efficient I/O multiplexing, built upon the +`select` module primitives. + +The following code adapted from trollius.selectors. +""" + + +from abc import ABCMeta, abstractmethod +from collections import namedtuple, Mapping +import math +import select +import sys + +from gunicorn._compat import wrap_error, InterruptedError +from gunicorn import six + + +# generic events, that must be mapped to implementation-specific ones +EVENT_READ = (1 << 0) +EVENT_WRITE = (1 << 1) + + +def _fileobj_to_fd(fileobj): + """Return a file descriptor from a file object. + + Parameters: + fileobj -- file object or file descriptor + + Returns: + corresponding file descriptor + + Raises: + ValueError if the object is invalid + """ + if isinstance(fileobj, six.integer_types): + fd = fileobj + else: + try: + fd = int(fileobj.fileno()) + except (AttributeError, TypeError, ValueError): + raise ValueError("Invalid file object: " + "{0!r}".format(fileobj)) + if fd < 0: + raise ValueError("Invalid file descriptor: {0}".format(fd)) + return fd + + +SelectorKey = namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data']) +"""Object used to associate a file object to its backing file descriptor, +selected event mask and attached data.""" + + +class _SelectorMapping(Mapping): + """Mapping of file objects to selector keys.""" + + def __init__(self, selector): + self._selector = selector + + def __len__(self): + return len(self._selector._fd_to_key) + + def __getitem__(self, fileobj): + try: + fd = self._selector._fileobj_lookup(fileobj) + return self._selector._fd_to_key[fd] + except KeyError: + raise KeyError("{0!r} is not registered".format(fileobj)) + + def __iter__(self): + return iter(self._selector._fd_to_key) + + +class BaseSelector(six.with_metaclass(ABCMeta)): + """Selector abstract base class. + + A selector supports registering file objects to be monitored for specific + I/O events. + + A file object is a file descriptor or any object with a `fileno()` method. + An arbitrary object can be attached to the file object, which can be used + for example to store context information, a callback, etc. + + A selector can use various implementations (select(), poll(), epoll()...) + depending on the platform. The default `Selector` class uses the most + efficient implementation on the current platform. + """ + + @abstractmethod + def register(self, fileobj, events, data=None): + """Register a file object. + + Parameters: + fileobj -- file object or file descriptor + events -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE) + data -- attached data + + Returns: + SelectorKey instance + + Raises: + ValueError if events is invalid + KeyError if fileobj is already registered + OSError if fileobj is closed or otherwise is unacceptable to + the underlying system call (if a system call is made) + + Note: + OSError may or may not be raised + """ + raise NotImplementedError + + @abstractmethod + def unregister(self, fileobj): + """Unregister a file object. + + Parameters: + fileobj -- file object or file descriptor + + Returns: + SelectorKey instance + + Raises: + KeyError if fileobj is not registered + + Note: + If fileobj is registered but has since been closed this does + *not* raise OSError (even if the wrapped syscall does) + """ + raise NotImplementedError + + def modify(self, fileobj, events, data=None): + """Change a registered file object monitored events or attached data. + + Parameters: + fileobj -- file object or file descriptor + events -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE) + data -- attached data + + Returns: + SelectorKey instance + + Raises: + Anything that unregister() or register() raises + """ + self.unregister(fileobj) + return self.register(fileobj, events, data) + + @abstractmethod + def select(self, timeout=None): + """Perform the actual selection, until some monitored file objects are + ready or a timeout expires. + + Parameters: + timeout -- if timeout > 0, this specifies the maximum wait time, in + seconds + if timeout <= 0, the select() call won't block, and will + report the currently ready file objects + if timeout is None, select() will block until a monitored + file object becomes ready + + Returns: + list of (key, events) for ready file objects + `events` is a bitwise mask of EVENT_READ|EVENT_WRITE + """ + raise NotImplementedError + + def close(self): + """Close the selector. + + This must be called to make sure that any underlying resource is freed. + """ + pass + + def get_key(self, fileobj): + """Return the key associated to a registered file object. + + Returns: + SelectorKey for this file object + """ + mapping = self.get_map() + try: + return mapping[fileobj] + except KeyError: + raise KeyError("{0!r} is not registered".format(fileobj)) + + @abstractmethod + def get_map(self): + """Return a mapping of file objects to selector keys.""" + raise NotImplementedError + + def __enter__(self): + return self + + def __exit__(self, *args): + self.close() + + +class _BaseSelectorImpl(BaseSelector): + """Base selector implementation.""" + + def __init__(self): + # this maps file descriptors to keys + self._fd_to_key = {} + # read-only mapping returned by get_map() + self._map = _SelectorMapping(self) + + def _fileobj_lookup(self, fileobj): + """Return a file descriptor from a file object. + + This wraps _fileobj_to_fd() to do an exhaustive search in case + the object is invalid but we still have it in our map. This + is used by unregister() so we can unregister an object that + was previously registered even if it is closed. It is also + used by _SelectorMapping. + """ + try: + return _fileobj_to_fd(fileobj) + except ValueError: + # Do an exhaustive search. + for key in self._fd_to_key.values(): + if key.fileobj is fileobj: + return key.fd + # Raise ValueError after all. + raise + + def register(self, fileobj, events, data=None): + if (not events) or (events & ~(EVENT_READ | EVENT_WRITE)): + raise ValueError("Invalid events: {0!r}".format(events)) + + key = SelectorKey(fileobj, self._fileobj_lookup(fileobj), events, data) + + if key.fd in self._fd_to_key: + raise KeyError("{0!r} (FD {1}) is already registered" + .format(fileobj, key.fd)) + + self._fd_to_key[key.fd] = key + return key + + def unregister(self, fileobj): + try: + key = self._fd_to_key.pop(self._fileobj_lookup(fileobj)) + except KeyError: + raise KeyError("{0!r} is not registered".format(fileobj)) + return key + + def modify(self, fileobj, events, data=None): + # TODO: Subclasses can probably optimize this even further. + try: + key = self._fd_to_key[self._fileobj_lookup(fileobj)] + except KeyError: + raise KeyError("{0!r} is not registered".format(fileobj)) + if events != key.events: + self.unregister(fileobj) + key = self.register(fileobj, events, data) + elif data != key.data: + # Use a shortcut to update the data. + key = key._replace(data=data) + self._fd_to_key[key.fd] = key + return key + + def close(self): + self._fd_to_key.clear() + + def get_map(self): + return self._map + + def _key_from_fd(self, fd): + """Return the key associated to a given file descriptor. + + Parameters: + fd -- file descriptor + + Returns: + corresponding key, or None if not found + """ + try: + return self._fd_to_key[fd] + except KeyError: + return None + + +class SelectSelector(_BaseSelectorImpl): + """Select-based selector.""" + + def __init__(self): + super(SelectSelector, self).__init__() + self._readers = set() + self._writers = set() + + def register(self, fileobj, events, data=None): + key = super(SelectSelector, self).register(fileobj, events, data) + if events & EVENT_READ: + self._readers.add(key.fd) + if events & EVENT_WRITE: + self._writers.add(key.fd) + return key + + def unregister(self, fileobj): + key = super(SelectSelector, self).unregister(fileobj) + self._readers.discard(key.fd) + self._writers.discard(key.fd) + return key + + if sys.platform == 'win32': + def _select(self, r, w, _, timeout=None): + r, w, x = select.select(r, w, w, timeout) + return r, w + x, [] + else: + _select = select.select + + def select(self, timeout=None): + timeout = None if timeout is None else max(timeout, 0) + ready = [] + try: + r, w, _ = wrap_error(self._select, + self._readers, self._writers, [], timeout) + except InterruptedError: + return ready + r = set(r) + w = set(w) + for fd in r | w: + events = 0 + if fd in r: + events |= EVENT_READ + if fd in w: + events |= EVENT_WRITE + + key = self._key_from_fd(fd) + if key: + ready.append((key, events & key.events)) + return ready + + +if hasattr(select, 'poll'): + + class PollSelector(_BaseSelectorImpl): + """Poll-based selector.""" + + def __init__(self): + super(PollSelector, self).__init__() + self._poll = select.poll() + + def register(self, fileobj, events, data=None): + key = super(PollSelector, self).register(fileobj, events, data) + poll_events = 0 + if events & EVENT_READ: + poll_events |= select.POLLIN + if events & EVENT_WRITE: + poll_events |= select.POLLOUT + self._poll.register(key.fd, poll_events) + return key + + def unregister(self, fileobj): + key = super(PollSelector, self).unregister(fileobj) + self._poll.unregister(key.fd) + return key + + def select(self, timeout=None): + if timeout is None: + timeout = None + elif timeout <= 0: + timeout = 0 + else: + # poll() has a resolution of 1 millisecond, round away from + # zero to wait *at least* timeout seconds. + timeout = int(math.ceil(timeout * 1e3)) + ready = [] + try: + fd_event_list = wrap_error(self._poll.poll, timeout) + except InterruptedError: + return ready + for fd, event in fd_event_list: + events = 0 + if event & ~select.POLLIN: + events |= EVENT_WRITE + if event & ~select.POLLOUT: + events |= EVENT_READ + + key = self._key_from_fd(fd) + if key: + ready.append((key, events & key.events)) + return ready + + +if hasattr(select, 'epoll'): + + class EpollSelector(_BaseSelectorImpl): + """Epoll-based selector.""" + + def __init__(self): + super(EpollSelector, self).__init__() + self._epoll = select.epoll() + + def fileno(self): + return self._epoll.fileno() + + def register(self, fileobj, events, data=None): + key = super(EpollSelector, self).register(fileobj, events, data) + epoll_events = 0 + if events & EVENT_READ: + epoll_events |= select.EPOLLIN + if events & EVENT_WRITE: + epoll_events |= select.EPOLLOUT + self._epoll.register(key.fd, epoll_events) + return key + + def unregister(self, fileobj): + key = super(EpollSelector, self).unregister(fileobj) + try: + self._epoll.unregister(key.fd) + except OSError: + # This can happen if the FD was closed since it + # was registered. + pass + return key + + def select(self, timeout=None): + if timeout is None: + timeout = -1 + elif timeout <= 0: + timeout = 0 + else: + # epoll_wait() has a resolution of 1 millisecond, round away + # from zero to wait *at least* timeout seconds. + timeout = math.ceil(timeout * 1e3) * 1e-3 + max_ev = len(self._fd_to_key) + ready = [] + try: + fd_event_list = wrap_error(self._epoll.poll, timeout, max_ev) + except InterruptedError: + return ready + for fd, event in fd_event_list: + events = 0 + if event & ~select.EPOLLIN: + events |= EVENT_WRITE + if event & ~select.EPOLLOUT: + events |= EVENT_READ + + key = self._key_from_fd(fd) + if key: + ready.append((key, events & key.events)) + return ready + + def close(self): + self._epoll.close() + super(EpollSelector, self).close() + + +if hasattr(select, 'devpoll'): + + class DevpollSelector(_BaseSelectorImpl): + """Solaris /dev/poll selector.""" + + def __init__(self): + super(DevpollSelector, self).__init__() + self._devpoll = select.devpoll() + + def fileno(self): + return self._devpoll.fileno() + + def register(self, fileobj, events, data=None): + key = super(DevpollSelector, self).register(fileobj, events, data) + poll_events = 0 + if events & EVENT_READ: + poll_events |= select.POLLIN + if events & EVENT_WRITE: + poll_events |= select.POLLOUT + self._devpoll.register(key.fd, poll_events) + return key + + def unregister(self, fileobj): + key = super(DevpollSelector, self).unregister(fileobj) + self._devpoll.unregister(key.fd) + return key + + def select(self, timeout=None): + if timeout is None: + timeout = None + elif timeout <= 0: + timeout = 0 + else: + # devpoll() has a resolution of 1 millisecond, round away from + # zero to wait *at least* timeout seconds. + timeout = math.ceil(timeout * 1e3) + ready = [] + try: + fd_event_list = self._devpoll.poll(timeout) + except InterruptedError: + return ready + for fd, event in fd_event_list: + events = 0 + if event & ~select.POLLIN: + events |= EVENT_WRITE + if event & ~select.POLLOUT: + events |= EVENT_READ + + key = self._key_from_fd(fd) + if key: + ready.append((key, events & key.events)) + return ready + + def close(self): + self._devpoll.close() + super(DevpollSelector, self).close() + + +if hasattr(select, 'kqueue'): + + class KqueueSelector(_BaseSelectorImpl): + """Kqueue-based selector.""" + + def __init__(self): + super(KqueueSelector, self).__init__() + self._kqueue = select.kqueue() + + def fileno(self): + return self._kqueue.fileno() + + def register(self, fileobj, events, data=None): + key = super(KqueueSelector, self).register(fileobj, events, data) + if events & EVENT_READ: + kev = select.kevent(key.fd, select.KQ_FILTER_READ, + select.KQ_EV_ADD) + self._kqueue.control([kev], 0, 0) + if events & EVENT_WRITE: + kev = select.kevent(key.fd, select.KQ_FILTER_WRITE, + select.KQ_EV_ADD) + self._kqueue.control([kev], 0, 0) + return key + + def unregister(self, fileobj): + key = super(KqueueSelector, self).unregister(fileobj) + if key.events & EVENT_READ: + kev = select.kevent(key.fd, select.KQ_FILTER_READ, + select.KQ_EV_DELETE) + try: + self._kqueue.control([kev], 0, 0) + except OSError: + # This can happen if the FD was closed since it + # was registered. + pass + if key.events & EVENT_WRITE: + kev = select.kevent(key.fd, select.KQ_FILTER_WRITE, + select.KQ_EV_DELETE) + try: + self._kqueue.control([kev], 0, 0) + except OSError: + # See comment above. + pass + return key + + def select(self, timeout=None): + timeout = None if timeout is None else max(timeout, 0) + max_ev = len(self._fd_to_key) + ready = [] + try: + kev_list = wrap_error(self._kqueue.control, + None, max_ev, timeout) + except InterruptedError: + return ready + for kev in kev_list: + fd = kev.ident + flag = kev.filter + events = 0 + if flag == select.KQ_FILTER_READ: + events |= EVENT_READ + if flag == select.KQ_FILTER_WRITE: + events |= EVENT_WRITE + + key = self._key_from_fd(fd) + if key: + ready.append((key, events & key.events)) + return ready + + def close(self): + self._kqueue.close() + super(KqueueSelector, self).close() + + +# Choose the best implementation: roughly, epoll|kqueue|devpoll > poll > select. +# select() also can't accept a FD > FD_SETSIZE (usually around 1024) +if 'KqueueSelector' in globals(): + DefaultSelector = KqueueSelector +elif 'EpollSelector' in globals(): + DefaultSelector = EpollSelector +elif 'DevpollSelector' in globals(): + DefaultSelector = DevpollSelector +elif 'PollSelector' in globals(): + DefaultSelector = PollSelector +else: + DefaultSelector = SelectSelector diff --git a/setup.py b/setup.py index a0486b6a..e302a3d2 100644 --- a/setup.py +++ b/setup.py @@ -44,6 +44,10 @@ fname = os.path.join(os.path.dirname(__file__), 'requirements_dev.txt') with open(fname) as f: tests_require = list(map(lambda l: l.strip(), f.readlines())) +if sys.version_info[:2] < (3, 3): + tests_require.append('mock') +if sys.version_info[:2] < (2, 7): + tests_require.append('unittest2') class PyTestCommand(TestCommand): user_options = [ diff --git a/tests/support.py b/tests/support.py new file mode 100644 index 00000000..1fffa408 --- /dev/null +++ b/tests/support.py @@ -0,0 +1,33 @@ +import functools +import sys +import unittest + +HOST = "127.0.0.1" + + +def requires_mac_ver(*min_version): + """Decorator raising SkipTest if the OS is Mac OS X and the OS X + version if less than min_version. + + For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version + is lesser than 10.5. + """ + def decorator(func): + @functools.wraps(func) + def wrapper(*args, **kw): + if sys.platform == 'darwin': + version_txt = platform.mac_ver()[0] + try: + version = tuple(map(int, version_txt.split('.'))) + except ValueError: + pass + else: + if version < min_version: + min_version_txt = '.'.join(map(str, min_version)) + raise unittest.SkipTest( + "Mac OS X %s or higher required, not %s" + % (min_version_txt, version_txt)) + return func(*args, **kw) + wrapper.min_version = min_version + return wrapper + return decorator diff --git a/tests/test_selectors.py b/tests/test_selectors.py new file mode 100644 index 00000000..647c50ed --- /dev/null +++ b/tests/test_selectors.py @@ -0,0 +1,442 @@ +# The following code adapted from CPython (see Lib/test/test_selectors.py) + +import errno +import os +import random +import signal +import socket +from time import sleep +try: + import unittest2 as unittest +except ImportError: + import unittest +try: + import unittest.mock as mock +except ImportError: + import mock +try: + from time import monotonic as time +except ImportError: + from time import time +try: + import resource +except ImportError: + resource = None + +from gunicorn import selectors +import support + + +if hasattr(socket, 'socketpair'): + socketpair = socket.socketpair +else: + def socketpair(family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0): + with socket.socket(family, type, proto) as l: + l.bind((support.HOST, 0)) + l.listen() + c = socket.socket(family, type, proto) + try: + c.connect(l.getsockname()) + caddr = c.getsockname() + while True: + a, addr = l.accept() + # check that we've got the correct client + if addr == caddr: + return c, a + a.close() + except OSError: + c.close() + raise + + +def find_ready_matching(ready, flag): + match = [] + for key, events in ready: + if events & flag: + match.append(key.fileobj) + return match + + +class BaseSelectorTestCase(object): + + def make_socketpair(self): + rd, wr = socketpair() + self.addCleanup(rd.close) + self.addCleanup(wr.close) + return rd, wr + + def test_register(self): + s = self.SELECTOR() + self.addCleanup(s.close) + + rd, wr = self.make_socketpair() + + key = s.register(rd, selectors.EVENT_READ, "data") + self.assertIsInstance(key, selectors.SelectorKey) + self.assertEqual(key.fileobj, rd) + self.assertEqual(key.fd, rd.fileno()) + self.assertEqual(key.events, selectors.EVENT_READ) + self.assertEqual(key.data, "data") + + # register an unknown event + self.assertRaises(ValueError, s.register, 0, 999999) + + # register an invalid FD + self.assertRaises(ValueError, s.register, -10, selectors.EVENT_READ) + + # register twice + self.assertRaises(KeyError, s.register, rd, selectors.EVENT_READ) + + # register the same FD, but with a different object + self.assertRaises(KeyError, s.register, rd.fileno(), + selectors.EVENT_READ) + + def test_unregister(self): + s = self.SELECTOR() + self.addCleanup(s.close) + + rd, wr = self.make_socketpair() + + s.register(rd, selectors.EVENT_READ) + s.unregister(rd) + + # unregister an unknown file obj + self.assertRaises(KeyError, s.unregister, 999999) + + # unregister twice + self.assertRaises(KeyError, s.unregister, rd) + + def test_unregister_after_fd_close(self): + s = self.SELECTOR() + self.addCleanup(s.close) + rd, wr = self.make_socketpair() + r, w = rd.fileno(), wr.fileno() + s.register(r, selectors.EVENT_READ) + s.register(w, selectors.EVENT_WRITE) + rd.close() + wr.close() + s.unregister(r) + s.unregister(w) + + def test_unregister_after_socket_close(self): + s = self.SELECTOR() + self.addCleanup(s.close) + rd, wr = self.make_socketpair() + s.register(rd, selectors.EVENT_READ) + s.register(wr, selectors.EVENT_WRITE) + rd.close() + wr.close() + s.unregister(rd) + s.unregister(wr) + + def test_modify(self): + s = self.SELECTOR() + self.addCleanup(s.close) + + rd, wr = self.make_socketpair() + + key = s.register(rd, selectors.EVENT_READ) + + # modify events + key2 = s.modify(rd, selectors.EVENT_WRITE) + self.assertNotEqual(key.events, key2.events) + self.assertEqual(key2, s.get_key(rd)) + + s.unregister(rd) + + # modify data + d1 = object() + d2 = object() + + key = s.register(rd, selectors.EVENT_READ, d1) + key2 = s.modify(rd, selectors.EVENT_READ, d2) + self.assertEqual(key.events, key2.events) + self.assertNotEqual(key.data, key2.data) + self.assertEqual(key2, s.get_key(rd)) + self.assertEqual(key2.data, d2) + + # modify unknown file obj + self.assertRaises(KeyError, s.modify, 999999, selectors.EVENT_READ) + + # modify use a shortcut + d3 = object() + s.register = mock.Mock() + s.unregister = mock.Mock() + + s.modify(rd, selectors.EVENT_READ, d3) + self.assertFalse(s.register.called) + self.assertFalse(s.unregister.called) + + def test_close(self): + s = self.SELECTOR() + self.addCleanup(s.close) + + rd, wr = self.make_socketpair() + + s.register(rd, selectors.EVENT_READ) + s.register(wr, selectors.EVENT_WRITE) + + s.close() + self.assertRaises(KeyError, s.get_key, rd) + self.assertRaises(KeyError, s.get_key, wr) + + def test_get_key(self): + s = self.SELECTOR() + self.addCleanup(s.close) + + rd, wr = self.make_socketpair() + + key = s.register(rd, selectors.EVENT_READ, "data") + self.assertEqual(key, s.get_key(rd)) + + # unknown file obj + self.assertRaises(KeyError, s.get_key, 999999) + + def test_get_map(self): + s = self.SELECTOR() + self.addCleanup(s.close) + + rd, wr = self.make_socketpair() + + keys = s.get_map() + self.assertFalse(keys) + self.assertEqual(len(keys), 0) + self.assertEqual(list(keys), []) + key = s.register(rd, selectors.EVENT_READ, "data") + self.assertIn(rd, keys) + self.assertEqual(key, keys[rd]) + self.assertEqual(len(keys), 1) + self.assertEqual(list(keys), [rd.fileno()]) + self.assertEqual(list(keys.values()), [key]) + + # unknown file obj + with self.assertRaises(KeyError): + keys[999999] + + # Read-only mapping + with self.assertRaises(TypeError): + del keys[rd] + + def test_select(self): + s = self.SELECTOR() + self.addCleanup(s.close) + + rd, wr = self.make_socketpair() + + s.register(rd, selectors.EVENT_READ) + wr_key = s.register(wr, selectors.EVENT_WRITE) + + result = s.select() + for key, events in result: + self.assertTrue(isinstance(key, selectors.SelectorKey)) + self.assertTrue(events) + self.assertFalse(events & ~(selectors.EVENT_READ | + selectors.EVENT_WRITE)) + + self.assertEqual([(wr_key, selectors.EVENT_WRITE)], result) + + def test_context_manager(self): + s = self.SELECTOR() + self.addCleanup(s.close) + + rd, wr = self.make_socketpair() + + with s as sel: + sel.register(rd, selectors.EVENT_READ) + sel.register(wr, selectors.EVENT_WRITE) + + self.assertRaises(KeyError, s.get_key, rd) + self.assertRaises(KeyError, s.get_key, wr) + + def test_fileno(self): + s = self.SELECTOR() + self.addCleanup(s.close) + + if hasattr(s, 'fileno'): + fd = s.fileno() + self.assertTrue(isinstance(fd, int)) + self.assertGreaterEqual(fd, 0) + + def test_selector(self): + s = self.SELECTOR() + self.addCleanup(s.close) + + NUM_SOCKETS = 12 + MSG = b" This is a test." + MSG_LEN = len(MSG) + readers = [] + writers = [] + r2w = {} + w2r = {} + + for i in range(NUM_SOCKETS): + rd, wr = self.make_socketpair() + s.register(rd, selectors.EVENT_READ) + s.register(wr, selectors.EVENT_WRITE) + readers.append(rd) + writers.append(wr) + r2w[rd] = wr + w2r[wr] = rd + + bufs = [] + + while writers: + ready = s.select() + ready_writers = find_ready_matching(ready, selectors.EVENT_WRITE) + if not ready_writers: + self.fail("no sockets ready for writing") + wr = random.choice(ready_writers) + wr.send(MSG) + + for i in range(10): + ready = s.select() + ready_readers = find_ready_matching(ready, + selectors.EVENT_READ) + if ready_readers: + break + # there might be a delay between the write to the write end and + # the read end is reported ready + sleep(0.1) + else: + self.fail("no sockets ready for reading") + self.assertEqual([w2r[wr]], ready_readers) + rd = ready_readers[0] + buf = rd.recv(MSG_LEN) + self.assertEqual(len(buf), MSG_LEN) + bufs.append(buf) + s.unregister(r2w[rd]) + s.unregister(rd) + writers.remove(r2w[rd]) + + self.assertEqual(bufs, [MSG] * NUM_SOCKETS) + + def test_timeout(self): + s = self.SELECTOR() + self.addCleanup(s.close) + + rd, wr = self.make_socketpair() + + s.register(wr, selectors.EVENT_WRITE) + t = time() + self.assertEqual(1, len(s.select(0))) + self.assertEqual(1, len(s.select(-1))) + self.assertLess(time() - t, 0.5) + + s.unregister(wr) + s.register(rd, selectors.EVENT_READ) + t = time() + self.assertFalse(s.select(0)) + self.assertFalse(s.select(-1)) + self.assertLess(time() - t, 0.5) + + t0 = time() + self.assertFalse(s.select(1)) + t1 = time() + dt = t1 - t0 + self.assertTrue(0.8 <= dt <= 1.6, dt) + + @unittest.skipUnless(hasattr(signal, "alarm"), + "signal.alarm() required for this test") + def test_select_interrupt(self): + s = self.SELECTOR() + self.addCleanup(s.close) + + rd, wr = self.make_socketpair() + + orig_alrm_handler = signal.signal(signal.SIGALRM, lambda *args: None) + self.addCleanup(signal.signal, signal.SIGALRM, orig_alrm_handler) + self.addCleanup(signal.alarm, 0) + + signal.alarm(1) + + s.register(rd, selectors.EVENT_READ) + t = time() + self.assertFalse(s.select(2)) + self.assertLess(time() - t, 2.5) + + +class ScalableSelectorMixIn: + + # see issue #18963 for why it's skipped on older OS X versions + @support.requires_mac_ver(10, 5) + @unittest.skipUnless(resource, "Test needs resource module") + def test_above_fd_setsize(self): + # A scalable implementation should have no problem with more than + # FD_SETSIZE file descriptors. Since we don't know the value, we just + # try to set the soft RLIMIT_NOFILE to the hard RLIMIT_NOFILE ceiling. + soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) + try: + resource.setrlimit(resource.RLIMIT_NOFILE, (hard, hard)) + self.addCleanup(resource.setrlimit, resource.RLIMIT_NOFILE, + (soft, hard)) + NUM_FDS = min(hard, 2**16) + except (OSError, ValueError): + NUM_FDS = soft + + # guard for already allocated FDs (stdin, stdout...) + NUM_FDS -= 32 + + s = self.SELECTOR() + self.addCleanup(s.close) + + for i in range(NUM_FDS // 2): + try: + rd, wr = self.make_socketpair() + except OSError: + # too many FDs, skip - note that we should only catch EMFILE + # here, but apparently *BSD and Solaris can fail upon connect() + # or bind() with EADDRNOTAVAIL, so let's be safe + self.skipTest("FD limit reached") + + try: + s.register(rd, selectors.EVENT_READ) + s.register(wr, selectors.EVENT_WRITE) + except OSError as e: + if e.errno == errno.ENOSPC: + # this can be raised by epoll if we go over + # fs.epoll.max_user_watches sysctl + self.skipTest("FD limit reached") + raise + + self.assertEqual(NUM_FDS // 2, len(s.select())) + + +class DefaultSelectorTestCase(BaseSelectorTestCase, unittest.TestCase): + + SELECTOR = selectors.DefaultSelector + + +class SelectSelectorTestCase(BaseSelectorTestCase, unittest.TestCase): + + SELECTOR = selectors.SelectSelector + + +@unittest.skipUnless(hasattr(selectors, 'PollSelector'), + "Test needs selectors.PollSelector") +class PollSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn, unittest.TestCase): + + SELECTOR = getattr(selectors, 'PollSelector', None) + + +@unittest.skipUnless(hasattr(selectors, 'EpollSelector'), + "Test needs selectors.EpollSelector") +class EpollSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn, unittest.TestCase): + + SELECTOR = getattr(selectors, 'EpollSelector', None) + + +@unittest.skipUnless(hasattr(selectors, 'KqueueSelector'), + "Test needs selectors.KqueueSelector)") +class KqueueSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn, unittest.TestCase): + + SELECTOR = getattr(selectors, 'KqueueSelector', None) + + +@unittest.skipUnless(hasattr(selectors, 'DevpollSelector'), + "Test needs selectors.DevpollSelector") +class DevpollSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn, unittest.TestCase): + + SELECTOR = getattr(selectors, 'DevpollSelector', None) + +if __name__ == "__main__": + unittest.main()