[thread] close sockets at graceful shutdown

The run loop has to change slightly to support graceful shutdown.
There is no way to interrupt a call to `futures.wait` so instead
the pattern, used by the async workers, is to sleep for only one
second at the most. The poll is extended to a one second timeout
to match.

Since threads are preemptively scheduled, it's possible that the
listener is closed when the request is actually handled. For this
reason it is necessary to slightly refactor the TConn class to store
the listening socket name. The name is checked once at the start of
the worker run loop.

Ref #922
This commit is contained in:
Randall Leeds 2016-03-20 17:34:55 -07:00
parent 5b32dde3ef
commit f2418a95e0

View File

@ -44,11 +44,11 @@ except ImportError:
class TConn(object): class TConn(object):
def __init__(self, cfg, listener, sock, addr): def __init__(self, cfg, sock, client, server):
self.cfg = cfg self.cfg = cfg
self.listener = listener
self.sock = sock self.sock = sock
self.addr = addr self.client = client
self.server = server
self.timeout = None self.timeout = None
self.parser = None self.parser = None
@ -127,11 +127,11 @@ class ThreadWorker(base.Worker):
fs = self.tpool.submit(self.handle, conn) fs = self.tpool.submit(self.handle, conn)
self._wrap_future(fs, conn) self._wrap_future(fs, conn)
def accept(self, listener): def accept(self, server, listener):
try: try:
client, addr = listener.accept() sock, client = listener.accept()
# initialize the connection object # initialize the connection object
conn = TConn(self.cfg, listener, client, addr) conn = TConn(self.cfg, sock, client, server)
self.nr_conns += 1 self.nr_conns += 1
# enqueue the job # enqueue the job
self.enqueue_req(conn) self.enqueue_req(conn)
@ -192,11 +192,13 @@ class ThreadWorker(base.Worker):
def run(self): def run(self):
# init listeners, add them to the event loop # init listeners, add them to the event loop
for s in self.sockets: for sock in self.sockets:
s.setblocking(False) sock.setblocking(False)
self.poller.register(s, selectors.EVENT_READ, self.accept) # a race condition during graceful shutdown may make the listener
# name unavailable in the request handler so capture it once here
timeout = self.cfg.timeout or 0.5 server = sock.getsockname()
acceptor = partial(self.accept, server)
self.poller.register(sock, selectors.EVENT_READ, acceptor)
while self.alive: while self.alive:
# notify the arbiter we are alive # notify the arbiter we are alive
@ -205,33 +207,37 @@ class ThreadWorker(base.Worker):
# can we accept more connections? # can we accept more connections?
if self.nr_conns < self.worker_connections: if self.nr_conns < self.worker_connections:
# wait for an event # wait for an event
events = self.poller.select(0.02) events = self.poller.select(1.0)
for key, mask in events: for key, mask in events:
callback = key.data callback = key.data
callback(key.fileobj) callback(key.fileobj)
# check (but do not wait) for finished requests
result = futures.wait(self.futures, timeout=0,
return_when=futures.FIRST_COMPLETED)
else:
# wait for a request to finish
result = futures.wait(self.futures, timeout=1.0,
return_when=futures.FIRST_COMPLETED)
# clean up finished requests
for fut in result.done:
self.futures.remove(fut)
if not self.is_parent_alive(): if not self.is_parent_alive():
break break
# hanle keepalive timeouts # hanle keepalive timeouts
self.murder_keepalived() self.murder_keepalived()
# if the number of connections is < to the max we can handle at
# the same time there is no need to wait for one
if len(self.futures) < self.cfg.threads:
continue
result = futures.wait(self.futures, timeout=timeout,
return_when=futures.FIRST_COMPLETED)
if not result.done:
break
else:
[self.futures.remove(f) for f in result.done]
self.tpool.shutdown(False) self.tpool.shutdown(False)
self.poller.close() self.poller.close()
for s in self.sockets:
s.close()
futures.wait(self.futures, timeout=self.cfg.graceful_timeout)
def finish_request(self, fs): def finish_request(self, fs):
if fs.cancelled(): if fs.cancelled():
fs.conn.close() fs.conn.close()
@ -285,7 +291,7 @@ class ThreadWorker(base.Worker):
conn.sock.close() conn.sock.close()
else: else:
self.log.debug("Error processing SSL request.") self.log.debug("Error processing SSL request.")
self.handle_error(req, conn.sock, conn.addr, e) self.handle_error(req, conn.sock, conn.client, e)
except EnvironmentError as e: except EnvironmentError as e:
if e.errno not in (errno.EPIPE, errno.ECONNRESET): if e.errno not in (errno.EPIPE, errno.ECONNRESET):
@ -296,7 +302,7 @@ class ThreadWorker(base.Worker):
else: else:
self.log.debug("Ignoring connection epipe") self.log.debug("Ignoring connection epipe")
except Exception as e: except Exception as e:
self.handle_error(req, conn.sock, conn.addr, e) self.handle_error(req, conn.sock, conn.client, e)
return (False, conn) return (False, conn)
@ -306,8 +312,8 @@ class ThreadWorker(base.Worker):
try: try:
self.cfg.pre_request(self, req) self.cfg.pre_request(self, req)
request_start = datetime.now() request_start = datetime.now()
resp, environ = wsgi.create(req, conn.sock, conn.addr, resp, environ = wsgi.create(req, conn.sock, conn.client,
conn.listener.getsockname(), self.cfg) conn.server, self.cfg)
environ["wsgi.multithread"] = True environ["wsgi.multithread"] = True
self.nr += 1 self.nr += 1
if self.alive and self.nr >= self.max_requests: if self.alive and self.nr >= self.max_requests: