Refactoring worker code.

Also went through gunicorn.main lightly.
This commit is contained in:
Paul J. Davis 2010-04-15 19:58:08 -04:00
parent 13b847e5ce
commit 51f1f22665
17 changed files with 574 additions and 555 deletions

View File

@ -18,7 +18,7 @@ import traceback
from gunicorn.pidfile import Pidfile
from gunicorn.sock import create_socket
from gunicorn.worker import Worker
from gunicorn.workers.sync import SyncWorker
from gunicorn import util
class Arbiter(object):
@ -47,21 +47,34 @@ class Arbiter(object):
pidfile = Pidfile()
def __init__(self, address, num_workers, app, **kwargs):
def __init__(self, cfg, app):
self.cfg = cfg
self.app = app
self.address = cfg.address
self.num_workers = cfg.num_workers
self.debug = cfg.debug
self.timeout = cfg.timeout
self.proc_name = cfg.proc_name
self.worker_class = cfg.worker_class
self.address = address
self.num_workers = num_workers
self.worker_age = 0
self.app = app
self._pidfile = None
self.worker_age = 0
self.reexec_pid = 0
self.master_name = "Master"
self.opts = kwargs
self.debug = kwargs.get("debug", False)
self.conf = kwargs.get("config", {})
self.timeout = self.conf['timeout']
self.reexec_pid = 0
self.debug = kwargs.get("debug", False)
self.log = logging.getLogger(__name__)
self.opts = kwargs
self._pidfile = None
self.master_name = "Master"
self.proc_name = self.conf['proc_name']
self.worker_class = kwargs.get("workerclass", SyncWorker)
self.log = logging.getLogger(__name__)
# get current path, try to use PWD env first
try:
@ -345,9 +358,6 @@ class Arbiter(object):
pid, age = wpid, worker.age
self.kill_worker(pid, signal.SIGQUIT)
def init_worker(self, worker_age, pid, listener, app, timeout, conf):
return Worker(worker_age, pid, listener, app, timeout, conf)
def spawn_workers(self):
"""\
Spawn new workers as needed.
@ -358,8 +368,8 @@ class Arbiter(object):
for i in range(self.num_workers - len(self.WORKERS.keys())):
self.worker_age += 1
worker = self.init_worker(self.worker_age, self.pid, self.LISTENER,
self.app, self.timeout/2.0, self.conf)
worker = self.worker_class(self.worker_age, self.pid, self.LISTENER,
self.app, self.timeout/2.0, self.conf)
self.conf.before_fork(self, worker)
pid = os.fork()
if pid != 0:

View File

@ -1,130 +0,0 @@
# -*- coding: utf-8 -
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
import errno
import os
import select
import socket
import traceback
from gunicorn import http
from gunicorn.http.tee import UnexpectedEOF
from gunicorn import util
from gunicorn.worker import Worker
ALREADY_HANDLED = object()
class KeepaliveResponse(http.Response):
def default_headers(self):
if self.req.parser.should_close:
connection_hdr = "close"
else:
connection_hdr = "keep-alive"
return [
"HTTP/1.1 %s\r\n" % self.status,
"Server: %s\r\n" % self.version,
"Date: %s\r\n" % util.http_date(),
"Connection: %s\r\n" % connection_hdr
]
class KeepaliveRequest(http.Request):
RESPONSE_CLASS = KeepaliveResponse
def read(self):
ret = select.select([self.socket], [], [], self.conf.keepalive)
if not ret[0]:
return
try:
return super(KeepaliveRequest, self).read()
except socket.error, e:
if e[0] == 54:
return
raise
class KeepaliveWorker(Worker):
def __init__(self, *args, **kwargs):
Worker.__init__(self, *args, **kwargs)
self.nb_connections = 0
self.worker_connections = self.conf.worker_connections
def handle(self, client, addr):
self.nb_connections += 1
try:
self.init_sock(client)
while True:
req = KeepaliveRequest(client, addr, self.address, self.conf)
try:
environ = req.read()
if not environ or not req.parser.headers:
return
respiter = self.app(environ, req.start_response)
if respiter == ALREADY_HANDLED:
break
for item in respiter:
req.response.write(item)
req.response.close()
if hasattr(respiter, "close"):
respiter.close()
if req.parser.should_close:
break
except Exception, e:
#Only send back traceback in HTTP in debug mode.
if not self.debug:
raise
util.write_error(client, traceback.format_exc())
break
except socket.error, e:
if e[0] != errno.EPIPE:
self.log.exception("Error processing request.")
else:
self.log.warn("Ignoring EPIPE")
except UnexpectedEOF:
self.log.exception("remote closed the connection unexpectedly.")
except Exception, e:
self.log.exception("Error processing request.")
try:
# Last ditch attempt to notify the client of an error.
mesg = "HTTP/1.0 500 Internal Server Error\r\n\r\n"
util.write_nonblock(client, mesg)
except:
pass
return
finally:
self.nb_connections -= 1
util.close(client)
def run(self):
self.init_process()
self.socket.setblocking(1)
while self.alive:
self.notify()
# If our parent changed then we shut down.
if self.ppid != os.getppid():
self.log.info("Parent changed, shutting down: %s" % self)
return
if self.nb_connections > self.worker_connections:
continue
try:
ret = select.select([self.socket], [], [], 1)
if ret[0]:
self.accept()
except select.error, e:
if e[0] == errno.EINTR:
continue
if e[0] == errno.EBADF:
continue
raise
except KeyboardInterrupt :
return

View File

@ -1,50 +0,0 @@
# -*- coding: utf-8 -
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
from __future__ import with_statement
import errno
import collections
import eventlet
from eventlet.green import os
from eventlet.green import socket
from eventlet import greenio
from eventlet.hubs import trampoline
from eventlet.timeout import Timeout
from gunicorn import util
from gunicorn import arbiter
from gunicorn.async.base import KeepaliveWorker
class EventletWorker(KeepaliveWorker):
def init_process(self):
super(EventletWorker, self).init_process()
self.pool = eventlet.GreenPool(self.worker_connections)
def accept(self):
with Timeout(0.1, False):
try:
client, addr = self.socket.accept()
self.pool.spawn_n(self.handle, client, addr)
except socket.error, e:
if e[0] in (errno.EWOULDBLOCK, errno.EAGAIN, errno.ECONNABORTED):
return
raise
class EventletArbiter(arbiter.Arbiter):
@classmethod
def setup(cls):
import eventlet
if eventlet.version_info < (0,9,7):
raise RuntimeError("You need eventlet >= 0.9.7")
eventlet.monkey_patch(all=False, socket=True, select=True, thread=True)
def init_worker(self, worker_age, pid, listener, app, timeout, conf):
return EventletWorker(worker_age, pid, listener, app, timeout, conf)

View File

@ -1,44 +0,0 @@
# -*- coding: utf-8 -
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
from __future__ import with_statement
import errno
import os
import gevent
from gevent import Timeout, socket
from gevent.greenlet import Greenlet
from gevent.pool import Pool
from gunicorn import arbiter
from gunicorn import util
from gunicorn.async.base import KeepaliveWorker
class GEventWorker(KeepaliveWorker):
def init_process(self):
super(GEventWorker, self).init_process()
self.pool = Pool(self.worker_connections)
def accept(self):
with Timeout(0.1, False):
try:
client, addr = self.socket.accept()
self.pool.spawn(self.handle, client, addr)
except socket.error, e:
if e[0] in (errno.EAGAIN, errno.EWOULDBLOCK, errno.ECONNABORTED):
return
raise
class GEventArbiter(arbiter.Arbiter):
@classmethod
def setup(cls):
from gevent import monkey
monkey.patch_all(dns=False)
def init_worker(self, worker_age, pid, listener, app, timeout, conf):
return GEventWorker(worker_age, pid, listener, app, timeout, conf)

View File

@ -15,7 +15,6 @@ class Config(object):
DEFAULT_CONFIG_FILE = 'gunicorn.conf.py'
DEFAULTS = dict(
arbiter="egg:gunicorn",
backlog=2048,
bind='127.0.0.1:8000',
daemon=False,
@ -34,6 +33,7 @@ class Config(object):
user=None,
workers=1,
worker_connections=1000,
worker_class="egg:gunicorn#sync",
after_fork=lambda server, worker: server.log.info(
"Worker spawned (pid: %s)" % worker.pid),
@ -99,12 +99,9 @@ class Config(object):
return self.conf.iteritems()
@property
def arbiter(self):
uri = self.conf.get('arbiter', 'egg:gunicorn')
arbiter = util.parse_arbiter_uri(uri)
if hasattr(arbiter, 'setup'):
arbiter.setup()
return arbiter
def worker_class(self):
uri = self.conf.get('workertype', None) or 'egg:gunicorn#sync'
return util.load_worker_class(uri)
@property
def workers(self):
@ -116,7 +113,7 @@ class Config(object):
if self.conf['debug'] == True:
workers = 1
return workers
@property
def address(self):
if not self.conf['bind']:

View File

@ -3,9 +3,11 @@
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
import errno
import logging
import os
import re
try:
from cStringIO import StringIO
except ImportError:
@ -16,7 +18,7 @@ from urllib import unquote
from gunicorn import __version__
from gunicorn.http.parser import Parser
from gunicorn.http.response import Response
from gunicorn.http.response import Response, KeepAliveResponse
from gunicorn.http.tee import TeeInput
from gunicorn.util import CHUNK_SIZE
@ -168,3 +170,15 @@ class Request(object):
self.response = self.RESPONSE_CLASS(self, status, headers)
return self.response.write
class KeepaliveRequest(http.Request):
RESPONSE_CLASS = KeepAliveResponse
def read(self):
try:
return super(KeepaliveRequest, self).read()
except socket.error, e:
if e[0] == errno.ECONNRESET:
return
raise

View File

@ -57,3 +57,17 @@ class Response(object):
self.send_headers()
if self.chunked:
write_chunk(self.socket, "")
class KeepaliveResponse(Response):
def default_headers(self):
connection = "keep-alive"
if self.req.parser.should_close:
connection = "close"
return [
"HTTP/1.1 %s\r\n" % self.status,
"Server: %s\r\n" % self.version,
"Date: %s\r\n" % http_date(),
"Connection: %s\r\n" % connection
]

View File

@ -9,6 +9,7 @@ import os
import pkg_resources
import sys
from gunicorn.arbiter import Arbiter
from gunicorn.config import Config
from gunicorn.debug import spew
from gunicorn import util, __version__
@ -32,8 +33,8 @@ def options():
help='Adress to listen on. Ex. 127.0.0.1:8000 or unix:/tmp/gunicorn.sock'),
op.make_option('-w', '--workers', dest='workers',
help='Number of workers to spawn. [1]'),
op.make_option('-a', '--arbiter', dest='arbiter',
help="gunicorn arbiter entry point or module "+
op.make_option('-k', '--worker-class', dest='klass',
help="The type of request processing to use "+
"[egg:gunicorn#main]"),
op.make_option('-p','--pid', dest='pidfile',
help='set the background PID FILE'),
@ -57,135 +58,88 @@ def options():
default=False, help="Install a trace hook")
]
def configure_logging(opts):
"""
Set level of logging, and choose where to display/save logs (file or standard output).
"""
handlers = []
if opts['logfile'] != "-":
handlers.append(logging.FileHandler(opts['logfile']))
else:
handlers.append(logging.StreamHandler())
loglevel = LOG_LEVELS.get(opts['loglevel'].lower(), logging.INFO)
logger = logging.getLogger('gunicorn')
logger.setLevel(loglevel)
format = r"%(asctime)s [%(process)d] [%(levelname)s] %(message)s"
datefmt = r"%Y-%m-%d %H:%M:%S"
for h in handlers:
h.setFormatter(logging.Formatter(format, datefmt))
logger.addHandler(h)
def daemonize():
""" if daemon option is set, this function will daemonize the master.
It's based on this activestate recipe :
http://code.activestate.com/recipes/278731/
"""
if not 'GUNICORN_FD' in os.environ:
if os.fork() == 0:
os.setsid()
if os.fork() != 0:
os.umask(0)
else:
os._exit(0)
else:
os._exit(0)
maxfd = util.get_maxfd()
# Iterate through and close all file descriptors.
for fd in range(0, maxfd):
try:
os.close(fd)
except OSError: # ERROR, fd wasn't open to begin with (ignored)
pass
os.open(util.REDIRECT_TO, os.O_RDWR)
os.dup2(0, 1)
os.dup2(0, 2)
def main(usage, get_app):
""" function used by different runners to setup options
ans launch the arbiter. """
parser = op.OptionParser(usage=usage, option_list=options(),
version="%prog " + __version__)
"""\
Used by the various runners to setup options and
launch the arbiter.
"""
vrs = "%prog " + __version__
parser = op.OptionParser(usage=usage, option_list=options(), version=vrs)
opts, args = parser.parse_args()
app = get_app(parser, opts, args)
conf = Config(opts.__dict__, opts.config)
if conf['spew']:
cfg = Config(opts.__dict__, opts.config)
if cfg.spew:
spew()
arbiter = conf.arbiter(conf.address, conf.workers, app, config=conf,
debug=conf['debug'], pidfile=conf['pidfile'])
if conf['daemon']:
if cfg.daemon:
daemonize()
else:
os.setpgrp()
configure_logging(conf)
arbiter.run()
configure_logging(cfg)
Arbiter(cfg, app).run()
def paste_server(app, global_conf=None, host="127.0.0.1", port=None,
*args, **kwargs):
""" Paster server entrypoint to add to your paster ini file:
def paste_server(app, gcfg=None, host="127.0.0.1", port=None, *args, **kwargs):
"""\
A paster server.
[server:main]
use = egg:gunicorn#main
host = 127.0.0.1
port = 5000
Then entry point in your paster ini file should looks like this:
[server:main]
use = egg:gunicorn#main
host = 127.0.0.1
port = 5000
"""
options = kwargs.copy()
opts = kwargs.copy()
if port and not host.startswith("unix:"):
bind = "%s:%s" % (host, port)
else:
bind = host
options['bind'] = bind
opts['bind'] = bind
if global_conf:
for key, value in list(global_conf.items()):
if gcfg:
for key, value in list(gcfg.items()):
if value and value is not None:
if key == "debug":
value = (value == "true")
options[key] = value
options['default_proc_name'] = options['__file__']
opts[key] = value
opts['default_proc_name'] = opts['__file__']
conf = Config(options)
if conf['spew']:
cfg = Config(opts)
if cfg.spew:
spew()
arbiter = conf.arbiter(conf.address, conf.workers, app, debug=conf["debug"],
pidfile=conf["pidfile"], config=conf)
if conf["daemon"] :
if cfg.daemon:
daemonize()
else:
os.setpgrp()
configure_logging(conf)
arbiter.run()
configure_logging(cfg)
Arbiter(cfg, app).run()
def run():
""" main runner used for gunicorn command to launch generic wsgi application """
"""\
The ``gunicorn`` command line runner for launcing Gunicorn with
generic WSGI applications.
"""
sys.path.insert(0, os.getcwd())
def get_app(parser, opts, args):
if len(args) != 1:
parser.error("No application module specified.")
opts.default_proc_name = args[0]
try:
return util.import_app(args[0])
except:
parser.error("Failed to import application module.")
except Exception, e:
parser.error("Failed to import application module:\n %s" % e)
main("%prog [OPTIONS] APP_MODULE", get_app)
def run_django():
""" django runner for gunicorn_django command used to launch django applications """
"""\
The ``gunicorn_django`` command line runner for launching Django
applications.
"""
def settings_notfound(path):
error = "Settings file '%s' not found in current folder.\n" % path
@ -228,30 +182,30 @@ def run_django():
main("%prog [OPTIONS] [SETTINGS_PATH]", get_app)
def run_paster():
""" runner used for gunicorn_paster command to launch paster compatible applications
(pylons, turbogears2, ...) """
"""\
The ``gunicorn_paster`` command for launcing Paster compatible
apllications like Pylons or Turbogears2
"""
from paste.deploy import loadapp, loadwsgi
def get_app(parser, opts, args):
if len(args) != 1:
parser.error("No application name specified.")
config_file = os.path.abspath(os.path.normpath(
os.path.join(os.getcwd(), args[0])))
if not os.path.exists(config_file):
cfgfname = os.path.normpath(os.path.join(os.getcwd(), args[0]))
cfgfname = os.path.abspath(cfgfname)
if not os.path.exists(cfgfname):
parser.error("Config file not found.")
config_url = 'config:%s' % config_file
relative_to = os.path.dirname(config_file)
cfgurl = 'config:%s' % cfgfname
relpath = os.path.dirname(cfgfname)
# load module in sys path
sys.path.insert(0, relative_to)
# add to eggs
pkg_resources.working_set.add_entry(relative_to)
ctx = loadwsgi.loadcontext(loadwsgi.SERVER, config_url,
relative_to=relative_to)
ctx = loadwsgi.loadcontext(loadwsgi.SERVER, cfgurl, relative_to=relpath)
if not opts.workers:
opts.workers = ctx.local_conf.get('workers', 1)
@ -284,4 +238,52 @@ def run_paster():
return app
main("%prog [OPTIONS] pasteconfig.ini", get_app)
def daemonize():
"""\
Standard daemonization of a process. Code is basd on the
ActiveState recipe at:
http://code.activestate.com/recipes/278731/
"""
if not 'GUNICORN_FD' in os.environ:
if os.fork() == 0:
os.setsid()
if os.fork() != 0:
os.umask(0)
else:
os._exit(0)
else:
os._exit(0)
maxfd = util.get_maxfd()
# Iterate through and close all file descriptors.
for fd in range(0, maxfd):
try:
os.close(fd)
except OSError: # ERROR, fd wasn't open to begin with (ignored)
pass
os.open(util.REDIRECT_TO, os.O_RDWR)
os.dup2(0, 1)
os.dup2(0, 2)
def configure_logging(opts):
"""\
Set the log level and choose the destination for log output.
"""
handlers = []
if opts['logfile'] != "-":
handlers.append(logging.FileHandler(opts['logfile']))
else:
handlers.append(logging.StreamHandler())
loglevel = LOG_LEVELS.get(opts['loglevel'].lower(), logging.INFO)
logger = logging.getLogger('gunicorn')
logger.setLevel(loglevel)
format = r"%(asctime)s [%(process)d] [%(levelname)s] %(message)s"
datefmt = r"%Y-%m-%d %H:%M:%S"
for h in handlers:
h.setFormatter(logging.Formatter(format, datefmt))
logger.addHandler(h)

View File

@ -43,7 +43,7 @@ except ImportError:
def _setproctitle(title):
return
def parse_arbiter_uri(uri):
def load_worker_class(uri):
if uri.startswith("egg:"):
# uses entry points
entry_str = uri.split("egg:")[1]
@ -53,8 +53,7 @@ def parse_arbiter_uri(uri):
dist = entry_str
name = "main"
return pkg_resources.load_entry_point(dist, "gunicorn.arbiter",
name)
return pkg_resources.load_entry_point(dist, "gunicorn.arbiter", name)
else:
components = uri.split('.')
if len(components) == 1:

View File

@ -1,197 +0,0 @@
# -*- coding: utf-8 -
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
import errno
import logging
import os
import select
import signal
import socket
import sys
import tempfile
import traceback
from gunicorn import http
from gunicorn import util
from gunicorn.http.tee import UnexpectedEOF
class Worker(object):
SIGNALS = map(
lambda x: getattr(signal, "SIG%s" % x),
"HUP QUIT INT TERM USR1 USR2 WINCH CHLD".split()
)
PIPE = []
def __init__(self, age, ppid, socket, app, timeout, conf):
self.nr = 0
self.age = age
self.ppid = ppid
self.debug = conf['debug']
self.conf = conf
self.socket = socket
self.timeout = timeout
self.fd, self.tmpname = tempfile.mkstemp(prefix="wgunicorn-")
util.chown(self.tmpname, conf.uid, conf.gid)
self.tmp = os.fdopen(self.fd, "r+b")
self.app = app
self.alive = True
self.log = logging.getLogger(__name__)
self.spinner = 0
self.address = self.socket.getsockname()
def __str__(self):
return "<Worker %s>" % os.getpid()
@property
def pid(self):
return os.getpid()
def init_signals(self):
map(lambda s: signal.signal(s, signal.SIG_DFL), self.SIGNALS)
signal.signal(signal.SIGQUIT, self.handle_quit)
signal.signal(signal.SIGUSR1, self.handle_usr1)
signal.signal(signal.SIGTERM, self.handle_exit)
signal.signal(signal.SIGINT, self.handle_exit)
signal.signal(signal.SIGWINCH, self.handle_winch)
def handle_usr1(self, sig, frame):
self.nr = -65536;
try:
map(lambda p: p.close(), self.PIPE)
except:
pass
def handle_quit(self, sig, frame):
self.alive = False
def handle_exit(self, sig, frame):
sys.exit(0)
def handle_winch(self, sig, fname):
# Ignore SIGWINCH in worker. Fix crash on OpenBSD
return
def notify(self):
"""\
Notify our parent process that we're still alive.
"""
self.spinner = (self.spinner+1) % 2
if getattr(os, 'fchmod', None):
os.fchmod(self.tmp.fileno(), self.spinner)
else:
os.chmod(self.tmpname, self.spinner)
def init_process(self):
util.set_owner_process(self.conf.uid, self.conf.gid)
# init pipe
self.PIPE = os.pipe()
map(util.set_non_blocking, self.PIPE)
map(util.close_on_exec, self.PIPE)
# prevent inherientence
util.close_on_exec(self.socket)
util.close_on_exec(self.fd)
self.init_signals()
def accept(self):
try:
client, addr = self.socket.accept()
self.init_sock(client)
self.handle(client, addr)
self.nr += 1
except socket.error, e:
if e[0] not in (errno.EAGAIN, errno.ECONNABORTED):
raise
def init_sock(self, sock):
sock.setblocking(1)
util.close_on_exec(sock)
def run(self):
self.init_process()
self.nr = 0
# self.socket appears to lose its blocking status after
# we fork in the arbiter. Reset it here.
self.socket.setblocking(0)
while self.alive:
self.nr = 0
self.notify()
# accept a new connection
self.accept()
# Keep processing clients until no one is waiting.
# This prevents the need to select() for every
# client that we process.
if self.nr > 0:
continue
# If our parent changed then we shut down.
if self.ppid != os.getppid():
self.log.info("Parent changed, shutting down: %s" % self)
return
try:
self.notify()
ret = select.select([self.socket], [], self.PIPE, self.timeout)
if ret[0]:
continue
except select.error, e:
if e[0] == errno.EINTR:
continue
if e[0] == errno.EBADF:
if self.nr < 0:
continue
else:
return
raise
def handle(self, client, addr):
try:
req = http.Request(client, addr, self.address, self.conf)
try:
environ = req.read()
if not environ or not req.parser.status_line:
return
respiter = self.app(environ, req.start_response)
for item in respiter:
req.response.write(item)
req.response.close()
if hasattr(respiter, "close"):
respiter.close()
except socket.error:
raise
except Exception, e:
# Only send back traceback in HTTP in debug mode.
if not self.debug:
raise
util.write_error(client, traceback.format_exc())
return
except socket.error, e:
if e[0] != errno.EPIPE:
self.log.exception("Error processing request.")
else:
self.log.warn("Ignoring EPIPE")
except UnexpectedEOF:
self.log.exception("remote closed the connection unexpectedly.")
except Exception, e:
self.log.exception("Error processing request.")
try:
# Last ditch attempt to notify the client of an error.
mesg = "HTTP/1.0 500 Internal Server Error\r\n\r\n"
util.write_nonblock(client, mesg)
except:
pass
finally:
util.close(client)

70
gunicorn/workers/async.py Normal file
View File

@ -0,0 +1,70 @@
# -*- coding: utf-8 -
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
import errno
import os
import select
import socket
import traceback
from gunicorn import http
from gunicorn.http.tee import UnexpectedEOF
from gunicorn import util
from gunicorn.workers.base import Worker
ALREADY_HANDLED = object()
class AsyncWorker(Worker):
def __init__(self, *args, **kwargs):
Worker.__init__(self, *args, **kwargs)
self.worker_connections = self.conf.worker_connections
def handle(self, client, addr):
try:
while self.handle_request(client, addr):
pass
except socket.error, e:
if e[0] != errno.EPIPE:
self.log.exception("Error processing request.")
else:
self.log.warn("Ignoring EPIPE")
except UnexpectedEOF:
self.log.exception("Client closed the connection unexpectedly.")
except Exception, e:
self.log.exception("Error processing request.")
try:
# Last ditch attempt to notify the client of an error.
mesg = "HTTP/1.0 500 Internal Server Error\r\n\r\n"
util.write_nonblock(client, mesg)
except:
pass
return
finally:
util.close(client)
def handle_request(self, client, addr):
req = KeepaliveRequest(client, addr, self.address, self.conf)
try:
environ = req.read()
if not environ or not req.parser.headers:
return False
respiter = self.app(environ, req.start_response)
if respiter == ALREADY_HANDLED:
return False
for item in respiter:
req.response.write(item)
req.response.close()
if hasattr(respiter, "close"):
respiter.close()
if req.parser.should_close:
return False
except Exception, e:
#Only send back traceback in HTTP in debug mode.
if not self.debug:
raise
util.write_error(client, traceback.format_exc())
return False
return True

116
gunicorn/workers/base.py Normal file
View File

@ -0,0 +1,116 @@
# -*- coding: utf-8 -
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
import errno
import logging
import os
import select
import signal
import socket
import sys
import tempfile
import traceback
from gunicorn import util
class Worker(object):
SIGNALS = map(
lambda x: getattr(signal, "SIG%s" % x),
"HUP QUIT INT TERM USR1 USR2 WINCH CHLD".split()
)
PIPE = []
def __init__(self, age, ppid, socket, app, timeout, conf):
"""\
This is called pre-fork so it shouldn't do anything to the
current process. If there's a need to make process wide
changes you'll want to do that in ``self.init_process()``.
"""
self.age = age
self.ppid = ppid
self.socket = socket
self.app = app
self.timeout = timeout
self.conf = conf
self.nr = 0
self.alive = True
self.spinner = 0
self.log = logging.getLogger(__name__)
self.debug = conf.get('debug', False)
self.address = self.socket.getsockname()
self.fd, self.tmpname = tempfile.mkstemp(prefix="wgunicorn-")
util.chown(self.tmpname, conf.uid, conf.gid)
self.tmp = os.fdopen(self.fd, "r+b")
def __str__(self):
return "<Worker %s>" % self.pid
@property
def pid(self):
return os.getpid()
def notify(self):
"""\
Your worker subclass must arrange to have this method called
once every ``self.timeout`` seconds. If you fail in accomplishing
this task, the master process will murder your workers.
"""
self.spinner = (self.spinner+1) % 2
if getattr(os, 'fchmod', None):
os.fchmod(self.tmp.fileno(), self.spinner)
else:
os.chmod(self.tmpname, self.spinner)
def run(self):
"""\
This is the mainloop of a worker process. You should override
this method in a subclass to provide the intended behaviour
for your particular evil schemes.
"""
raise NotImplementedError()
def init_process(self):
"""\
If you override this method in a subclass, the last statement
in the function should be to call this method with
super(MyWorkerClass, self).init_process() so that the ``run()``
loop is initiated.
"""
util.set_owner_process(self.conf.uid, self.conf.gid)
# For waking ourselves up
self.PIPE = os.pipe()
map(util.set_non_blocking, self.PIPE)
map(util.close_on_exec, self.PIPE)
# Prevent fd inherientence
util.close_on_exec(self.socket)
util.close_on_exec(self.fd)
self.init_signals()
# Enter main run loop
self.run()
def init_signals(self):
map(lambda s: signal.signal(s, signal.SIG_DFL), self.SIGNALS)
signal.signal(signal.SIGQUIT, self.handle_quit)
signal.signal(signal.SIGTERM, self.handle_exit)
signal.signal(signal.SIGINT, self.handle_exit)
signal.signal(signal.SIGWINCH, self.handle_winch)
def handle_quit(self, sig, frame):
self.alive = False
def handle_exit(self, sig, frame):
sys.exit(0)
def handle_winch(self, sig, fname):
# Ignore SIGWINCH in worker. Fixes a crash on OpenBSD.
return

View File

@ -0,0 +1,82 @@
# -*- coding: utf-8 -
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
from __future__ import with_statement
import collections
import errno
import traceback
import eventlet
import eventlet.debug
from eventlet.green import os
from eventlet.green import socket
from eventlet import greenio
from eventlet import greenlet
from eventlet import greenpool
from eventlet import greenthread
from gunicorn import util
from gunicorn import arbiter
from gunicorn.workers.async import AsyncWorker, ALREADY_HANDLED
from gunicorn.http.tee import UnexpectedEOF
eventlet.debug.hub_exceptions(True)
class EventletWorker(AsyncWorker):
def __init__(self, *args, **kwargs):
super(EventletWorker, self).__init__(*args, **kwargs)
if eventlet.version_info < (0,9,7):
raise RuntimeError("You need eventlet >= 0.9.7")
def init_process(self):
eventlet.monkey_patch(all=False, socket=True, select=True)
self.socket = greenio.GreenSocket(self.socket)
super(EventletWorker, self).init_process()
def run(self):
self.init_process()
self.socket.setblocking(1)
pool = greenpool.GreenPool(self.worker_connections)
acceptor = greenthread.spawn(self.acceptor, pool)
while True:
self.notify()
if self.ppid != os.getppid():
self.log.info("Parent changed, shutting down: %s" % self)
greenthread.kill(acceptor, eventlet.StopServe)
break
eventlet.sleep(0.1)
with evenlet.Timeout(self.timeout, False):
pool.waitall()
def acceptor(self, pool):
server_gt = greenthread.getcurrent()
while True:
try:
conn, addr = self.socket.accept()
gt = pool.spawn(self.handle, conn, addr)
gt.link(self.cleanup, conn)
conn, addr, gt = None, None, None
except eventlet.StopServe:
return
def cleanup(self, thread, conn):
try:
try:
thread.wait()
finally:
conn.close()
except greenlet.GreenletExit:
pass
except Exception:
self.log.exception("Unhandled exception in worker.")

View File

@ -0,0 +1,36 @@
# -*- coding: utf-8 -
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
from __future__ import with_statement
import errno
import os
import gevent
from gevent import monkey
from gevent import socket
from gevent.greenlet import Greenlet
from gevent.pool import Pool
from gunicorn import util
from gunicorn.workers.async import AsyncWorker
class GEventWorker(KeepaliveWorker):
def init_process(self):
monkey.patch_all()
super(GEventWorker, self).init_process()
def run(self):
raise NotImplementedError()
def accept(self):
try:
client, addr = self.socket.accept()
self.pool.spawn(self.handle, client, addr)
except socket.error, e:
if e[0] in (errno.EAGAIN, errno.EWOULDBLOCK, errno.ECONNABORTED):
return
raise

100
gunicorn/workers/sync.py Normal file
View File

@ -0,0 +1,100 @@
from gunicorn import http
from gunicorn.http.tee import UnexpectedEOF
from gunicorn.workers.base import Worker
class SyncWorker(Worker):
def run(self):
self.init_process()
self.nr = 0
# self.socket appears to lose its blocking status after
# we fork in the arbiter. Reset it here.
self.socket.setblocking(0)
while self.alive:
self.nr = 0
self.notify()
# Accept a connection. If we get an error telling us
# that no connection is waiting we fall down to the
# select which is where we'll wait for a bit for new
# workers to come give us some love.
try:
client, addr = self.socket.accept()
client.setblocking(1)
util.close_on_exec(sock)
self.handle(client, addr)
self.nr += 1
except socket.error, e:
if e[0] not in (errno.EAGAIN, errno.ECONNABORTED):
raise
# Keep processing clients until no one is waiting. This
# prevents the need to select() for every client that we
# process.
if self.nr > 0:
continue
# If our parent changed then we shut down.
if self.ppid != os.getppid():
self.log.info("Parent changed, shutting down: %s" % self)
return
try:
self.notify()
ret = select.select([self.socket], [], self.PIPE, self.timeout)
if ret[0]:
continue
except select.error, e:
if e[0] == errno.EINTR:
continue
if e[0] == errno.EBADF:
if self.nr < 0:
continue
else:
return
raise
def handle(self, client, addr):
try:
self.handle_request(client, addr)
except socket.error, e:
if e[0] != errno.EPIPE:
self.log.exception("Error processing request.")
else:
self.log.warn("Ignoring EPIPE")
except UnexpectedEOF:
self.log.exception("Client closed the connection unexpectedly.")
except Exception, e:
self.log.exception("Error processing request.")
try:
# Last ditch attempt to notify the client of an error.
mesg = "HTTP/1.0 500 Internal Server Error\r\n\r\n"
util.write_nonblock(client, mesg)
except:
pass
finally:
util.close(client)
def handle_request(self, client, addr):
req = http.Request(client, addr, self.address, self.conf)
try:
environ = req.read()
if not environ or not req.parser.status_line:
return
respiter = self.app(environ, req.start_response)
for item in respiter:
req.response.write(item)
req.response.close()
if hasattr(respiter, "close"):
respiter.close()
except socket.error:
raise
except Exception, e:
# Only send back traceback in HTTP in debug mode.
if not self.debug:
raise
util.write_error(client, traceback.format_exc())
return

View File

@ -48,10 +48,10 @@ setup(
gunicorn_django=gunicorn.main:run_django
gunicorn_paster=gunicorn.main:run_paster
[gunicorn.arbiter]
main=gunicorn.arbiter:Arbiter
eventlet=gunicorn.async.eventlet_server:EventletArbiter
gevent=gunicorn.async.gevent_server:GEventArbiter
[gunicorn.worker]
sync=gunicorn.workers.sync:SyncWorker
eventlet=gunicorn.workers.geventlet:EventletWorker
gevent=gunicorn.workers.ggevent:GEventWorker
[paste.server_runner]
main=gunicorn.main:paste_server