diff --git a/gunicorn/arbiter.py b/gunicorn/arbiter.py index 527a8038..fce66387 100644 --- a/gunicorn/arbiter.py +++ b/gunicorn/arbiter.py @@ -23,21 +23,22 @@ class Arbiter(object): SIG_QUEUE = [] SIGNALS = map( lambda x: getattr(signal, "SIG%s" % x), - "WINCH CHLD QUIT INT TERM USR1 USR2 HUP TTIN TTOU".split() + "QUIT INT TERM TTIN TTOU".split() ) SIG_NAMES = dict( - (getattr(signal, name), name) for name in dir(signal) + (getattr(signal, name), name[3:].lower()) for name in dir(signal) if name[:3] == "SIG" ) def __init__(self, address, num_workers, modname): - log.info("Booting Arbiter.") self.address = address self.num_workers = num_workers self.modname = modname + self.timeout = 30 self.pid = os.getpid() self.init_signals() self.listen(self.address) + log.info("Booted Arbiter: %s" % os.getpid()) def init_signals(self): if self.PIPE: @@ -63,22 +64,6 @@ class Arbiter(object): if e.errno not in [errno.EAGAIN, errno.EINTR]: raise - def sleep(self): - try: - ready = select.select([self.PIPE[0]], [], [], 1) - if not ready[0]: - return - while os.read(self.PIPE[0], 1): - pass - except select.error, e: - if e[0] not in [errno.EAGAIN, errno.EINTR]: - raise - except OSError, e: - if e.errno not in [errno.EAGAIN, errno.EINTR]: - raise - except KeyboardInterrupt: - sys.exit() - def listen(self, addr): for i in range(5): try: @@ -106,30 +91,107 @@ class Arbiter(object): while True: try: sig = self.SIG_QUEUE.pop(0) if len(self.SIG_QUEUE) else None - if sig is not None: - log.info("SIGNAL: %s" % self.SIG_NAMES.get(sig, "Unknown")) + if sig is None: self.sleep() - elif sig is signal.SIGINT: - self.kill_workers(signal.SIGINT) - sys.exit(1) - elif sig is signal.SIGQUIT: - self.kill_workers(signal.SIGTERM) - sys.exit(0) - else: - name = self.SIG_NAMES.get(sig, "UNKNOWN") - log.warn("IGNORED: %s" % name) + continue + if sig not in self.SIG_NAMES: + log.info("Ignoring unknown signal: %s" % sig) + continue + + signame = self.SIG_NAMES.get(sig) + handler = getattr(self, "handle_%s" % signame, None) + if not handler: + log.error("Unhandled signal: %s" % signame) + continue + + log.info("Handling signal: %s" % signame) + handler() + + self.murder_workers() self.reap_workers() self.manage_workers() + except StopIteration: + break except KeyboardInterrupt: - self.kill_workers(signal.SIGTERM) - sys.exit() + self.stop(False) + sys.exit(-1) except Exception, e: - self.kill_workers(signal.SIGTERM) log.exception("Unhandled exception in main loop.") - sys.exit() + self.stop(False) + sys.exit(-1) + + log.info("Master is shutting down.") + self.stop() + + def handle_quit(self): + raise StopIteration + + def handle_int(self): + self.stop(False) + raise StopIteration + + def handle_term(self): + self.stop(False) + raise StopIteration + + def handle_ttin(self): + self.num_workers += 1 + + def handle_ttou(self): + if self.num_workers > 0: + self.num_workers -= 1 + + def sleep(self): + try: + ready = select.select([self.PIPE[0]], [], [], 1) + if not ready[0]: + return + while os.read(self.PIPE[0], 1): + pass + except select.error, e: + if e[0] not in [errno.EAGAIN, errno.EINTR]: + raise + except OSError, e: + if e.errno not in [errno.EAGAIN, errno.EINTR]: + raise + except KeyboardInterrupt: + sys.exit() + + def stop(self, graceful=True): + self.LISTENER.close() + sig = signal.SIGQUIT + if not graceful: + sig = signal.SIGTERM + limit = time.time() + self.timeout + while len(self.WORKERS) and time.time() < limit: + self.kill_workers(sig) + time.sleep(0.1) + self.reap_workers() + self.kill_workers(signal.SIGKILL) + + def murder_workers(self): + for (pid, worker) in self.WORKERS.iteritems(): + diff = time.time() - os.fstat(worker.tmp.fileno()).st_mtime + if diff < self.timeout: + continue + self.kill_worker(pid, signal.SIGKILL) + + def reap_workers(self): + try: + while True: + wpid, status = os.waitpid(-1, os.WNOHANG) + if not wpid: + break + worker = self.WORKERS.pop(wpid, None) + if not worker: + continue + worker.tmp.close() + except OSError, e: + if e.errno == errno.ECHILD: + pass def manage_workers(self): if len(self.WORKERS.keys()) < self.num_workers: @@ -145,7 +207,7 @@ class Arbiter(object): if i in workers: continue - worker = Worker(i, self.LISTENER, self.modname) + worker = Worker(i, self.pid, self.LISTENER, self.modname) pid = os.fork() if pid != 0: self.WORKERS[pid] = worker @@ -155,29 +217,14 @@ class Arbiter(object): try: log.info("Worker %s booting" % os.getpid()) worker.run() - log.info("Worker %s exiting" % os.getpid()) sys.exit(0) except SystemExit: - pass + raise except: log.exception("Exception in worker process.") sys.exit(-1) finally: - log.info("Done.") - - def reap_workers(self): - try: - while True: - wpid, status = os.waitpid(-1, os.WNOHANG) - if not wpid: - break - worker = self.WORKERS.pop(wpid) - if not worker: - continue - worker.tmp.close() - except OSError, e: - if e.errno == errno.ECHILD: - pass + log.info("Worker %s exiting." % os.getpid()) def kill_workers(self, sig): for pid in self.WORKERS.keys(): @@ -187,6 +234,9 @@ class Arbiter(object): worker = self.WORKERS.pop(pid) try: os.kill(pid, sig) + except OSError, e: + if e.errno == errno.ESRCH: + pass finally: worker.tmp.close() diff --git a/gunicorn/http/request.py b/gunicorn/http/request.py index bd7488ca..0a41cfde 100644 --- a/gunicorn/http/request.py +++ b/gunicorn/http/request.py @@ -121,8 +121,7 @@ class HTTPRequest(object): except ValueError: # bad headers pass - - + def body_length(self): transfert_encoding = self.headers.get('TRANSFERT-ENCODING') content_length = self.headers.get('CONTENT-LENGTH') diff --git a/gunicorn/worker.py b/gunicorn/worker.py index 0b690acd..521925ba 100644 --- a/gunicorn/worker.py +++ b/gunicorn/worker.py @@ -5,6 +5,7 @@ import os import select import signal import socket +import sys import http import util @@ -15,11 +16,13 @@ class Worker(object): SIGNALS = map( lambda x: getattr(signal, "SIG%s" % x), - "WINCH QUIT INT TERM USR1 USR2 HUP TTIN TTOU".split() + "QUIT INT TERM TTIN TTOU".split() ) - def __init__(self, workerid, socket, module): + def __init__(self, workerid, ppid, socket, module): + self.alive = True self.id = workerid + self.ppid = ppid self.socket = socket self.address = socket.getsockname() self.tmp = os.tmpfile() @@ -27,18 +30,35 @@ class Worker(object): def init_signals(self): map(lambda s: signal.signal(s, signal.SIG_DFL), self.SIGNALS) + signal.signal(signal.SIGQUIT, self.handle_quit) + signal.signal(signal.SIGTERM, self.handle_exit) + signal.signal(signal.SIGINT, self.handle_exit) + def handle_quit(self, sig, frame): + self.alive = False + + def handle_exit(self, sig, frame): + sys.exit(-1) + def run(self): self.init_signals() - while True: + spinner = 0 + while self.alive: # Wait for a request to handle. - while True: - ret = select.select([self.socket], [], [], 2.0) + while self.alive: + try: + ret = select.select([self.socket], [], [], 2.0) + except select.error, e: + if e[0] != errno.EINTR: + raise if ret[0]: break - # Accept until we hit EAGAIN - while True: + # Accept until we hit EAGAIN. We're betting that when we're + # processing clients that more clients are waiting. When + # there's no more clients waiting we go back to the select() + # loop and wait for some lovin. + while self.alive: try: (conn, addr) = self.socket.accept() except socket.error, e: @@ -46,7 +66,6 @@ class Worker(object): break # Jump back to select raise # Uh oh! - #log.info("Client connected: %s:%s" % addr) conn.setblocking(1) try: self.handle(conn, addr) @@ -55,6 +74,11 @@ class Worker(object): finally: conn.close() + # Update the fd mtime on each client completion + # to signal that this worker process is alive. + spinner = (spinner+1) % 2 + os.fchmod(self.tmp.fileno(), spinner) + def handle(self, conn, client): while True: req = http.HTTPRequest(conn, client, self.address)