handle HUP. There are still dead processes after HUP that need to be

removed.
This commit is contained in:
Benoit Chesneau 2010-01-10 19:53:34 +01:00
parent fb61e9b6ba
commit a98d9d9e3e

View File

@ -61,6 +61,7 @@ class Arbiter(object):
self.num_workers = num_workers self.num_workers = num_workers
self.modname = modname self.modname = modname
self.timeout = 30 self.timeout = 30
self.reexec_pid = 0
self.pid = os.getpid() self.pid = os.getpid()
self.init_signals() self.init_signals()
self.listen(self.address) self.listen(self.address)
@ -81,16 +82,27 @@ class Arbiter(object):
def signal(self, sig, frame): def signal(self, sig, frame):
if len(self.SIG_QUEUE) < 5: if len(self.SIG_QUEUE) < 5:
self.SIG_QUEUE.append(sig) self.SIG_QUEUE.append(sig)
else: else:
log.warn("Ignoring rapid signaling: %s" % sig) log.warn("Ignoring rapid signaling: %s" % sig)
# Wake up the arbiter
try: self.wakeup()
os.write(self.PIPE[1], '.')
except IOError, e:
if e.errno not in [errno.EAGAIN, errno.EINTR]:
raise
def listen(self, addr): def listen(self, addr):
if 'GUNICORN_FD' in os.environ:
fd = int(os.environ['GUNICORN_FD'])
del os.environ['GUNICORN_FD']
try:
sock = self.init_socket_fromfd(fd, addr)
self.LISTENER = sock
return
except socket.error, e:
if e[0] == errno.ENOTCONN:
log.error("should be a non GUNICORN environnement")
else:
raise
for i in range(5): for i in range(5):
try: try:
sock = self.init_socket(addr) sock = self.init_socket(addr)
@ -103,19 +115,25 @@ class Arbiter(object):
log.error("Retrying in 1 second.") log.error("Retrying in 1 second.")
time.sleep(1) time.sleep(1)
def init_socket_fromfd(self, fd, address):
sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
self.set_sockopts(sock)
return sock
def init_socket(self, address): def init_socket(self, address):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(0) self.set_sockopts(sock)
sock.bind(address)
sock.listen(2048)
return sock
def set_sockopts(self, sock):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
if hasattr(socket, "TCP_CORK"): if hasattr(socket, "TCP_CORK"):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_CORK, 1) sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_CORK, 1)
elif hasattr(socket, "TCP_NOPUSH"): elif hasattr(socket, "TCP_NOPUSH"):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NOPUSH, 1) sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NOPUSH, 1)
sock.bind(address)
sock.listen(2048)
return sock
def run(self): def run(self):
self.manage_workers() self.manage_workers()
@ -156,16 +174,15 @@ class Arbiter(object):
log.info("Master is shutting down.") log.info("Master is shutting down.")
self.stop() self.stop()
sys.exit(0)
def handle_chld(self): def handle_chld(self):
self.wakeup() self.wakeup()
def handle_hup(self): def handle_hup(self):
log.info("Master hang up.") log.info("Master hang up.")
# for now we quit on HUP self.reexec()
self.handle_quit() raise StopIteration
#apply(os.execlp, (sys.argv[0],) + tuple(sys.argv))
def handle_quit(self): def handle_quit(self):
self.stop(False) self.stop(False)
@ -187,13 +204,12 @@ class Arbiter(object):
self.num_workers -= 1 self.num_workers -= 1
def wakeup(self): def wakeup(self):
while True: # Wake up the arbiter
try: try:
os.write(self.PIPE[1], ".") os.write(self.PIPE[1], '.')
return except IOError, e:
except OSError, e: if e.errno not in [errno.EAGAIN, errno.EINTR]:
if e[0] not in [errno.EAGAIN, errno.EINTR]: raise
raise
def sleep(self): def sleep(self):
try: try:
@ -223,6 +239,13 @@ class Arbiter(object):
self.reap_workers() self.reap_workers()
self.kill_workers(signal.SIGKILL) self.kill_workers(signal.SIGKILL)
def reexec(self):
self.reexec_pid = os.fork()
if self.reexec_pid != 0:
os.environ['GUNICORN_FD'] = str(self.LISTENER.fileno())
self.LISTENER.setblocking(1)
apply(os.execlp, (sys.argv[0],) + tuple(sys.argv))
def murder_workers(self): def murder_workers(self):
for (pid, worker) in list(self.WORKERS.items()): for (pid, worker) in list(self.WORKERS.items()):
diff = time.time() - os.fstat(worker.tmp.fileno()).st_mtime diff = time.time() - os.fstat(worker.tmp.fileno()).st_mtime
@ -234,12 +257,17 @@ class Arbiter(object):
try: try:
while True: while True:
wpid, status = os.waitpid(-1, os.WNOHANG) wpid, status = os.waitpid(-1, os.WNOHANG)
if not wpid: if not wpid: break
break if self.reexec_pid == wpid:
worker = self.WORKERS.pop(wpid, None) log.error("reaped %s" % str(status))
if not worker: self.reexec_pid = 0
continue else:
worker.tmp.close() worker = self.WORKERS.pop(wpid, None)
if not worker:
continue
worker.tmp.close()
log.info("repead %s \nworker %s" % (str(status),
str(worker.id)))
except OSError, e: except OSError, e:
if e.errno == errno.ECHILD: if e.errno == errno.ECHILD:
pass pass