some code documentation.

This commit is contained in:
Benoit Chesneau 2010-02-16 17:17:47 +01:00
parent f4098aa20d
commit 3c82b1985b
2 changed files with 69 additions and 2 deletions

View File

@ -21,6 +21,10 @@ from gunicorn.worker import Worker
from gunicorn import util
class Arbiter(object):
"""
Arbiter maintain the workers processes alive. It launches or kill them if needed.
It also manage application reloading via SIGHUP/USR2.
"""
START_CTX = {}
@ -71,6 +75,7 @@ class Arbiter(object):
}
def start(self):
""" Really initialize the arbiter. Strat to listen and set pidfile if needed."""
self.pid = os.getpid()
self.init_signals()
self.LISTENER = create_socket(self.address)
@ -102,9 +107,11 @@ class Arbiter(object):
os.rename(fname, path)
os.close(fd)
self._pidfile = path
pidfile = property(_get_pidfile, _set_pidfile, _del_pidfile)
pidfile = property(_get_pidfile, _set_pidfile, _del_pidfile,
"manage creation/delettion of pidfile")
def unlink_pidfile(self, path):
""" delete pidfile"""
try:
with open(path, "r") as f:
if int(f.read() or 0) == self.pid:
@ -113,6 +120,7 @@ class Arbiter(object):
pass
def valid_pidfile(self, path):
""" Validate pidfile and make it stale if needed"""
try:
with open(path, "r") as f:
wpid = int(f.read() or 0)
@ -132,6 +140,8 @@ class Arbiter(object):
raise
def init_signals(self):
""" Init master signals handling. Most of signal are queued. Childs signals
only wake up the master"""
if self.PIPE:
map(lambda p: p.close(), self.PIPE)
self.PIPE = pair = os.pipe()
@ -148,6 +158,7 @@ class Arbiter(object):
self.log.warn("Ignoring rapid signaling: %s" % sig)
def run(self):
""" main master loop. Launch to start the master"""
self.start()
self.manage_workers()
while True:
@ -191,41 +202,52 @@ class Arbiter(object):
sys.exit(0)
def handle_chld(self, sig, frame):
""" SIGCHLD handling """
self.wakeup()
self.reap_workers()
def handle_hup(self):
""" HUP handling . We relaunch gracefully the workers and app while
reloading configuration."""
self.log.info("%s hang up." % self.master_name)
self.reexec()
raise StopIteration
def handle_quit(self):
""" SIGQUIT handling"""
raise StopIteration
def handle_int(self):
""" SIGINT handling """
self.stop(False)
raise StopIteration
def handle_term(self):
""" SIGTERM handling """
self.stop(False)
raise StopIteration
def handle_ttin(self):
""" SIGTTIN handling. Increase number of workers."""
self.num_workers += 1
self.manage_workers()
def handle_ttou(self):
""" SIGTTOU handling. Decrease number of workers."""
if self.num_workers > 0:
self.num_workers -= 1
self.manage_workers()
def handle_usr1(self):
""" SIGUSR1 handling. send USR1 to workers (which will kill it)"""
self.kill_workers(signal.SIGUSR1)
def handle_usr2(self):
""" SIGUSR2 handling. relaunch WORKERS and reload app but don't kill old master/workers"""
self.reexec()
def handle_winch(self):
""" SIGWINCH handling """
if os.getppid() == 1 or os.getpgrp() != os.getpid():
self.logger.info("graceful stop of workers")
self.kill_workers(True)
@ -233,7 +255,7 @@ class Arbiter(object):
self.log.info("SIGWINCH ignored. not daemonized")
def wakeup(self):
# Wake up the arbiter
""" Wake up the arbiter by writing to the PIPE"""
try:
os.write(self.PIPE[1], '.')
except IOError, e:
@ -241,6 +263,8 @@ class Arbiter(object):
raise
def sleep(self):
""" Master sleep and wake up when its PIPE change or timeout"""
try:
ready = select.select([self.PIPE[0]], [], [], 1.0)
if not ready[0]:
@ -257,6 +281,11 @@ class Arbiter(object):
sys.exit()
def stop(self, graceful=True):
""" Stop workers
:attr graceful: boolean, by default is True. If True workers will be killed gracefully
(ie. we trying to wait end of client connection)
"""
self.LISTENER = None
sig = signal.SIGQUIT
if not graceful:
@ -269,6 +298,7 @@ class Arbiter(object):
self.kill_workers(signal.SIGKILL)
def reexec(self):
""" relaunch the master """
if self.pidfile:
old_pidfile = "%s.oldbin" % self.pidfile
self.pidfile = old_pidfile
@ -284,6 +314,7 @@ class Arbiter(object):
os.execlp(self.START_CTX[0], *self.START_CTX['argv'])
def murder_workers(self):
""" kill unused/iddle workers"""
for (pid, worker) in list(self.WORKERS.items()):
diff = time.time() - os.fstat(worker.tmp.fileno()).st_ctime
if diff <= self.timeout:
@ -292,6 +323,7 @@ class Arbiter(object):
self.kill_worker(pid, signal.SIGKILL)
def reap_workers(self):
""" reap workers """
try:
while True:
wpid, status = os.waitpid(-1, os.WNOHANG)
@ -308,6 +340,7 @@ class Arbiter(object):
pass
def manage_workers(self):
""" maintain number of workers """
if len(self.WORKERS.keys()) < self.num_workers:
self.spawn_workers()
@ -316,6 +349,7 @@ class Arbiter(object):
self.kill_worker(pid, signal.SIGQUIT)
def spawn_workers(self):
""" span new workers """
workers = set(w.id for w in self.WORKERS.values())
for i in range(self.num_workers):
if i in workers:
@ -345,10 +379,18 @@ class Arbiter(object):
self.log.info("Worker %s exiting." % worker_pid)
def kill_workers(self, sig):
""" kill all workers with signal sig
:attr sig: `signal.SIG*` value
"""
for pid in self.WORKERS.keys():
self.kill_worker(pid, sig)
def kill_worker(self, pid, sig):
""" kill a worker
:attr pid: int, worker pid
:attr sig: `signal.SIG*` value
"""
try:
os.kill(pid, sig)
except OSError, e:

View File

@ -27,6 +27,7 @@ LOG_LEVELS = {
UMASK = 0
def options():
""" build command lines options passed to OptParse object """
return [
op.make_option('-c', '--config', dest='config', type='string',
help='Config file. [%default]'),
@ -53,6 +54,9 @@ def options():
]
def configure_logging(opts):
"""
Set level of logging, and choose where to display/save logs (file or standard output).
"""
handlers = []
if opts['logfile'] != "-":
handlers.append(logging.FileHandler(opts['logfile']))
@ -68,6 +72,10 @@ def configure_logging(opts):
logger.addHandler(h)
def daemonize(umask):
""" if daemon option is set, this function will daemonize the master.
It's based on this activestate recipe :
http://code.activestate.com/recipes/278731/
"""
if not 'GUNICORN_FD' in os.environ:
if os.fork() == 0:
os.setsid()
@ -92,6 +100,7 @@ def daemonize(umask):
os.dup2(0, 2)
def set_owner_process(user,group):
""" set user and group of workers processes """
if group:
if group.isdigit() or isinstance(group, int):
gid = int(group)
@ -106,6 +115,9 @@ def set_owner_process(user,group):
os.setuid(uid)
def main(usage, get_app):
""" function used by different runners to setup options
ans launch the arbiter. """
parser = op.OptionParser(usage=usage, option_list=options(),
version="%prog " + __version__)
opts, args = parser.parse_args()
@ -134,6 +146,14 @@ def main(usage, get_app):
def paste_server(app, global_conf=None, host="127.0.0.1", port=None,
*args, **kwargs):
""" Paster server entrypoint to add to your paster ini file:
[server:main]
use = egg:gunicorn#main
host = 127.0.0.1
port = 5000
"""
bind_addr = util.parse_address(host, port)
@ -171,6 +191,8 @@ def paste_server(app, global_conf=None, host="127.0.0.1", port=None,
arbiter.run()
def run():
""" main runner used for gunicorn command to launch generic wsgi application """
sys.path.insert(0, os.getcwd())
def get_app(parser, opts, args):
@ -185,6 +207,7 @@ def run():
main("%prog [OPTIONS] APP_MODULE", get_app)
def run_django():
""" django runner for gunicorn_django command used to launch django applications """
def settings_notfound(path):
error = "Settings file '%s' not found in current folder.\n" % path
@ -222,6 +245,8 @@ def run_django():
main("%prog [OPTIONS] [SETTINGS_PATH]", get_app)
def run_paster():
""" runner used for gunicorn_paster command to launch paster compatible applications
(pylons, turbogears2, ...) """
from paste.deploy import loadapp, loadwsgi
def get_app(parser, opts, args):