diff --git a/gunicorn/arbiter.py b/gunicorn/arbiter.py index 68b23696..88a7e1cb 100644 --- a/gunicorn/arbiter.py +++ b/gunicorn/arbiter.py @@ -17,6 +17,7 @@ import tempfile import time from gunicorn.worker import Worker +from gunicorn import util class Arbiter(object): @@ -34,9 +35,7 @@ class Arbiter(object): (getattr(signal, name), name[3:].lower()) for name in dir(signal) if name[:3] == "SIG" and name[3] != "_" ) - - - + def __init__(self, address, num_workers, modname, **kwargs): self.address = address @@ -96,8 +95,11 @@ class Arbiter(object): def valid_pidfile(self, path): try: with open(path, "r") as f: - pid = int(f.read() or 0) - if pid <= 0: return + try: + pid = int(f.read()) + except: + return None + if pid <= 0: return None try: os.kill(pid, 0) @@ -116,14 +118,10 @@ class Arbiter(object): if self.PIPE: map(lambda p: p.close(), self.PIPE) self.PIPE = pair = os.pipe() - map(self.set_non_blocking, pair) - map(lambda p: fcntl.fcntl(p, fcntl.F_SETFD, fcntl.FD_CLOEXEC), pair) + map(util.set_non_blocking, pair) + map(util.close_on_exec, pair) map(lambda s: signal.signal(s, self.signal), self.SIGNALS) signal.signal(signal.SIGCHLD, self.handle_chld) - - def set_non_blocking(self, fd): - flags = fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK - fcntl.fcntl(fd, fcntl.F_SETFL, flags) def signal(self, sig, frame): if len(self.SIG_QUEUE) < 5: @@ -146,6 +144,7 @@ class Arbiter(object): self.log.error("should be a non GUNICORN environnement") else: raise + for i in range(5): try: @@ -292,7 +291,7 @@ class Arbiter(object): if not graceful: sig = signal.SIGTERM limit = time.time() + self.timeout - while len(self.WORKERS) and time.time() < limit: + while self.WORKERS or time.time() > limit: self.kill_workers(sig) time.sleep(0.1) self.reap_workers() @@ -343,12 +342,12 @@ class Arbiter(object): continue worker = Worker(i, self.pid, self.LISTENER, self.modname, - self.timeout, self.debug) + self.timeout, self.PIPE, self.debug) pid = os.fork() if pid != 0: self.WORKERS[pid] = worker continue - + # Process Child worker_pid = os.getpid() try: @@ -356,7 +355,6 @@ class Arbiter(object): worker.run() sys.exit(0) except SystemExit: - raise except: self.log.exception("Exception in worker process.") @@ -364,22 +362,21 @@ class Arbiter(object): finally: worker.tmp.close() self.log.info("Worker %s exiting." % worker_pid) + os._exit(127) def kill_workers(self, sig): for pid in self.WORKERS.keys(): self.kill_worker(pid, sig) def kill_worker(self, pid, sig): - worker = self.WORKERS.pop(pid) + try: os.kill(pid, sig) - kpid, stat = os.waitpid(pid, os.WNOHANG) - if kpid: - self.log.warning("Problem killing process: %s" % pid) except OSError, e: if e.errno == errno.ESRCH: + worker = self.WORKERS.pop(pid) try: worker.tmp.close() except: pass - + raise diff --git a/gunicorn/main.py b/gunicorn/main.py index 1fe88c7f..1f85be62 100644 --- a/gunicorn/main.py +++ b/gunicorn/main.py @@ -7,10 +7,10 @@ import logging import optparse as op import os -import resource import sys from gunicorn.arbiter import Arbiter +from gunicorn import util LOG_LEVELS = { "critical": logging.CRITICAL, @@ -21,12 +21,6 @@ LOG_LEVELS = { } UMASK = 0 -MAXFD = 1024 -if (hasattr(os, "devnull")): - REDIRECT_TO = os.devnull -else: - REDIRECT_TO = "/dev/null" - def options(): return [ @@ -75,9 +69,7 @@ def daemonize(logger): else: os._exit(0) - maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1] - if (maxfd == resource.RLIM_INFINITY): - maxfd = 1024 + maxfd = util.get_maxfd() # Iterate through and close all file descriptors. for fd in range(0, maxfd): @@ -86,8 +78,7 @@ def daemonize(logger): except OSError: # ERROR, fd wasn't open to begin with (ignored) pass - - os.open(REDIRECT_TO, os.O_RDWR) + os.open(util.REDIRECT_TO, os.O_RDWR) os.dup2(0, 1) os.dup2(0, 2) diff --git a/gunicorn/util.py b/gunicorn/util.py index 311c87be..87584fd7 100644 --- a/gunicorn/util.py +++ b/gunicorn/util.py @@ -5,10 +5,19 @@ import errno import fcntl +import os +import resource import select import socket import time +MAXFD = 1024 +if (hasattr(os, "devnull")): + REDIRECT_TO = os.devnull +else: + REDIRECT_TO = "/dev/null" + + timeout_default = object() @@ -20,10 +29,20 @@ weekdayname = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'] monthname = [None, 'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'] + +def get_maxfd(): + maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1] + if (maxfd == resource.RLIM_INFINITY): + maxfd = MAXFD + return maxfd def close_on_exec(fd): flags = fcntl.fcntl(fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC fcntl.fcntl(fd, fcntl.F_SETFL, flags) + +def set_non_blocking(fd): + flags = fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK + fcntl.fcntl(fd, fcntl.F_SETFL, flags) def close(sock): try: diff --git a/gunicorn/worker.py b/gunicorn/worker.py index 8e74c409..00c2a7fe 100644 --- a/gunicorn/worker.py +++ b/gunicorn/worker.py @@ -24,39 +24,52 @@ class Worker(object): lambda x: getattr(signal, "SIG%s" % x), "HUP QUIT INT TERM TTIN TTOU USR1".split() ) + + PIPE = [] def __init__(self, workerid, ppid, socket, app, timeout, - debug=False): + pipe, debug=False): + self.nr = 0 self.id = workerid self.ppid = ppid self.debug = debug + self.socket = socket self.timeout = timeout / 2.0 fd, tmpname = tempfile.mkstemp() self.tmp = os.fdopen(fd, "r+b") self.tmpname = tmpname - - # prevent inherientence - self.socket = socket - util.close_on_exec(self.socket) - self.socket.setblocking(0) - - util.close_on_exec(fd) - - self.address = self.socket.getsockname() - self.app = app self.alive = True self.log = logging.getLogger(__name__) - + + # init pipe + self.PIPE = pipe + map(util.set_non_blocking, pipe) + map(util.close_on_exec, pipe) + + # prevent inherientence + util.close_on_exec(self.socket) + self.socket.setblocking(0) + util.close_on_exec(fd) + + self.address = self.socket.getsockname() + def init_signals(self): map(lambda s: signal.signal(s, signal.SIG_DFL), self.SIGNALS) signal.signal(signal.SIGQUIT, self.handle_quit) + signal.signal(signal.SIGUSR1, self.handle_usr1) signal.signal(signal.SIGTERM, self.handle_exit) signal.signal(signal.SIGINT, self.handle_exit) - signal.signal(signal.SIGUSR1, self.handle_quit) - - def handle_quit(self, sig, frame): + + def handle_usr1(self, *args): + self.nr = -65536; + try: + map(lambda p: p.close(), self.PIPE) + except: + pass + + def handle_quit(self, *args): self.alive = False def handle_exit(self, sig, frame): @@ -71,14 +84,16 @@ class Worker(object): def run(self): self.init_signals() spinner = 0 + self.nr = 0 while self.alive: - nr = 0 + self.nr = 0 # Accept until we hit EAGAIN. We're betting that when we're # processing clients that more clients are waiting. When # there's no more clients waiting we go back to the select() # loop and wait for some lovin. while self.alive: + self.nr = 0 try: client, addr = self.socket.accept() @@ -89,19 +104,21 @@ class Worker(object): # to signal that this worker process is alive. spinner = (spinner+1) % 2 self._fchmod(spinner) - nr += 1 + self.nr += 1 except socket.error, e: if e[0] in (errno.EAGAIN, errno.ECONNABORTED): break # Uh oh! - raise - if nr == 0: break + if self.nr == 0: break + + if self.ppid != os.getppid(): + break while self.alive: spinner = (spinner+1) % 2 self._fchmod(spinner) try: - ret = select.select([self.socket], [], [], + ret = select.select([self.socket], [], self.PIPE, self.timeout) if ret[0]: break