diff --git a/gunicorn/workers/gthread.py b/gunicorn/workers/gthread.py index dbcfdab5..417c00f9 100644 --- a/gunicorn/workers/gthread.py +++ b/gunicorn/workers/gthread.py @@ -10,7 +10,7 @@ # If no event happen after the keep alive timeout, the connectoin is # closed. - +from collections import deque import concurrent.futures as futures from datetime import datetime import errno @@ -39,14 +39,33 @@ KEEPALIVED = 1 class TConn(): - def __init__(self, worker, listener, sock, addr, parser): + def __init__(self, cfg, listener, sock, addr): + self.cfg = cfg self.listener = listener self.sock = sock self.addr = addr - self.parser = parser + self.timeout = None + self.parser = None + + # set the socket to non blocking + #self.sock.setblocking(False) + + + def maybe_init(self): + if self.parser is None: + # wrap the socket if needed + if self.cfg.is_ssl: + client = ssl.wrap_socket(client, server_side=True, + **self.cfg.ssl_options) + + + # initialize the parser + self.parser = http.RequestParser(self.cfg, self.sock) + + def set_timeout(self): # set the timeout - self.timeout = time.time() + worker.cfg.keepalive + self.timeout = time.time() + self.cfg.keepalive def __lt__(self, other): return self.timeout < other.timeout @@ -65,8 +84,7 @@ class ThreadWorker(base.Worker): self.tpool = None self.poller = None self.futures = set() - self._heap = [] - self.clients = {} + self._keep = deque() def _wrap_future(self, fs, conn): @@ -74,80 +92,48 @@ class ThreadWorker(base.Worker): self.futures.add(fs) fs.add_done_callback(self.finish_request) - def _unregister_keepalive(self, conn): - try: - del self._heap[operator.indexOf(self._heap, conn)] - except (KeyError, IndexError, ValueError): - pass - def init_process(self): self.tpool = futures.ThreadPoolExecutor(max_workers=self.cfg.threads) self.poller = selectors.DefaultSelector() super(ThreadWorker, self).init_process() - def acceptor(self, listener): + def accept(self, listener, *args): try: client, addr = listener.accept() - client.setblocking(False) - - # wrap the socket if needed - if self.cfg.is_ssl: - client = ssl.wrap_socket(client, server_side=True, - **self.cfg.ssl_options) - - # initialise the parser - parser = http.RequestParser(self.cfg, client) - - # register the connection - tconn = TConn(self, listener, client, addr, parser) - self.clients[client] = tconn + conn = TConn(self.cfg, listener, client, addr) # wait for the read event to handle the connection self.poller.register(client, selectors.EVENT_READ, - self.handle_client) + (self.handle_client, (conn,))) except socket.error as e: if e.args[0] not in (errno.EAGAIN, errno.ECONNABORTED, errno.EWOULDBLOCK): raise - def handle_client(self, client): + def handle_client(self, client, conn): # unregister the client from the poller self.poller.unregister(client) - try: - conn = self.clients[client] + # submit the connection to a worker + fs = self.tpool.submit(self.handle, conn) + self.idle_workers += 1 + self._wrap_future(fs, conn) - # maybe unregister the keepalive from the heap - self._unregister_keepalive(conn) - - # submit the connection to a worker - fs = self.tpool.submit(self.handle, conn) - self.idle_workers += 1 - self._wrap_future(fs, conn) - - except KeyError: - # no connection registered - return def murder_keepalived(self): now = time.time() while True: - if not len(self._heap): + if not len(self._keep): break - conn = heapq.heappop(self._heap) - delta = conn.timeout - now + delta = self._keep[0].timeout - now if delta > 0: - heapq.heappush(self._heap, conn) break else: - # make sure the connection can't be handled - try: - del self.clients[conn.sock] - except KeyError: - pass + # remove the connection from the queue + conn = self._keep.popleft() # remove the socket from the poller self.poller.unregister(conn.sock) @@ -155,13 +141,12 @@ class ThreadWorker(base.Worker): # close the socket util.close(conn.sock) - - def run(self): # init listeners, add them to the event loop for s in self.sockets: s.setblocking(False) - self.poller.register(s, selectors.EVENT_READ, self.acceptor) + self.poller.register(s, selectors.EVENT_READ, + (self.accept,())) while self.alive: # If our parent changed then we shut down. @@ -174,8 +159,8 @@ class ThreadWorker(base.Worker): events = self.poller.select(1.0) for key, mask in events: - callback = key.data - callback(key.fileobj) + (callback, args) = key.data + callback(key.fileobj, *args) # hanle keepalive timeouts self.murder_keepalived() @@ -217,17 +202,13 @@ class ThreadWorker(base.Worker): conn.sock.setblocking(False) # register the connection - heapq.heappush(self._heap, conn) + conn.set_timeout() + self._keep.append(conn) # add the socket to the event loop self.poller.register(conn.sock, selectors.EVENT_READ, - self.handle_client) + (self.handle_client, (conn,))) else: - try: - del self.clients[conn.sock] - except KeyError: - pass - util.close(fs.conn.sock) except: # an exception happened, make sure to close the @@ -239,6 +220,13 @@ class ThreadWorker(base.Worker): self.idle_workers -= 1 def handle(self, conn): + try: + self._keep.remove(conn) + except ValueError: + pass + + conn.maybe_init() + keepalive = False req = None try: