From 1f92511430b59a6645e59d408d8cb21d6d37114a Mon Sep 17 00:00:00 2001 From: benoitc Date: Sat, 20 Dec 2014 16:52:22 +0100 Subject: [PATCH] make sure we can alwas accept connection. This is done by limiting the number of kept alived requests to MAX WORKER CONNECTIONS - N Threads so at any time we can accept N connection. --- gunicorn/workers/gthread.py | 67 +++++++++++++++++++++---------------- 1 file changed, 39 insertions(+), 28 deletions(-) diff --git a/gunicorn/workers/gthread.py b/gunicorn/workers/gthread.py index 2f1afc56..579f9d83 100644 --- a/gunicorn/workers/gthread.py +++ b/gunicorn/workers/gthread.py @@ -18,6 +18,7 @@ import os import socket import ssl import sys +from threading import RLock import time from .. import http @@ -84,16 +85,19 @@ class ThreadWorker(base.Worker): def __init__(self, *args, **kwargs): super(ThreadWorker, self).__init__(*args, **kwargs) self.worker_connections = self.cfg.worker_connections + self.max_keepalived = self.cfg.worker_connections - self.cfg.threads # initialise the pool self.tpool = None self.poller = None + self._lock = None self.futures = deque() self._keep = deque() def init_process(self): self.tpool = futures.ThreadPoolExecutor(max_workers=self.cfg.threads) self.poller = selectors.DefaultSelector() + self._lock = RLock() super(ThreadWorker, self).init_process() def _wrap_future(self, fs, conn): @@ -121,15 +125,15 @@ class ThreadWorker(base.Worker): raise def reuse_connection(self, conn, client): - # unregister the client from the poller - self.poller.unregister(client) - - # remove the connection from keepalive - try: - self._keep.remove(conn) - except ValueError: - # race condition - return + with self._lock: + # unregister the client from the poller + self.poller.unregister(client) + # remove the connection from keepalive + try: + self._keep.remove(conn) + except ValueError: + # race condition + return # submit the connection to a worker self.enqueue_req(conn) @@ -137,26 +141,28 @@ class ThreadWorker(base.Worker): def murder_keepalived(self): now = time.time() while True: - try: - # remove the connection from the queue - conn = self._keep.popleft() - except IndexError: - break + with self._lock: + try: + # remove the connection from the queue + conn = self._keep.popleft() + except IndexError: + break delta = conn.timeout - now if delta > 0: # add the connection back to the queue - self._keep.appendleft(conn) + with self._lock: + self._keep.appendleft(conn) break else: self.nr -= 1 # remove the socket from the poller - try: - self.poller.unregister(conn.sock) - except socket.error as e: - if e.args[0] == errno.EBADF: - pass - raise + with self._lock: + try: + self.poller.unregister(conn.sock) + except socket.error as e: + if e.args[0] != errno.EBADF: + raise # close the socket conn.close() @@ -200,12 +206,14 @@ class ThreadWorker(base.Worker): return_when=futures.FIRST_COMPLETED) if not result.done: - self.tpool.shutdown(False) - self.poller.close() - return + break + else: [self.futures.remove(f) for f in result.done] + self.tpool.shutdown(False) + self.poller.close() + def finish_request(self, fs): if fs.cancelled(): @@ -222,11 +230,12 @@ class ThreadWorker(base.Worker): # register the connection conn.set_timeout() - self._keep.append(conn) + with self._lock: + self._keep.append(conn) - # add the socket to the event loop - self.poller.register(conn.sock, selectors.EVENT_READ, - partial(self.reuse_connection, conn)) + # add the socket to the event loop + self.poller.register(conn.sock, selectors.EVENT_READ, + partial(self.reuse_connection, conn)) else: self.nr -= 1 conn.close() @@ -291,6 +300,8 @@ class ThreadWorker(base.Worker): if not self.cfg.keepalive: resp.force_close() + elif len(self._keep) >= self.max_keepalived: + resp.force_close() respiter = self.wsgi(environ, resp.start_response) try: