diff --git a/examples/test.py b/examples/test.py index 610cf5b1..77bef952 100644 --- a/examples/test.py +++ b/examples/test.py @@ -16,7 +16,7 @@ def app(environ, start_response): """Simplest possible application object""" errors = environ['wsgi.errors'] - pprint.pprint(('ENVIRON', environ), stream=errors) +# pprint.pprint(('ENVIRON', environ), stream=errors) data = b'Hello, World!\n' status = '200 OK' diff --git a/gunicorn/arbiter.py b/gunicorn/arbiter.py index 77002653..a05750ea 100644 --- a/gunicorn/arbiter.py +++ b/gunicorn/arbiter.py @@ -131,8 +131,7 @@ class Arbiter(object): listeners_str = ",".join([str(l) for l in self.LISTENERS]) self.log.debug("Arbiter booted") self.log.info("Listening at: %s (%s)", listeners_str, self.pid) - self.log.info("Using worker: %s", - self.cfg.settings['worker_class'].get()) + self.log.info("Using worker: %s", self.cfg.worker_class_str) self.cfg.when_ready(self) diff --git a/gunicorn/config.py b/gunicorn/config.py index 17fef369..5a165724 100644 --- a/gunicorn/config.py +++ b/gunicorn/config.py @@ -83,14 +83,34 @@ class Config(object): return parser + @property + def worker_class_str(self): + uri = self.settings['worker_class'].get() + + ## are we using a threaded worker? + is_sync = uri.endswith('SyncWorker') or uri == 'sync' + if is_sync and self.threads > 1: + return "threads" + return uri + @property def worker_class(self): uri = self.settings['worker_class'].get() + + ## are we using a threaded worker? + is_sync = uri.endswith('SyncWorker') or uri == 'sync' + if is_sync and self.threads > 1: + uri = "gunicorn.workers.gthread.ThreadWorker" + worker_class = util.load_class(uri) if hasattr(worker_class, "setup"): worker_class.setup() return worker_class + @property + def threads(self): + return self.settings['threads'].get() + @property def workers(self): return self.settings['workers'].get() @@ -538,6 +558,26 @@ class WorkerClass(Setting): can also load the gevent class with ``egg:gunicorn#gevent`` """ +class WorkerThreads(Setting): + name = "threads" + section = "Worker Processes" + cli = ["--threads"] + meta = "INT" + validator = validate_pos_int + type = int + default = 1 + desc = """\ + The number of worker threads for handling requests. + + Run each worker with the specified number of threads. + + A positive integer generally in the 2-4 x $(NUM_CORES) range. You'll + want to vary this a bit to find the best for your particular + application's work load. + + If it is not defined, the default is 1. + """ + class WorkerConnections(Setting): name = "worker_connections" diff --git a/gunicorn/util.py b/gunicorn/util.py index de0ef613..9f6b79c9 100644 --- a/gunicorn/util.py +++ b/gunicorn/util.py @@ -269,7 +269,6 @@ def set_non_blocking(fd): flags = fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK fcntl.fcntl(fd, fcntl.F_SETFL, flags) - def close(sock): try: sock.close() diff --git a/gunicorn/workers/gthread.py b/gunicorn/workers/gthread.py new file mode 100644 index 00000000..122d923e --- /dev/null +++ b/gunicorn/workers/gthread.py @@ -0,0 +1,345 @@ +# -*- coding: utf-8 - +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +# design: +# a threaded worker accepts connections in the main loop, accepted +# connections are are added to the thread pool as a connection job. On +# keepalive connections are put back in the loop waiting for an event. +# If no event happen after the keep alive timeout, the connectoin is +# closed. + +from collections import deque +from datetime import datetime +import errno +from functools import partial +import os +import operator +import socket +import ssl +import sys +import time + +from .. import http +from ..http import wsgi +from .. import util +from . import base +from .. import six + + +try: + import concurrent.futures as futures +except ImportError: + raise RuntimeError(""" + You need 'concurrent' installed to use this worker with this python + version. + """) + +try: + from asyncio import selectors +except ImportError: + try: + from trollius import selectors + except ImportError: + raise RuntimeError(""" + You need 'trollius' installed to use this worker with this python + version. + """) + + +class TConn(object): + + def __init__(self, cfg, listener, sock, addr): + self.cfg = cfg + self.listener = listener + self.sock = sock + self.addr = addr + + self.timeout = None + self.parser = None + + # set the socket to non blocking + self.sock.setblocking(False) + + def init(self): + self.sock.setblocking(True) + if self.parser is None: + # wrap the socket if needed + if self.cfg.is_ssl: + self.sock = ssl.wrap_socket(client, server_side=True, + **self.cfg.ssl_options) + + + # initialize the parser + self.parser = http.RequestParser(self.cfg, self.sock) + return True + return False + + def set_timeout(self): + # set the timeout + self.timeout = time.time() + self.cfg.keepalive + + def __lt__(self, other): + return self.timeout < other.timeout + + __cmp__ = __lt__ + + +class ThreadWorker(base.Worker): + + def __init__(self, *args, **kwargs): + super(ThreadWorker, self).__init__(*args, **kwargs) + self.worker_connections = self.cfg.worker_connections + + # initialise the pool + self.tpool = None + self.poller = None + self.futures = deque() + self._keep = deque() + + def _wrap_future(self, fs, conn): + fs.conn = conn + self.futures.append(fs) + fs.add_done_callback(self.finish_request) + + def init_process(self): + self.tpool = futures.ThreadPoolExecutor(max_workers=self.cfg.threads) + self.poller = selectors.DefaultSelector() + super(ThreadWorker, self).init_process() + + def accept(self, listener): + try: + client, addr = listener.accept() + conn = TConn(self.cfg, listener, client, addr) + + # wait for the read event to handle the connection + self.poller.register(client, selectors.EVENT_READ, + partial(self.handle_client, conn)) + + except socket.error as e: + if e.args[0] not in (errno.EAGAIN, + errno.ECONNABORTED, errno.EWOULDBLOCK): + raise + + def handle_client(self, conn, client): + # unregister the client from the poller + self.poller.unregister(client) + + # submit the connection to a worker + fs = self.tpool.submit(self.handle, conn) + self._wrap_future(fs, conn) + + def murder_keepalived(self): + now = time.time() + while True: + try: + conn = self._keep.popleft() + except IndexError: + break + + delta = conn.timeout - now + if delta > 0: + self._keep.appendleft(conn) + break + else: + # remove the connection from the queue + conn = self._keep.popleft() + + # remove the socket from the poller + self.poller.unregister(conn.sock) + + # close the socket + util.close(conn.sock) + + def run(self): + # init listeners, add them to the event loop + for s in self.sockets: + s.setblocking(False) + self.poller.register(s, selectors.EVENT_READ, self.accept) + + timeout = self.cfg.timeout or 0.5 + + while self.alive: + # If our parent changed then we shut down. + if self.ppid != os.getppid(): + self.log.info("Parent changed, shutting down: %s", self) + return + + # notify the arbiter we are alive + self.notify() + + events = self.poller.select(0.2) + for key, mask in events: + callback = key.data + callback(key.fileobj) + + # hanle keepalive timeouts + self.murder_keepalived() + + # if we more connections than the max number of connections + # accepted on a worker, wait until some complete or exit. + if len(self.futures) >= self.worker_connections: + res = futures.wait(self.futures, timeout=timeout) + if not res: + self.log.info("max requests achieved") + break + + # shutdown the pool + self.poller.close() + self.tpool.shutdown(False) + + # wait for the workers + futures.wait(self.futures, timeout=self.cfg.graceful_timeout) + + # if we have still fures running, try to close them + while True: + try: + fs = self.futures.popleft() + except IndexError: + break + + sock = fs.conn.sock + + # the future is not running, cancel it + if not fs.done() and not fs.running(): + fs.cancel() + + # make sure we close the sockets after the graceful timeout + util.close(sock) + + def finish_request(self, fs): + try: + (keepalive, conn) = fs.result() + # if the connection should be kept alived add it + # to the eventloop and record it + if keepalive: + # flag the socket as non blocked + conn.sock.setblocking(False) + + # register the connection + conn.set_timeout() + self._keep.append(conn) + + # add the socket to the event loop + self.poller.register(conn.sock, selectors.EVENT_READ, + partial(self.handle_client, conn)) + else: + util.close(conn.sock) + except: + # an exception happened, make sure to close the + # socket. + util.close(fs.conn.sock) + finally: + # remove the future from our list + try: + self.futures.remove(fs) + except ValueError: + pass + + def handle(self, conn): + if not conn.init(): + # connection kept alive + try: + self._keep.remove(conn) + except ValueError: + pass + + keepalive = False + req = None + try: + req = six.next(conn.parser) + if not req: + return (False, conn) + + # handle the request + keepalive = self.handle_request(req, conn) + if keepalive: + return (keepalive, conn) + except http.errors.NoMoreData as e: + self.log.debug("Ignored premature client disconnection. %s", e) + + except StopIteration as e: + self.log.debug("Closing connection. %s", e) + except ssl.SSLError as e: + if e.args[0] == ssl.SSL_ERROR_EOF: + self.log.debug("ssl connection closed") + conn.sock.close() + else: + self.log.debug("Error processing SSL request.") + self.handle_error(req, conn.sock, conn.addr, e) + + except socket.error as e: + if e.args[0] not in (errno.EPIPE, errno.ECONNRESET): + self.log.exception("Socket error processing request.") + else: + if e.args[0] == errno.ECONNRESET: + self.log.debug("Ignoring connection reset") + else: + self.log.debug("Ignoring connection epipe") + except Exception as e: + self.handle_error(req, conn.sock, conn.addr, e) + + return (False, conn) + + def handle_request(self, req, conn): + environ = {} + resp = None + try: + self.cfg.pre_request(self, req) + request_start = datetime.now() + resp, environ = wsgi.create(req, conn.sock, conn.addr, + conn.listener.getsockname(), self.cfg) + environ["wsgi.multithread"] = True + + self.nr += 1 + + if self.alive and self.nr >= self.max_requests: + self.log.info("Autorestarting worker after current request.") + resp.force_close() + self.alive = False + + if not self.cfg.keepalive: + resp.force_close() + + respiter = self.wsgi(environ, resp.start_response) + try: + if isinstance(respiter, environ['wsgi.file_wrapper']): + resp.write_file(respiter) + else: + for item in respiter: + resp.write(item) + + resp.close() + request_time = datetime.now() - request_start + self.log.access(resp, req, environ, request_time) + finally: + if hasattr(respiter, "close"): + respiter.close() + + if resp.should_close(): + self.log.debug("Closing connection.") + return False + except socket.error: + exc_info = sys.exc_info() + # pass to next try-except level + six.reraise(exc_info[0], exc_info[1], exc_info[2]) + except Exception: + if resp and resp.headers_sent: + # If the requests have already been sent, we should close the + # connection to indicate the error. + self.log.exception("Error handling request") + try: + conn.sock.shutdown(socket.SHUT_RDWR) + conn.sock.close() + except socket.error: + pass + raise StopIteration() + raise + finally: + try: + self.cfg.post_request(self, req, environ, resp) + except Exception: + self.log.exception("Exception in post_request hook") + + return True diff --git a/setup.py b/setup.py index 6b6e474e..be54106f 100644 --- a/setup.py +++ b/setup.py @@ -10,6 +10,7 @@ import sys from gunicorn import __version__ + CLASSIFIERS = [ 'Development Status :: 4 - Beta', 'Environment :: Other Environment', @@ -62,6 +63,8 @@ class PyTest(Command): raise SystemExit(errno) +REQUIREMENTS = [] + setup( name = 'gunicorn', version = __version__, @@ -81,6 +84,8 @@ setup( tests_require = tests_require, cmdclass = {'test': PyTest}, + install_requires = REQUIREMENTS, + entry_points=""" [console_scripts]