From fcd9d04515075aaad44dd20d2247f8ff6c4ae8ed Mon Sep 17 00:00:00 2001 From: benoitc Date: Sat, 20 Dec 2014 11:16:58 +0100 Subject: [PATCH] stop to accept more requests when maximum accepted is achieved this change makes sure that a worker don't handle more requests than it can achieved. The new workflow is quite more simple: listeners are put in the poller. On read we try to accept on them. When a connection is accepted it is put in the execution queue When a request is done and the socket can be kept alived, we put it in the poller, on read event we will try to handle the new request. If it is not put out of the poller before the keepalive timeout the socket will be closed. if all threads are busy we are waiting until one request complet. If it doesn't complete before the timeout we kill the worker. fix #908 --- gunicorn/workers/gthread.py | 145 +++++++++++++++++------------------- 1 file changed, 70 insertions(+), 75 deletions(-) diff --git a/gunicorn/workers/gthread.py b/gunicorn/workers/gthread.py index 71078c5a..2f1afc56 100644 --- a/gunicorn/workers/gthread.py +++ b/gunicorn/workers/gthread.py @@ -65,13 +65,14 @@ class TConn(object): # initialize the parser self.parser = http.RequestParser(self.cfg, self.sock) - return True - return False def set_timeout(self): # set the timeout self.timeout = time.time() + self.cfg.keepalive + def close(self): + util.close(self.sock) + def __lt__(self, other): return self.timeout < other.timeout @@ -90,40 +91,48 @@ class ThreadWorker(base.Worker): self.futures = deque() self._keep = deque() - def _wrap_future(self, fs, conn): - fs.conn = conn - self.futures.append(fs) - fs.add_done_callback(self.finish_request) - def init_process(self): self.tpool = futures.ThreadPoolExecutor(max_workers=self.cfg.threads) self.poller = selectors.DefaultSelector() super(ThreadWorker, self).init_process() - def accept(self, listener): - if not self.alive: - return + def _wrap_future(self, fs, conn): + fs.conn = conn + self.futures.append(fs) + fs.add_done_callback(self.finish_request) + def enqueue_req(self, conn): + conn.init() + # submit the connection to a worker + fs = self.tpool.submit(self.handle, conn) + self._wrap_future(fs, conn) + + def accept(self, listener): try: client, addr = listener.accept() + # initialize the connection object conn = TConn(self.cfg, listener, client, addr) - - # wait for the read event to handle the connection - self.poller.register(client, selectors.EVENT_READ, - partial(self.handle_client, conn)) - + self.nr += 1 + # enqueue the job + self.enqueue_req(conn) except socket.error as e: if e.args[0] not in (errno.EAGAIN, errno.ECONNABORTED, errno.EWOULDBLOCK): raise - def handle_client(self, conn, client): + def reuse_connection(self, conn, client): # unregister the client from the poller self.poller.unregister(client) + # remove the connection from keepalive + try: + self._keep.remove(conn) + except ValueError: + # race condition + return + # submit the connection to a worker - fs = self.tpool.submit(self.handle, conn) - self._wrap_future(fs, conn) + self.enqueue_req(conn) def murder_keepalived(self): now = time.time() @@ -140,11 +149,24 @@ class ThreadWorker(base.Worker): self._keep.appendleft(conn) break else: + self.nr -= 1 # remove the socket from the poller - self.poller.unregister(conn.sock) + try: + self.poller.unregister(conn.sock) + except socket.error as e: + if e.args[0] == errno.EBADF: + pass + raise # close the socket - util.close(conn.sock) + conn.close() + + def is_parent_alive(self): + # If our parent changed then we shut down. + if self.ppid != os.getppid(): + self.log.info("Parent changed, shutting down: %s", self) + return False + return True def run(self): # init listeners, add them to the event loop @@ -155,55 +177,41 @@ class ThreadWorker(base.Worker): timeout = self.cfg.timeout or 0.5 while self.alive: - # If our parent changed then we shut down. - if self.ppid != os.getppid(): - self.log.info("Parent changed, shutting down: %s", self) - return - # notify the arbiter we are alive self.notify() - events = self.poller.select(0.2) - for key, mask in events: - callback = key.data - callback(key.fileobj) + # can we accept more connections? + if self.nr < self.worker_connections: + # wait for an event + events = self.poller.select(0.02) + for key, mask in events: + callback = key.data + callback(key.fileobj) # hanle keepalive timeouts self.murder_keepalived() - # if we more connections than the max number of connections - # accepted on a worker, wait until some complete or exit. - if len(self.futures) >= self.worker_connections: - res = futures.wait(self.futures, timeout=timeout) - if not res: - self.alive = False - self.log.info("max requests achieved") - break + # if the number of connections is < to the max we can handle at + # the same time there is no need to wait for one + if len(self.futures) < self.cfg.threads: + continue - # shutdown the pool - self.poller.close() - self.tpool.shutdown(False) + result = futures.wait(self.futures, timeout=timeout, + return_when=futures.FIRST_COMPLETED) - # wait for the workers - futures.wait(self.futures, timeout=self.cfg.graceful_timeout) + if not result.done: + self.tpool.shutdown(False) + self.poller.close() + return + else: + [self.futures.remove(f) for f in result.done] - # if we have still fures running, try to close them - while True: - try: - fs = self.futures.popleft() - except IndexError: - break - - sock = fs.conn.sock - - # the future is not running, cancel it - if not fs.done() and not fs.running(): - fs.cancel() - - # make sure we close the sockets after the graceful timeout - util.close(sock) def finish_request(self, fs): + if fs.cancelled(): + fs.conn.close() + return + try: (keepalive, conn) = fs.result() # if the connection should be kept alived add it @@ -218,28 +226,17 @@ class ThreadWorker(base.Worker): # add the socket to the event loop self.poller.register(conn.sock, selectors.EVENT_READ, - partial(self.handle_client, conn)) + partial(self.reuse_connection, conn)) else: - util.close(conn.sock) + self.nr -= 1 + conn.close() except: # an exception happened, make sure to close the # socket. - util.close(fs.conn.sock) - finally: - # remove the future from our list - try: - self.futures.remove(fs) - except ValueError: - pass + self.nr -= 1 + fs.conn.close() def handle(self, conn): - if not conn.init(): - # connection kept alive - try: - self._keep.remove(conn) - except ValueError: - pass - keepalive = False req = None try: @@ -287,8 +284,6 @@ class ThreadWorker(base.Worker): conn.listener.getsockname(), self.cfg) environ["wsgi.multithread"] = True - self.nr += 1 - if self.alive and self.nr >= self.max_requests: self.log.info("Autorestarting worker after current request.") resp.force_close()