inherit PIPE and fix bug in reap_workers

This commit is contained in:
Benoit Chesneau 2010-01-29 22:55:01 +01:00
parent 6dff13c309
commit 6d4ef85e39
4 changed files with 76 additions and 52 deletions

View File

@ -17,6 +17,7 @@ import tempfile
import time import time
from gunicorn.worker import Worker from gunicorn.worker import Worker
from gunicorn import util
class Arbiter(object): class Arbiter(object):
@ -35,8 +36,6 @@ class Arbiter(object):
if name[:3] == "SIG" and name[3] != "_" if name[:3] == "SIG" and name[3] != "_"
) )
def __init__(self, address, num_workers, modname, def __init__(self, address, num_workers, modname,
**kwargs): **kwargs):
self.address = address self.address = address
@ -96,8 +95,11 @@ class Arbiter(object):
def valid_pidfile(self, path): def valid_pidfile(self, path):
try: try:
with open(path, "r") as f: with open(path, "r") as f:
pid = int(f.read() or 0) try:
if pid <= 0: return pid = int(f.read())
except:
return None
if pid <= 0: return None
try: try:
os.kill(pid, 0) os.kill(pid, 0)
@ -116,15 +118,11 @@ class Arbiter(object):
if self.PIPE: if self.PIPE:
map(lambda p: p.close(), self.PIPE) map(lambda p: p.close(), self.PIPE)
self.PIPE = pair = os.pipe() self.PIPE = pair = os.pipe()
map(self.set_non_blocking, pair) map(util.set_non_blocking, pair)
map(lambda p: fcntl.fcntl(p, fcntl.F_SETFD, fcntl.FD_CLOEXEC), pair) map(util.close_on_exec, pair)
map(lambda s: signal.signal(s, self.signal), self.SIGNALS) map(lambda s: signal.signal(s, self.signal), self.SIGNALS)
signal.signal(signal.SIGCHLD, self.handle_chld) signal.signal(signal.SIGCHLD, self.handle_chld)
def set_non_blocking(self, fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK
fcntl.fcntl(fd, fcntl.F_SETFL, flags)
def signal(self, sig, frame): def signal(self, sig, frame):
if len(self.SIG_QUEUE) < 5: if len(self.SIG_QUEUE) < 5:
self.SIG_QUEUE.append(sig) self.SIG_QUEUE.append(sig)
@ -147,6 +145,7 @@ class Arbiter(object):
else: else:
raise raise
for i in range(5): for i in range(5):
try: try:
sock = self.init_socket(addr) sock = self.init_socket(addr)
@ -292,7 +291,7 @@ class Arbiter(object):
if not graceful: if not graceful:
sig = signal.SIGTERM sig = signal.SIGTERM
limit = time.time() + self.timeout limit = time.time() + self.timeout
while len(self.WORKERS) and time.time() < limit: while self.WORKERS or time.time() > limit:
self.kill_workers(sig) self.kill_workers(sig)
time.sleep(0.1) time.sleep(0.1)
self.reap_workers() self.reap_workers()
@ -343,7 +342,7 @@ class Arbiter(object):
continue continue
worker = Worker(i, self.pid, self.LISTENER, self.modname, worker = Worker(i, self.pid, self.LISTENER, self.modname,
self.timeout, self.debug) self.timeout, self.PIPE, self.debug)
pid = os.fork() pid = os.fork()
if pid != 0: if pid != 0:
self.WORKERS[pid] = worker self.WORKERS[pid] = worker
@ -356,7 +355,6 @@ class Arbiter(object):
worker.run() worker.run()
sys.exit(0) sys.exit(0)
except SystemExit: except SystemExit:
raise raise
except: except:
self.log.exception("Exception in worker process.") self.log.exception("Exception in worker process.")
@ -364,22 +362,21 @@ class Arbiter(object):
finally: finally:
worker.tmp.close() worker.tmp.close()
self.log.info("Worker %s exiting." % worker_pid) self.log.info("Worker %s exiting." % worker_pid)
os._exit(127)
def kill_workers(self, sig): def kill_workers(self, sig):
for pid in self.WORKERS.keys(): for pid in self.WORKERS.keys():
self.kill_worker(pid, sig) self.kill_worker(pid, sig)
def kill_worker(self, pid, sig): def kill_worker(self, pid, sig):
worker = self.WORKERS.pop(pid)
try: try:
os.kill(pid, sig) os.kill(pid, sig)
kpid, stat = os.waitpid(pid, os.WNOHANG)
if kpid:
self.log.warning("Problem killing process: %s" % pid)
except OSError, e: except OSError, e:
if e.errno == errno.ESRCH: if e.errno == errno.ESRCH:
worker = self.WORKERS.pop(pid)
try: try:
worker.tmp.close() worker.tmp.close()
except: except:
pass pass
raise

View File

@ -7,10 +7,10 @@
import logging import logging
import optparse as op import optparse as op
import os import os
import resource
import sys import sys
from gunicorn.arbiter import Arbiter from gunicorn.arbiter import Arbiter
from gunicorn import util
LOG_LEVELS = { LOG_LEVELS = {
"critical": logging.CRITICAL, "critical": logging.CRITICAL,
@ -21,12 +21,6 @@ LOG_LEVELS = {
} }
UMASK = 0 UMASK = 0
MAXFD = 1024
if (hasattr(os, "devnull")):
REDIRECT_TO = os.devnull
else:
REDIRECT_TO = "/dev/null"
def options(): def options():
return [ return [
@ -75,9 +69,7 @@ def daemonize(logger):
else: else:
os._exit(0) os._exit(0)
maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1] maxfd = util.get_maxfd()
if (maxfd == resource.RLIM_INFINITY):
maxfd = 1024
# Iterate through and close all file descriptors. # Iterate through and close all file descriptors.
for fd in range(0, maxfd): for fd in range(0, maxfd):
@ -86,8 +78,7 @@ def daemonize(logger):
except OSError: # ERROR, fd wasn't open to begin with (ignored) except OSError: # ERROR, fd wasn't open to begin with (ignored)
pass pass
os.open(util.REDIRECT_TO, os.O_RDWR)
os.open(REDIRECT_TO, os.O_RDWR)
os.dup2(0, 1) os.dup2(0, 1)
os.dup2(0, 2) os.dup2(0, 2)

View File

@ -5,10 +5,19 @@
import errno import errno
import fcntl import fcntl
import os
import resource
import select import select
import socket import socket
import time import time
MAXFD = 1024
if (hasattr(os, "devnull")):
REDIRECT_TO = os.devnull
else:
REDIRECT_TO = "/dev/null"
timeout_default = object() timeout_default = object()
@ -21,10 +30,20 @@ monthname = [None,
'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'] 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
def get_maxfd():
maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
if (maxfd == resource.RLIM_INFINITY):
maxfd = MAXFD
return maxfd
def close_on_exec(fd): def close_on_exec(fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC flags = fcntl.fcntl(fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
fcntl.fcntl(fd, fcntl.F_SETFL, flags) fcntl.fcntl(fd, fcntl.F_SETFL, flags)
def set_non_blocking(fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK
fcntl.fcntl(fd, fcntl.F_SETFL, flags)
def close(sock): def close(sock):
try: try:
sock.close() sock.close()

View File

@ -25,38 +25,51 @@ class Worker(object):
"HUP QUIT INT TERM TTIN TTOU USR1".split() "HUP QUIT INT TERM TTIN TTOU USR1".split()
) )
PIPE = []
def __init__(self, workerid, ppid, socket, app, timeout, def __init__(self, workerid, ppid, socket, app, timeout,
debug=False): pipe, debug=False):
self.nr = 0
self.id = workerid self.id = workerid
self.ppid = ppid self.ppid = ppid
self.debug = debug self.debug = debug
self.socket = socket
self.timeout = timeout / 2.0 self.timeout = timeout / 2.0
fd, tmpname = tempfile.mkstemp() fd, tmpname = tempfile.mkstemp()
self.tmp = os.fdopen(fd, "r+b") self.tmp = os.fdopen(fd, "r+b")
self.tmpname = tmpname self.tmpname = tmpname
# prevent inherientence
self.socket = socket
util.close_on_exec(self.socket)
self.socket.setblocking(0)
util.close_on_exec(fd)
self.address = self.socket.getsockname()
self.app = app self.app = app
self.alive = True self.alive = True
self.log = logging.getLogger(__name__) self.log = logging.getLogger(__name__)
# init pipe
self.PIPE = pipe
map(util.set_non_blocking, pipe)
map(util.close_on_exec, pipe)
# prevent inherientence
util.close_on_exec(self.socket)
self.socket.setblocking(0)
util.close_on_exec(fd)
self.address = self.socket.getsockname()
def init_signals(self): def init_signals(self):
map(lambda s: signal.signal(s, signal.SIG_DFL), self.SIGNALS) map(lambda s: signal.signal(s, signal.SIG_DFL), self.SIGNALS)
signal.signal(signal.SIGQUIT, self.handle_quit) signal.signal(signal.SIGQUIT, self.handle_quit)
signal.signal(signal.SIGUSR1, self.handle_usr1)
signal.signal(signal.SIGTERM, self.handle_exit) signal.signal(signal.SIGTERM, self.handle_exit)
signal.signal(signal.SIGINT, self.handle_exit) signal.signal(signal.SIGINT, self.handle_exit)
signal.signal(signal.SIGUSR1, self.handle_quit)
def handle_quit(self, sig, frame): def handle_usr1(self, *args):
self.nr = -65536;
try:
map(lambda p: p.close(), self.PIPE)
except:
pass
def handle_quit(self, *args):
self.alive = False self.alive = False
def handle_exit(self, sig, frame): def handle_exit(self, sig, frame):
@ -71,14 +84,16 @@ class Worker(object):
def run(self): def run(self):
self.init_signals() self.init_signals()
spinner = 0 spinner = 0
self.nr = 0
while self.alive: while self.alive:
nr = 0 self.nr = 0
# Accept until we hit EAGAIN. We're betting that when we're # Accept until we hit EAGAIN. We're betting that when we're
# processing clients that more clients are waiting. When # processing clients that more clients are waiting. When
# there's no more clients waiting we go back to the select() # there's no more clients waiting we go back to the select()
# loop and wait for some lovin. # loop and wait for some lovin.
while self.alive: while self.alive:
self.nr = 0
try: try:
client, addr = self.socket.accept() client, addr = self.socket.accept()
@ -89,19 +104,21 @@ class Worker(object):
# to signal that this worker process is alive. # to signal that this worker process is alive.
spinner = (spinner+1) % 2 spinner = (spinner+1) % 2
self._fchmod(spinner) self._fchmod(spinner)
nr += 1 self.nr += 1
except socket.error, e: except socket.error, e:
if e[0] in (errno.EAGAIN, errno.ECONNABORTED): if e[0] in (errno.EAGAIN, errno.ECONNABORTED):
break # Uh oh! break # Uh oh!
raise raise
if nr == 0: break if self.nr == 0: break
if self.ppid != os.getppid():
break
while self.alive: while self.alive:
spinner = (spinner+1) % 2 spinner = (spinner+1) % 2
self._fchmod(spinner) self._fchmod(spinner)
try: try:
ret = select.select([self.socket], [], [], ret = select.select([self.socket], [], self.PIPE,
self.timeout) self.timeout)
if ret[0]: if ret[0]:
break break