diff --git a/gunicorn/workers/gthread.py b/gunicorn/workers/gthread.py index 71078c5a..2f1afc56 100644 --- a/gunicorn/workers/gthread.py +++ b/gunicorn/workers/gthread.py @@ -65,13 +65,14 @@ class TConn(object): # initialize the parser self.parser = http.RequestParser(self.cfg, self.sock) - return True - return False def set_timeout(self): # set the timeout self.timeout = time.time() + self.cfg.keepalive + def close(self): + util.close(self.sock) + def __lt__(self, other): return self.timeout < other.timeout @@ -90,40 +91,48 @@ class ThreadWorker(base.Worker): self.futures = deque() self._keep = deque() - def _wrap_future(self, fs, conn): - fs.conn = conn - self.futures.append(fs) - fs.add_done_callback(self.finish_request) - def init_process(self): self.tpool = futures.ThreadPoolExecutor(max_workers=self.cfg.threads) self.poller = selectors.DefaultSelector() super(ThreadWorker, self).init_process() - def accept(self, listener): - if not self.alive: - return + def _wrap_future(self, fs, conn): + fs.conn = conn + self.futures.append(fs) + fs.add_done_callback(self.finish_request) + def enqueue_req(self, conn): + conn.init() + # submit the connection to a worker + fs = self.tpool.submit(self.handle, conn) + self._wrap_future(fs, conn) + + def accept(self, listener): try: client, addr = listener.accept() + # initialize the connection object conn = TConn(self.cfg, listener, client, addr) - - # wait for the read event to handle the connection - self.poller.register(client, selectors.EVENT_READ, - partial(self.handle_client, conn)) - + self.nr += 1 + # enqueue the job + self.enqueue_req(conn) except socket.error as e: if e.args[0] not in (errno.EAGAIN, errno.ECONNABORTED, errno.EWOULDBLOCK): raise - def handle_client(self, conn, client): + def reuse_connection(self, conn, client): # unregister the client from the poller self.poller.unregister(client) + # remove the connection from keepalive + try: + self._keep.remove(conn) + except ValueError: + # race condition + return + # submit the connection to a worker - fs = self.tpool.submit(self.handle, conn) - self._wrap_future(fs, conn) + self.enqueue_req(conn) def murder_keepalived(self): now = time.time() @@ -140,11 +149,24 @@ class ThreadWorker(base.Worker): self._keep.appendleft(conn) break else: + self.nr -= 1 # remove the socket from the poller - self.poller.unregister(conn.sock) + try: + self.poller.unregister(conn.sock) + except socket.error as e: + if e.args[0] == errno.EBADF: + pass + raise # close the socket - util.close(conn.sock) + conn.close() + + def is_parent_alive(self): + # If our parent changed then we shut down. + if self.ppid != os.getppid(): + self.log.info("Parent changed, shutting down: %s", self) + return False + return True def run(self): # init listeners, add them to the event loop @@ -155,55 +177,41 @@ class ThreadWorker(base.Worker): timeout = self.cfg.timeout or 0.5 while self.alive: - # If our parent changed then we shut down. - if self.ppid != os.getppid(): - self.log.info("Parent changed, shutting down: %s", self) - return - # notify the arbiter we are alive self.notify() - events = self.poller.select(0.2) - for key, mask in events: - callback = key.data - callback(key.fileobj) + # can we accept more connections? + if self.nr < self.worker_connections: + # wait for an event + events = self.poller.select(0.02) + for key, mask in events: + callback = key.data + callback(key.fileobj) # hanle keepalive timeouts self.murder_keepalived() - # if we more connections than the max number of connections - # accepted on a worker, wait until some complete or exit. - if len(self.futures) >= self.worker_connections: - res = futures.wait(self.futures, timeout=timeout) - if not res: - self.alive = False - self.log.info("max requests achieved") - break + # if the number of connections is < to the max we can handle at + # the same time there is no need to wait for one + if len(self.futures) < self.cfg.threads: + continue - # shutdown the pool - self.poller.close() - self.tpool.shutdown(False) + result = futures.wait(self.futures, timeout=timeout, + return_when=futures.FIRST_COMPLETED) - # wait for the workers - futures.wait(self.futures, timeout=self.cfg.graceful_timeout) + if not result.done: + self.tpool.shutdown(False) + self.poller.close() + return + else: + [self.futures.remove(f) for f in result.done] - # if we have still fures running, try to close them - while True: - try: - fs = self.futures.popleft() - except IndexError: - break - - sock = fs.conn.sock - - # the future is not running, cancel it - if not fs.done() and not fs.running(): - fs.cancel() - - # make sure we close the sockets after the graceful timeout - util.close(sock) def finish_request(self, fs): + if fs.cancelled(): + fs.conn.close() + return + try: (keepalive, conn) = fs.result() # if the connection should be kept alived add it @@ -218,28 +226,17 @@ class ThreadWorker(base.Worker): # add the socket to the event loop self.poller.register(conn.sock, selectors.EVENT_READ, - partial(self.handle_client, conn)) + partial(self.reuse_connection, conn)) else: - util.close(conn.sock) + self.nr -= 1 + conn.close() except: # an exception happened, make sure to close the # socket. - util.close(fs.conn.sock) - finally: - # remove the future from our list - try: - self.futures.remove(fs) - except ValueError: - pass + self.nr -= 1 + fs.conn.close() def handle(self, conn): - if not conn.init(): - # connection kept alive - try: - self._keep.remove(conn) - except ValueError: - pass - keepalive = False req = None try: @@ -287,8 +284,6 @@ class ThreadWorker(base.Worker): conn.listener.getsockname(), self.cfg) environ["wsgi.multithread"] = True - self.nr += 1 - if self.alive and self.nr >= self.max_requests: self.log.info("Autorestarting worker after current request.") resp.force_close()