From 51f1f226651105e6dc8a7c245638612d9dafec40 Mon Sep 17 00:00:00 2001 From: "Paul J. Davis" Date: Thu, 15 Apr 2010 19:58:08 -0400 Subject: [PATCH] Refactoring worker code. Also went through gunicorn.main lightly. --- gunicorn/arbiter.py | 40 +++-- gunicorn/async/base.py | 130 --------------- gunicorn/async/eventlet_server.py | 50 ------ gunicorn/async/gevent_server.py | 44 ----- gunicorn/config.py | 13 +- gunicorn/http/request.py | 16 +- gunicorn/http/response.py | 14 ++ gunicorn/main.py | 208 ++++++++++++------------ gunicorn/util.py | 5 +- gunicorn/worker.py | 197 ---------------------- gunicorn/{async => workers}/__init__.py | 0 gunicorn/workers/async.py | 70 ++++++++ gunicorn/workers/base.py | 116 +++++++++++++ gunicorn/workers/geventlet.py | 82 ++++++++++ gunicorn/workers/ggevent.py | 36 ++++ gunicorn/workers/sync.py | 100 ++++++++++++ setup.py | 8 +- 17 files changed, 574 insertions(+), 555 deletions(-) delete mode 100644 gunicorn/async/base.py delete mode 100644 gunicorn/async/eventlet_server.py delete mode 100644 gunicorn/async/gevent_server.py delete mode 100644 gunicorn/worker.py rename gunicorn/{async => workers}/__init__.py (100%) create mode 100644 gunicorn/workers/async.py create mode 100644 gunicorn/workers/base.py create mode 100644 gunicorn/workers/geventlet.py create mode 100644 gunicorn/workers/ggevent.py create mode 100644 gunicorn/workers/sync.py diff --git a/gunicorn/arbiter.py b/gunicorn/arbiter.py index 21437607..c660ac18 100644 --- a/gunicorn/arbiter.py +++ b/gunicorn/arbiter.py @@ -18,7 +18,7 @@ import traceback from gunicorn.pidfile import Pidfile from gunicorn.sock import create_socket -from gunicorn.worker import Worker +from gunicorn.workers.sync import SyncWorker from gunicorn import util class Arbiter(object): @@ -47,21 +47,34 @@ class Arbiter(object): pidfile = Pidfile() - def __init__(self, address, num_workers, app, **kwargs): + def __init__(self, cfg, app): + self.cfg = cfg + self.app = app + + self.address = cfg.address + self.num_workers = cfg.num_workers + self.debug = cfg.debug + self.timeout = cfg.timeout + self.proc_name = cfg.proc_name + self.worker_class = cfg.worker_class + self.address = address self.num_workers = num_workers - self.worker_age = 0 self.app = app + + self._pidfile = None + self.worker_age = 0 + self.reexec_pid = 0 + self.master_name = "Master" + + self.opts = kwargs + self.debug = kwargs.get("debug", False) self.conf = kwargs.get("config", {}) self.timeout = self.conf['timeout'] - self.reexec_pid = 0 - self.debug = kwargs.get("debug", False) - self.log = logging.getLogger(__name__) - self.opts = kwargs - - self._pidfile = None - self.master_name = "Master" self.proc_name = self.conf['proc_name'] + self.worker_class = kwargs.get("workerclass", SyncWorker) + + self.log = logging.getLogger(__name__) # get current path, try to use PWD env first try: @@ -345,9 +358,6 @@ class Arbiter(object): pid, age = wpid, worker.age self.kill_worker(pid, signal.SIGQUIT) - def init_worker(self, worker_age, pid, listener, app, timeout, conf): - return Worker(worker_age, pid, listener, app, timeout, conf) - def spawn_workers(self): """\ Spawn new workers as needed. @@ -358,8 +368,8 @@ class Arbiter(object): for i in range(self.num_workers - len(self.WORKERS.keys())): self.worker_age += 1 - worker = self.init_worker(self.worker_age, self.pid, self.LISTENER, - self.app, self.timeout/2.0, self.conf) + worker = self.worker_class(self.worker_age, self.pid, self.LISTENER, + self.app, self.timeout/2.0, self.conf) self.conf.before_fork(self, worker) pid = os.fork() if pid != 0: diff --git a/gunicorn/async/base.py b/gunicorn/async/base.py deleted file mode 100644 index dce84ac8..00000000 --- a/gunicorn/async/base.py +++ /dev/null @@ -1,130 +0,0 @@ -# -*- coding: utf-8 - -# -# This file is part of gunicorn released under the MIT license. -# See the NOTICE for more information. - -import errno -import os -import select -import socket -import traceback - -from gunicorn import http -from gunicorn.http.tee import UnexpectedEOF -from gunicorn import util -from gunicorn.worker import Worker - -ALREADY_HANDLED = object() - -class KeepaliveResponse(http.Response): - - def default_headers(self): - if self.req.parser.should_close: - connection_hdr = "close" - else: - connection_hdr = "keep-alive" - - return [ - "HTTP/1.1 %s\r\n" % self.status, - "Server: %s\r\n" % self.version, - "Date: %s\r\n" % util.http_date(), - "Connection: %s\r\n" % connection_hdr - ] - -class KeepaliveRequest(http.Request): - - RESPONSE_CLASS = KeepaliveResponse - - def read(self): - ret = select.select([self.socket], [], [], self.conf.keepalive) - if not ret[0]: - return - try: - return super(KeepaliveRequest, self).read() - except socket.error, e: - if e[0] == 54: - return - raise - -class KeepaliveWorker(Worker): - - def __init__(self, *args, **kwargs): - Worker.__init__(self, *args, **kwargs) - self.nb_connections = 0 - self.worker_connections = self.conf.worker_connections - - def handle(self, client, addr): - - self.nb_connections += 1 - try: - self.init_sock(client) - while True: - req = KeepaliveRequest(client, addr, self.address, self.conf) - try: - environ = req.read() - if not environ or not req.parser.headers: - return - respiter = self.app(environ, req.start_response) - if respiter == ALREADY_HANDLED: - break - for item in respiter: - req.response.write(item) - req.response.close() - if hasattr(respiter, "close"): - respiter.close() - if req.parser.should_close: - break - except Exception, e: - #Only send back traceback in HTTP in debug mode. - if not self.debug: - raise - util.write_error(client, traceback.format_exc()) - break - except socket.error, e: - if e[0] != errno.EPIPE: - self.log.exception("Error processing request.") - else: - self.log.warn("Ignoring EPIPE") - except UnexpectedEOF: - self.log.exception("remote closed the connection unexpectedly.") - except Exception, e: - self.log.exception("Error processing request.") - try: - # Last ditch attempt to notify the client of an error. - mesg = "HTTP/1.0 500 Internal Server Error\r\n\r\n" - util.write_nonblock(client, mesg) - except: - pass - return - finally: - self.nb_connections -= 1 - util.close(client) - - def run(self): - self.init_process() - self.socket.setblocking(1) - - while self.alive: - self.notify() - - # If our parent changed then we shut down. - if self.ppid != os.getppid(): - self.log.info("Parent changed, shutting down: %s" % self) - return - - if self.nb_connections > self.worker_connections: - continue - - try: - ret = select.select([self.socket], [], [], 1) - if ret[0]: - self.accept() - except select.error, e: - if e[0] == errno.EINTR: - continue - if e[0] == errno.EBADF: - continue - raise - except KeyboardInterrupt : - return - diff --git a/gunicorn/async/eventlet_server.py b/gunicorn/async/eventlet_server.py deleted file mode 100644 index 0ce98aeb..00000000 --- a/gunicorn/async/eventlet_server.py +++ /dev/null @@ -1,50 +0,0 @@ -# -*- coding: utf-8 - -# -# This file is part of gunicorn released under the MIT license. -# See the NOTICE for more information. - -from __future__ import with_statement - -import errno - -import collections -import eventlet -from eventlet.green import os -from eventlet.green import socket -from eventlet import greenio -from eventlet.hubs import trampoline -from eventlet.timeout import Timeout - -from gunicorn import util -from gunicorn import arbiter -from gunicorn.async.base import KeepaliveWorker - - -class EventletWorker(KeepaliveWorker): - - def init_process(self): - super(EventletWorker, self).init_process() - self.pool = eventlet.GreenPool(self.worker_connections) - - def accept(self): - with Timeout(0.1, False): - try: - client, addr = self.socket.accept() - self.pool.spawn_n(self.handle, client, addr) - except socket.error, e: - if e[0] in (errno.EWOULDBLOCK, errno.EAGAIN, errno.ECONNABORTED): - return - raise - - -class EventletArbiter(arbiter.Arbiter): - - @classmethod - def setup(cls): - import eventlet - if eventlet.version_info < (0,9,7): - raise RuntimeError("You need eventlet >= 0.9.7") - eventlet.monkey_patch(all=False, socket=True, select=True, thread=True) - - def init_worker(self, worker_age, pid, listener, app, timeout, conf): - return EventletWorker(worker_age, pid, listener, app, timeout, conf) diff --git a/gunicorn/async/gevent_server.py b/gunicorn/async/gevent_server.py deleted file mode 100644 index e5cb2325..00000000 --- a/gunicorn/async/gevent_server.py +++ /dev/null @@ -1,44 +0,0 @@ -# -*- coding: utf-8 - -# -# This file is part of gunicorn released under the MIT license. -# See the NOTICE for more information. - -from __future__ import with_statement - -import errno -import os - -import gevent -from gevent import Timeout, socket -from gevent.greenlet import Greenlet -from gevent.pool import Pool - -from gunicorn import arbiter -from gunicorn import util -from gunicorn.async.base import KeepaliveWorker - -class GEventWorker(KeepaliveWorker): - - def init_process(self): - super(GEventWorker, self).init_process() - self.pool = Pool(self.worker_connections) - - def accept(self): - with Timeout(0.1, False): - try: - client, addr = self.socket.accept() - self.pool.spawn(self.handle, client, addr) - except socket.error, e: - if e[0] in (errno.EAGAIN, errno.EWOULDBLOCK, errno.ECONNABORTED): - return - raise - -class GEventArbiter(arbiter.Arbiter): - - @classmethod - def setup(cls): - from gevent import monkey - monkey.patch_all(dns=False) - - def init_worker(self, worker_age, pid, listener, app, timeout, conf): - return GEventWorker(worker_age, pid, listener, app, timeout, conf) diff --git a/gunicorn/config.py b/gunicorn/config.py index e09f8cda..49ece6d2 100644 --- a/gunicorn/config.py +++ b/gunicorn/config.py @@ -15,7 +15,6 @@ class Config(object): DEFAULT_CONFIG_FILE = 'gunicorn.conf.py' DEFAULTS = dict( - arbiter="egg:gunicorn", backlog=2048, bind='127.0.0.1:8000', daemon=False, @@ -34,6 +33,7 @@ class Config(object): user=None, workers=1, worker_connections=1000, + worker_class="egg:gunicorn#sync", after_fork=lambda server, worker: server.log.info( "Worker spawned (pid: %s)" % worker.pid), @@ -99,12 +99,9 @@ class Config(object): return self.conf.iteritems() @property - def arbiter(self): - uri = self.conf.get('arbiter', 'egg:gunicorn') - arbiter = util.parse_arbiter_uri(uri) - if hasattr(arbiter, 'setup'): - arbiter.setup() - return arbiter + def worker_class(self): + uri = self.conf.get('workertype', None) or 'egg:gunicorn#sync' + return util.load_worker_class(uri) @property def workers(self): @@ -116,7 +113,7 @@ class Config(object): if self.conf['debug'] == True: workers = 1 return workers - + @property def address(self): if not self.conf['bind']: diff --git a/gunicorn/http/request.py b/gunicorn/http/request.py index b06bbce7..74fbd508 100644 --- a/gunicorn/http/request.py +++ b/gunicorn/http/request.py @@ -3,9 +3,11 @@ # This file is part of gunicorn released under the MIT license. # See the NOTICE for more information. +import errno import logging import os import re + try: from cStringIO import StringIO except ImportError: @@ -16,7 +18,7 @@ from urllib import unquote from gunicorn import __version__ from gunicorn.http.parser import Parser -from gunicorn.http.response import Response +from gunicorn.http.response import Response, KeepAliveResponse from gunicorn.http.tee import TeeInput from gunicorn.util import CHUNK_SIZE @@ -168,3 +170,15 @@ class Request(object): self.response = self.RESPONSE_CLASS(self, status, headers) return self.response.write + +class KeepaliveRequest(http.Request): + + RESPONSE_CLASS = KeepAliveResponse + + def read(self): + try: + return super(KeepaliveRequest, self).read() + except socket.error, e: + if e[0] == errno.ECONNRESET: + return + raise diff --git a/gunicorn/http/response.py b/gunicorn/http/response.py index 1e123cb9..df5b51a7 100644 --- a/gunicorn/http/response.py +++ b/gunicorn/http/response.py @@ -57,3 +57,17 @@ class Response(object): self.send_headers() if self.chunked: write_chunk(self.socket, "") + +class KeepaliveResponse(Response): + + def default_headers(self): + connection = "keep-alive" + if self.req.parser.should_close: + connection = "close" + + return [ + "HTTP/1.1 %s\r\n" % self.status, + "Server: %s\r\n" % self.version, + "Date: %s\r\n" % http_date(), + "Connection: %s\r\n" % connection + ] diff --git a/gunicorn/main.py b/gunicorn/main.py index 7afada6d..7f4af297 100644 --- a/gunicorn/main.py +++ b/gunicorn/main.py @@ -9,6 +9,7 @@ import os import pkg_resources import sys +from gunicorn.arbiter import Arbiter from gunicorn.config import Config from gunicorn.debug import spew from gunicorn import util, __version__ @@ -32,8 +33,8 @@ def options(): help='Adress to listen on. Ex. 127.0.0.1:8000 or unix:/tmp/gunicorn.sock'), op.make_option('-w', '--workers', dest='workers', help='Number of workers to spawn. [1]'), - op.make_option('-a', '--arbiter', dest='arbiter', - help="gunicorn arbiter entry point or module "+ + op.make_option('-k', '--worker-class', dest='klass', + help="The type of request processing to use "+ "[egg:gunicorn#main]"), op.make_option('-p','--pid', dest='pidfile', help='set the background PID FILE'), @@ -57,135 +58,88 @@ def options(): default=False, help="Install a trace hook") ] -def configure_logging(opts): - """ - Set level of logging, and choose where to display/save logs (file or standard output). - """ - handlers = [] - if opts['logfile'] != "-": - handlers.append(logging.FileHandler(opts['logfile'])) - else: - handlers.append(logging.StreamHandler()) - - loglevel = LOG_LEVELS.get(opts['loglevel'].lower(), logging.INFO) - - logger = logging.getLogger('gunicorn') - logger.setLevel(loglevel) - format = r"%(asctime)s [%(process)d] [%(levelname)s] %(message)s" - datefmt = r"%Y-%m-%d %H:%M:%S" - for h in handlers: - h.setFormatter(logging.Formatter(format, datefmt)) - logger.addHandler(h) - -def daemonize(): - """ if daemon option is set, this function will daemonize the master. - It's based on this activestate recipe : - http://code.activestate.com/recipes/278731/ - """ - if not 'GUNICORN_FD' in os.environ: - if os.fork() == 0: - os.setsid() - if os.fork() != 0: - os.umask(0) - else: - os._exit(0) - else: - os._exit(0) - - maxfd = util.get_maxfd() - - # Iterate through and close all file descriptors. - for fd in range(0, maxfd): - try: - os.close(fd) - except OSError: # ERROR, fd wasn't open to begin with (ignored) - pass - - os.open(util.REDIRECT_TO, os.O_RDWR) - os.dup2(0, 1) - os.dup2(0, 2) - - - def main(usage, get_app): - """ function used by different runners to setup options - ans launch the arbiter. """ - - parser = op.OptionParser(usage=usage, option_list=options(), - version="%prog " + __version__) + """\ + Used by the various runners to setup options and + launch the arbiter. + """ + vrs = "%prog " + __version__ + parser = op.OptionParser(usage=usage, option_list=options(), version=vrs) opts, args = parser.parse_args() app = get_app(parser, opts, args) - conf = Config(opts.__dict__, opts.config) - if conf['spew']: + cfg = Config(opts.__dict__, opts.config) + if cfg.spew: spew() - arbiter = conf.arbiter(conf.address, conf.workers, app, config=conf, - debug=conf['debug'], pidfile=conf['pidfile']) - if conf['daemon']: + if cfg.daemon: daemonize() else: os.setpgrp() - configure_logging(conf) - arbiter.run() + configure_logging(cfg) + + Arbiter(cfg, app).run() -def paste_server(app, global_conf=None, host="127.0.0.1", port=None, - *args, **kwargs): - """ Paster server entrypoint to add to your paster ini file: +def paste_server(app, gcfg=None, host="127.0.0.1", port=None, *args, **kwargs): + """\ + A paster server. - [server:main] - use = egg:gunicorn#main - host = 127.0.0.1 - port = 5000 + Then entry point in your paster ini file should looks like this: + + [server:main] + use = egg:gunicorn#main + host = 127.0.0.1 + port = 5000 """ - options = kwargs.copy() + opts = kwargs.copy() if port and not host.startswith("unix:"): bind = "%s:%s" % (host, port) else: bind = host - options['bind'] = bind + opts['bind'] = bind - if global_conf: - for key, value in list(global_conf.items()): + if gcfg: + for key, value in list(gcfg.items()): if value and value is not None: if key == "debug": value = (value == "true") - options[key] = value - options['default_proc_name'] = options['__file__'] + opts[key] = value + opts['default_proc_name'] = opts['__file__'] - conf = Config(options) - if conf['spew']: + cfg = Config(opts) + if cfg.spew: spew() - arbiter = conf.arbiter(conf.address, conf.workers, app, debug=conf["debug"], - pidfile=conf["pidfile"], config=conf) - if conf["daemon"] : + if cfg.daemon: daemonize() else: os.setpgrp() - configure_logging(conf) - arbiter.run() + configure_logging(cfg) + + Arbiter(cfg, app).run() def run(): - """ main runner used for gunicorn command to launch generic wsgi application """ - + """\ + The ``gunicorn`` command line runner for launcing Gunicorn with + generic WSGI applications. + """ sys.path.insert(0, os.getcwd()) def get_app(parser, opts, args): if len(args) != 1: parser.error("No application module specified.") - - opts.default_proc_name = args[0] - try: return util.import_app(args[0]) - except: - parser.error("Failed to import application module.") + except Exception, e: + parser.error("Failed to import application module:\n %s" % e) main("%prog [OPTIONS] APP_MODULE", get_app) def run_django(): - """ django runner for gunicorn_django command used to launch django applications """ + """\ + The ``gunicorn_django`` command line runner for launching Django + applications. + """ def settings_notfound(path): error = "Settings file '%s' not found in current folder.\n" % path @@ -228,30 +182,30 @@ def run_django(): main("%prog [OPTIONS] [SETTINGS_PATH]", get_app) def run_paster(): - """ runner used for gunicorn_paster command to launch paster compatible applications - (pylons, turbogears2, ...) """ + """\ + The ``gunicorn_paster`` command for launcing Paster compatible + apllications like Pylons or Turbogears2 + """ from paste.deploy import loadapp, loadwsgi def get_app(parser, opts, args): if len(args) != 1: parser.error("No application name specified.") - config_file = os.path.abspath(os.path.normpath( - os.path.join(os.getcwd(), args[0]))) - - if not os.path.exists(config_file): + cfgfname = os.path.normpath(os.path.join(os.getcwd(), args[0])) + cfgfname = os.path.abspath(cfgfname) + if not os.path.exists(cfgfname): parser.error("Config file not found.") - config_url = 'config:%s' % config_file - relative_to = os.path.dirname(config_file) + cfgurl = 'config:%s' % cfgfname + relpath = os.path.dirname(cfgfname) # load module in sys path sys.path.insert(0, relative_to) # add to eggs pkg_resources.working_set.add_entry(relative_to) - ctx = loadwsgi.loadcontext(loadwsgi.SERVER, config_url, - relative_to=relative_to) + ctx = loadwsgi.loadcontext(loadwsgi.SERVER, cfgurl, relative_to=relpath) if not opts.workers: opts.workers = ctx.local_conf.get('workers', 1) @@ -284,4 +238,52 @@ def run_paster(): return app main("%prog [OPTIONS] pasteconfig.ini", get_app) - + +def daemonize(): + """\ + Standard daemonization of a process. Code is basd on the + ActiveState recipe at: + http://code.activestate.com/recipes/278731/ + """ + if not 'GUNICORN_FD' in os.environ: + if os.fork() == 0: + os.setsid() + if os.fork() != 0: + os.umask(0) + else: + os._exit(0) + else: + os._exit(0) + + maxfd = util.get_maxfd() + + # Iterate through and close all file descriptors. + for fd in range(0, maxfd): + try: + os.close(fd) + except OSError: # ERROR, fd wasn't open to begin with (ignored) + pass + + os.open(util.REDIRECT_TO, os.O_RDWR) + os.dup2(0, 1) + os.dup2(0, 2) + +def configure_logging(opts): + """\ + Set the log level and choose the destination for log output. + """ + handlers = [] + if opts['logfile'] != "-": + handlers.append(logging.FileHandler(opts['logfile'])) + else: + handlers.append(logging.StreamHandler()) + + loglevel = LOG_LEVELS.get(opts['loglevel'].lower(), logging.INFO) + + logger = logging.getLogger('gunicorn') + logger.setLevel(loglevel) + format = r"%(asctime)s [%(process)d] [%(levelname)s] %(message)s" + datefmt = r"%Y-%m-%d %H:%M:%S" + for h in handlers: + h.setFormatter(logging.Formatter(format, datefmt)) + logger.addHandler(h) diff --git a/gunicorn/util.py b/gunicorn/util.py index a066ffc3..7f7acdf3 100644 --- a/gunicorn/util.py +++ b/gunicorn/util.py @@ -43,7 +43,7 @@ except ImportError: def _setproctitle(title): return -def parse_arbiter_uri(uri): +def load_worker_class(uri): if uri.startswith("egg:"): # uses entry points entry_str = uri.split("egg:")[1] @@ -53,8 +53,7 @@ def parse_arbiter_uri(uri): dist = entry_str name = "main" - return pkg_resources.load_entry_point(dist, "gunicorn.arbiter", - name) + return pkg_resources.load_entry_point(dist, "gunicorn.arbiter", name) else: components = uri.split('.') if len(components) == 1: diff --git a/gunicorn/worker.py b/gunicorn/worker.py deleted file mode 100644 index 9bafff44..00000000 --- a/gunicorn/worker.py +++ /dev/null @@ -1,197 +0,0 @@ -# -*- coding: utf-8 - -# -# This file is part of gunicorn released under the MIT license. -# See the NOTICE for more information. - - -import errno -import logging -import os -import select -import signal -import socket -import sys -import tempfile -import traceback - -from gunicorn import http -from gunicorn import util -from gunicorn.http.tee import UnexpectedEOF - -class Worker(object): - - SIGNALS = map( - lambda x: getattr(signal, "SIG%s" % x), - "HUP QUIT INT TERM USR1 USR2 WINCH CHLD".split() - ) - - PIPE = [] - - def __init__(self, age, ppid, socket, app, timeout, conf): - self.nr = 0 - self.age = age - self.ppid = ppid - self.debug = conf['debug'] - self.conf = conf - self.socket = socket - self.timeout = timeout - self.fd, self.tmpname = tempfile.mkstemp(prefix="wgunicorn-") - util.chown(self.tmpname, conf.uid, conf.gid) - self.tmp = os.fdopen(self.fd, "r+b") - self.app = app - self.alive = True - self.log = logging.getLogger(__name__) - self.spinner = 0 - self.address = self.socket.getsockname() - - def __str__(self): - return "" % os.getpid() - - @property - def pid(self): - return os.getpid() - - def init_signals(self): - map(lambda s: signal.signal(s, signal.SIG_DFL), self.SIGNALS) - signal.signal(signal.SIGQUIT, self.handle_quit) - signal.signal(signal.SIGUSR1, self.handle_usr1) - signal.signal(signal.SIGTERM, self.handle_exit) - signal.signal(signal.SIGINT, self.handle_exit) - signal.signal(signal.SIGWINCH, self.handle_winch) - - def handle_usr1(self, sig, frame): - self.nr = -65536; - try: - map(lambda p: p.close(), self.PIPE) - except: - pass - - def handle_quit(self, sig, frame): - self.alive = False - - def handle_exit(self, sig, frame): - sys.exit(0) - - def handle_winch(self, sig, fname): - # Ignore SIGWINCH in worker. Fix crash on OpenBSD - return - - def notify(self): - """\ - Notify our parent process that we're still alive. - """ - self.spinner = (self.spinner+1) % 2 - if getattr(os, 'fchmod', None): - os.fchmod(self.tmp.fileno(), self.spinner) - else: - os.chmod(self.tmpname, self.spinner) - - def init_process(self): - util.set_owner_process(self.conf.uid, self.conf.gid) - - # init pipe - self.PIPE = os.pipe() - map(util.set_non_blocking, self.PIPE) - map(util.close_on_exec, self.PIPE) - - # prevent inherientence - util.close_on_exec(self.socket) - util.close_on_exec(self.fd) - self.init_signals() - - def accept(self): - try: - client, addr = self.socket.accept() - self.init_sock(client) - self.handle(client, addr) - self.nr += 1 - except socket.error, e: - if e[0] not in (errno.EAGAIN, errno.ECONNABORTED): - raise - - def init_sock(self, sock): - sock.setblocking(1) - util.close_on_exec(sock) - - def run(self): - self.init_process() - self.nr = 0 - - # self.socket appears to lose its blocking status after - # we fork in the arbiter. Reset it here. - self.socket.setblocking(0) - - while self.alive: - self.nr = 0 - self.notify() - - # accept a new connection - self.accept() - - # Keep processing clients until no one is waiting. - # This prevents the need to select() for every - # client that we process. - if self.nr > 0: - continue - - # If our parent changed then we shut down. - if self.ppid != os.getppid(): - self.log.info("Parent changed, shutting down: %s" % self) - return - - try: - self.notify() - ret = select.select([self.socket], [], self.PIPE, self.timeout) - if ret[0]: - continue - except select.error, e: - if e[0] == errno.EINTR: - continue - if e[0] == errno.EBADF: - if self.nr < 0: - continue - else: - return - raise - - def handle(self, client, addr): - try: - req = http.Request(client, addr, self.address, self.conf) - - try: - environ = req.read() - if not environ or not req.parser.status_line: - return - - respiter = self.app(environ, req.start_response) - for item in respiter: - req.response.write(item) - req.response.close() - if hasattr(respiter, "close"): - respiter.close() - except socket.error: - raise - except Exception, e: - # Only send back traceback in HTTP in debug mode. - if not self.debug: - raise - util.write_error(client, traceback.format_exc()) - return - - except socket.error, e: - if e[0] != errno.EPIPE: - self.log.exception("Error processing request.") - else: - self.log.warn("Ignoring EPIPE") - except UnexpectedEOF: - self.log.exception("remote closed the connection unexpectedly.") - except Exception, e: - self.log.exception("Error processing request.") - try: - # Last ditch attempt to notify the client of an error. - mesg = "HTTP/1.0 500 Internal Server Error\r\n\r\n" - util.write_nonblock(client, mesg) - except: - pass - finally: - util.close(client) diff --git a/gunicorn/async/__init__.py b/gunicorn/workers/__init__.py similarity index 100% rename from gunicorn/async/__init__.py rename to gunicorn/workers/__init__.py diff --git a/gunicorn/workers/async.py b/gunicorn/workers/async.py new file mode 100644 index 00000000..1820b83a --- /dev/null +++ b/gunicorn/workers/async.py @@ -0,0 +1,70 @@ +# -*- coding: utf-8 - +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +import errno +import os +import select +import socket +import traceback + +from gunicorn import http +from gunicorn.http.tee import UnexpectedEOF +from gunicorn import util +from gunicorn.workers.base import Worker + +ALREADY_HANDLED = object() + +class AsyncWorker(Worker): + + def __init__(self, *args, **kwargs): + Worker.__init__(self, *args, **kwargs) + self.worker_connections = self.conf.worker_connections + + def handle(self, client, addr): + try: + while self.handle_request(client, addr): + pass + except socket.error, e: + if e[0] != errno.EPIPE: + self.log.exception("Error processing request.") + else: + self.log.warn("Ignoring EPIPE") + except UnexpectedEOF: + self.log.exception("Client closed the connection unexpectedly.") + except Exception, e: + self.log.exception("Error processing request.") + try: + # Last ditch attempt to notify the client of an error. + mesg = "HTTP/1.0 500 Internal Server Error\r\n\r\n" + util.write_nonblock(client, mesg) + except: + pass + return + finally: + util.close(client) + + def handle_request(self, client, addr): + req = KeepaliveRequest(client, addr, self.address, self.conf) + try: + environ = req.read() + if not environ or not req.parser.headers: + return False + respiter = self.app(environ, req.start_response) + if respiter == ALREADY_HANDLED: + return False + for item in respiter: + req.response.write(item) + req.response.close() + if hasattr(respiter, "close"): + respiter.close() + if req.parser.should_close: + return False + except Exception, e: + #Only send back traceback in HTTP in debug mode. + if not self.debug: + raise + util.write_error(client, traceback.format_exc()) + return False + return True diff --git a/gunicorn/workers/base.py b/gunicorn/workers/base.py new file mode 100644 index 00000000..be0774f3 --- /dev/null +++ b/gunicorn/workers/base.py @@ -0,0 +1,116 @@ +# -*- coding: utf-8 - +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + + +import errno +import logging +import os +import select +import signal +import socket +import sys +import tempfile +import traceback + +from gunicorn import util + +class Worker(object): + + SIGNALS = map( + lambda x: getattr(signal, "SIG%s" % x), + "HUP QUIT INT TERM USR1 USR2 WINCH CHLD".split() + ) + + PIPE = [] + + def __init__(self, age, ppid, socket, app, timeout, conf): + """\ + This is called pre-fork so it shouldn't do anything to the + current process. If there's a need to make process wide + changes you'll want to do that in ``self.init_process()``. + """ + self.age = age + self.ppid = ppid + self.socket = socket + self.app = app + self.timeout = timeout + self.conf = conf + + self.nr = 0 + self.alive = True + self.spinner = 0 + self.log = logging.getLogger(__name__) + self.debug = conf.get('debug', False) + self.address = self.socket.getsockname() + + self.fd, self.tmpname = tempfile.mkstemp(prefix="wgunicorn-") + util.chown(self.tmpname, conf.uid, conf.gid) + self.tmp = os.fdopen(self.fd, "r+b") + + def __str__(self): + return "" % self.pid + + @property + def pid(self): + return os.getpid() + + def notify(self): + """\ + Your worker subclass must arrange to have this method called + once every ``self.timeout`` seconds. If you fail in accomplishing + this task, the master process will murder your workers. + """ + self.spinner = (self.spinner+1) % 2 + if getattr(os, 'fchmod', None): + os.fchmod(self.tmp.fileno(), self.spinner) + else: + os.chmod(self.tmpname, self.spinner) + + def run(self): + """\ + This is the mainloop of a worker process. You should override + this method in a subclass to provide the intended behaviour + for your particular evil schemes. + """ + raise NotImplementedError() + + def init_process(self): + """\ + If you override this method in a subclass, the last statement + in the function should be to call this method with + super(MyWorkerClass, self).init_process() so that the ``run()`` + loop is initiated. + """ + util.set_owner_process(self.conf.uid, self.conf.gid) + + # For waking ourselves up + self.PIPE = os.pipe() + map(util.set_non_blocking, self.PIPE) + map(util.close_on_exec, self.PIPE) + + # Prevent fd inherientence + util.close_on_exec(self.socket) + util.close_on_exec(self.fd) + self.init_signals() + + # Enter main run loop + self.run() + + def init_signals(self): + map(lambda s: signal.signal(s, signal.SIG_DFL), self.SIGNALS) + signal.signal(signal.SIGQUIT, self.handle_quit) + signal.signal(signal.SIGTERM, self.handle_exit) + signal.signal(signal.SIGINT, self.handle_exit) + signal.signal(signal.SIGWINCH, self.handle_winch) + + def handle_quit(self, sig, frame): + self.alive = False + + def handle_exit(self, sig, frame): + sys.exit(0) + + def handle_winch(self, sig, fname): + # Ignore SIGWINCH in worker. Fixes a crash on OpenBSD. + return diff --git a/gunicorn/workers/geventlet.py b/gunicorn/workers/geventlet.py new file mode 100644 index 00000000..01b2d856 --- /dev/null +++ b/gunicorn/workers/geventlet.py @@ -0,0 +1,82 @@ +# -*- coding: utf-8 - +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +from __future__ import with_statement + +import collections +import errno +import traceback + +import eventlet +import eventlet.debug + +from eventlet.green import os +from eventlet.green import socket +from eventlet import greenio +from eventlet import greenlet +from eventlet import greenpool +from eventlet import greenthread + +from gunicorn import util +from gunicorn import arbiter +from gunicorn.workers.async import AsyncWorker, ALREADY_HANDLED +from gunicorn.http.tee import UnexpectedEOF + +eventlet.debug.hub_exceptions(True) + +class EventletWorker(AsyncWorker): + + def __init__(self, *args, **kwargs): + super(EventletWorker, self).__init__(*args, **kwargs) + if eventlet.version_info < (0,9,7): + raise RuntimeError("You need eventlet >= 0.9.7") + + def init_process(self): + eventlet.monkey_patch(all=False, socket=True, select=True) + self.socket = greenio.GreenSocket(self.socket) + super(EventletWorker, self).init_process() + + def run(self): + self.init_process() + self.socket.setblocking(1) + + pool = greenpool.GreenPool(self.worker_connections) + acceptor = greenthread.spawn(self.acceptor, pool) + + while True: + self.notify() + + if self.ppid != os.getppid(): + self.log.info("Parent changed, shutting down: %s" % self) + greenthread.kill(acceptor, eventlet.StopServe) + break + + eventlet.sleep(0.1) + + with evenlet.Timeout(self.timeout, False): + pool.waitall() + + def acceptor(self, pool): + server_gt = greenthread.getcurrent() + while True: + try: + conn, addr = self.socket.accept() + gt = pool.spawn(self.handle, conn, addr) + gt.link(self.cleanup, conn) + conn, addr, gt = None, None, None + except eventlet.StopServe: + return + + def cleanup(self, thread, conn): + try: + try: + thread.wait() + finally: + conn.close() + except greenlet.GreenletExit: + pass + except Exception: + self.log.exception("Unhandled exception in worker.") + diff --git a/gunicorn/workers/ggevent.py b/gunicorn/workers/ggevent.py new file mode 100644 index 00000000..b0eaa6d3 --- /dev/null +++ b/gunicorn/workers/ggevent.py @@ -0,0 +1,36 @@ +# -*- coding: utf-8 - +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +from __future__ import with_statement + +import errno +import os + +import gevent +from gevent import monkey +from gevent import socket +from gevent.greenlet import Greenlet +from gevent.pool import Pool + +from gunicorn import util +from gunicorn.workers.async import AsyncWorker + +class GEventWorker(KeepaliveWorker): + + def init_process(self): + monkey.patch_all() + super(GEventWorker, self).init_process() + + def run(self): + raise NotImplementedError() + + def accept(self): + try: + client, addr = self.socket.accept() + self.pool.spawn(self.handle, client, addr) + except socket.error, e: + if e[0] in (errno.EAGAIN, errno.EWOULDBLOCK, errno.ECONNABORTED): + return + raise diff --git a/gunicorn/workers/sync.py b/gunicorn/workers/sync.py new file mode 100644 index 00000000..b996637d --- /dev/null +++ b/gunicorn/workers/sync.py @@ -0,0 +1,100 @@ + +from gunicorn import http +from gunicorn.http.tee import UnexpectedEOF +from gunicorn.workers.base import Worker + +class SyncWorker(Worker): + + def run(self): + self.init_process() + self.nr = 0 + + # self.socket appears to lose its blocking status after + # we fork in the arbiter. Reset it here. + self.socket.setblocking(0) + + while self.alive: + self.nr = 0 + self.notify() + + # Accept a connection. If we get an error telling us + # that no connection is waiting we fall down to the + # select which is where we'll wait for a bit for new + # workers to come give us some love. + try: + client, addr = self.socket.accept() + client.setblocking(1) + util.close_on_exec(sock) + self.handle(client, addr) + self.nr += 1 + except socket.error, e: + if e[0] not in (errno.EAGAIN, errno.ECONNABORTED): + raise + + # Keep processing clients until no one is waiting. This + # prevents the need to select() for every client that we + # process. + if self.nr > 0: + continue + + # If our parent changed then we shut down. + if self.ppid != os.getppid(): + self.log.info("Parent changed, shutting down: %s" % self) + return + + try: + self.notify() + ret = select.select([self.socket], [], self.PIPE, self.timeout) + if ret[0]: + continue + except select.error, e: + if e[0] == errno.EINTR: + continue + if e[0] == errno.EBADF: + if self.nr < 0: + continue + else: + return + raise + + def handle(self, client, addr): + try: + self.handle_request(client, addr) + except socket.error, e: + if e[0] != errno.EPIPE: + self.log.exception("Error processing request.") + else: + self.log.warn("Ignoring EPIPE") + except UnexpectedEOF: + self.log.exception("Client closed the connection unexpectedly.") + except Exception, e: + self.log.exception("Error processing request.") + try: + # Last ditch attempt to notify the client of an error. + mesg = "HTTP/1.0 500 Internal Server Error\r\n\r\n" + util.write_nonblock(client, mesg) + except: + pass + finally: + util.close(client) + + def handle_request(self, client, addr): + req = http.Request(client, addr, self.address, self.conf) + try: + environ = req.read() + if not environ or not req.parser.status_line: + return + respiter = self.app(environ, req.start_response) + for item in respiter: + req.response.write(item) + req.response.close() + if hasattr(respiter, "close"): + respiter.close() + except socket.error: + raise + except Exception, e: + # Only send back traceback in HTTP in debug mode. + if not self.debug: + raise + util.write_error(client, traceback.format_exc()) + return diff --git a/setup.py b/setup.py index bcec80e3..015c814b 100644 --- a/setup.py +++ b/setup.py @@ -48,10 +48,10 @@ setup( gunicorn_django=gunicorn.main:run_django gunicorn_paster=gunicorn.main:run_paster - [gunicorn.arbiter] - main=gunicorn.arbiter:Arbiter - eventlet=gunicorn.async.eventlet_server:EventletArbiter - gevent=gunicorn.async.gevent_server:GEventArbiter + [gunicorn.worker] + sync=gunicorn.workers.sync:SyncWorker + eventlet=gunicorn.workers.geventlet:EventletWorker + gevent=gunicorn.workers.ggevent:GEventWorker [paste.server_runner] main=gunicorn.main:paste_server