From 7349c4fb9a02863d9a7fc518c14ffeaf5460f7a7 Mon Sep 17 00:00:00 2001 From: benoitc Date: Sun, 30 Mar 2014 15:27:53 +0200 Subject: [PATCH 01/25] add `--threads` param --- gunicorn/config.py | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/gunicorn/config.py b/gunicorn/config.py index f272bcc9..58ccc26f 100644 --- a/gunicorn/config.py +++ b/gunicorn/config.py @@ -91,11 +91,21 @@ class Config(object): @property def worker_class(self): uri = self.settings['worker_class'].get() + + ## are we using a threaded worker? + is_sync = uri.endswith('SyncWorker') or uri == 'sync' + if is_sync and self.threads > 1: + uri = "gunicorn.workers.gthread.ThreadedWorker" + worker_class = util.load_class(uri) if hasattr(worker_class, "setup"): worker_class.setup() return worker_class + @property + def threads(self): + return self.settings['threads'].get() + @property def workers(self): return self.settings['workers'].get() @@ -550,6 +560,27 @@ class WorkerClass(Setting): can also load the gevent class with ``egg:gunicorn#gevent`` """ +class WorkerThreads(Setting): + name = "threads" + section = "Worker Processes" + cli = ["--threads"] + meta = "INT" + validator = validate_pos_int + type = int + default = 1 + desc = """\ + The number of worker threads for handling requests. + + Run each worker in prethreaded mode with the specified number of + threads per worker. + + A positive integer generally in the 2-4 x $(NUM_CORES) range. You'll + want to vary this a bit to find the best for your particular + application's work load. + + If it is not defined, the default is 1. + """ + class WorkerConnections(Setting): name = "worker_connections" From 5f0a329b58cfefcbf11da0ab252917d50e3e8ae4 Mon Sep 17 00:00:00 2001 From: benoitc Date: Sun, 30 Mar 2014 15:28:19 +0200 Subject: [PATCH 02/25] add fdevents module This module add a new cross platform event poller to gunicorn. It allows you to listen on different fds in an efficient manner. On linux it's using epoll, bsd/darwin kqueue... --- gunicorn/fdevents.py | 467 +++++++++++++++++++++++++++++++++++++++++++ gunicorn/util.py | 4 + 2 files changed, 471 insertions(+) create mode 100644 gunicorn/fdevents.py diff --git a/gunicorn/fdevents.py b/gunicorn/fdevents.py new file mode 100644 index 00000000..3c928be6 --- /dev/null +++ b/gunicorn/fdevents.py @@ -0,0 +1,467 @@ +# -*- coding: utf-8 - +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + + +""" module implementing Poller depending on the platform. A pollster +allows you to register an fd, and retrieve events on it. """ + +import select + +from .util import fd_, close_on_exec + + +class PollerBase(object): + + def addfd(self, fd, mode, repeat=True): + """ add a filed escriptor to poll. + + Parameters: + + * fd : file descriptor or file object + * mode: 'r' to wait for read events, 'w' to wait for write events + * repeat: true or false . to continuously wait on this event or + not (default is true). + """ + + raise NotImplementedError + + def delfd(self, fd, mode): + """ stop to poll for the event on this file descriptor + + Parameters: + + * fd : file descriptor or file object + * mode: 'r' to wait for read events, 'w' to wait for write events + """ + + raise NotImplementedError + + def waitfd(self, nsec): + """ return one event from the pollster. + + return: (fd, mode) + """ + raise NotImplementedError + + def wait(self, nsec): + """ return all events raised in the pollster when calling the + function. + + return: [(fd, mode), ....] + """ + raise NotImplementedError + + def close(self): + """ close the pollster """ + raise NotImplementedError + + +class SelectPoller(PollerBase): + + def __init__(self): + self.read_fds = {} + self.write_fds = {} + self.events = [] + + def addfd(self, fd, mode, repeat=True): + fd = fd_(fd) + + if mode == 'r': + self.read_fds[fd] = repeat + else: + self.write_fds[fd] = repeat + + def delfd(self, fd, mode): + if mode == 'r' and fd in self.read_fds: + del self.read_fds[fd] + elif fd in self.write_fds: + del self.write_fds[fd] + + def _wait(self, nsec): + read_fds = [fd for fd in self.read_fds] + write_fds = [fd for fd in self.write_fds] + + if len(self.events) == 0: + try: + r, w, e = select.select(read_fds, write_fds, [], nsec) + except select.error as e: + if e.args[0] == errno.EINTR: + continue + raise + + events = [] + for fd in r: + if fd in self.read_fds: + if self.read_fds[fd] == False: + del self.read_fds[fd] + events.append((fd, 'r')) + + for fd in w: + if fd in self.write_fds: + if self.write_fds[fd] == False: + del self.write_fds[fd] + events.append((fd, 'w')) + + self.events.extend(events) + return self.events + + def waitfd(self, nsec): + self._wait(nsec) + if self.events: + return self.events.pop(0) + return None + + def wait(self, nsec): + events = self._wait(nsec) + self.events = [] + return events + + def close(self): + self.read_fds = [] + self.write_fds = [] + +if hasattr(selec, 'kqueue') + + class KQueuePoller(object): + + def __init__(self): + self.kq = select.kqueue() + close_on_exec(self.kq.fileno()) + self.events = [] + + def addfd(self, fd, mode, repeat=True): + if mode == 'r': + kmode = select.KQ_FILTER_READ + else: + kmode = select.KQ_FILTER_WRITE + + flags = select.KQ_EV_ADD + + if sys.platform.startswith("darwin"): + flags |= select.KQ_EV_ENABLE + + if not repeat: + flags |= select.KQ_EV_ONESHOT + + ev = select.kevent(fd_(fd), kmode, flags) + self.kq.control([ev], 0) + + def delfd(self, fd, mode): + if mode == 'r': + kmode = select.KQ_FILTER_READ + else: + kmode = select.KQ_FILTER_WRITE + + ev = select.kevent(fd_(fd), select.KQ_FILTER_READ, + select.KQ_EV_DELETE) + self.kq.control([ev], 0) + + def _wait(self, nsec=0): + if len(self.events) == 0: + try: + events = self.kq.control(None, 0, nsec) + except select.error as e: + if e.args[0] == errno.EINTR: + continue + raise + + # process events + all_events = [] + for ev in events: + if ev.filter == select.KQ_FILTER_READ: + mode = 'r' + else: + mode = 'w' + all_events.append((fd_(ev.ident), mode)) + + self.events.extend(all_events) + + # return all events + return self.events + + def waitfd(self, nsec=0): + self._wait(nsec) + if self.events: + return self.events.pop(0) + return None + + def wait(self, nsec=0): + events = self._wait(nsec) + self.events = [] + return events + + def close(self): + self.kq.close() + +if hasattr(select, "epoll"): + class EpollPoller(object): + + def __init__(self): + self.poll = select.epoll() + close_on_exec(self.poll.fileno()) + self.fds = {} + self.events = [] + + def addfd(self, fd, mode, repeat=True): + if mode == 'r': + mode = (select.EPOLLIN, repeat) + else: + mode = (select.EPOLLOUT, repeat) + + if fd in self.fds: + modes = self.fds[fd] + if mode in self.fds[fd]: + # already registered for this mode + return + modes.append(mode) + addfd_ = self.poll.modify + else: + modes = [mode] + addfd_ = self.poll.register + + # append the new mode to fds + self.fds[fd] = modes + + mask = 0 + for mode, r in modes: + mask |= mode + + if not repeat: + mask |= select.EPOLLONESHOT + + addfd_(fd, mask) + + def delfd(self, fd, mode): + if mode == 'r': + mode = select.POLLIN | select.POLLPRI + else: + mode = select.POLLOUT + + if fd not in self.fds: + return + + modes = [] + for m, r in self.fds[fd]: + if mode != m: + modes.append((m, r)) + + if not modes: + # del the fd from the poll + self.poll.unregister(fd) + del self.fds[fd] + else: + # modify the fd in the poll + self.fds[fd] = modes + m, r = modes[0] + mask = m[0] + if r: + mask |= select.EPOLLONESHOT + + self.poll.modify(fd, mask) + + def _wait(self, nsec=0): + # wait for the events + if len(self.events) == 0: + try: + events = self.poll.poll(nsec) + except select.error as e: + if e.args[0] == errno.EINTR: + continue + raise + + if events: + all_events = [] + fds = {} + for fd, ev in self.events: + fd = fd_(fd) + if ev == select.EPOLLIN: + mode = 'r' + else: + mode = 'w' + + all_events.append((fd, mode)) + + if fd in fds: + fds[fd].append(mode) + else: + fds[fd] = [mode] + + # eventually remove the mode from the list if repeat + # was set to False and modify the poll if needed. + modes = [] + for m, r in self.fds[fd]: + if not r: + continue + modes.append(m, r) + + if modes != self.fds[fd]: + self.fds[fd] = modes + mask = 0 + for m, r in modes: + mask |= m + self.poll.modify(fd, mask) + + self.events.extend(all_events) + + # return all events + return self.events + + def waitfd(self, nsec=0): + self._wait(nsec) + return self.events.pop(0) + + def wait(self, nsec=0): + events = self._wait(nsec) + self.events = [] + return events + + def close(self): + self.poll.close() + + +if hasattr(select, "poll") or hasattr(select, "epoll"): + + class _PollerBase(object): + + POLL_IMPL = None + + def __init__(self): + self.poll = self.POLL_IMPL() + self.fds = {} + self.events = [] + + def addfd(self, fd, mode, repeat=True): + fd = fd_(fd) + if mode == 'r': + mode = (select.POLLIN, repeat) + else: + mode = (select.POLLOUT, repeat) + + if fd in self.fds: + modes = self.fds[fd] + if mode in modes: + # already registered for this mode + return + modes.append(mode) + addfd_ = self.poll.modify + else: + modes = [mode] + addfd_ = self.poll.register + + # append the new mode to fds + self.fds[fd] = modes + + mask = 0 + for mode, r in modes: + mask |= mode + + addfd_(fd, mask) + + def delfd(self, fd, mode): + fd = fd_(fd) + + if mode == 'r': + mode = select.POLLIN | select.POLLPRI + else: + mode = select.POLLOUT + + if fd not in self.fds: + return + + modes = [] + for m, r in self.fds[fd]: + if mode != m: + modes.append((m, r)) + + if not modes: + # del the fd from the poll + self.poll.unregister(fd) + del self.fds[fd] + else: + # modify the fd in the poll + self.fds[fd] = modes + m, r = modes[0] + mask = m[0] + self.poll.modify(fd, mask) + + def _wait(self, nsec=0): + # wait for the events + if len(self.events) == 0: + try: + events = self.poll.poll(nsec) + except select.error as e: + if e.args[0] == errno.EINTR: + continue + raise + + all_events = [] + for fd, ev in events: + fd = fd_(fd) + + if fd not in self.fds: + continue + + if ev == select.POLLIN or ev == select.POLLPRI: + mode = 'r' + else: + mode = 'w' + + # add new event to the list + all_events.append((fd, mode)) + + # eventually remove the mode from the list if repeat + # was set to False and modify the poll if needed. + modes = [] + for m, r in self.fds[fd]: + if not r: + continue + modes.append(m, r) + + if not modes: + self.poll.unregister(fd) + else: + mask = 0 + if modes != self.fds[fd]: + mask |= m + self.poll.modify(fd, mask) + + + self.events.extend(all_events) + return self.events + + def waitfd(self, nsec=0): + self._wait(nsec) + if self.events: + return self.events.pop(0) + return None + + def close(self): + for fd in self.fds: + self.poll.unregister(fd) + self.fds = [] + self.poll = None + + + if hasattr(select, "devpoll"): + + class DevPollPoller(_PollerBase): + POLL_IMPL = select.devpoll + + if hasattr(select, "poll"): + class PollPoller(_PollerBase): + POLL_IMPL = select.poll + + +# choose the best implementation depending on the platform. +if 'KqueuePoller' in globals(): + DefaultPoller = KqueuePoller +elif 'EpollPoller' in globals(): + DefaultPoller = EpollPoller +elif 'DevpollPoller' in globals(): + DefaultPoller = DevpollPoller +elif 'PollPoller' in globals(): + DefaultPoller = PollPoller +else: + DefaultPoller = SelectPoller diff --git a/gunicorn/util.py b/gunicorn/util.py index de0ef613..07db3dba 100644 --- a/gunicorn/util.py +++ b/gunicorn/util.py @@ -269,6 +269,10 @@ def set_non_blocking(fd): flags = fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK fcntl.fcntl(fd, fcntl.F_SETFL, flags) +def fd_(fd): + if hasattr(fd, "fileno"): + return int(fd.fileno()) + return fd def close(sock): try: From 67866f275fe673324a240148f76e240c10cb0b6a Mon Sep 17 00:00:00 2001 From: benoitc Date: Tue, 13 May 2014 10:21:48 +0200 Subject: [PATCH 03/25] add missing gthreads worker --- gunicorn/fdevents.py | 2 +- gunicorn/workers/gthread.py | 301 ++++++++++++++++++++++++++++++++++++ 2 files changed, 302 insertions(+), 1 deletion(-) create mode 100644 gunicorn/workers/gthread.py diff --git a/gunicorn/fdevents.py b/gunicorn/fdevents.py index 3c928be6..ccd5bd61 100644 --- a/gunicorn/fdevents.py +++ b/gunicorn/fdevents.py @@ -17,7 +17,7 @@ class PollerBase(object): def addfd(self, fd, mode, repeat=True): """ add a filed escriptor to poll. - Parameters: + fdevent Parameters: * fd : file descriptor or file object * mode: 'r' to wait for read events, 'w' to wait for write events diff --git a/gunicorn/workers/gthread.py b/gunicorn/workers/gthread.py new file mode 100644 index 00000000..b6d6247b --- /dev/null +++ b/gunicorn/workers/gthread.py @@ -0,0 +1,301 @@ +# -*- coding: utf-8 - +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +# design: +# a threaded worker accepts connections in the main loop, accepted +# connections are are added to the thread pool as a connection job. On +# keepalive connections are put back in the loop waiting for an event. +# If no event happen after the keep alive timeout, the connectoin is +# closed. + + +import concurrent.futures as futures +from datetime import datetime +import errno +import heapq +import os +import socket +import ssl +import sys +import time + +from .. import http +from ..http import wsgi +from .. import fdevents +from .. import util +from . import base +from .. import six + + +class TConn(): + + def __init__(self, worker, listener, sock, addr): + self.listener = listener + self.sock = sock + self.addr = addr + self.when = fs.timeout + + # set the timeout + self.timeout = time.time() + worker.cfg.keepalive + + def __lt__(self, other): + return self.timeout < other.timeout + + __cmp__ = __lt__ + + +class ThreadWorker(base.worker): + + def __init__(self, *args, **kwargs): + super(ThreadWorker, self).__init__(*args, **kwargs) + # initialise the pool + self.tpool = futures.ThreadPoolExecutor(max_workers=self.cfg.threads) + self.poller = fdevents.DefaultPoller() + self.futures = set() + self._heap = [] + self.keepalived = {} + + def _wrap(self, fs, listener, client, addr): + fs.listener = listener + fs.sock = client + fs.addr = addr + + def run(self): + for s in self.sockets: + s.setblocking(False) + self.poller.add_fd(s, 'r') + + listeners = dict([(s.fileno(), s) for s in self.sockets]) + while self.alive: + + # If our parent changed then we shut down. + if self.ppid != os.getppid(): + self.log.info("Parent changed, shutting down: %s", self) + return + + # notify the arbiter we are alive + self.notify() + + events = self.poller.wait(0.1) + if events: + for (fd, mode) in events: + fs = None + client = None + if fd in listeners: + # start to accept connections + try: + client, addr = sock.accept() + + # add a job to the pool + fs = self.tpool.submit(self.handle, listeners[fd], + client, addr, False) + + self._wrap_future(fs, listemers[fd], + client, addr) + + except socket.error as e: + if e.args[0] not in (errno.EAGAIN, + errno.ECONNABORTED, errno.EWOULDBLOCK): + raise + else: + # keepalive connection + if fd in self.keepalived: + # get the client connection + client = self.keepalived[fd] + + # remove it from the heap + try: + del self._heap[operator.indexOf(self._heap, t)] + except (KeyError, IndexError): + pass + + # add a job to the pool + fs = self.tpool.submit(self.handle, client.listener, + client.sock, client.addr, True) + + self._wrap_future(fs, client.listener, + client.sock, client.addr) + + if fs is not None: + self.futures.add(fs) + + # handle jobs, we give a chance to all jobs to be executed. + if self.futures: + res = futures.wait([fs for fs in self.futures], + timeout=self.timeout, + return_when=futures.ALL_COMPLETED) + + for fs in res: + # remove the future from our list + self.futures.remove(fs) + + try: + result = fs.result() + # if the connection should be kept alived add it + # to the eventloop and record it + if result and result is not None: + # flag the socket as non blocked + fs.sock.setblocking(0) + util.close_on_exec(fs.sock) + + tconn = TConn(self, fs.listener, fs.sock, + fs.addr) + + # register the connection + heapq.heappush(self._heap, tconn) + self.keepalived[fs.sock.fileno()] = tconn + + # add the socket to the event loop + self.poller.add_fd(fs.sock.fileno(), 'r') + else: + # at this point the connection should be + # closed but we make sure it is. + util.close(fs.sock) + except: + # an exception happened, make sure to close the + # socket. + util.close(fs.sock) + + + # hanle keepalive timeouts + now = time.time() + while True: + if not len(self._heap): + continue + + conn = heapq.heappop(self._heap) + delta = t.timeout = now + if delta > 0: + heapq.heappush(self._heap, t) + break + else: + # remove the socket from the poller + self.poller.del_fd(conn.sock.fileno(), 'r') + # close the socket + conn.sock.close() + + + # shutdown the pool + self.tpool.shutdown(False) + + # wait for the workers + futures.wait([fs for fs in self.futures], + timeout=self.cfg.graceful_timeout) + + # if we have still fures running, try to close them + for fs in self.futures: + sock = fs.sock + + # the future is not running, cancel it + if not fs.done() and not fs.running(): + fs.cancel() + + # make sure we close the sockets after the graceful timeout + util.close(sock) + + + def handle(self, listener, client, addr, keepalived): + keepalive = False + try: + # wrap the connection + if not keepalived and self.cfg.is_ssl: + client = ssl.wrap_socket(client, server_side=True, + **self.cfg.ssl_options) + + client.setblocking(1) + util.close_on_exec(client) + + parser = http.RequestParser(self.cfg, sock) + req = six.next(parser) + + # handle the request + keepalive = self.handle_request(listener, req, client, addr) + except http.errors.NoMoreData as e: + self.log.debug("Ignored premature client disconnection. %s", e) + + except StopIteration as e: + self.log.debug("Closing connection. %s", e) + + except ssl.SSLError as e: + if e.args[0] == ssl.SSL_ERROR_EOF: + self.log.debug("ssl connection closed") + client.close() + else: + self.log.debug("Error processing SSL request.") + self.handle_error(req, client, addr, e) + + except socket.error as e: + if e.args[0] not in (errno.EPIPE, errno.ECONNRESET): + self.log.exception("Socket error processing request.") + else: + if e.args[0] == errno.ECONNRESET: + self.log.debug("Ignoring connection reset") + else: + self.log.debug("Ignoring EPIPE") + except Exception as e: + self.handle_error(req, client, addr, e) + finally: + if not keepalive: + util.close(client) + return keepalive + + def handle_request(self, listener, req, client, addr): + environ = {} + resp = None + try: + self.cfg.pre_request(self, req) + request_start = datetime.now() + resp, environ = wsgi.create(req, client, addr, + listener.getsockname(), self.cfg) + environ["wsgi.multithread"] = True + + self.nr += 1 + if self.nr >= self.max_requests: + self.log.info("Autorestarting worker after current request.") + self.alive = False + + if not self.cfg.keepalive: + resp.force_close() + + respiter = self.wsgi(environ, resp.start_response) + try: + if isinstance(respiter, environ['wsgi.file_wrapper']): + resp.write_file(respiter) + else: + for item in respiter: + resp.write(item) + resp.close() + request_time = datetime.now() - request_start + self.log.access(resp, req, environ, request_time) + finally: + if hasattr(respiter, "close"): + respiter.close() + + if resp.should_close(): + raise StopIteration() + + except socket.error: + exc_info = sys.exc_info() + # pass to next try-except level + six.reraise(exc_info[0], exc_info[1], exc_info[2]) + except Exception: + if resp and resp.headers_sent: + # If the requests have already been sent, we should close the + # connection to indicate the error. + self.log.exception("Error handling request") + try: + client.shutdown(socket.SHUT_RDWR) + client.close() + except socket.error: + pass + raise StopIteration() + raise + finally: + try: + self.cfg.post_request(self, req, environ, resp) + except Exception: + self.log.exception("Exception in post_request hook") + + return True From c8f6269f29701db7da7e5b6afa591c59f8076e2c Mon Sep 17 00:00:00 2001 From: benoitc Date: Tue, 13 May 2014 10:25:26 +0200 Subject: [PATCH 04/25] fix the fdevents module --- gunicorn/fdevents.py | 28 +++++++++++++--------------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/gunicorn/fdevents.py b/gunicorn/fdevents.py index ccd5bd61..0bddb41e 100644 --- a/gunicorn/fdevents.py +++ b/gunicorn/fdevents.py @@ -4,7 +4,7 @@ # See the NOTICE for more information. -""" module implementing Poller depending on the platform. A pollster +""" A module implementing Poller depending on the platform. A pollster allows you to register an fd, and retrieve events on it. """ import select @@ -70,8 +70,10 @@ class SelectPoller(PollerBase): if mode == 'r': self.read_fds[fd] = repeat - else: + elif mode == 'w': self.write_fds[fd] = repeat + else: + raise ValueError('unkown mode {0}'.format(mode)) def delfd(self, fd, mode): if mode == 'r' and fd in self.read_fds: @@ -87,9 +89,8 @@ class SelectPoller(PollerBase): try: r, w, e = select.select(read_fds, write_fds, [], nsec) except select.error as e: - if e.args[0] == errno.EINTR: - continue - raise + if e.args[0] != errno.EINTR: + raise events = [] for fd in r: @@ -122,7 +123,7 @@ class SelectPoller(PollerBase): self.read_fds = [] self.write_fds = [] -if hasattr(selec, 'kqueue') +if hasattr(select, 'kqueue'): class KQueuePoller(object): @@ -163,9 +164,8 @@ if hasattr(selec, 'kqueue') try: events = self.kq.control(None, 0, nsec) except select.error as e: - if e.args[0] == errno.EINTR: - continue - raise + if e.args[0] != errno.EINTR: + raise # process events all_events = [] @@ -267,9 +267,8 @@ if hasattr(select, "epoll"): try: events = self.poll.poll(nsec) except select.error as e: - if e.args[0] == errno.EINTR: - continue - raise + if e.args[0] != errno.EINTR: + raise if events: all_events = [] @@ -392,9 +391,8 @@ if hasattr(select, "poll") or hasattr(select, "epoll"): try: events = self.poll.poll(nsec) except select.error as e: - if e.args[0] == errno.EINTR: - continue - raise + if e.args[0] != errno.EINTR: + raise all_events = [] for fd, ev in events: From c353eaaceebea1ef03c2e2329d88e994740093c3 Mon Sep 17 00:00:00 2001 From: benoitc Date: Tue, 13 May 2014 10:29:26 +0200 Subject: [PATCH 05/25] fix ThreadWorker --- gunicorn/workers/gthread.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gunicorn/workers/gthread.py b/gunicorn/workers/gthread.py index b6d6247b..a0aebf2f 100644 --- a/gunicorn/workers/gthread.py +++ b/gunicorn/workers/gthread.py @@ -46,7 +46,7 @@ class TConn(): __cmp__ = __lt__ -class ThreadWorker(base.worker): +class ThreadWorker(base.Worker): def __init__(self, *args, **kwargs): super(ThreadWorker, self).__init__(*args, **kwargs) From 6aa99e44415a2a43f0f9476966a782a8e5d42a71 Mon Sep 17 00:00:00 2001 From: benoitc Date: Tue, 13 May 2014 12:30:57 +0200 Subject: [PATCH 06/25] fix keepalive --- examples/test.py | 2 +- gunicorn/config.py | 2 +- gunicorn/fdevents.py | 16 +++-- gunicorn/workers/gthread.py | 121 ++++++++++++++++++------------------ 4 files changed, 74 insertions(+), 67 deletions(-) diff --git a/examples/test.py b/examples/test.py index 610cf5b1..77bef952 100644 --- a/examples/test.py +++ b/examples/test.py @@ -16,7 +16,7 @@ def app(environ, start_response): """Simplest possible application object""" errors = environ['wsgi.errors'] - pprint.pprint(('ENVIRON', environ), stream=errors) +# pprint.pprint(('ENVIRON', environ), stream=errors) data = b'Hello, World!\n' status = '200 OK' diff --git a/gunicorn/config.py b/gunicorn/config.py index 58ccc26f..cd4d1784 100644 --- a/gunicorn/config.py +++ b/gunicorn/config.py @@ -95,7 +95,7 @@ class Config(object): ## are we using a threaded worker? is_sync = uri.endswith('SyncWorker') or uri == 'sync' if is_sync and self.threads > 1: - uri = "gunicorn.workers.gthread.ThreadedWorker" + uri = "gunicorn.workers.gthread.ThreadWorker" worker_class = util.load_class(uri) if hasattr(worker_class, "setup"): diff --git a/gunicorn/fdevents.py b/gunicorn/fdevents.py index 0bddb41e..1fee335a 100644 --- a/gunicorn/fdevents.py +++ b/gunicorn/fdevents.py @@ -133,6 +133,8 @@ if hasattr(select, 'kqueue'): self.events = [] def addfd(self, fd, mode, repeat=True): + fd = fd_(fd) + if mode == 'r': kmode = select.KQ_FILTER_READ else: @@ -150,6 +152,8 @@ if hasattr(select, 'kqueue'): self.kq.control([ev], 0) def delfd(self, fd, mode): + fd = fd_(fd) + if mode == 'r': kmode = select.KQ_FILTER_READ else: @@ -205,6 +209,8 @@ if hasattr(select, "epoll"): self.events = [] def addfd(self, fd, mode, repeat=True): + fd = fd_(fd) + if mode == 'r': mode = (select.EPOLLIN, repeat) else: @@ -234,6 +240,8 @@ if hasattr(select, "epoll"): addfd_(fd, mask) def delfd(self, fd, mode): + fd = fd_(fd) + if mode == 'r': mode = select.POLLIN | select.POLLPRI else: @@ -273,9 +281,9 @@ if hasattr(select, "epoll"): if events: all_events = [] fds = {} - for fd, ev in self.events: + for fd, ev in events: fd = fd_(fd) - if ev == select.EPOLLIN: + if ev & select.EPOLLIN: mode = 'r' else: mode = 'w' @@ -293,7 +301,7 @@ if hasattr(select, "epoll"): for m, r in self.fds[fd]: if not r: continue - modes.append(m, r) + modes.append((m, r)) if modes != self.fds[fd]: self.fds[fd] = modes @@ -401,7 +409,7 @@ if hasattr(select, "poll") or hasattr(select, "epoll"): if fd not in self.fds: continue - if ev == select.POLLIN or ev == select.POLLPRI: + if ev & select.POLLIN or ev & select.POLLPRI: mode = 'r' else: mode = 'w' diff --git a/gunicorn/workers/gthread.py b/gunicorn/workers/gthread.py index a0aebf2f..6336e6de 100644 --- a/gunicorn/workers/gthread.py +++ b/gunicorn/workers/gthread.py @@ -16,6 +16,7 @@ from datetime import datetime import errno import heapq import os +import operator import socket import ssl import sys @@ -31,11 +32,11 @@ from .. import six class TConn(): - def __init__(self, worker, listener, sock, addr): + def __init__(self, worker, listener, sock, addr, parser): self.listener = listener self.sock = sock self.addr = addr - self.when = fs.timeout + self.parser = parser # set the timeout self.timeout = time.time() + worker.cfg.keepalive @@ -57,15 +58,16 @@ class ThreadWorker(base.Worker): self._heap = [] self.keepalived = {} - def _wrap(self, fs, listener, client, addr): + def _wrap_future(self, fs, listener, client, addr): fs.listener = listener fs.sock = client fs.addr = addr + self.futures.add(fs) def run(self): for s in self.sockets: - s.setblocking(False) - self.poller.add_fd(s, 'r') + s.setblocking(0) + self.poller.addfd(s, 'r') listeners = dict([(s.fileno(), s) for s in self.sockets]) while self.alive: @@ -78,77 +80,70 @@ class ThreadWorker(base.Worker): # notify the arbiter we are alive self.notify() - events = self.poller.wait(0.1) + events = self.poller.wait(0.01) if events: for (fd, mode) in events: fs = None client = None if fd in listeners: + listener = listeners[fd] # start to accept connections try: - client, addr = sock.accept() + client, addr = listener.accept() # add a job to the pool - fs = self.tpool.submit(self.handle, listeners[fd], + fs = self.tpool.submit(self.handle, listener, client, addr, False) - self._wrap_future(fs, listemers[fd], - client, addr) + self._wrap_future(fs, listener, client, addr) except socket.error as e: if e.args[0] not in (errno.EAGAIN, errno.ECONNABORTED, errno.EWOULDBLOCK): raise - else: - # keepalive connection - if fd in self.keepalived: - # get the client connection - client = self.keepalived[fd] + elif fd in self.keepalived: + # get the client connection + client = self.keepalived[fd] - # remove it from the heap - try: - del self._heap[operator.indexOf(self._heap, t)] - except (KeyError, IndexError): - pass + # remove it from the heap + try: + del self._heap[operator.indexOf(self._heap, client)] + except (KeyError, IndexError): + pass - # add a job to the pool - fs = self.tpool.submit(self.handle, client.listener, - client.sock, client.addr, True) + # add a job to the pool + fs = self.tpool.submit(self.handle, client.listener, + client.sock, client.addr, client.parser) - self._wrap_future(fs, client.listener, - client.sock, client.addr) - - if fs is not None: - self.futures.add(fs) + # wrap the future + self._wrap_future(fs, client.listener, client.sock, + client.addr) # handle jobs, we give a chance to all jobs to be executed. if self.futures: - res = futures.wait([fs for fs in self.futures], - timeout=self.timeout, - return_when=futures.ALL_COMPLETED) + self.notify() - for fs in res: - # remove the future from our list - self.futures.remove(fs) + res = futures.wait(self.futures, timeout=self.timeout, + return_when=futures.FIRST_COMPLETED) + for fs in res.done: try: - result = fs.result() + (keepalive, parser) = fs.result() # if the connection should be kept alived add it # to the eventloop and record it - if result and result is not None: + if keepalive: # flag the socket as non blocked fs.sock.setblocking(0) - util.close_on_exec(fs.sock) tconn = TConn(self, fs.listener, fs.sock, - fs.addr) + fs.addr, parser) # register the connection heapq.heappush(self._heap, tconn) self.keepalived[fs.sock.fileno()] = tconn # add the socket to the event loop - self.poller.add_fd(fs.sock.fileno(), 'r') + self.poller.addfd(fs.sock, 'r', False) else: # at this point the connection should be # closed but we make sure it is. @@ -157,32 +152,34 @@ class ThreadWorker(base.Worker): # an exception happened, make sure to close the # socket. util.close(fs.sock) + finally: + # remove the future from our list + self.futures.remove(fs) # hanle keepalive timeouts now = time.time() while True: if not len(self._heap): - continue + break conn = heapq.heappop(self._heap) - delta = t.timeout = now + delta = conn.timeout - now if delta > 0: - heapq.heappush(self._heap, t) + heapq.heappush(self._heap, conn) break else: # remove the socket from the poller - self.poller.del_fd(conn.sock.fileno(), 'r') + self.poller.delfd(conn.sock.fileno(), 'r') # close the socket - conn.sock.close() + util.close(conn.sock) # shutdown the pool self.tpool.shutdown(False) # wait for the workers - futures.wait([fs for fs in self.futures], - timeout=self.cfg.graceful_timeout) + futures.wait(self.futures, timeout=self.cfg.graceful_timeout) # if we have still fures running, try to close them for fs in self.futures: @@ -196,28 +193,32 @@ class ThreadWorker(base.Worker): util.close(sock) - def handle(self, listener, client, addr, keepalived): + def handle(self, listener, client, addr, parser): keepalive = False + req = None try: + client.setblocking(1) + # wrap the connection - if not keepalived and self.cfg.is_ssl: - client = ssl.wrap_socket(client, server_side=True, - **self.cfg.ssl_options) + if not parser: + if self.cfg.is_ssl: + client = ssl.wrap_socket(client, server_side=True, + **self.cfg.ssl_options) + parser = http.RequestParser(self.cfg, client) - client.setblocking(1) - util.close_on_exec(client) - - parser = http.RequestParser(self.cfg, sock) req = six.next(parser) + if not req: + return (False, None) # handle the request keepalive = self.handle_request(listener, req, client, addr) + if keepalive: + return (keepalive, parser) except http.errors.NoMoreData as e: self.log.debug("Ignored premature client disconnection. %s", e) except StopIteration as e: self.log.debug("Closing connection. %s", e) - except ssl.SSLError as e: if e.args[0] == ssl.SSL_ERROR_EOF: self.log.debug("ssl connection closed") @@ -236,10 +237,8 @@ class ThreadWorker(base.Worker): self.log.debug("Ignoring EPIPE") except Exception as e: self.handle_error(req, client, addr, e) - finally: - if not keepalive: - util.close(client) - return keepalive + + return (False, None) def handle_request(self, listener, req, client, addr): environ = {} @@ -274,8 +273,8 @@ class ThreadWorker(base.Worker): respiter.close() if resp.should_close(): - raise StopIteration() - + self.log.debug("Closing connection.") + return False except socket.error: exc_info = sys.exc_info() # pass to next try-except level From eadc526192b28a7ed87fda5915be8e292ffd92a1 Mon Sep 17 00:00:00 2001 From: benoitc Date: Tue, 13 May 2014 13:21:14 +0200 Subject: [PATCH 07/25] fix PollPoller --- gunicorn/fdevents.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/gunicorn/fdevents.py b/gunicorn/fdevents.py index 1fee335a..f53ef047 100644 --- a/gunicorn/fdevents.py +++ b/gunicorn/fdevents.py @@ -423,7 +423,7 @@ if hasattr(select, "poll") or hasattr(select, "epoll"): for m, r in self.fds[fd]: if not r: continue - modes.append(m, r) + modes.append((m, r)) if not modes: self.poll.unregister(fd) @@ -443,6 +443,11 @@ if hasattr(select, "poll") or hasattr(select, "epoll"): return self.events.pop(0) return None + def wait(self, nsec=0): + events = self._wait(nsec) + self.events = [] + return events + def close(self): for fd in self.fds: self.poll.unregister(fd) From 67800292e05956cbd4e3b852011495b1da3a5537 Mon Sep 17 00:00:00 2001 From: benoitc Date: Tue, 13 May 2014 13:57:34 +0200 Subject: [PATCH 08/25] fix kqueue poller. this change initialise the event loop after the process has forked so we make sure to inherit from the file descriptor. Also fix the number of events we are waiting for. The python implementation requires a positive number. --- gunicorn/fdevents.py | 21 +++++++++++++++------ gunicorn/workers/gthread.py | 15 +++++++++++---- 2 files changed, 26 insertions(+), 10 deletions(-) diff --git a/gunicorn/fdevents.py b/gunicorn/fdevents.py index f53ef047..e50f2a0d 100644 --- a/gunicorn/fdevents.py +++ b/gunicorn/fdevents.py @@ -8,6 +8,7 @@ allows you to register an fd, and retrieve events on it. """ import select +import sys from .util import fd_, close_on_exec @@ -131,10 +132,13 @@ if hasattr(select, 'kqueue'): self.kq = select.kqueue() close_on_exec(self.kq.fileno()) self.events = [] + self.max_ev = 0 def addfd(self, fd, mode, repeat=True): fd = fd_(fd) + self.max_ev += 1 + if mode == 'r': kmode = select.KQ_FILTER_READ else: @@ -148,12 +152,17 @@ if hasattr(select, 'kqueue'): if not repeat: flags |= select.KQ_EV_ONESHOT - ev = select.kevent(fd_(fd), kmode, flags) - self.kq.control([ev], 0) + ev = select.kevent(fd, kmode, flags) + self.kq.control([ev], 0, 0) def delfd(self, fd, mode): fd = fd_(fd) + self.max_ev -= 1 + + if fd < 0: + return + if mode == 'r': kmode = select.KQ_FILTER_READ else: @@ -166,7 +175,7 @@ if hasattr(select, 'kqueue'): def _wait(self, nsec=0): if len(self.events) == 0: try: - events = self.kq.control(None, 0, nsec) + events = self.kq.control(None, self.max_ev, nsec) except select.error as e: if e.args[0] != errno.EINTR: raise @@ -328,7 +337,7 @@ if hasattr(select, "epoll"): self.poll.close() -if hasattr(select, "poll") or hasattr(select, "epoll"): +if hasattr(select, "poll") or hasattr(select, "devpoll"): class _PollerBase(object): @@ -466,8 +475,8 @@ if hasattr(select, "poll") or hasattr(select, "epoll"): # choose the best implementation depending on the platform. -if 'KqueuePoller' in globals(): - DefaultPoller = KqueuePoller +if 'KQueuePoller' in globals(): + DefaultPoller = KQueuePoller elif 'EpollPoller' in globals(): DefaultPoller = EpollPoller elif 'DevpollPoller' in globals(): diff --git a/gunicorn/workers/gthread.py b/gunicorn/workers/gthread.py index 6336e6de..f34c7867 100644 --- a/gunicorn/workers/gthread.py +++ b/gunicorn/workers/gthread.py @@ -52,8 +52,8 @@ class ThreadWorker(base.Worker): def __init__(self, *args, **kwargs): super(ThreadWorker, self).__init__(*args, **kwargs) # initialise the pool - self.tpool = futures.ThreadPoolExecutor(max_workers=self.cfg.threads) - self.poller = fdevents.DefaultPoller() + self.tpool = None + self.poller = None self.futures = set() self._heap = [] self.keepalived = {} @@ -64,9 +64,15 @@ class ThreadWorker(base.Worker): fs.addr = addr self.futures.add(fs) + def init_process(self): + self.tpool = futures.ThreadPoolExecutor(max_workers=self.cfg.threads) + self.poller = fdevents.DefaultPoller() + super(ThreadWorker, self).init_process() + def run(self): + # init listeners, add them to the event loop for s in self.sockets: - s.setblocking(0) + s.setblocking(False) self.poller.addfd(s, 'r') listeners = dict([(s.fileno(), s) for s in self.sockets]) @@ -170,13 +176,14 @@ class ThreadWorker(base.Worker): break else: # remove the socket from the poller - self.poller.delfd(conn.sock.fileno(), 'r') + self.poller.delfd(conn.sock, 'r') # close the socket util.close(conn.sock) # shutdown the pool self.tpool.shutdown(False) + self.poller.close() # wait for the workers futures.wait(self.futures, timeout=self.cfg.graceful_timeout) From 14f71ebf39abfd2d369d331225fc3c66af3a318c Mon Sep 17 00:00:00 2001 From: benoitc Date: Tue, 13 May 2014 15:18:43 +0200 Subject: [PATCH 09/25] compatibility with python 2 Add support of the threaded worker on python 2.7. python 2.7 has no futures module. With this change the compatibility module is installed. --- setup.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/setup.py b/setup.py index 050ab617..36e33853 100644 --- a/setup.py +++ b/setup.py @@ -10,6 +10,7 @@ import sys from gunicorn import __version__ + CLASSIFIERS = [ 'Development Status :: 4 - Beta', 'Environment :: Other Environment', @@ -61,6 +62,11 @@ class PyTest(Command): raise SystemExit(errno) +REQUIREMENTS = [] +if sys.version_info[0] == 2: + REQUIREMENTS.append('futures') + + setup( name = 'gunicorn', version = __version__, @@ -80,6 +86,8 @@ setup( tests_require = tests_require, cmdclass = {'test': PyTest}, + install_requires = REQUIREMENTS, + entry_points=""" [console_scripts] From 81810d9f04a0999e6c2ee906132bf9dc42e66518 Mon Sep 17 00:00:00 2001 From: benoitc Date: Thu, 15 May 2014 08:03:06 +0200 Subject: [PATCH 10/25] reuse the code --- gunicorn/fdevents.py | 58 +++++++++----------------------------------- 1 file changed, 11 insertions(+), 47 deletions(-) diff --git a/gunicorn/fdevents.py b/gunicorn/fdevents.py index e50f2a0d..3a273f5d 100644 --- a/gunicorn/fdevents.py +++ b/gunicorn/fdevents.py @@ -44,7 +44,11 @@ class PollerBase(object): return: (fd, mode) """ - raise NotImplementedError + self._wait(nsec) + if self.events: + return self.events.pop(0) + return None + def wait(self, nsec): """ return all events raised in the pollster when calling the @@ -52,7 +56,9 @@ class PollerBase(object): return: [(fd, mode), ....] """ - raise NotImplementedError + events = self._wait(nsec) + self.events = [] + return events def close(self): """ close the pollster """ @@ -109,24 +115,13 @@ class SelectPoller(PollerBase): self.events.extend(events) return self.events - def waitfd(self, nsec): - self._wait(nsec) - if self.events: - return self.events.pop(0) - return None - - def wait(self, nsec): - events = self._wait(nsec) - self.events = [] - return events - def close(self): self.read_fds = [] self.write_fds = [] if hasattr(select, 'kqueue'): - class KQueuePoller(object): + class KQueuePoller(PollerBase): def __init__(self): self.kq = select.kqueue() @@ -194,22 +189,11 @@ if hasattr(select, 'kqueue'): # return all events return self.events - def waitfd(self, nsec=0): - self._wait(nsec) - if self.events: - return self.events.pop(0) - return None - - def wait(self, nsec=0): - events = self._wait(nsec) - self.events = [] - return events - def close(self): self.kq.close() if hasattr(select, "epoll"): - class EpollPoller(object): + class EpollPoller(PollerBase): def __init__(self): self.poll = select.epoll() @@ -324,22 +308,13 @@ if hasattr(select, "epoll"): # return all events return self.events - def waitfd(self, nsec=0): - self._wait(nsec) - return self.events.pop(0) - - def wait(self, nsec=0): - events = self._wait(nsec) - self.events = [] - return events - def close(self): self.poll.close() if hasattr(select, "poll") or hasattr(select, "devpoll"): - class _PollerBase(object): + class _PollerBase(PollerBase): POLL_IMPL = None @@ -446,17 +421,6 @@ if hasattr(select, "poll") or hasattr(select, "devpoll"): self.events.extend(all_events) return self.events - def waitfd(self, nsec=0): - self._wait(nsec) - if self.events: - return self.events.pop(0) - return None - - def wait(self, nsec=0): - events = self._wait(nsec) - self.events = [] - return events - def close(self): for fd in self.fds: self.poll.unregister(fd) From 7f9d745eb5919967c682d28e2a6237d62efab97e Mon Sep 17 00:00:00 2001 From: benoitc Date: Fri, 30 May 2014 11:07:35 +0200 Subject: [PATCH 11/25] reuse asyncio code in the threaded worker --- gunicorn/arbiter.py | 3 +- gunicorn/config.py | 10 + gunicorn/fdevents.py | 451 --------------------------- gunicorn/selectors.py | 585 ++++++++++++++++++++++++++++++++++++ gunicorn/workers/gthread.py | 37 ++- 5 files changed, 620 insertions(+), 466 deletions(-) delete mode 100644 gunicorn/fdevents.py create mode 100644 gunicorn/selectors.py diff --git a/gunicorn/arbiter.py b/gunicorn/arbiter.py index bd2016d2..e0ffb225 100644 --- a/gunicorn/arbiter.py +++ b/gunicorn/arbiter.py @@ -133,8 +133,7 @@ class Arbiter(object): listeners_str = ",".join([str(l) for l in self.LISTENERS]) self.log.debug("Arbiter booted") self.log.info("Listening at: %s (%s)", listeners_str, self.pid) - self.log.info("Using worker: %s", - self.cfg.settings['worker_class'].get()) + self.log.info("Using worker: %s", self.cfg.worker_class_str) self.cfg.when_ready(self) diff --git a/gunicorn/config.py b/gunicorn/config.py index cd4d1784..4c28dae3 100644 --- a/gunicorn/config.py +++ b/gunicorn/config.py @@ -88,6 +88,16 @@ class Config(object): return parser + @property + def worker_class_str(self): + uri = self.settings['worker_class'].get() + + ## are we using a threaded worker? + is_sync = uri.endswith('SyncWorker') or uri == 'sync' + if is_sync and self.threads > 1: + return "threads" + return uri + @property def worker_class(self): uri = self.settings['worker_class'].get() diff --git a/gunicorn/fdevents.py b/gunicorn/fdevents.py deleted file mode 100644 index 3a273f5d..00000000 --- a/gunicorn/fdevents.py +++ /dev/null @@ -1,451 +0,0 @@ -# -*- coding: utf-8 - -# -# This file is part of gunicorn released under the MIT license. -# See the NOTICE for more information. - - -""" A module implementing Poller depending on the platform. A pollster -allows you to register an fd, and retrieve events on it. """ - -import select -import sys - -from .util import fd_, close_on_exec - - -class PollerBase(object): - - def addfd(self, fd, mode, repeat=True): - """ add a filed escriptor to poll. - - fdevent Parameters: - - * fd : file descriptor or file object - * mode: 'r' to wait for read events, 'w' to wait for write events - * repeat: true or false . to continuously wait on this event or - not (default is true). - """ - - raise NotImplementedError - - def delfd(self, fd, mode): - """ stop to poll for the event on this file descriptor - - Parameters: - - * fd : file descriptor or file object - * mode: 'r' to wait for read events, 'w' to wait for write events - """ - - raise NotImplementedError - - def waitfd(self, nsec): - """ return one event from the pollster. - - return: (fd, mode) - """ - self._wait(nsec) - if self.events: - return self.events.pop(0) - return None - - - def wait(self, nsec): - """ return all events raised in the pollster when calling the - function. - - return: [(fd, mode), ....] - """ - events = self._wait(nsec) - self.events = [] - return events - - def close(self): - """ close the pollster """ - raise NotImplementedError - - -class SelectPoller(PollerBase): - - def __init__(self): - self.read_fds = {} - self.write_fds = {} - self.events = [] - - def addfd(self, fd, mode, repeat=True): - fd = fd_(fd) - - if mode == 'r': - self.read_fds[fd] = repeat - elif mode == 'w': - self.write_fds[fd] = repeat - else: - raise ValueError('unkown mode {0}'.format(mode)) - - def delfd(self, fd, mode): - if mode == 'r' and fd in self.read_fds: - del self.read_fds[fd] - elif fd in self.write_fds: - del self.write_fds[fd] - - def _wait(self, nsec): - read_fds = [fd for fd in self.read_fds] - write_fds = [fd for fd in self.write_fds] - - if len(self.events) == 0: - try: - r, w, e = select.select(read_fds, write_fds, [], nsec) - except select.error as e: - if e.args[0] != errno.EINTR: - raise - - events = [] - for fd in r: - if fd in self.read_fds: - if self.read_fds[fd] == False: - del self.read_fds[fd] - events.append((fd, 'r')) - - for fd in w: - if fd in self.write_fds: - if self.write_fds[fd] == False: - del self.write_fds[fd] - events.append((fd, 'w')) - - self.events.extend(events) - return self.events - - def close(self): - self.read_fds = [] - self.write_fds = [] - -if hasattr(select, 'kqueue'): - - class KQueuePoller(PollerBase): - - def __init__(self): - self.kq = select.kqueue() - close_on_exec(self.kq.fileno()) - self.events = [] - self.max_ev = 0 - - def addfd(self, fd, mode, repeat=True): - fd = fd_(fd) - - self.max_ev += 1 - - if mode == 'r': - kmode = select.KQ_FILTER_READ - else: - kmode = select.KQ_FILTER_WRITE - - flags = select.KQ_EV_ADD - - if sys.platform.startswith("darwin"): - flags |= select.KQ_EV_ENABLE - - if not repeat: - flags |= select.KQ_EV_ONESHOT - - ev = select.kevent(fd, kmode, flags) - self.kq.control([ev], 0, 0) - - def delfd(self, fd, mode): - fd = fd_(fd) - - self.max_ev -= 1 - - if fd < 0: - return - - if mode == 'r': - kmode = select.KQ_FILTER_READ - else: - kmode = select.KQ_FILTER_WRITE - - ev = select.kevent(fd_(fd), select.KQ_FILTER_READ, - select.KQ_EV_DELETE) - self.kq.control([ev], 0) - - def _wait(self, nsec=0): - if len(self.events) == 0: - try: - events = self.kq.control(None, self.max_ev, nsec) - except select.error as e: - if e.args[0] != errno.EINTR: - raise - - # process events - all_events = [] - for ev in events: - if ev.filter == select.KQ_FILTER_READ: - mode = 'r' - else: - mode = 'w' - all_events.append((fd_(ev.ident), mode)) - - self.events.extend(all_events) - - # return all events - return self.events - - def close(self): - self.kq.close() - -if hasattr(select, "epoll"): - class EpollPoller(PollerBase): - - def __init__(self): - self.poll = select.epoll() - close_on_exec(self.poll.fileno()) - self.fds = {} - self.events = [] - - def addfd(self, fd, mode, repeat=True): - fd = fd_(fd) - - if mode == 'r': - mode = (select.EPOLLIN, repeat) - else: - mode = (select.EPOLLOUT, repeat) - - if fd in self.fds: - modes = self.fds[fd] - if mode in self.fds[fd]: - # already registered for this mode - return - modes.append(mode) - addfd_ = self.poll.modify - else: - modes = [mode] - addfd_ = self.poll.register - - # append the new mode to fds - self.fds[fd] = modes - - mask = 0 - for mode, r in modes: - mask |= mode - - if not repeat: - mask |= select.EPOLLONESHOT - - addfd_(fd, mask) - - def delfd(self, fd, mode): - fd = fd_(fd) - - if mode == 'r': - mode = select.POLLIN | select.POLLPRI - else: - mode = select.POLLOUT - - if fd not in self.fds: - return - - modes = [] - for m, r in self.fds[fd]: - if mode != m: - modes.append((m, r)) - - if not modes: - # del the fd from the poll - self.poll.unregister(fd) - del self.fds[fd] - else: - # modify the fd in the poll - self.fds[fd] = modes - m, r = modes[0] - mask = m[0] - if r: - mask |= select.EPOLLONESHOT - - self.poll.modify(fd, mask) - - def _wait(self, nsec=0): - # wait for the events - if len(self.events) == 0: - try: - events = self.poll.poll(nsec) - except select.error as e: - if e.args[0] != errno.EINTR: - raise - - if events: - all_events = [] - fds = {} - for fd, ev in events: - fd = fd_(fd) - if ev & select.EPOLLIN: - mode = 'r' - else: - mode = 'w' - - all_events.append((fd, mode)) - - if fd in fds: - fds[fd].append(mode) - else: - fds[fd] = [mode] - - # eventually remove the mode from the list if repeat - # was set to False and modify the poll if needed. - modes = [] - for m, r in self.fds[fd]: - if not r: - continue - modes.append((m, r)) - - if modes != self.fds[fd]: - self.fds[fd] = modes - mask = 0 - for m, r in modes: - mask |= m - self.poll.modify(fd, mask) - - self.events.extend(all_events) - - # return all events - return self.events - - def close(self): - self.poll.close() - - -if hasattr(select, "poll") or hasattr(select, "devpoll"): - - class _PollerBase(PollerBase): - - POLL_IMPL = None - - def __init__(self): - self.poll = self.POLL_IMPL() - self.fds = {} - self.events = [] - - def addfd(self, fd, mode, repeat=True): - fd = fd_(fd) - if mode == 'r': - mode = (select.POLLIN, repeat) - else: - mode = (select.POLLOUT, repeat) - - if fd in self.fds: - modes = self.fds[fd] - if mode in modes: - # already registered for this mode - return - modes.append(mode) - addfd_ = self.poll.modify - else: - modes = [mode] - addfd_ = self.poll.register - - # append the new mode to fds - self.fds[fd] = modes - - mask = 0 - for mode, r in modes: - mask |= mode - - addfd_(fd, mask) - - def delfd(self, fd, mode): - fd = fd_(fd) - - if mode == 'r': - mode = select.POLLIN | select.POLLPRI - else: - mode = select.POLLOUT - - if fd not in self.fds: - return - - modes = [] - for m, r in self.fds[fd]: - if mode != m: - modes.append((m, r)) - - if not modes: - # del the fd from the poll - self.poll.unregister(fd) - del self.fds[fd] - else: - # modify the fd in the poll - self.fds[fd] = modes - m, r = modes[0] - mask = m[0] - self.poll.modify(fd, mask) - - def _wait(self, nsec=0): - # wait for the events - if len(self.events) == 0: - try: - events = self.poll.poll(nsec) - except select.error as e: - if e.args[0] != errno.EINTR: - raise - - all_events = [] - for fd, ev in events: - fd = fd_(fd) - - if fd not in self.fds: - continue - - if ev & select.POLLIN or ev & select.POLLPRI: - mode = 'r' - else: - mode = 'w' - - # add new event to the list - all_events.append((fd, mode)) - - # eventually remove the mode from the list if repeat - # was set to False and modify the poll if needed. - modes = [] - for m, r in self.fds[fd]: - if not r: - continue - modes.append((m, r)) - - if not modes: - self.poll.unregister(fd) - else: - mask = 0 - if modes != self.fds[fd]: - mask |= m - self.poll.modify(fd, mask) - - - self.events.extend(all_events) - return self.events - - def close(self): - for fd in self.fds: - self.poll.unregister(fd) - self.fds = [] - self.poll = None - - - if hasattr(select, "devpoll"): - - class DevPollPoller(_PollerBase): - POLL_IMPL = select.devpoll - - if hasattr(select, "poll"): - class PollPoller(_PollerBase): - POLL_IMPL = select.poll - - -# choose the best implementation depending on the platform. -if 'KQueuePoller' in globals(): - DefaultPoller = KQueuePoller -elif 'EpollPoller' in globals(): - DefaultPoller = EpollPoller -elif 'DevpollPoller' in globals(): - DefaultPoller = DevpollPoller -elif 'PollPoller' in globals(): - DefaultPoller = PollPoller -else: - DefaultPoller = SelectPoller diff --git a/gunicorn/selectors.py b/gunicorn/selectors.py new file mode 100644 index 00000000..4e9ae6ec --- /dev/null +++ b/gunicorn/selectors.py @@ -0,0 +1,585 @@ +"""Selectors module. + +This module allows high-level and efficient I/O multiplexing, built upon the +`select` module primitives. +""" + + +from abc import ABCMeta, abstractmethod +from collections import namedtuple, Mapping +import math +import select +import sys + + +# generic events, that must be mapped to implementation-specific ones +EVENT_READ = (1 << 0) +EVENT_WRITE = (1 << 1) + + +def _fileobj_to_fd(fileobj): + """Return a file descriptor from a file object. + + Parameters: + fileobj -- file object or file descriptor + + Returns: + corresponding file descriptor + + Raises: + ValueError if the object is invalid + """ + if isinstance(fileobj, int): + fd = fileobj + else: + try: + fd = int(fileobj.fileno()) + except (AttributeError, TypeError, ValueError): + raise ValueError("Invalid file object: " + "{!r}".format(fileobj)) from None + if fd < 0: + raise ValueError("Invalid file descriptor: {}".format(fd)) + return fd + + +SelectorKey = namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data']) +"""Object used to associate a file object to its backing file descriptor, +selected event mask and attached data.""" + + +class _SelectorMapping(Mapping): + """Mapping of file objects to selector keys.""" + + def __init__(self, selector): + self._selector = selector + + def __len__(self): + return len(self._selector._fd_to_key) + + def __getitem__(self, fileobj): + try: + fd = self._selector._fileobj_lookup(fileobj) + return self._selector._fd_to_key[fd] + except KeyError: + raise KeyError("{!r} is not registered".format(fileobj)) from None + + def __iter__(self): + return iter(self._selector._fd_to_key) + + +class BaseSelector(metaclass=ABCMeta): + """Selector abstract base class. + + A selector supports registering file objects to be monitored for specific + I/O events. + + A file object is a file descriptor or any object with a `fileno()` method. + An arbitrary object can be attached to the file object, which can be used + for example to store context information, a callback, etc. + + A selector can use various implementations (select(), poll(), epoll()...) + depending on the platform. The default `Selector` class uses the most + efficient implementation on the current platform. + """ + + @abstractmethod + def register(self, fileobj, events, data=None): + """Register a file object. + + Parameters: + fileobj -- file object or file descriptor + events -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE) + data -- attached data + + Returns: + SelectorKey instance + + Raises: + ValueError if events is invalid + KeyError if fileobj is already registered + OSError if fileobj is closed or otherwise is unacceptable to + the underlying system call (if a system call is made) + + Note: + OSError may or may not be raised + """ + raise NotImplementedError + + @abstractmethod + def unregister(self, fileobj): + """Unregister a file object. + + Parameters: + fileobj -- file object or file descriptor + + Returns: + SelectorKey instance + + Raises: + KeyError if fileobj is not registered + + Note: + If fileobj is registered but has since been closed this does + *not* raise OSError (even if the wrapped syscall does) + """ + raise NotImplementedError + + def modify(self, fileobj, events, data=None): + """Change a registered file object monitored events or attached data. + + Parameters: + fileobj -- file object or file descriptor + events -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE) + data -- attached data + + Returns: + SelectorKey instance + + Raises: + Anything that unregister() or register() raises + """ + self.unregister(fileobj) + return self.register(fileobj, events, data) + + @abstractmethod + def select(self, timeout=None): + """Perform the actual selection, until some monitored file objects are + ready or a timeout expires. + + Parameters: + timeout -- if timeout > 0, this specifies the maximum wait time, in + seconds + if timeout <= 0, the select() call won't block, and will + report the currently ready file objects + if timeout is None, select() will block until a monitored + file object becomes ready + + Returns: + list of (key, events) for ready file objects + `events` is a bitwise mask of EVENT_READ|EVENT_WRITE + """ + raise NotImplementedError + + def close(self): + """Close the selector. + + This must be called to make sure that any underlying resource is freed. + """ + pass + + def get_key(self, fileobj): + """Return the key associated to a registered file object. + + Returns: + SelectorKey for this file object + """ + mapping = self.get_map() + try: + return mapping[fileobj] + except KeyError: + raise KeyError("{!r} is not registered".format(fileobj)) from None + + @abstractmethod + def get_map(self): + """Return a mapping of file objects to selector keys.""" + raise NotImplementedError + + def __enter__(self): + return self + + def __exit__(self, *args): + self.close() + + +class _BaseSelectorImpl(BaseSelector): + """Base selector implementation.""" + + def __init__(self): + # this maps file descriptors to keys + self._fd_to_key = {} + # read-only mapping returned by get_map() + self._map = _SelectorMapping(self) + + def _fileobj_lookup(self, fileobj): + """Return a file descriptor from a file object. + + This wraps _fileobj_to_fd() to do an exhaustive search in case + the object is invalid but we still have it in our map. This + is used by unregister() so we can unregister an object that + was previously registered even if it is closed. It is also + used by _SelectorMapping. + """ + try: + return _fileobj_to_fd(fileobj) + except ValueError: + # Do an exhaustive search. + for key in self._fd_to_key.values(): + if key.fileobj is fileobj: + return key.fd + # Raise ValueError after all. + raise + + def register(self, fileobj, events, data=None): + if (not events) or (events & ~(EVENT_READ | EVENT_WRITE)): + raise ValueError("Invalid events: {!r}".format(events)) + + key = SelectorKey(fileobj, self._fileobj_lookup(fileobj), events, data) + + if key.fd in self._fd_to_key: + raise KeyError("{!r} (FD {}) is already registered" + .format(fileobj, key.fd)) + + self._fd_to_key[key.fd] = key + return key + + def unregister(self, fileobj): + try: + key = self._fd_to_key.pop(self._fileobj_lookup(fileobj)) + except KeyError: + raise KeyError("{!r} is not registered".format(fileobj)) from None + return key + + def modify(self, fileobj, events, data=None): + # TODO: Subclasses can probably optimize this even further. + try: + key = self._fd_to_key[self._fileobj_lookup(fileobj)] + except KeyError: + raise KeyError("{!r} is not registered".format(fileobj)) from None + if events != key.events: + self.unregister(fileobj) + key = self.register(fileobj, events, data) + elif data != key.data: + # Use a shortcut to update the data. + key = key._replace(data=data) + self._fd_to_key[key.fd] = key + return key + + def close(self): + self._fd_to_key.clear() + + def get_map(self): + return self._map + + def _key_from_fd(self, fd): + """Return the key associated to a given file descriptor. + + Parameters: + fd -- file descriptor + + Returns: + corresponding key, or None if not found + """ + try: + return self._fd_to_key[fd] + except KeyError: + return None + + +class SelectSelector(_BaseSelectorImpl): + """Select-based selector.""" + + def __init__(self): + super().__init__() + self._readers = set() + self._writers = set() + + def register(self, fileobj, events, data=None): + key = super().register(fileobj, events, data) + if events & EVENT_READ: + self._readers.add(key.fd) + if events & EVENT_WRITE: + self._writers.add(key.fd) + return key + + def unregister(self, fileobj): + key = super().unregister(fileobj) + self._readers.discard(key.fd) + self._writers.discard(key.fd) + return key + + if sys.platform == 'win32': + def _select(self, r, w, _, timeout=None): + r, w, x = select.select(r, w, w, timeout) + return r, w + x, [] + else: + _select = select.select + + def select(self, timeout=None): + timeout = None if timeout is None else max(timeout, 0) + ready = [] + try: + r, w, _ = self._select(self._readers, self._writers, [], timeout) + except InterruptedError: + return ready + r = set(r) + w = set(w) + for fd in r | w: + events = 0 + if fd in r: + events |= EVENT_READ + if fd in w: + events |= EVENT_WRITE + + key = self._key_from_fd(fd) + if key: + ready.append((key, events & key.events)) + return ready + + +if hasattr(select, 'poll'): + + class PollSelector(_BaseSelectorImpl): + """Poll-based selector.""" + + def __init__(self): + super().__init__() + self._poll = select.poll() + + def register(self, fileobj, events, data=None): + key = super().register(fileobj, events, data) + poll_events = 0 + if events & EVENT_READ: + poll_events |= select.POLLIN + if events & EVENT_WRITE: + poll_events |= select.POLLOUT + self._poll.register(key.fd, poll_events) + return key + + def unregister(self, fileobj): + key = super().unregister(fileobj) + self._poll.unregister(key.fd) + return key + + def select(self, timeout=None): + if timeout is None: + timeout = None + elif timeout <= 0: + timeout = 0 + else: + # poll() has a resolution of 1 millisecond, round away from + # zero to wait *at least* timeout seconds. + timeout = math.ceil(timeout * 1e3) + ready = [] + try: + fd_event_list = self._poll.poll(timeout) + except InterruptedError: + return ready + for fd, event in fd_event_list: + events = 0 + if event & ~select.POLLIN: + events |= EVENT_WRITE + if event & ~select.POLLOUT: + events |= EVENT_READ + + key = self._key_from_fd(fd) + if key: + ready.append((key, events & key.events)) + return ready + + +if hasattr(select, 'epoll'): + + class EpollSelector(_BaseSelectorImpl): + """Epoll-based selector.""" + + def __init__(self): + super().__init__() + self._epoll = select.epoll() + + def fileno(self): + return self._epoll.fileno() + + def register(self, fileobj, events, data=None): + key = super().register(fileobj, events, data) + epoll_events = 0 + if events & EVENT_READ: + epoll_events |= select.EPOLLIN + if events & EVENT_WRITE: + epoll_events |= select.EPOLLOUT + self._epoll.register(key.fd, epoll_events) + return key + + def unregister(self, fileobj): + key = super().unregister(fileobj) + try: + self._epoll.unregister(key.fd) + except OSError: + # This can happen if the FD was closed since it + # was registered. + pass + return key + + def select(self, timeout=None): + if timeout is None: + timeout = -1 + elif timeout <= 0: + timeout = 0 + else: + # epoll_wait() has a resolution of 1 millisecond, round away + # from zero to wait *at least* timeout seconds. + timeout = math.ceil(timeout * 1e3) * 1e-3 + max_ev = len(self._fd_to_key) + ready = [] + try: + fd_event_list = self._epoll.poll(timeout, max_ev) + except InterruptedError: + return ready + for fd, event in fd_event_list: + events = 0 + if event & ~select.EPOLLIN: + events |= EVENT_WRITE + if event & ~select.EPOLLOUT: + events |= EVENT_READ + + key = self._key_from_fd(fd) + if key: + ready.append((key, events & key.events)) + return ready + + def close(self): + self._epoll.close() + super().close() + + +if hasattr(select, 'devpoll'): + + class DevpollSelector(_BaseSelectorImpl): + """Solaris /dev/poll selector.""" + + def __init__(self): + super().__init__() + self._devpoll = select.devpoll() + + def fileno(self): + return self._devpoll.fileno() + + def register(self, fileobj, events, data=None): + key = super().register(fileobj, events, data) + poll_events = 0 + if events & EVENT_READ: + poll_events |= select.POLLIN + if events & EVENT_WRITE: + poll_events |= select.POLLOUT + self._devpoll.register(key.fd, poll_events) + return key + + def unregister(self, fileobj): + key = super().unregister(fileobj) + self._devpoll.unregister(key.fd) + return key + + def select(self, timeout=None): + if timeout is None: + timeout = None + elif timeout <= 0: + timeout = 0 + else: + # devpoll() has a resolution of 1 millisecond, round away from + # zero to wait *at least* timeout seconds. + timeout = math.ceil(timeout * 1e3) + ready = [] + try: + fd_event_list = self._devpoll.poll(timeout) + except InterruptedError: + return ready + for fd, event in fd_event_list: + events = 0 + if event & ~select.POLLIN: + events |= EVENT_WRITE + if event & ~select.POLLOUT: + events |= EVENT_READ + + key = self._key_from_fd(fd) + if key: + ready.append((key, events & key.events)) + return ready + + def close(self): + self._devpoll.close() + super().close() + + +if hasattr(select, 'kqueue'): + + class KqueueSelector(_BaseSelectorImpl): + """Kqueue-based selector.""" + + def __init__(self): + super().__init__() + self._kqueue = select.kqueue() + + def fileno(self): + return self._kqueue.fileno() + + def register(self, fileobj, events, data=None): + key = super().register(fileobj, events, data) + if events & EVENT_READ: + kev = select.kevent(key.fd, select.KQ_FILTER_READ, + select.KQ_EV_ADD) + self._kqueue.control([kev], 0, 0) + if events & EVENT_WRITE: + kev = select.kevent(key.fd, select.KQ_FILTER_WRITE, + select.KQ_EV_ADD) + self._kqueue.control([kev], 0, 0) + return key + + def unregister(self, fileobj): + key = super().unregister(fileobj) + if key.events & EVENT_READ: + kev = select.kevent(key.fd, select.KQ_FILTER_READ, + select.KQ_EV_DELETE) + try: + self._kqueue.control([kev], 0, 0) + except OSError: + # This can happen if the FD was closed since it + # was registered. + pass + if key.events & EVENT_WRITE: + kev = select.kevent(key.fd, select.KQ_FILTER_WRITE, + select.KQ_EV_DELETE) + try: + self._kqueue.control([kev], 0, 0) + except OSError: + # See comment above. + pass + return key + + def select(self, timeout=None): + timeout = None if timeout is None else max(timeout, 0) + max_ev = len(self._fd_to_key) + ready = [] + try: + kev_list = self._kqueue.control(None, max_ev, timeout) + except InterruptedError: + return ready + for kev in kev_list: + fd = kev.ident + flag = kev.filter + events = 0 + if flag == select.KQ_FILTER_READ: + events |= EVENT_READ + if flag == select.KQ_FILTER_WRITE: + events |= EVENT_WRITE + + key = self._key_from_fd(fd) + if key: + ready.append((key, events & key.events)) + return ready + + def close(self): + self._kqueue.close() + super().close() + + +# Choose the best implementation: roughly, epoll|kqueue|devpoll > poll > select. +# select() also can't accept a FD > FD_SETSIZE (usually around 1024) +if 'KqueueSelector' in globals(): + DefaultSelector = KqueueSelector +elif 'EpollSelector' in globals(): + DefaultSelector = EpollSelector +elif 'DevpollSelector' in globals(): + DefaultSelector = DevpollSelector +elif 'PollSelector' in globals(): + DefaultSelector = PollSelector +else: + DefaultSelector = SelectSelector diff --git a/gunicorn/workers/gthread.py b/gunicorn/workers/gthread.py index f34c7867..3acfd40f 100644 --- a/gunicorn/workers/gthread.py +++ b/gunicorn/workers/gthread.py @@ -24,11 +24,18 @@ import time from .. import http from ..http import wsgi -from .. import fdevents from .. import util from . import base from .. import six +try: + from asyncio import selectors +except ImportError: + from .. import selectors + + +ACCEPT = 0 +KEEPALIVED = 1 class TConn(): @@ -66,16 +73,15 @@ class ThreadWorker(base.Worker): def init_process(self): self.tpool = futures.ThreadPoolExecutor(max_workers=self.cfg.threads) - self.poller = fdevents.DefaultPoller() + self.poller = selectors.DefaultSelector() super(ThreadWorker, self).init_process() def run(self): # init listeners, add them to the event loop for s in self.sockets: s.setblocking(False) - self.poller.addfd(s, 'r') + self.poller.register(s, selectors.EVENT_READ, ACCEPT) - listeners = dict([(s.fileno(), s) for s in self.sockets]) while self.alive: # If our parent changed then we shut down. @@ -86,13 +92,14 @@ class ThreadWorker(base.Worker): # notify the arbiter we are alive self.notify() - events = self.poller.wait(0.01) + events = self.poller.select(0.01) if events: - for (fd, mode) in events: + for key, mask in events: fs = None client = None - if fd in listeners: - listener = listeners[fd] + if key.data == ACCEPT: + listener = key.fileobj + # start to accept connections try: client, addr = listener.accept() @@ -107,16 +114,20 @@ class ThreadWorker(base.Worker): if e.args[0] not in (errno.EAGAIN, errno.ECONNABORTED, errno.EWOULDBLOCK): raise - elif fd in self.keepalived: + + else: # get the client connection - client = self.keepalived[fd] + client = key.data # remove it from the heap + try: del self._heap[operator.indexOf(self._heap, client)] except (KeyError, IndexError): pass + self.poller.unregister(key.fileobj) + # add a job to the pool fs = self.tpool.submit(self.handle, client.listener, client.sock, client.addr, client.parser) @@ -146,10 +157,10 @@ class ThreadWorker(base.Worker): # register the connection heapq.heappush(self._heap, tconn) - self.keepalived[fs.sock.fileno()] = tconn # add the socket to the event loop - self.poller.addfd(fs.sock, 'r', False) + self.poller.register(fs.sock, selectors.EVENT_READ, + tconn) else: # at this point the connection should be # closed but we make sure it is. @@ -176,7 +187,7 @@ class ThreadWorker(base.Worker): break else: # remove the socket from the poller - self.poller.delfd(conn.sock, 'r') + self.poller.unregister(conn.sock) # close the socket util.close(conn.sock) From f8b415496d6a3263f8dd13559be926d841c3a3a7 Mon Sep 17 00:00:00 2001 From: benoitc Date: Fri, 30 May 2014 15:59:47 +0200 Subject: [PATCH 12/25] refactor the gthread worker for a better usage of asyncio we have the possibility to pass a data payload to the poller when registering a file object. We are using this possibility to pass a callback. the callback will either accept or handle a connection when the read event is triggered. while I am here make the future result asynchronous so we don't block the I/O event handling. --- gunicorn/workers/gthread.py | 278 ++++++++++++++++++++---------------- 1 file changed, 151 insertions(+), 127 deletions(-) diff --git a/gunicorn/workers/gthread.py b/gunicorn/workers/gthread.py index 3acfd40f..dbcfdab5 100644 --- a/gunicorn/workers/gthread.py +++ b/gunicorn/workers/gthread.py @@ -58,32 +58,112 @@ class ThreadWorker(base.Worker): def __init__(self, *args, **kwargs): super(ThreadWorker, self).__init__(*args, **kwargs) + self.worker_connections = self.cfg.worker_connections + self.idle_workers = 0 + # initialise the pool self.tpool = None self.poller = None self.futures = set() self._heap = [] - self.keepalived = {} + self.clients = {} - def _wrap_future(self, fs, listener, client, addr): - fs.listener = listener - fs.sock = client - fs.addr = addr + + def _wrap_future(self, fs, conn): + fs.conn = conn self.futures.add(fs) + fs.add_done_callback(self.finish_request) + + def _unregister_keepalive(self, conn): + try: + del self._heap[operator.indexOf(self._heap, conn)] + except (KeyError, IndexError, ValueError): + pass def init_process(self): self.tpool = futures.ThreadPoolExecutor(max_workers=self.cfg.threads) self.poller = selectors.DefaultSelector() super(ThreadWorker, self).init_process() + + def acceptor(self, listener): + try: + client, addr = listener.accept() + client.setblocking(False) + + # wrap the socket if needed + if self.cfg.is_ssl: + client = ssl.wrap_socket(client, server_side=True, + **self.cfg.ssl_options) + + # initialise the parser + parser = http.RequestParser(self.cfg, client) + + # register the connection + tconn = TConn(self, listener, client, addr, parser) + self.clients[client] = tconn + + # wait for the read event to handle the connection + self.poller.register(client, selectors.EVENT_READ, + self.handle_client) + + except socket.error as e: + if e.args[0] not in (errno.EAGAIN, + errno.ECONNABORTED, errno.EWOULDBLOCK): + raise + + def handle_client(self, client): + # unregister the client from the poller + self.poller.unregister(client) + + try: + conn = self.clients[client] + + # maybe unregister the keepalive from the heap + self._unregister_keepalive(conn) + + # submit the connection to a worker + fs = self.tpool.submit(self.handle, conn) + self.idle_workers += 1 + self._wrap_future(fs, conn) + + except KeyError: + # no connection registered + return + + def murder_keepalived(self): + now = time.time() + while True: + if not len(self._heap): + break + + conn = heapq.heappop(self._heap) + delta = conn.timeout - now + if delta > 0: + heapq.heappush(self._heap, conn) + break + else: + # make sure the connection can't be handled + try: + del self.clients[conn.sock] + except KeyError: + pass + + # remove the socket from the poller + self.poller.unregister(conn.sock) + + # close the socket + util.close(conn.sock) + + + def run(self): # init listeners, add them to the event loop for s in self.sockets: s.setblocking(False) - self.poller.register(s, selectors.EVENT_READ, ACCEPT) + self.poller.register(s, selectors.EVENT_READ, self.acceptor) while self.alive: - # If our parent changed then we shut down. if self.ppid != os.getppid(): self.log.info("Parent changed, shutting down: %s", self) @@ -92,116 +172,32 @@ class ThreadWorker(base.Worker): # notify the arbiter we are alive self.notify() - events = self.poller.select(0.01) - if events: - for key, mask in events: - fs = None - client = None - if key.data == ACCEPT: - listener = key.fileobj - - # start to accept connections - try: - client, addr = listener.accept() - - # add a job to the pool - fs = self.tpool.submit(self.handle, listener, - client, addr, False) - - self._wrap_future(fs, listener, client, addr) - - except socket.error as e: - if e.args[0] not in (errno.EAGAIN, - errno.ECONNABORTED, errno.EWOULDBLOCK): - raise - - else: - # get the client connection - client = key.data - - # remove it from the heap - - try: - del self._heap[operator.indexOf(self._heap, client)] - except (KeyError, IndexError): - pass - - self.poller.unregister(key.fileobj) - - # add a job to the pool - fs = self.tpool.submit(self.handle, client.listener, - client.sock, client.addr, client.parser) - - # wrap the future - self._wrap_future(fs, client.listener, client.sock, - client.addr) - - # handle jobs, we give a chance to all jobs to be executed. - if self.futures: - self.notify() - - res = futures.wait(self.futures, timeout=self.timeout, - return_when=futures.FIRST_COMPLETED) - - for fs in res.done: - try: - (keepalive, parser) = fs.result() - # if the connection should be kept alived add it - # to the eventloop and record it - if keepalive: - # flag the socket as non blocked - fs.sock.setblocking(0) - - tconn = TConn(self, fs.listener, fs.sock, - fs.addr, parser) - - # register the connection - heapq.heappush(self._heap, tconn) - - # add the socket to the event loop - self.poller.register(fs.sock, selectors.EVENT_READ, - tconn) - else: - # at this point the connection should be - # closed but we make sure it is. - util.close(fs.sock) - except: - # an exception happened, make sure to close the - # socket. - util.close(fs.sock) - finally: - # remove the future from our list - self.futures.remove(fs) - + events = self.poller.select(1.0) + for key, mask in events: + callback = key.data + callback(key.fileobj) # hanle keepalive timeouts - now = time.time() - while True: - if not len(self._heap): - break + self.murder_keepalived() - conn = heapq.heappop(self._heap) - delta = conn.timeout - now - if delta > 0: - heapq.heappush(self._heap, conn) + # if we more connections than the max number of connections + # accepted on a worker, wait until some complete or exit. + if self.idle_workers >= self.worker_connections: + futures.wait(self.futures, timeout=self.cfg.timeout) + if not res: + self.log.info("max requests achieved") break - else: - # remove the socket from the poller - self.poller.unregister(conn.sock) - # close the socket - util.close(conn.sock) - # shutdown the pool - self.tpool.shutdown(False) self.poller.close() + self.tpool.shutdown(False) # wait for the workers futures.wait(self.futures, timeout=self.cfg.graceful_timeout) # if we have still fures running, try to close them for fs in self.futures: - sock = fs.sock + sock = fs.conn.sock # the future is not running, cancel it if not fs.done() and not fs.running(): @@ -211,27 +207,51 @@ class ThreadWorker(base.Worker): util.close(sock) - def handle(self, listener, client, addr, parser): + def finish_request(self, fs): + try: + (keepalive, conn) = fs.result() + # if the connection should be kept alived add it + # to the eventloop and record it + if keepalive: + # flag the socket as non blocked + conn.sock.setblocking(False) + + # register the connection + heapq.heappush(self._heap, conn) + + # add the socket to the event loop + self.poller.register(conn.sock, selectors.EVENT_READ, + self.handle_client) + else: + try: + del self.clients[conn.sock] + except KeyError: + pass + + util.close(fs.conn.sock) + except: + # an exception happened, make sure to close the + # socket. + util.close(fs.conn.sock) + finally: + # remove the future from our list + self.futures.remove(fs) + self.idle_workers -= 1 + + def handle(self, conn): keepalive = False req = None try: - client.setblocking(1) + conn.sock.setblocking(1) - # wrap the connection - if not parser: - if self.cfg.is_ssl: - client = ssl.wrap_socket(client, server_side=True, - **self.cfg.ssl_options) - parser = http.RequestParser(self.cfg, client) - - req = six.next(parser) + req = six.next(conn.parser) if not req: return (False, None) # handle the request - keepalive = self.handle_request(listener, req, client, addr) + keepalive = self.handle_request(req, conn) if keepalive: - return (keepalive, parser) + return (keepalive, conn) except http.errors.NoMoreData as e: self.log.debug("Ignored premature client disconnection. %s", e) @@ -240,10 +260,10 @@ class ThreadWorker(base.Worker): except ssl.SSLError as e: if e.args[0] == ssl.SSL_ERROR_EOF: self.log.debug("ssl connection closed") - client.close() + conn.sock.close() else: self.log.debug("Error processing SSL request.") - self.handle_error(req, client, addr, e) + self.handle_error(req, conn.sock, conn.addr, e) except socket.error as e: if e.args[0] not in (errno.EPIPE, errno.ECONNRESET): @@ -252,27 +272,30 @@ class ThreadWorker(base.Worker): if e.args[0] == errno.ECONNRESET: self.log.debug("Ignoring connection reset") else: - self.log.debug("Ignoring EPIPE") + self.log.debug("Ignoring connection epipe") except Exception as e: - self.handle_error(req, client, addr, e) + self.handle_error(req, conn.sock, conn.addr, e) - return (False, None) + return (False, conn) - def handle_request(self, listener, req, client, addr): + def handle_request(self, req, conn): environ = {} resp = None try: self.cfg.pre_request(self, req) request_start = datetime.now() - resp, environ = wsgi.create(req, client, addr, - listener.getsockname(), self.cfg) + resp, environ = wsgi.create(req, conn.sock, conn.addr, + conn.listener.getsockname(), self.cfg) environ["wsgi.multithread"] = True self.nr += 1 - if self.nr >= self.max_requests: + + if self.alive and self.nr >= self.max_requests: self.log.info("Autorestarting worker after current request.") + resp.force_close() self.alive = False + if not self.cfg.keepalive: resp.force_close() @@ -283,6 +306,7 @@ class ThreadWorker(base.Worker): else: for item in respiter: resp.write(item) + resp.close() request_time = datetime.now() - request_start self.log.access(resp, req, environ, request_time) @@ -303,8 +327,8 @@ class ThreadWorker(base.Worker): # connection to indicate the error. self.log.exception("Error handling request") try: - client.shutdown(socket.SHUT_RDWR) - client.close() + conn.sock.shutdown(socket.SHUT_RDWR) + conn.sock.close() except socket.error: pass raise StopIteration() From c8e93a6f218eb2435a5d21091e7523940ba0c8c9 Mon Sep 17 00:00:00 2001 From: benoitc Date: Fri, 30 May 2014 23:26:30 +0200 Subject: [PATCH 13/25] make the code simpler and fix issue with ab --- gunicorn/workers/gthread.py | 112 ++++++++++++++++-------------------- 1 file changed, 50 insertions(+), 62 deletions(-) diff --git a/gunicorn/workers/gthread.py b/gunicorn/workers/gthread.py index dbcfdab5..417c00f9 100644 --- a/gunicorn/workers/gthread.py +++ b/gunicorn/workers/gthread.py @@ -10,7 +10,7 @@ # If no event happen after the keep alive timeout, the connectoin is # closed. - +from collections import deque import concurrent.futures as futures from datetime import datetime import errno @@ -39,14 +39,33 @@ KEEPALIVED = 1 class TConn(): - def __init__(self, worker, listener, sock, addr, parser): + def __init__(self, cfg, listener, sock, addr): + self.cfg = cfg self.listener = listener self.sock = sock self.addr = addr - self.parser = parser + self.timeout = None + self.parser = None + + # set the socket to non blocking + #self.sock.setblocking(False) + + + def maybe_init(self): + if self.parser is None: + # wrap the socket if needed + if self.cfg.is_ssl: + client = ssl.wrap_socket(client, server_side=True, + **self.cfg.ssl_options) + + + # initialize the parser + self.parser = http.RequestParser(self.cfg, self.sock) + + def set_timeout(self): # set the timeout - self.timeout = time.time() + worker.cfg.keepalive + self.timeout = time.time() + self.cfg.keepalive def __lt__(self, other): return self.timeout < other.timeout @@ -65,8 +84,7 @@ class ThreadWorker(base.Worker): self.tpool = None self.poller = None self.futures = set() - self._heap = [] - self.clients = {} + self._keep = deque() def _wrap_future(self, fs, conn): @@ -74,80 +92,48 @@ class ThreadWorker(base.Worker): self.futures.add(fs) fs.add_done_callback(self.finish_request) - def _unregister_keepalive(self, conn): - try: - del self._heap[operator.indexOf(self._heap, conn)] - except (KeyError, IndexError, ValueError): - pass - def init_process(self): self.tpool = futures.ThreadPoolExecutor(max_workers=self.cfg.threads) self.poller = selectors.DefaultSelector() super(ThreadWorker, self).init_process() - def acceptor(self, listener): + def accept(self, listener, *args): try: client, addr = listener.accept() - client.setblocking(False) - - # wrap the socket if needed - if self.cfg.is_ssl: - client = ssl.wrap_socket(client, server_side=True, - **self.cfg.ssl_options) - - # initialise the parser - parser = http.RequestParser(self.cfg, client) - - # register the connection - tconn = TConn(self, listener, client, addr, parser) - self.clients[client] = tconn + conn = TConn(self.cfg, listener, client, addr) # wait for the read event to handle the connection self.poller.register(client, selectors.EVENT_READ, - self.handle_client) + (self.handle_client, (conn,))) except socket.error as e: if e.args[0] not in (errno.EAGAIN, errno.ECONNABORTED, errno.EWOULDBLOCK): raise - def handle_client(self, client): + def handle_client(self, client, conn): # unregister the client from the poller self.poller.unregister(client) - try: - conn = self.clients[client] + # submit the connection to a worker + fs = self.tpool.submit(self.handle, conn) + self.idle_workers += 1 + self._wrap_future(fs, conn) - # maybe unregister the keepalive from the heap - self._unregister_keepalive(conn) - - # submit the connection to a worker - fs = self.tpool.submit(self.handle, conn) - self.idle_workers += 1 - self._wrap_future(fs, conn) - - except KeyError: - # no connection registered - return def murder_keepalived(self): now = time.time() while True: - if not len(self._heap): + if not len(self._keep): break - conn = heapq.heappop(self._heap) - delta = conn.timeout - now + delta = self._keep[0].timeout - now if delta > 0: - heapq.heappush(self._heap, conn) break else: - # make sure the connection can't be handled - try: - del self.clients[conn.sock] - except KeyError: - pass + # remove the connection from the queue + conn = self._keep.popleft() # remove the socket from the poller self.poller.unregister(conn.sock) @@ -155,13 +141,12 @@ class ThreadWorker(base.Worker): # close the socket util.close(conn.sock) - - def run(self): # init listeners, add them to the event loop for s in self.sockets: s.setblocking(False) - self.poller.register(s, selectors.EVENT_READ, self.acceptor) + self.poller.register(s, selectors.EVENT_READ, + (self.accept,())) while self.alive: # If our parent changed then we shut down. @@ -174,8 +159,8 @@ class ThreadWorker(base.Worker): events = self.poller.select(1.0) for key, mask in events: - callback = key.data - callback(key.fileobj) + (callback, args) = key.data + callback(key.fileobj, *args) # hanle keepalive timeouts self.murder_keepalived() @@ -217,17 +202,13 @@ class ThreadWorker(base.Worker): conn.sock.setblocking(False) # register the connection - heapq.heappush(self._heap, conn) + conn.set_timeout() + self._keep.append(conn) # add the socket to the event loop self.poller.register(conn.sock, selectors.EVENT_READ, - self.handle_client) + (self.handle_client, (conn,))) else: - try: - del self.clients[conn.sock] - except KeyError: - pass - util.close(fs.conn.sock) except: # an exception happened, make sure to close the @@ -239,6 +220,13 @@ class ThreadWorker(base.Worker): self.idle_workers -= 1 def handle(self, conn): + try: + self._keep.remove(conn) + except ValueError: + pass + + conn.maybe_init() + keepalive = False req = None try: From 5ba749e9cac271b6959798d519c309daec49228b Mon Sep 17 00:00:00 2001 From: benoitc Date: Sat, 31 May 2014 00:17:29 +0200 Subject: [PATCH 14/25] some quick optimisations --- gunicorn/workers/gthread.py | 54 ++++++++++++++++++------------------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/gunicorn/workers/gthread.py b/gunicorn/workers/gthread.py index 417c00f9..a07da55b 100644 --- a/gunicorn/workers/gthread.py +++ b/gunicorn/workers/gthread.py @@ -14,7 +14,7 @@ from collections import deque import concurrent.futures as futures from datetime import datetime import errno -import heapq +from functools import partial import os import operator import socket @@ -49,10 +49,10 @@ class TConn(): self.parser = None # set the socket to non blocking - #self.sock.setblocking(False) + self.sock.setblocking(False) - def maybe_init(self): + def init(self): if self.parser is None: # wrap the socket if needed if self.cfg.is_ssl: @@ -62,6 +62,8 @@ class TConn(): # initialize the parser self.parser = http.RequestParser(self.cfg, self.sock) + return True + return False def set_timeout(self): # set the timeout @@ -78,18 +80,17 @@ class ThreadWorker(base.Worker): def __init__(self, *args, **kwargs): super(ThreadWorker, self).__init__(*args, **kwargs) self.worker_connections = self.cfg.worker_connections - self.idle_workers = 0 # initialise the pool self.tpool = None self.poller = None - self.futures = set() + self.futures = deque() self._keep = deque() def _wrap_future(self, fs, conn): fs.conn = conn - self.futures.add(fs) + self.futures.append(fs) fs.add_done_callback(self.finish_request) def init_process(self): @@ -97,28 +98,26 @@ class ThreadWorker(base.Worker): self.poller = selectors.DefaultSelector() super(ThreadWorker, self).init_process() - - def accept(self, listener, *args): + def accept(self, listener): try: client, addr = listener.accept() conn = TConn(self.cfg, listener, client, addr) # wait for the read event to handle the connection self.poller.register(client, selectors.EVENT_READ, - (self.handle_client, (conn,))) + partial(self.handle_client, conn)) except socket.error as e: if e.args[0] not in (errno.EAGAIN, errno.ECONNABORTED, errno.EWOULDBLOCK): raise - def handle_client(self, client, conn): + def handle_client(self, conn, client): # unregister the client from the poller self.poller.unregister(client) # submit the connection to a worker fs = self.tpool.submit(self.handle, conn) - self.idle_workers += 1 self._wrap_future(fs, conn) @@ -145,8 +144,7 @@ class ThreadWorker(base.Worker): # init listeners, add them to the event loop for s in self.sockets: s.setblocking(False) - self.poller.register(s, selectors.EVENT_READ, - (self.accept,())) + self.poller.register(s, selectors.EVENT_READ, self.accept) while self.alive: # If our parent changed then we shut down. @@ -157,17 +155,17 @@ class ThreadWorker(base.Worker): # notify the arbiter we are alive self.notify() - events = self.poller.select(1.0) + events = self.poller.select(0.01) for key, mask in events: - (callback, args) = key.data - callback(key.fileobj, *args) + callback = key.data + callback(key.fileobj) # hanle keepalive timeouts self.murder_keepalived() # if we more connections than the max number of connections # accepted on a worker, wait until some complete or exit. - if self.idle_workers >= self.worker_connections: + if len(self.futures) >= self.worker_connections: futures.wait(self.futures, timeout=self.cfg.timeout) if not res: self.log.info("max requests achieved") @@ -207,25 +205,27 @@ class ThreadWorker(base.Worker): # add the socket to the event loop self.poller.register(conn.sock, selectors.EVENT_READ, - (self.handle_client, (conn,))) + partial(self.handle_client, conn)) else: - util.close(fs.conn.sock) + util.close(conn.sock) except: # an exception happened, make sure to close the # socket. util.close(fs.conn.sock) finally: # remove the future from our list - self.futures.remove(fs) - self.idle_workers -= 1 + try: + self.futures.remove(fs) + except ValueError: + pass def handle(self, conn): - try: - self._keep.remove(conn) - except ValueError: - pass - - conn.maybe_init() + if not conn.init(): + # connection kept alive + try: + self._keep.remove(conn) + except ValueError: + pass keepalive = False req = None From e8e9d285a63fba36c85e9c974b7a57c183937641 Mon Sep 17 00:00:00 2001 From: benoitc Date: Sat, 31 May 2014 00:44:20 +0200 Subject: [PATCH 15/25] fixes --- gunicorn/workers/gthread.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/gunicorn/workers/gthread.py b/gunicorn/workers/gthread.py index a07da55b..06a5d6e8 100644 --- a/gunicorn/workers/gthread.py +++ b/gunicorn/workers/gthread.py @@ -33,10 +33,6 @@ try: except ImportError: from .. import selectors - -ACCEPT = 0 -KEEPALIVED = 1 - class TConn(): def __init__(self, cfg, listener, sock, addr): @@ -166,7 +162,7 @@ class ThreadWorker(base.Worker): # if we more connections than the max number of connections # accepted on a worker, wait until some complete or exit. if len(self.futures) >= self.worker_connections: - futures.wait(self.futures, timeout=self.cfg.timeout) + res = futures.wait(self.futures, timeout=self.cfg.timeout) if not res: self.log.info("max requests achieved") break @@ -234,7 +230,7 @@ class ThreadWorker(base.Worker): req = six.next(conn.parser) if not req: - return (False, None) + return (False, conn) # handle the request keepalive = self.handle_request(req, conn) From fb53047b73b806e4e4cdd822201731d08ab137d8 Mon Sep 17 00:00:00 2001 From: benoitc Date: Sat, 31 May 2014 01:15:05 +0200 Subject: [PATCH 16/25] fix timeout and socket ssl wrapping --- gunicorn/workers/gthread.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/gunicorn/workers/gthread.py b/gunicorn/workers/gthread.py index 06a5d6e8..f3d52804 100644 --- a/gunicorn/workers/gthread.py +++ b/gunicorn/workers/gthread.py @@ -33,6 +33,7 @@ try: except ImportError: from .. import selectors + class TConn(): def __init__(self, cfg, listener, sock, addr): @@ -47,12 +48,12 @@ class TConn(): # set the socket to non blocking self.sock.setblocking(False) - def init(self): + self.sock.setblocking(True) if self.parser is None: # wrap the socket if needed if self.cfg.is_ssl: - client = ssl.wrap_socket(client, server_side=True, + self.sock = ssl.wrap_socket(client, server_side=True, **self.cfg.ssl_options) @@ -142,6 +143,8 @@ class ThreadWorker(base.Worker): s.setblocking(False) self.poller.register(s, selectors.EVENT_READ, self.accept) + timeout = self.cfg.timeout or 0.5 + while self.alive: # If our parent changed then we shut down. if self.ppid != os.getppid(): @@ -162,7 +165,7 @@ class ThreadWorker(base.Worker): # if we more connections than the max number of connections # accepted on a worker, wait until some complete or exit. if len(self.futures) >= self.worker_connections: - res = futures.wait(self.futures, timeout=self.cfg.timeout) + res = futures.wait(self.futures, timeout=timeout) if not res: self.log.info("max requests achieved") break @@ -185,7 +188,6 @@ class ThreadWorker(base.Worker): # make sure we close the sockets after the graceful timeout util.close(sock) - def finish_request(self, fs): try: (keepalive, conn) = fs.result() @@ -226,8 +228,6 @@ class ThreadWorker(base.Worker): keepalive = False req = None try: - conn.sock.setblocking(1) - req = six.next(conn.parser) if not req: return (False, conn) From d775b576e8867e2560833ee471013a7da68abd75 Mon Sep 17 00:00:00 2001 From: benoitc Date: Sat, 31 May 2014 01:21:05 +0200 Subject: [PATCH 17/25] improve worker shutdown --- gunicorn/workers/gthread.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/gunicorn/workers/gthread.py b/gunicorn/workers/gthread.py index f3d52804..f47eb7fc 100644 --- a/gunicorn/workers/gthread.py +++ b/gunicorn/workers/gthread.py @@ -84,7 +84,6 @@ class ThreadWorker(base.Worker): self.futures = deque() self._keep = deque() - def _wrap_future(self, fs, conn): fs.conn = conn self.futures.append(fs) @@ -117,7 +116,6 @@ class ThreadWorker(base.Worker): fs = self.tpool.submit(self.handle, conn) self._wrap_future(fs, conn) - def murder_keepalived(self): now = time.time() while True: @@ -178,7 +176,12 @@ class ThreadWorker(base.Worker): futures.wait(self.futures, timeout=self.cfg.graceful_timeout) # if we have still fures running, try to close them - for fs in self.futures: + while True: + try: + fs = self.futures.popleft() + except IndexError: + break + sock = fs.conn.sock # the future is not running, cancel it @@ -279,7 +282,6 @@ class ThreadWorker(base.Worker): resp.force_close() self.alive = False - if not self.cfg.keepalive: resp.force_close() From b810a1d1a911f457d8351183ae46518fc0123d26 Mon Sep 17 00:00:00 2001 From: benoitc Date: Sat, 31 May 2014 07:13:36 +0200 Subject: [PATCH 18/25] fix doc --- gunicorn/config.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/gunicorn/config.py b/gunicorn/config.py index 4c28dae3..09113aa2 100644 --- a/gunicorn/config.py +++ b/gunicorn/config.py @@ -581,8 +581,7 @@ class WorkerThreads(Setting): desc = """\ The number of worker threads for handling requests. - Run each worker in prethreaded mode with the specified number of - threads per worker. + Run each worker with the specified number of threads. A positive integer generally in the 2-4 x $(NUM_CORES) range. You'll want to vary this a bit to find the best for your particular From b7cbb59bbc414eb3c03b163a8afdd296c613a4f6 Mon Sep 17 00:00:00 2001 From: benoitc Date: Sat, 31 May 2014 07:18:39 +0200 Subject: [PATCH 19/25] remove useless code --- gunicorn/util.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/gunicorn/util.py b/gunicorn/util.py index 07db3dba..9f6b79c9 100644 --- a/gunicorn/util.py +++ b/gunicorn/util.py @@ -269,11 +269,6 @@ def set_non_blocking(fd): flags = fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK fcntl.fcntl(fd, fcntl.F_SETFL, flags) -def fd_(fd): - if hasattr(fd, "fileno"): - return int(fd.fileno()) - return fd - def close(sock): try: sock.close() From eb17b13b1df62fc12c5eebb72bc650c61979b5ae Mon Sep 17 00:00:00 2001 From: Randall Leeds Date: Sat, 31 May 2014 13:28:16 -0700 Subject: [PATCH 20/25] Guard against race condition on threads keepalive Requests after the first on a keepalive connection remove themselves from the keepalive timeout queue. This presents a race condition where the main thread might try to access the first element of the queue after it has been removed. --- gunicorn/workers/gthread.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/gunicorn/workers/gthread.py b/gunicorn/workers/gthread.py index f47eb7fc..789d869e 100644 --- a/gunicorn/workers/gthread.py +++ b/gunicorn/workers/gthread.py @@ -119,10 +119,11 @@ class ThreadWorker(base.Worker): def murder_keepalived(self): now = time.time() while True: - if not len(self._keep): + try: + delta = self._keep[0].timeout - now + except IndexError: break - delta = self._keep[0].timeout - now if delta > 0: break else: From 7e699b7d51b7a17f1d2bba7ff8bacb75991c54bf Mon Sep 17 00:00:00 2001 From: Randall Leeds Date: Sat, 31 May 2014 13:31:07 -0700 Subject: [PATCH 21/25] Use trollius on Py2 instead of bundling selectors --- gunicorn/selectors.py | 585 ------------------------------------ gunicorn/workers/gthread.py | 2 +- 2 files changed, 1 insertion(+), 586 deletions(-) delete mode 100644 gunicorn/selectors.py diff --git a/gunicorn/selectors.py b/gunicorn/selectors.py deleted file mode 100644 index 4e9ae6ec..00000000 --- a/gunicorn/selectors.py +++ /dev/null @@ -1,585 +0,0 @@ -"""Selectors module. - -This module allows high-level and efficient I/O multiplexing, built upon the -`select` module primitives. -""" - - -from abc import ABCMeta, abstractmethod -from collections import namedtuple, Mapping -import math -import select -import sys - - -# generic events, that must be mapped to implementation-specific ones -EVENT_READ = (1 << 0) -EVENT_WRITE = (1 << 1) - - -def _fileobj_to_fd(fileobj): - """Return a file descriptor from a file object. - - Parameters: - fileobj -- file object or file descriptor - - Returns: - corresponding file descriptor - - Raises: - ValueError if the object is invalid - """ - if isinstance(fileobj, int): - fd = fileobj - else: - try: - fd = int(fileobj.fileno()) - except (AttributeError, TypeError, ValueError): - raise ValueError("Invalid file object: " - "{!r}".format(fileobj)) from None - if fd < 0: - raise ValueError("Invalid file descriptor: {}".format(fd)) - return fd - - -SelectorKey = namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data']) -"""Object used to associate a file object to its backing file descriptor, -selected event mask and attached data.""" - - -class _SelectorMapping(Mapping): - """Mapping of file objects to selector keys.""" - - def __init__(self, selector): - self._selector = selector - - def __len__(self): - return len(self._selector._fd_to_key) - - def __getitem__(self, fileobj): - try: - fd = self._selector._fileobj_lookup(fileobj) - return self._selector._fd_to_key[fd] - except KeyError: - raise KeyError("{!r} is not registered".format(fileobj)) from None - - def __iter__(self): - return iter(self._selector._fd_to_key) - - -class BaseSelector(metaclass=ABCMeta): - """Selector abstract base class. - - A selector supports registering file objects to be monitored for specific - I/O events. - - A file object is a file descriptor or any object with a `fileno()` method. - An arbitrary object can be attached to the file object, which can be used - for example to store context information, a callback, etc. - - A selector can use various implementations (select(), poll(), epoll()...) - depending on the platform. The default `Selector` class uses the most - efficient implementation on the current platform. - """ - - @abstractmethod - def register(self, fileobj, events, data=None): - """Register a file object. - - Parameters: - fileobj -- file object or file descriptor - events -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE) - data -- attached data - - Returns: - SelectorKey instance - - Raises: - ValueError if events is invalid - KeyError if fileobj is already registered - OSError if fileobj is closed or otherwise is unacceptable to - the underlying system call (if a system call is made) - - Note: - OSError may or may not be raised - """ - raise NotImplementedError - - @abstractmethod - def unregister(self, fileobj): - """Unregister a file object. - - Parameters: - fileobj -- file object or file descriptor - - Returns: - SelectorKey instance - - Raises: - KeyError if fileobj is not registered - - Note: - If fileobj is registered but has since been closed this does - *not* raise OSError (even if the wrapped syscall does) - """ - raise NotImplementedError - - def modify(self, fileobj, events, data=None): - """Change a registered file object monitored events or attached data. - - Parameters: - fileobj -- file object or file descriptor - events -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE) - data -- attached data - - Returns: - SelectorKey instance - - Raises: - Anything that unregister() or register() raises - """ - self.unregister(fileobj) - return self.register(fileobj, events, data) - - @abstractmethod - def select(self, timeout=None): - """Perform the actual selection, until some monitored file objects are - ready or a timeout expires. - - Parameters: - timeout -- if timeout > 0, this specifies the maximum wait time, in - seconds - if timeout <= 0, the select() call won't block, and will - report the currently ready file objects - if timeout is None, select() will block until a monitored - file object becomes ready - - Returns: - list of (key, events) for ready file objects - `events` is a bitwise mask of EVENT_READ|EVENT_WRITE - """ - raise NotImplementedError - - def close(self): - """Close the selector. - - This must be called to make sure that any underlying resource is freed. - """ - pass - - def get_key(self, fileobj): - """Return the key associated to a registered file object. - - Returns: - SelectorKey for this file object - """ - mapping = self.get_map() - try: - return mapping[fileobj] - except KeyError: - raise KeyError("{!r} is not registered".format(fileobj)) from None - - @abstractmethod - def get_map(self): - """Return a mapping of file objects to selector keys.""" - raise NotImplementedError - - def __enter__(self): - return self - - def __exit__(self, *args): - self.close() - - -class _BaseSelectorImpl(BaseSelector): - """Base selector implementation.""" - - def __init__(self): - # this maps file descriptors to keys - self._fd_to_key = {} - # read-only mapping returned by get_map() - self._map = _SelectorMapping(self) - - def _fileobj_lookup(self, fileobj): - """Return a file descriptor from a file object. - - This wraps _fileobj_to_fd() to do an exhaustive search in case - the object is invalid but we still have it in our map. This - is used by unregister() so we can unregister an object that - was previously registered even if it is closed. It is also - used by _SelectorMapping. - """ - try: - return _fileobj_to_fd(fileobj) - except ValueError: - # Do an exhaustive search. - for key in self._fd_to_key.values(): - if key.fileobj is fileobj: - return key.fd - # Raise ValueError after all. - raise - - def register(self, fileobj, events, data=None): - if (not events) or (events & ~(EVENT_READ | EVENT_WRITE)): - raise ValueError("Invalid events: {!r}".format(events)) - - key = SelectorKey(fileobj, self._fileobj_lookup(fileobj), events, data) - - if key.fd in self._fd_to_key: - raise KeyError("{!r} (FD {}) is already registered" - .format(fileobj, key.fd)) - - self._fd_to_key[key.fd] = key - return key - - def unregister(self, fileobj): - try: - key = self._fd_to_key.pop(self._fileobj_lookup(fileobj)) - except KeyError: - raise KeyError("{!r} is not registered".format(fileobj)) from None - return key - - def modify(self, fileobj, events, data=None): - # TODO: Subclasses can probably optimize this even further. - try: - key = self._fd_to_key[self._fileobj_lookup(fileobj)] - except KeyError: - raise KeyError("{!r} is not registered".format(fileobj)) from None - if events != key.events: - self.unregister(fileobj) - key = self.register(fileobj, events, data) - elif data != key.data: - # Use a shortcut to update the data. - key = key._replace(data=data) - self._fd_to_key[key.fd] = key - return key - - def close(self): - self._fd_to_key.clear() - - def get_map(self): - return self._map - - def _key_from_fd(self, fd): - """Return the key associated to a given file descriptor. - - Parameters: - fd -- file descriptor - - Returns: - corresponding key, or None if not found - """ - try: - return self._fd_to_key[fd] - except KeyError: - return None - - -class SelectSelector(_BaseSelectorImpl): - """Select-based selector.""" - - def __init__(self): - super().__init__() - self._readers = set() - self._writers = set() - - def register(self, fileobj, events, data=None): - key = super().register(fileobj, events, data) - if events & EVENT_READ: - self._readers.add(key.fd) - if events & EVENT_WRITE: - self._writers.add(key.fd) - return key - - def unregister(self, fileobj): - key = super().unregister(fileobj) - self._readers.discard(key.fd) - self._writers.discard(key.fd) - return key - - if sys.platform == 'win32': - def _select(self, r, w, _, timeout=None): - r, w, x = select.select(r, w, w, timeout) - return r, w + x, [] - else: - _select = select.select - - def select(self, timeout=None): - timeout = None if timeout is None else max(timeout, 0) - ready = [] - try: - r, w, _ = self._select(self._readers, self._writers, [], timeout) - except InterruptedError: - return ready - r = set(r) - w = set(w) - for fd in r | w: - events = 0 - if fd in r: - events |= EVENT_READ - if fd in w: - events |= EVENT_WRITE - - key = self._key_from_fd(fd) - if key: - ready.append((key, events & key.events)) - return ready - - -if hasattr(select, 'poll'): - - class PollSelector(_BaseSelectorImpl): - """Poll-based selector.""" - - def __init__(self): - super().__init__() - self._poll = select.poll() - - def register(self, fileobj, events, data=None): - key = super().register(fileobj, events, data) - poll_events = 0 - if events & EVENT_READ: - poll_events |= select.POLLIN - if events & EVENT_WRITE: - poll_events |= select.POLLOUT - self._poll.register(key.fd, poll_events) - return key - - def unregister(self, fileobj): - key = super().unregister(fileobj) - self._poll.unregister(key.fd) - return key - - def select(self, timeout=None): - if timeout is None: - timeout = None - elif timeout <= 0: - timeout = 0 - else: - # poll() has a resolution of 1 millisecond, round away from - # zero to wait *at least* timeout seconds. - timeout = math.ceil(timeout * 1e3) - ready = [] - try: - fd_event_list = self._poll.poll(timeout) - except InterruptedError: - return ready - for fd, event in fd_event_list: - events = 0 - if event & ~select.POLLIN: - events |= EVENT_WRITE - if event & ~select.POLLOUT: - events |= EVENT_READ - - key = self._key_from_fd(fd) - if key: - ready.append((key, events & key.events)) - return ready - - -if hasattr(select, 'epoll'): - - class EpollSelector(_BaseSelectorImpl): - """Epoll-based selector.""" - - def __init__(self): - super().__init__() - self._epoll = select.epoll() - - def fileno(self): - return self._epoll.fileno() - - def register(self, fileobj, events, data=None): - key = super().register(fileobj, events, data) - epoll_events = 0 - if events & EVENT_READ: - epoll_events |= select.EPOLLIN - if events & EVENT_WRITE: - epoll_events |= select.EPOLLOUT - self._epoll.register(key.fd, epoll_events) - return key - - def unregister(self, fileobj): - key = super().unregister(fileobj) - try: - self._epoll.unregister(key.fd) - except OSError: - # This can happen if the FD was closed since it - # was registered. - pass - return key - - def select(self, timeout=None): - if timeout is None: - timeout = -1 - elif timeout <= 0: - timeout = 0 - else: - # epoll_wait() has a resolution of 1 millisecond, round away - # from zero to wait *at least* timeout seconds. - timeout = math.ceil(timeout * 1e3) * 1e-3 - max_ev = len(self._fd_to_key) - ready = [] - try: - fd_event_list = self._epoll.poll(timeout, max_ev) - except InterruptedError: - return ready - for fd, event in fd_event_list: - events = 0 - if event & ~select.EPOLLIN: - events |= EVENT_WRITE - if event & ~select.EPOLLOUT: - events |= EVENT_READ - - key = self._key_from_fd(fd) - if key: - ready.append((key, events & key.events)) - return ready - - def close(self): - self._epoll.close() - super().close() - - -if hasattr(select, 'devpoll'): - - class DevpollSelector(_BaseSelectorImpl): - """Solaris /dev/poll selector.""" - - def __init__(self): - super().__init__() - self._devpoll = select.devpoll() - - def fileno(self): - return self._devpoll.fileno() - - def register(self, fileobj, events, data=None): - key = super().register(fileobj, events, data) - poll_events = 0 - if events & EVENT_READ: - poll_events |= select.POLLIN - if events & EVENT_WRITE: - poll_events |= select.POLLOUT - self._devpoll.register(key.fd, poll_events) - return key - - def unregister(self, fileobj): - key = super().unregister(fileobj) - self._devpoll.unregister(key.fd) - return key - - def select(self, timeout=None): - if timeout is None: - timeout = None - elif timeout <= 0: - timeout = 0 - else: - # devpoll() has a resolution of 1 millisecond, round away from - # zero to wait *at least* timeout seconds. - timeout = math.ceil(timeout * 1e3) - ready = [] - try: - fd_event_list = self._devpoll.poll(timeout) - except InterruptedError: - return ready - for fd, event in fd_event_list: - events = 0 - if event & ~select.POLLIN: - events |= EVENT_WRITE - if event & ~select.POLLOUT: - events |= EVENT_READ - - key = self._key_from_fd(fd) - if key: - ready.append((key, events & key.events)) - return ready - - def close(self): - self._devpoll.close() - super().close() - - -if hasattr(select, 'kqueue'): - - class KqueueSelector(_BaseSelectorImpl): - """Kqueue-based selector.""" - - def __init__(self): - super().__init__() - self._kqueue = select.kqueue() - - def fileno(self): - return self._kqueue.fileno() - - def register(self, fileobj, events, data=None): - key = super().register(fileobj, events, data) - if events & EVENT_READ: - kev = select.kevent(key.fd, select.KQ_FILTER_READ, - select.KQ_EV_ADD) - self._kqueue.control([kev], 0, 0) - if events & EVENT_WRITE: - kev = select.kevent(key.fd, select.KQ_FILTER_WRITE, - select.KQ_EV_ADD) - self._kqueue.control([kev], 0, 0) - return key - - def unregister(self, fileobj): - key = super().unregister(fileobj) - if key.events & EVENT_READ: - kev = select.kevent(key.fd, select.KQ_FILTER_READ, - select.KQ_EV_DELETE) - try: - self._kqueue.control([kev], 0, 0) - except OSError: - # This can happen if the FD was closed since it - # was registered. - pass - if key.events & EVENT_WRITE: - kev = select.kevent(key.fd, select.KQ_FILTER_WRITE, - select.KQ_EV_DELETE) - try: - self._kqueue.control([kev], 0, 0) - except OSError: - # See comment above. - pass - return key - - def select(self, timeout=None): - timeout = None if timeout is None else max(timeout, 0) - max_ev = len(self._fd_to_key) - ready = [] - try: - kev_list = self._kqueue.control(None, max_ev, timeout) - except InterruptedError: - return ready - for kev in kev_list: - fd = kev.ident - flag = kev.filter - events = 0 - if flag == select.KQ_FILTER_READ: - events |= EVENT_READ - if flag == select.KQ_FILTER_WRITE: - events |= EVENT_WRITE - - key = self._key_from_fd(fd) - if key: - ready.append((key, events & key.events)) - return ready - - def close(self): - self._kqueue.close() - super().close() - - -# Choose the best implementation: roughly, epoll|kqueue|devpoll > poll > select. -# select() also can't accept a FD > FD_SETSIZE (usually around 1024) -if 'KqueueSelector' in globals(): - DefaultSelector = KqueueSelector -elif 'EpollSelector' in globals(): - DefaultSelector = EpollSelector -elif 'DevpollSelector' in globals(): - DefaultSelector = DevpollSelector -elif 'PollSelector' in globals(): - DefaultSelector = PollSelector -else: - DefaultSelector = SelectSelector diff --git a/gunicorn/workers/gthread.py b/gunicorn/workers/gthread.py index 789d869e..41e135f6 100644 --- a/gunicorn/workers/gthread.py +++ b/gunicorn/workers/gthread.py @@ -31,7 +31,7 @@ from .. import six try: from asyncio import selectors except ImportError: - from .. import selectors + from trollius import selectors class TConn(): From 3cda90a214c361507206a951d57f6ddce2e476d2 Mon Sep 17 00:00:00 2001 From: benoitc Date: Sun, 1 Jun 2014 09:50:40 +0200 Subject: [PATCH 22/25] reduce CPU usage. --- gunicorn/workers/gthread.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gunicorn/workers/gthread.py b/gunicorn/workers/gthread.py index 41e135f6..5f660883 100644 --- a/gunicorn/workers/gthread.py +++ b/gunicorn/workers/gthread.py @@ -153,7 +153,7 @@ class ThreadWorker(base.Worker): # notify the arbiter we are alive self.notify() - events = self.poller.select(0.01) + events = self.poller.select(0.2) for key, mask in events: callback = key.data callback(key.fileobj) From abac771c44f7cb889ff03e0dc0b08037202ea897 Mon Sep 17 00:00:00 2001 From: benoitc Date: Sun, 1 Jun 2014 20:36:48 +0200 Subject: [PATCH 23/25] fix race keepalived condition by popping/appending from left --- gunicorn/workers/gthread.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/gunicorn/workers/gthread.py b/gunicorn/workers/gthread.py index 5f660883..0f486642 100644 --- a/gunicorn/workers/gthread.py +++ b/gunicorn/workers/gthread.py @@ -120,11 +120,13 @@ class ThreadWorker(base.Worker): now = time.time() while True: try: - delta = self._keep[0].timeout - now + conn = self._keep.popleft() except IndexError: break + delta = conn.timeout - now if delta > 0: + self._keep.appendleft(conn) break else: # remove the connection from the queue From ff6169cc20fe0e97f9db7f670d2b56ce8c5573be Mon Sep 17 00:00:00 2001 From: benoitc Date: Sun, 1 Jun 2014 20:44:50 +0200 Subject: [PATCH 24/25] gthreads: only check requirements for python < 3.4 --- gunicorn/workers/gthread.py | 18 ++++++++++++++++-- setup.py | 3 --- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/gunicorn/workers/gthread.py b/gunicorn/workers/gthread.py index 0f486642..aedfdd93 100644 --- a/gunicorn/workers/gthread.py +++ b/gunicorn/workers/gthread.py @@ -11,7 +11,6 @@ # closed. from collections import deque -import concurrent.futures as futures from datetime import datetime import errno from functools import partial @@ -28,10 +27,25 @@ from .. import util from . import base from .. import six + +try: + import concurrent.futures as futures +except ImportError: + raise RuntimeError(""" + You need 'concurrent' installed to use this worker with this python + version. + """) + try: from asyncio import selectors except ImportError: - from trollius import selectors + try: + from trollius import selectors + except ImportError: + raise RuntimeError(""" + You need 'trollius' installed to use this worker with this python + version. + """) class TConn(): diff --git a/setup.py b/setup.py index 36e33853..6f5b5b69 100644 --- a/setup.py +++ b/setup.py @@ -63,9 +63,6 @@ class PyTest(Command): REQUIREMENTS = [] -if sys.version_info[0] == 2: - REQUIREMENTS.append('futures') - setup( name = 'gunicorn', From 8436389229ea881e314687c789ccc979df1fd0ff Mon Sep 17 00:00:00 2001 From: benoitc Date: Sun, 1 Jun 2014 22:30:46 +0200 Subject: [PATCH 25/25] define an object class instance. --- gunicorn/workers/gthread.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gunicorn/workers/gthread.py b/gunicorn/workers/gthread.py index aedfdd93..122d923e 100644 --- a/gunicorn/workers/gthread.py +++ b/gunicorn/workers/gthread.py @@ -48,7 +48,7 @@ except ImportError: """) -class TConn(): +class TConn(object): def __init__(self, cfg, listener, sock, addr): self.cfg = cfg