diff --git a/gunicorn/workers/gthread.py b/gunicorn/workers/gthread.py index 71078c5a..df60951f 100644 --- a/gunicorn/workers/gthread.py +++ b/gunicorn/workers/gthread.py @@ -18,6 +18,7 @@ import os import socket import ssl import sys +from threading import RLock import time from .. import http @@ -65,13 +66,14 @@ class TConn(object): # initialize the parser self.parser = http.RequestParser(self.cfg, self.sock) - return True - return False def set_timeout(self): # set the timeout self.timeout = time.time() + self.cfg.keepalive + def close(self): + util.close(self.sock) + def __lt__(self, other): return self.timeout < other.timeout @@ -83,68 +85,94 @@ class ThreadWorker(base.Worker): def __init__(self, *args, **kwargs): super(ThreadWorker, self).__init__(*args, **kwargs) self.worker_connections = self.cfg.worker_connections + self.max_keepalived = self.cfg.worker_connections - self.cfg.threads # initialise the pool self.tpool = None self.poller = None + self._lock = None self.futures = deque() self._keep = deque() + def init_process(self): + self.tpool = futures.ThreadPoolExecutor(max_workers=self.cfg.threads) + self.poller = selectors.DefaultSelector() + self._lock = RLock() + super(ThreadWorker, self).init_process() + def _wrap_future(self, fs, conn): fs.conn = conn self.futures.append(fs) fs.add_done_callback(self.finish_request) - def init_process(self): - self.tpool = futures.ThreadPoolExecutor(max_workers=self.cfg.threads) - self.poller = selectors.DefaultSelector() - super(ThreadWorker, self).init_process() + def enqueue_req(self, conn): + conn.init() + # submit the connection to a worker + fs = self.tpool.submit(self.handle, conn) + self._wrap_future(fs, conn) def accept(self, listener): - if not self.alive: - return - try: client, addr = listener.accept() + # initialize the connection object conn = TConn(self.cfg, listener, client, addr) - - # wait for the read event to handle the connection - self.poller.register(client, selectors.EVENT_READ, - partial(self.handle_client, conn)) - + self.nr += 1 + # enqueue the job + self.enqueue_req(conn) except socket.error as e: if e.args[0] not in (errno.EAGAIN, errno.ECONNABORTED, errno.EWOULDBLOCK): raise - def handle_client(self, conn, client): - # unregister the client from the poller - self.poller.unregister(client) + def reuse_connection(self, conn, client): + with self._lock: + # unregister the client from the poller + self.poller.unregister(client) + # remove the connection from keepalive + try: + self._keep.remove(conn) + except ValueError: + # race condition + return # submit the connection to a worker - fs = self.tpool.submit(self.handle, conn) - self._wrap_future(fs, conn) + self.enqueue_req(conn) def murder_keepalived(self): now = time.time() while True: - try: - # remove the connection from the queue - conn = self._keep.popleft() - except IndexError: - break + with self._lock: + try: + # remove the connection from the queue + conn = self._keep.popleft() + except IndexError: + break delta = conn.timeout - now if delta > 0: # add the connection back to the queue - self._keep.appendleft(conn) + with self._lock: + self._keep.appendleft(conn) break else: + self.nr -= 1 # remove the socket from the poller - self.poller.unregister(conn.sock) + with self._lock: + try: + self.poller.unregister(conn.sock) + except socket.error as e: + if e.args[0] != errno.EBADF: + raise # close the socket - util.close(conn.sock) + conn.close() + + def is_parent_alive(self): + # If our parent changed then we shut down. + if self.ppid != os.getppid(): + self.log.info("Parent changed, shutting down: %s", self) + return False + return True def run(self): # init listeners, add them to the event loop @@ -155,55 +183,46 @@ class ThreadWorker(base.Worker): timeout = self.cfg.timeout or 0.5 while self.alive: - # If our parent changed then we shut down. - if self.ppid != os.getppid(): - self.log.info("Parent changed, shutting down: %s", self) - return - # notify the arbiter we are alive self.notify() - events = self.poller.select(0.2) - for key, mask in events: - callback = key.data - callback(key.fileobj) + # can we accept more connections? + if self.nr < self.worker_connections: + # wait for an event + events = self.poller.select(0.02) + for key, mask in events: + callback = key.data + callback(key.fileobj) + + if not self.is_parent_alive(): + break # hanle keepalive timeouts self.murder_keepalived() - # if we more connections than the max number of connections - # accepted on a worker, wait until some complete or exit. - if len(self.futures) >= self.worker_connections: - res = futures.wait(self.futures, timeout=timeout) - if not res: - self.alive = False - self.log.info("max requests achieved") - break + # if the number of connections is < to the max we can handle at + # the same time there is no need to wait for one + if len(self.futures) < self.cfg.threads: + continue - # shutdown the pool - self.poller.close() - self.tpool.shutdown(False) + result = futures.wait(self.futures, timeout=timeout, + return_when=futures.FIRST_COMPLETED) - # wait for the workers - futures.wait(self.futures, timeout=self.cfg.graceful_timeout) - - # if we have still fures running, try to close them - while True: - try: - fs = self.futures.popleft() - except IndexError: + if not result.done: break - sock = fs.conn.sock + else: + [self.futures.remove(f) for f in result.done] - # the future is not running, cancel it - if not fs.done() and not fs.running(): - fs.cancel() + self.tpool.shutdown(False) + self.poller.close() - # make sure we close the sockets after the graceful timeout - util.close(sock) def finish_request(self, fs): + if fs.cancelled(): + fs.conn.close() + return + try: (keepalive, conn) = fs.result() # if the connection should be kept alived add it @@ -214,32 +233,22 @@ class ThreadWorker(base.Worker): # register the connection conn.set_timeout() - self._keep.append(conn) + with self._lock: + self._keep.append(conn) - # add the socket to the event loop - self.poller.register(conn.sock, selectors.EVENT_READ, - partial(self.handle_client, conn)) + # add the socket to the event loop + self.poller.register(conn.sock, selectors.EVENT_READ, + partial(self.reuse_connection, conn)) else: - util.close(conn.sock) + self.nr -= 1 + conn.close() except: # an exception happened, make sure to close the # socket. - util.close(fs.conn.sock) - finally: - # remove the future from our list - try: - self.futures.remove(fs) - except ValueError: - pass + self.nr -= 1 + fs.conn.close() def handle(self, conn): - if not conn.init(): - # connection kept alive - try: - self._keep.remove(conn) - except ValueError: - pass - keepalive = False req = None try: @@ -287,8 +296,6 @@ class ThreadWorker(base.Worker): conn.listener.getsockname(), self.cfg) environ["wsgi.multithread"] = True - self.nr += 1 - if self.alive and self.nr >= self.max_requests: self.log.info("Autorestarting worker after current request.") resp.force_close() @@ -296,6 +303,8 @@ class ThreadWorker(base.Worker): if not self.cfg.keepalive: resp.force_close() + elif len(self._keep) >= self.max_keepalived: + resp.force_close() respiter = self.wsgi(environ, resp.start_response) try: