refactor the gthread worker for a better usage of asyncio

we have the possibility to pass a data payload to the poller when
registering a file object. We are using this possibility to pass a
callback. the callback will either accept or handle a connection when
the read event is triggered.

while I am here make the future result asynchronous so we don't block
the I/O event handling.
This commit is contained in:
benoitc 2014-05-30 15:59:47 +02:00
parent 7f9d745eb5
commit f8b415496d

View File

@ -58,32 +58,112 @@ class ThreadWorker(base.Worker):
def __init__(self, *args, **kwargs):
super(ThreadWorker, self).__init__(*args, **kwargs)
self.worker_connections = self.cfg.worker_connections
self.idle_workers = 0
# initialise the pool
self.tpool = None
self.poller = None
self.futures = set()
self._heap = []
self.keepalived = {}
self.clients = {}
def _wrap_future(self, fs, listener, client, addr):
fs.listener = listener
fs.sock = client
fs.addr = addr
def _wrap_future(self, fs, conn):
fs.conn = conn
self.futures.add(fs)
fs.add_done_callback(self.finish_request)
def _unregister_keepalive(self, conn):
try:
del self._heap[operator.indexOf(self._heap, conn)]
except (KeyError, IndexError, ValueError):
pass
def init_process(self):
self.tpool = futures.ThreadPoolExecutor(max_workers=self.cfg.threads)
self.poller = selectors.DefaultSelector()
super(ThreadWorker, self).init_process()
def acceptor(self, listener):
try:
client, addr = listener.accept()
client.setblocking(False)
# wrap the socket if needed
if self.cfg.is_ssl:
client = ssl.wrap_socket(client, server_side=True,
**self.cfg.ssl_options)
# initialise the parser
parser = http.RequestParser(self.cfg, client)
# register the connection
tconn = TConn(self, listener, client, addr, parser)
self.clients[client] = tconn
# wait for the read event to handle the connection
self.poller.register(client, selectors.EVENT_READ,
self.handle_client)
except socket.error as e:
if e.args[0] not in (errno.EAGAIN,
errno.ECONNABORTED, errno.EWOULDBLOCK):
raise
def handle_client(self, client):
# unregister the client from the poller
self.poller.unregister(client)
try:
conn = self.clients[client]
# maybe unregister the keepalive from the heap
self._unregister_keepalive(conn)
# submit the connection to a worker
fs = self.tpool.submit(self.handle, conn)
self.idle_workers += 1
self._wrap_future(fs, conn)
except KeyError:
# no connection registered
return
def murder_keepalived(self):
now = time.time()
while True:
if not len(self._heap):
break
conn = heapq.heappop(self._heap)
delta = conn.timeout - now
if delta > 0:
heapq.heappush(self._heap, conn)
break
else:
# make sure the connection can't be handled
try:
del self.clients[conn.sock]
except KeyError:
pass
# remove the socket from the poller
self.poller.unregister(conn.sock)
# close the socket
util.close(conn.sock)
def run(self):
# init listeners, add them to the event loop
for s in self.sockets:
s.setblocking(False)
self.poller.register(s, selectors.EVENT_READ, ACCEPT)
self.poller.register(s, selectors.EVENT_READ, self.acceptor)
while self.alive:
# If our parent changed then we shut down.
if self.ppid != os.getppid():
self.log.info("Parent changed, shutting down: %s", self)
@ -92,116 +172,32 @@ class ThreadWorker(base.Worker):
# notify the arbiter we are alive
self.notify()
events = self.poller.select(0.01)
if events:
for key, mask in events:
fs = None
client = None
if key.data == ACCEPT:
listener = key.fileobj
# start to accept connections
try:
client, addr = listener.accept()
# add a job to the pool
fs = self.tpool.submit(self.handle, listener,
client, addr, False)
self._wrap_future(fs, listener, client, addr)
except socket.error as e:
if e.args[0] not in (errno.EAGAIN,
errno.ECONNABORTED, errno.EWOULDBLOCK):
raise
else:
# get the client connection
client = key.data
# remove it from the heap
try:
del self._heap[operator.indexOf(self._heap, client)]
except (KeyError, IndexError):
pass
self.poller.unregister(key.fileobj)
# add a job to the pool
fs = self.tpool.submit(self.handle, client.listener,
client.sock, client.addr, client.parser)
# wrap the future
self._wrap_future(fs, client.listener, client.sock,
client.addr)
# handle jobs, we give a chance to all jobs to be executed.
if self.futures:
self.notify()
res = futures.wait(self.futures, timeout=self.timeout,
return_when=futures.FIRST_COMPLETED)
for fs in res.done:
try:
(keepalive, parser) = fs.result()
# if the connection should be kept alived add it
# to the eventloop and record it
if keepalive:
# flag the socket as non blocked
fs.sock.setblocking(0)
tconn = TConn(self, fs.listener, fs.sock,
fs.addr, parser)
# register the connection
heapq.heappush(self._heap, tconn)
# add the socket to the event loop
self.poller.register(fs.sock, selectors.EVENT_READ,
tconn)
else:
# at this point the connection should be
# closed but we make sure it is.
util.close(fs.sock)
except:
# an exception happened, make sure to close the
# socket.
util.close(fs.sock)
finally:
# remove the future from our list
self.futures.remove(fs)
events = self.poller.select(1.0)
for key, mask in events:
callback = key.data
callback(key.fileobj)
# hanle keepalive timeouts
now = time.time()
while True:
if not len(self._heap):
break
self.murder_keepalived()
conn = heapq.heappop(self._heap)
delta = conn.timeout - now
if delta > 0:
heapq.heappush(self._heap, conn)
# if we more connections than the max number of connections
# accepted on a worker, wait until some complete or exit.
if self.idle_workers >= self.worker_connections:
futures.wait(self.futures, timeout=self.cfg.timeout)
if not res:
self.log.info("max requests achieved")
break
else:
# remove the socket from the poller
self.poller.unregister(conn.sock)
# close the socket
util.close(conn.sock)
# shutdown the pool
self.tpool.shutdown(False)
self.poller.close()
self.tpool.shutdown(False)
# wait for the workers
futures.wait(self.futures, timeout=self.cfg.graceful_timeout)
# if we have still fures running, try to close them
for fs in self.futures:
sock = fs.sock
sock = fs.conn.sock
# the future is not running, cancel it
if not fs.done() and not fs.running():
@ -211,27 +207,51 @@ class ThreadWorker(base.Worker):
util.close(sock)
def handle(self, listener, client, addr, parser):
def finish_request(self, fs):
try:
(keepalive, conn) = fs.result()
# if the connection should be kept alived add it
# to the eventloop and record it
if keepalive:
# flag the socket as non blocked
conn.sock.setblocking(False)
# register the connection
heapq.heappush(self._heap, conn)
# add the socket to the event loop
self.poller.register(conn.sock, selectors.EVENT_READ,
self.handle_client)
else:
try:
del self.clients[conn.sock]
except KeyError:
pass
util.close(fs.conn.sock)
except:
# an exception happened, make sure to close the
# socket.
util.close(fs.conn.sock)
finally:
# remove the future from our list
self.futures.remove(fs)
self.idle_workers -= 1
def handle(self, conn):
keepalive = False
req = None
try:
client.setblocking(1)
conn.sock.setblocking(1)
# wrap the connection
if not parser:
if self.cfg.is_ssl:
client = ssl.wrap_socket(client, server_side=True,
**self.cfg.ssl_options)
parser = http.RequestParser(self.cfg, client)
req = six.next(parser)
req = six.next(conn.parser)
if not req:
return (False, None)
# handle the request
keepalive = self.handle_request(listener, req, client, addr)
keepalive = self.handle_request(req, conn)
if keepalive:
return (keepalive, parser)
return (keepalive, conn)
except http.errors.NoMoreData as e:
self.log.debug("Ignored premature client disconnection. %s", e)
@ -240,10 +260,10 @@ class ThreadWorker(base.Worker):
except ssl.SSLError as e:
if e.args[0] == ssl.SSL_ERROR_EOF:
self.log.debug("ssl connection closed")
client.close()
conn.sock.close()
else:
self.log.debug("Error processing SSL request.")
self.handle_error(req, client, addr, e)
self.handle_error(req, conn.sock, conn.addr, e)
except socket.error as e:
if e.args[0] not in (errno.EPIPE, errno.ECONNRESET):
@ -252,27 +272,30 @@ class ThreadWorker(base.Worker):
if e.args[0] == errno.ECONNRESET:
self.log.debug("Ignoring connection reset")
else:
self.log.debug("Ignoring EPIPE")
self.log.debug("Ignoring connection epipe")
except Exception as e:
self.handle_error(req, client, addr, e)
self.handle_error(req, conn.sock, conn.addr, e)
return (False, None)
return (False, conn)
def handle_request(self, listener, req, client, addr):
def handle_request(self, req, conn):
environ = {}
resp = None
try:
self.cfg.pre_request(self, req)
request_start = datetime.now()
resp, environ = wsgi.create(req, client, addr,
listener.getsockname(), self.cfg)
resp, environ = wsgi.create(req, conn.sock, conn.addr,
conn.listener.getsockname(), self.cfg)
environ["wsgi.multithread"] = True
self.nr += 1
if self.nr >= self.max_requests:
if self.alive and self.nr >= self.max_requests:
self.log.info("Autorestarting worker after current request.")
resp.force_close()
self.alive = False
if not self.cfg.keepalive:
resp.force_close()
@ -283,6 +306,7 @@ class ThreadWorker(base.Worker):
else:
for item in respiter:
resp.write(item)
resp.close()
request_time = datetime.now() - request_start
self.log.access(resp, req, environ, request_time)
@ -303,8 +327,8 @@ class ThreadWorker(base.Worker):
# connection to indicate the error.
self.log.exception("Error handling request")
try:
client.shutdown(socket.SHUT_RDWR)
client.close()
conn.sock.shutdown(socket.SHUT_RDWR)
conn.sock.close()
except socket.error:
pass
raise StopIteration()