mirror of
https://github.com/frappe/gunicorn.git
synced 2026-01-14 11:09:11 +08:00
More work on signals.
I removed some of the signals that are more advanced than I want to write. After we get things working solid we'll need to go back and revisit the hot code swapping, halting workers, and log handling. There's still a weird racey lockup when hitting it with ab. I don't have httperf installed on this machine so I can't try a different tool to see if its just ab acting weird. I'm pretty sure I've seen ab do this before so I'm not too concerned.
This commit is contained in:
parent
d5956ed5f3
commit
89bae0daf2
@ -23,21 +23,22 @@ class Arbiter(object):
|
|||||||
SIG_QUEUE = []
|
SIG_QUEUE = []
|
||||||
SIGNALS = map(
|
SIGNALS = map(
|
||||||
lambda x: getattr(signal, "SIG%s" % x),
|
lambda x: getattr(signal, "SIG%s" % x),
|
||||||
"WINCH CHLD QUIT INT TERM USR1 USR2 HUP TTIN TTOU".split()
|
"QUIT INT TERM TTIN TTOU".split()
|
||||||
)
|
)
|
||||||
SIG_NAMES = dict(
|
SIG_NAMES = dict(
|
||||||
(getattr(signal, name), name) for name in dir(signal)
|
(getattr(signal, name), name[3:].lower()) for name in dir(signal)
|
||||||
if name[:3] == "SIG"
|
if name[:3] == "SIG"
|
||||||
)
|
)
|
||||||
|
|
||||||
def __init__(self, address, num_workers, modname):
|
def __init__(self, address, num_workers, modname):
|
||||||
log.info("Booting Arbiter.")
|
|
||||||
self.address = address
|
self.address = address
|
||||||
self.num_workers = num_workers
|
self.num_workers = num_workers
|
||||||
self.modname = modname
|
self.modname = modname
|
||||||
|
self.timeout = 30
|
||||||
self.pid = os.getpid()
|
self.pid = os.getpid()
|
||||||
self.init_signals()
|
self.init_signals()
|
||||||
self.listen(self.address)
|
self.listen(self.address)
|
||||||
|
log.info("Booted Arbiter: %s" % os.getpid())
|
||||||
|
|
||||||
def init_signals(self):
|
def init_signals(self):
|
||||||
if self.PIPE:
|
if self.PIPE:
|
||||||
@ -63,22 +64,6 @@ class Arbiter(object):
|
|||||||
if e.errno not in [errno.EAGAIN, errno.EINTR]:
|
if e.errno not in [errno.EAGAIN, errno.EINTR]:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def sleep(self):
|
|
||||||
try:
|
|
||||||
ready = select.select([self.PIPE[0]], [], [], 1)
|
|
||||||
if not ready[0]:
|
|
||||||
return
|
|
||||||
while os.read(self.PIPE[0], 1):
|
|
||||||
pass
|
|
||||||
except select.error, e:
|
|
||||||
if e[0] not in [errno.EAGAIN, errno.EINTR]:
|
|
||||||
raise
|
|
||||||
except OSError, e:
|
|
||||||
if e.errno not in [errno.EAGAIN, errno.EINTR]:
|
|
||||||
raise
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
sys.exit()
|
|
||||||
|
|
||||||
def listen(self, addr):
|
def listen(self, addr):
|
||||||
for i in range(5):
|
for i in range(5):
|
||||||
try:
|
try:
|
||||||
@ -106,30 +91,107 @@ class Arbiter(object):
|
|||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
sig = self.SIG_QUEUE.pop(0) if len(self.SIG_QUEUE) else None
|
sig = self.SIG_QUEUE.pop(0) if len(self.SIG_QUEUE) else None
|
||||||
if sig is not None:
|
|
||||||
log.info("SIGNAL: %s" % self.SIG_NAMES.get(sig, "Unknown"))
|
|
||||||
if sig is None:
|
if sig is None:
|
||||||
self.sleep()
|
self.sleep()
|
||||||
elif sig is signal.SIGINT:
|
continue
|
||||||
self.kill_workers(signal.SIGINT)
|
|
||||||
sys.exit(1)
|
|
||||||
elif sig is signal.SIGQUIT:
|
|
||||||
self.kill_workers(signal.SIGTERM)
|
|
||||||
sys.exit(0)
|
|
||||||
else:
|
|
||||||
name = self.SIG_NAMES.get(sig, "UNKNOWN")
|
|
||||||
log.warn("IGNORED: %s" % name)
|
|
||||||
|
|
||||||
|
if sig not in self.SIG_NAMES:
|
||||||
|
log.info("Ignoring unknown signal: %s" % sig)
|
||||||
|
continue
|
||||||
|
|
||||||
|
signame = self.SIG_NAMES.get(sig)
|
||||||
|
handler = getattr(self, "handle_%s" % signame, None)
|
||||||
|
if not handler:
|
||||||
|
log.error("Unhandled signal: %s" % signame)
|
||||||
|
continue
|
||||||
|
|
||||||
|
log.info("Handling signal: %s" % signame)
|
||||||
|
handler()
|
||||||
|
|
||||||
|
self.murder_workers()
|
||||||
self.reap_workers()
|
self.reap_workers()
|
||||||
self.manage_workers()
|
self.manage_workers()
|
||||||
|
|
||||||
|
except StopIteration:
|
||||||
|
break
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
self.kill_workers(signal.SIGTERM)
|
self.stop(False)
|
||||||
sys.exit()
|
sys.exit(-1)
|
||||||
except Exception, e:
|
except Exception, e:
|
||||||
self.kill_workers(signal.SIGTERM)
|
|
||||||
log.exception("Unhandled exception in main loop.")
|
log.exception("Unhandled exception in main loop.")
|
||||||
sys.exit()
|
self.stop(False)
|
||||||
|
sys.exit(-1)
|
||||||
|
|
||||||
|
log.info("Master is shutting down.")
|
||||||
|
self.stop()
|
||||||
|
|
||||||
|
def handle_quit(self):
|
||||||
|
raise StopIteration
|
||||||
|
|
||||||
|
def handle_int(self):
|
||||||
|
self.stop(False)
|
||||||
|
raise StopIteration
|
||||||
|
|
||||||
|
def handle_term(self):
|
||||||
|
self.stop(False)
|
||||||
|
raise StopIteration
|
||||||
|
|
||||||
|
def handle_ttin(self):
|
||||||
|
self.num_workers += 1
|
||||||
|
|
||||||
|
def handle_ttou(self):
|
||||||
|
if self.num_workers > 0:
|
||||||
|
self.num_workers -= 1
|
||||||
|
|
||||||
|
def sleep(self):
|
||||||
|
try:
|
||||||
|
ready = select.select([self.PIPE[0]], [], [], 1)
|
||||||
|
if not ready[0]:
|
||||||
|
return
|
||||||
|
while os.read(self.PIPE[0], 1):
|
||||||
|
pass
|
||||||
|
except select.error, e:
|
||||||
|
if e[0] not in [errno.EAGAIN, errno.EINTR]:
|
||||||
|
raise
|
||||||
|
except OSError, e:
|
||||||
|
if e.errno not in [errno.EAGAIN, errno.EINTR]:
|
||||||
|
raise
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
sys.exit()
|
||||||
|
|
||||||
|
def stop(self, graceful=True):
|
||||||
|
self.LISTENER.close()
|
||||||
|
sig = signal.SIGQUIT
|
||||||
|
if not graceful:
|
||||||
|
sig = signal.SIGTERM
|
||||||
|
limit = time.time() + self.timeout
|
||||||
|
while len(self.WORKERS) and time.time() < limit:
|
||||||
|
self.kill_workers(sig)
|
||||||
|
time.sleep(0.1)
|
||||||
|
self.reap_workers()
|
||||||
|
self.kill_workers(signal.SIGKILL)
|
||||||
|
|
||||||
|
def murder_workers(self):
|
||||||
|
for (pid, worker) in self.WORKERS.iteritems():
|
||||||
|
diff = time.time() - os.fstat(worker.tmp.fileno()).st_mtime
|
||||||
|
if diff < self.timeout:
|
||||||
|
continue
|
||||||
|
self.kill_worker(pid, signal.SIGKILL)
|
||||||
|
|
||||||
|
def reap_workers(self):
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
wpid, status = os.waitpid(-1, os.WNOHANG)
|
||||||
|
if not wpid:
|
||||||
|
break
|
||||||
|
worker = self.WORKERS.pop(wpid, None)
|
||||||
|
if not worker:
|
||||||
|
continue
|
||||||
|
worker.tmp.close()
|
||||||
|
except OSError, e:
|
||||||
|
if e.errno == errno.ECHILD:
|
||||||
|
pass
|
||||||
|
|
||||||
def manage_workers(self):
|
def manage_workers(self):
|
||||||
if len(self.WORKERS.keys()) < self.num_workers:
|
if len(self.WORKERS.keys()) < self.num_workers:
|
||||||
@ -145,7 +207,7 @@ class Arbiter(object):
|
|||||||
if i in workers:
|
if i in workers:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
worker = Worker(i, self.LISTENER, self.modname)
|
worker = Worker(i, self.pid, self.LISTENER, self.modname)
|
||||||
pid = os.fork()
|
pid = os.fork()
|
||||||
if pid != 0:
|
if pid != 0:
|
||||||
self.WORKERS[pid] = worker
|
self.WORKERS[pid] = worker
|
||||||
@ -155,29 +217,14 @@ class Arbiter(object):
|
|||||||
try:
|
try:
|
||||||
log.info("Worker %s booting" % os.getpid())
|
log.info("Worker %s booting" % os.getpid())
|
||||||
worker.run()
|
worker.run()
|
||||||
log.info("Worker %s exiting" % os.getpid())
|
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
except SystemExit:
|
except SystemExit:
|
||||||
pass
|
raise
|
||||||
except:
|
except:
|
||||||
log.exception("Exception in worker process.")
|
log.exception("Exception in worker process.")
|
||||||
sys.exit(-1)
|
sys.exit(-1)
|
||||||
finally:
|
finally:
|
||||||
log.info("Done.")
|
log.info("Worker %s exiting." % os.getpid())
|
||||||
|
|
||||||
def reap_workers(self):
|
|
||||||
try:
|
|
||||||
while True:
|
|
||||||
wpid, status = os.waitpid(-1, os.WNOHANG)
|
|
||||||
if not wpid:
|
|
||||||
break
|
|
||||||
worker = self.WORKERS.pop(wpid)
|
|
||||||
if not worker:
|
|
||||||
continue
|
|
||||||
worker.tmp.close()
|
|
||||||
except OSError, e:
|
|
||||||
if e.errno == errno.ECHILD:
|
|
||||||
pass
|
|
||||||
|
|
||||||
def kill_workers(self, sig):
|
def kill_workers(self, sig):
|
||||||
for pid in self.WORKERS.keys():
|
for pid in self.WORKERS.keys():
|
||||||
@ -187,6 +234,9 @@ class Arbiter(object):
|
|||||||
worker = self.WORKERS.pop(pid)
|
worker = self.WORKERS.pop(pid)
|
||||||
try:
|
try:
|
||||||
os.kill(pid, sig)
|
os.kill(pid, sig)
|
||||||
|
except OSError, e:
|
||||||
|
if e.errno == errno.ESRCH:
|
||||||
|
pass
|
||||||
finally:
|
finally:
|
||||||
worker.tmp.close()
|
worker.tmp.close()
|
||||||
|
|
||||||
|
|||||||
@ -122,7 +122,6 @@ class HTTPRequest(object):
|
|||||||
# bad headers
|
# bad headers
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
def body_length(self):
|
def body_length(self):
|
||||||
transfert_encoding = self.headers.get('TRANSFERT-ENCODING')
|
transfert_encoding = self.headers.get('TRANSFERT-ENCODING')
|
||||||
content_length = self.headers.get('CONTENT-LENGTH')
|
content_length = self.headers.get('CONTENT-LENGTH')
|
||||||
|
|||||||
@ -5,6 +5,7 @@ import os
|
|||||||
import select
|
import select
|
||||||
import signal
|
import signal
|
||||||
import socket
|
import socket
|
||||||
|
import sys
|
||||||
|
|
||||||
import http
|
import http
|
||||||
import util
|
import util
|
||||||
@ -15,11 +16,13 @@ class Worker(object):
|
|||||||
|
|
||||||
SIGNALS = map(
|
SIGNALS = map(
|
||||||
lambda x: getattr(signal, "SIG%s" % x),
|
lambda x: getattr(signal, "SIG%s" % x),
|
||||||
"WINCH QUIT INT TERM USR1 USR2 HUP TTIN TTOU".split()
|
"QUIT INT TERM TTIN TTOU".split()
|
||||||
)
|
)
|
||||||
|
|
||||||
def __init__(self, workerid, socket, module):
|
def __init__(self, workerid, ppid, socket, module):
|
||||||
|
self.alive = True
|
||||||
self.id = workerid
|
self.id = workerid
|
||||||
|
self.ppid = ppid
|
||||||
self.socket = socket
|
self.socket = socket
|
||||||
self.address = socket.getsockname()
|
self.address = socket.getsockname()
|
||||||
self.tmp = os.tmpfile()
|
self.tmp = os.tmpfile()
|
||||||
@ -27,18 +30,35 @@ class Worker(object):
|
|||||||
|
|
||||||
def init_signals(self):
|
def init_signals(self):
|
||||||
map(lambda s: signal.signal(s, signal.SIG_DFL), self.SIGNALS)
|
map(lambda s: signal.signal(s, signal.SIG_DFL), self.SIGNALS)
|
||||||
|
signal.signal(signal.SIGQUIT, self.handle_quit)
|
||||||
|
signal.signal(signal.SIGTERM, self.handle_exit)
|
||||||
|
signal.signal(signal.SIGINT, self.handle_exit)
|
||||||
|
|
||||||
|
def handle_quit(self, sig, frame):
|
||||||
|
self.alive = False
|
||||||
|
|
||||||
|
def handle_exit(self, sig, frame):
|
||||||
|
sys.exit(-1)
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
self.init_signals()
|
self.init_signals()
|
||||||
while True:
|
spinner = 0
|
||||||
|
while self.alive:
|
||||||
# Wait for a request to handle.
|
# Wait for a request to handle.
|
||||||
while True:
|
while self.alive:
|
||||||
ret = select.select([self.socket], [], [], 2.0)
|
try:
|
||||||
|
ret = select.select([self.socket], [], [], 2.0)
|
||||||
|
except select.error, e:
|
||||||
|
if e[0] != errno.EINTR:
|
||||||
|
raise
|
||||||
if ret[0]:
|
if ret[0]:
|
||||||
break
|
break
|
||||||
|
|
||||||
# Accept until we hit EAGAIN
|
# Accept until we hit EAGAIN. We're betting that when we're
|
||||||
while True:
|
# processing clients that more clients are waiting. When
|
||||||
|
# there's no more clients waiting we go back to the select()
|
||||||
|
# loop and wait for some lovin.
|
||||||
|
while self.alive:
|
||||||
try:
|
try:
|
||||||
(conn, addr) = self.socket.accept()
|
(conn, addr) = self.socket.accept()
|
||||||
except socket.error, e:
|
except socket.error, e:
|
||||||
@ -46,7 +66,6 @@ class Worker(object):
|
|||||||
break # Jump back to select
|
break # Jump back to select
|
||||||
raise # Uh oh!
|
raise # Uh oh!
|
||||||
|
|
||||||
#log.info("Client connected: %s:%s" % addr)
|
|
||||||
conn.setblocking(1)
|
conn.setblocking(1)
|
||||||
try:
|
try:
|
||||||
self.handle(conn, addr)
|
self.handle(conn, addr)
|
||||||
@ -55,6 +74,11 @@ class Worker(object):
|
|||||||
finally:
|
finally:
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
|
# Update the fd mtime on each client completion
|
||||||
|
# to signal that this worker process is alive.
|
||||||
|
spinner = (spinner+1) % 2
|
||||||
|
os.fchmod(self.tmp.fileno(), spinner)
|
||||||
|
|
||||||
def handle(self, conn, client):
|
def handle(self, conn, client):
|
||||||
while True:
|
while True:
|
||||||
req = http.HTTPRequest(conn, client, self.address)
|
req = http.HTTPRequest(conn, client, self.address)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user