diff --git a/gunicorn/workers/gthread.py b/gunicorn/workers/gthread.py index 417c00f9..a07da55b 100644 --- a/gunicorn/workers/gthread.py +++ b/gunicorn/workers/gthread.py @@ -14,7 +14,7 @@ from collections import deque import concurrent.futures as futures from datetime import datetime import errno -import heapq +from functools import partial import os import operator import socket @@ -49,10 +49,10 @@ class TConn(): self.parser = None # set the socket to non blocking - #self.sock.setblocking(False) + self.sock.setblocking(False) - def maybe_init(self): + def init(self): if self.parser is None: # wrap the socket if needed if self.cfg.is_ssl: @@ -62,6 +62,8 @@ class TConn(): # initialize the parser self.parser = http.RequestParser(self.cfg, self.sock) + return True + return False def set_timeout(self): # set the timeout @@ -78,18 +80,17 @@ class ThreadWorker(base.Worker): def __init__(self, *args, **kwargs): super(ThreadWorker, self).__init__(*args, **kwargs) self.worker_connections = self.cfg.worker_connections - self.idle_workers = 0 # initialise the pool self.tpool = None self.poller = None - self.futures = set() + self.futures = deque() self._keep = deque() def _wrap_future(self, fs, conn): fs.conn = conn - self.futures.add(fs) + self.futures.append(fs) fs.add_done_callback(self.finish_request) def init_process(self): @@ -97,28 +98,26 @@ class ThreadWorker(base.Worker): self.poller = selectors.DefaultSelector() super(ThreadWorker, self).init_process() - - def accept(self, listener, *args): + def accept(self, listener): try: client, addr = listener.accept() conn = TConn(self.cfg, listener, client, addr) # wait for the read event to handle the connection self.poller.register(client, selectors.EVENT_READ, - (self.handle_client, (conn,))) + partial(self.handle_client, conn)) except socket.error as e: if e.args[0] not in (errno.EAGAIN, errno.ECONNABORTED, errno.EWOULDBLOCK): raise - def handle_client(self, client, conn): + def handle_client(self, conn, client): # unregister the client from the poller self.poller.unregister(client) # submit the connection to a worker fs = self.tpool.submit(self.handle, conn) - self.idle_workers += 1 self._wrap_future(fs, conn) @@ -145,8 +144,7 @@ class ThreadWorker(base.Worker): # init listeners, add them to the event loop for s in self.sockets: s.setblocking(False) - self.poller.register(s, selectors.EVENT_READ, - (self.accept,())) + self.poller.register(s, selectors.EVENT_READ, self.accept) while self.alive: # If our parent changed then we shut down. @@ -157,17 +155,17 @@ class ThreadWorker(base.Worker): # notify the arbiter we are alive self.notify() - events = self.poller.select(1.0) + events = self.poller.select(0.01) for key, mask in events: - (callback, args) = key.data - callback(key.fileobj, *args) + callback = key.data + callback(key.fileobj) # hanle keepalive timeouts self.murder_keepalived() # if we more connections than the max number of connections # accepted on a worker, wait until some complete or exit. - if self.idle_workers >= self.worker_connections: + if len(self.futures) >= self.worker_connections: futures.wait(self.futures, timeout=self.cfg.timeout) if not res: self.log.info("max requests achieved") @@ -207,25 +205,27 @@ class ThreadWorker(base.Worker): # add the socket to the event loop self.poller.register(conn.sock, selectors.EVENT_READ, - (self.handle_client, (conn,))) + partial(self.handle_client, conn)) else: - util.close(fs.conn.sock) + util.close(conn.sock) except: # an exception happened, make sure to close the # socket. util.close(fs.conn.sock) finally: # remove the future from our list - self.futures.remove(fs) - self.idle_workers -= 1 + try: + self.futures.remove(fs) + except ValueError: + pass def handle(self, conn): - try: - self._keep.remove(conn) - except ValueError: - pass - - conn.maybe_init() + if not conn.init(): + # connection kept alive + try: + self._keep.remove(conn) + except ValueError: + pass keepalive = False req = None