Fix eventlet shutdown to actively shut down the workers.

This commit is contained in:
Randall Leeds 2014-10-09 10:49:57 +01:00
parent fcba1a6c1c
commit 415956da23

View File

@ -5,6 +5,7 @@
from functools import partial from functools import partial
import errno import errno
import sys
try: try:
import eventlet import eventlet
@ -16,9 +17,10 @@ if eventlet.version_info < (0, 9, 7):
raise RuntimeError("You need eventlet >= 0.9.7") raise RuntimeError("You need eventlet >= 0.9.7")
from eventlet import hubs from eventlet import hubs, greenthread
from eventlet.greenio import GreenSocket from eventlet.greenio import GreenSocket
from eventlet.hubs import trampoline from eventlet.hubs import trampoline
import greenlet
from gunicorn.http.wsgi import sendfile as o_sendfile from gunicorn.http.wsgi import sendfile as o_sendfile
from gunicorn.workers.async import AsyncWorker from gunicorn.workers.async import AsyncWorker
@ -33,6 +35,47 @@ def _eventlet_sendfile(fdout, fdin, offset, nbytes):
else: else:
raise raise
def _eventlet_serve(sock, handle, concurrency):
"""
Serve requests forever.
This code is nearly identical to ``eventlet.convenience.serve`` except
that it attempts to join the pool at the end, which allows for gunicorn
graceful shutdowns.
"""
pool = eventlet.greenpool.GreenPool(concurrency)
server_gt = eventlet.greenthread.getcurrent()
while True:
try:
conn, addr = sock.accept()
gt = pool.spawn(handle, conn, addr)
gt.link(_eventlet_stop, server_gt, conn)
conn, addr, gt = None, None, None
except eventlet.StopServe:
pool.waitall()
return
def _eventlet_stop(client, server, conn):
"""
Stop a greenlet handling a request and close its connection.
This code is lifted from eventlet so as not to depend on undocumented
functions in the library.
"""
try:
try:
client.wait()
finally:
conn.close()
except greenlet.GreenletExit:
pass
except Exception:
greenthread.kill(server, *sys.exc_info())
def patch_sendfile(): def patch_sendfile():
from gunicorn.http import wsgi from gunicorn.http import wsgi
@ -60,16 +103,13 @@ class EventletWorker(AsyncWorker):
super(EventletWorker, self).handle(listener, client, addr) super(EventletWorker, self).handle(listener, client, addr)
if not self.alive:
raise eventlet.StopServe()
def run(self): def run(self):
acceptors = [] acceptors = []
for sock in self.sockets: for sock in self.sockets:
sock = GreenSocket(sock) gsock = GreenSocket(sock)
sock.setblocking(1) gsock.setblocking(1)
hfun = partial(self.handle, sock) hfun = partial(self.handle, gsock)
acceptor = eventlet.spawn(eventlet.serve, sock, hfun, acceptor = eventlet.spawn(_eventlet_serve, gsock, hfun,
self.worker_connections) self.worker_connections)
acceptors.append(acceptor) acceptors.append(acceptor)
@ -82,6 +122,7 @@ class EventletWorker(AsyncWorker):
self.notify() self.notify()
try: try:
with eventlet.Timeout(self.cfg.graceful_timeout) as t: with eventlet.Timeout(self.cfg.graceful_timeout) as t:
[a.kill(eventlet.StopServe()) for a in acceptors]
[a.wait() for a in acceptors] [a.wait() for a in acceptors]
except eventlet.Timeout as te: except eventlet.Timeout as te:
if te != t: if te != t: