diff --git a/gunicorn/workers/gthread.py b/gunicorn/workers/gthread.py index 3acfd40f..dbcfdab5 100644 --- a/gunicorn/workers/gthread.py +++ b/gunicorn/workers/gthread.py @@ -58,32 +58,112 @@ class ThreadWorker(base.Worker): def __init__(self, *args, **kwargs): super(ThreadWorker, self).__init__(*args, **kwargs) + self.worker_connections = self.cfg.worker_connections + self.idle_workers = 0 + # initialise the pool self.tpool = None self.poller = None self.futures = set() self._heap = [] - self.keepalived = {} + self.clients = {} - def _wrap_future(self, fs, listener, client, addr): - fs.listener = listener - fs.sock = client - fs.addr = addr + + def _wrap_future(self, fs, conn): + fs.conn = conn self.futures.add(fs) + fs.add_done_callback(self.finish_request) + + def _unregister_keepalive(self, conn): + try: + del self._heap[operator.indexOf(self._heap, conn)] + except (KeyError, IndexError, ValueError): + pass def init_process(self): self.tpool = futures.ThreadPoolExecutor(max_workers=self.cfg.threads) self.poller = selectors.DefaultSelector() super(ThreadWorker, self).init_process() + + def acceptor(self, listener): + try: + client, addr = listener.accept() + client.setblocking(False) + + # wrap the socket if needed + if self.cfg.is_ssl: + client = ssl.wrap_socket(client, server_side=True, + **self.cfg.ssl_options) + + # initialise the parser + parser = http.RequestParser(self.cfg, client) + + # register the connection + tconn = TConn(self, listener, client, addr, parser) + self.clients[client] = tconn + + # wait for the read event to handle the connection + self.poller.register(client, selectors.EVENT_READ, + self.handle_client) + + except socket.error as e: + if e.args[0] not in (errno.EAGAIN, + errno.ECONNABORTED, errno.EWOULDBLOCK): + raise + + def handle_client(self, client): + # unregister the client from the poller + self.poller.unregister(client) + + try: + conn = self.clients[client] + + # maybe unregister the keepalive from the heap + self._unregister_keepalive(conn) + + # submit the connection to a worker + fs = self.tpool.submit(self.handle, conn) + self.idle_workers += 1 + self._wrap_future(fs, conn) + + except KeyError: + # no connection registered + return + + def murder_keepalived(self): + now = time.time() + while True: + if not len(self._heap): + break + + conn = heapq.heappop(self._heap) + delta = conn.timeout - now + if delta > 0: + heapq.heappush(self._heap, conn) + break + else: + # make sure the connection can't be handled + try: + del self.clients[conn.sock] + except KeyError: + pass + + # remove the socket from the poller + self.poller.unregister(conn.sock) + + # close the socket + util.close(conn.sock) + + + def run(self): # init listeners, add them to the event loop for s in self.sockets: s.setblocking(False) - self.poller.register(s, selectors.EVENT_READ, ACCEPT) + self.poller.register(s, selectors.EVENT_READ, self.acceptor) while self.alive: - # If our parent changed then we shut down. if self.ppid != os.getppid(): self.log.info("Parent changed, shutting down: %s", self) @@ -92,116 +172,32 @@ class ThreadWorker(base.Worker): # notify the arbiter we are alive self.notify() - events = self.poller.select(0.01) - if events: - for key, mask in events: - fs = None - client = None - if key.data == ACCEPT: - listener = key.fileobj - - # start to accept connections - try: - client, addr = listener.accept() - - # add a job to the pool - fs = self.tpool.submit(self.handle, listener, - client, addr, False) - - self._wrap_future(fs, listener, client, addr) - - except socket.error as e: - if e.args[0] not in (errno.EAGAIN, - errno.ECONNABORTED, errno.EWOULDBLOCK): - raise - - else: - # get the client connection - client = key.data - - # remove it from the heap - - try: - del self._heap[operator.indexOf(self._heap, client)] - except (KeyError, IndexError): - pass - - self.poller.unregister(key.fileobj) - - # add a job to the pool - fs = self.tpool.submit(self.handle, client.listener, - client.sock, client.addr, client.parser) - - # wrap the future - self._wrap_future(fs, client.listener, client.sock, - client.addr) - - # handle jobs, we give a chance to all jobs to be executed. - if self.futures: - self.notify() - - res = futures.wait(self.futures, timeout=self.timeout, - return_when=futures.FIRST_COMPLETED) - - for fs in res.done: - try: - (keepalive, parser) = fs.result() - # if the connection should be kept alived add it - # to the eventloop and record it - if keepalive: - # flag the socket as non blocked - fs.sock.setblocking(0) - - tconn = TConn(self, fs.listener, fs.sock, - fs.addr, parser) - - # register the connection - heapq.heappush(self._heap, tconn) - - # add the socket to the event loop - self.poller.register(fs.sock, selectors.EVENT_READ, - tconn) - else: - # at this point the connection should be - # closed but we make sure it is. - util.close(fs.sock) - except: - # an exception happened, make sure to close the - # socket. - util.close(fs.sock) - finally: - # remove the future from our list - self.futures.remove(fs) - + events = self.poller.select(1.0) + for key, mask in events: + callback = key.data + callback(key.fileobj) # hanle keepalive timeouts - now = time.time() - while True: - if not len(self._heap): - break + self.murder_keepalived() - conn = heapq.heappop(self._heap) - delta = conn.timeout - now - if delta > 0: - heapq.heappush(self._heap, conn) + # if we more connections than the max number of connections + # accepted on a worker, wait until some complete or exit. + if self.idle_workers >= self.worker_connections: + futures.wait(self.futures, timeout=self.cfg.timeout) + if not res: + self.log.info("max requests achieved") break - else: - # remove the socket from the poller - self.poller.unregister(conn.sock) - # close the socket - util.close(conn.sock) - # shutdown the pool - self.tpool.shutdown(False) self.poller.close() + self.tpool.shutdown(False) # wait for the workers futures.wait(self.futures, timeout=self.cfg.graceful_timeout) # if we have still fures running, try to close them for fs in self.futures: - sock = fs.sock + sock = fs.conn.sock # the future is not running, cancel it if not fs.done() and not fs.running(): @@ -211,27 +207,51 @@ class ThreadWorker(base.Worker): util.close(sock) - def handle(self, listener, client, addr, parser): + def finish_request(self, fs): + try: + (keepalive, conn) = fs.result() + # if the connection should be kept alived add it + # to the eventloop and record it + if keepalive: + # flag the socket as non blocked + conn.sock.setblocking(False) + + # register the connection + heapq.heappush(self._heap, conn) + + # add the socket to the event loop + self.poller.register(conn.sock, selectors.EVENT_READ, + self.handle_client) + else: + try: + del self.clients[conn.sock] + except KeyError: + pass + + util.close(fs.conn.sock) + except: + # an exception happened, make sure to close the + # socket. + util.close(fs.conn.sock) + finally: + # remove the future from our list + self.futures.remove(fs) + self.idle_workers -= 1 + + def handle(self, conn): keepalive = False req = None try: - client.setblocking(1) + conn.sock.setblocking(1) - # wrap the connection - if not parser: - if self.cfg.is_ssl: - client = ssl.wrap_socket(client, server_side=True, - **self.cfg.ssl_options) - parser = http.RequestParser(self.cfg, client) - - req = six.next(parser) + req = six.next(conn.parser) if not req: return (False, None) # handle the request - keepalive = self.handle_request(listener, req, client, addr) + keepalive = self.handle_request(req, conn) if keepalive: - return (keepalive, parser) + return (keepalive, conn) except http.errors.NoMoreData as e: self.log.debug("Ignored premature client disconnection. %s", e) @@ -240,10 +260,10 @@ class ThreadWorker(base.Worker): except ssl.SSLError as e: if e.args[0] == ssl.SSL_ERROR_EOF: self.log.debug("ssl connection closed") - client.close() + conn.sock.close() else: self.log.debug("Error processing SSL request.") - self.handle_error(req, client, addr, e) + self.handle_error(req, conn.sock, conn.addr, e) except socket.error as e: if e.args[0] not in (errno.EPIPE, errno.ECONNRESET): @@ -252,27 +272,30 @@ class ThreadWorker(base.Worker): if e.args[0] == errno.ECONNRESET: self.log.debug("Ignoring connection reset") else: - self.log.debug("Ignoring EPIPE") + self.log.debug("Ignoring connection epipe") except Exception as e: - self.handle_error(req, client, addr, e) + self.handle_error(req, conn.sock, conn.addr, e) - return (False, None) + return (False, conn) - def handle_request(self, listener, req, client, addr): + def handle_request(self, req, conn): environ = {} resp = None try: self.cfg.pre_request(self, req) request_start = datetime.now() - resp, environ = wsgi.create(req, client, addr, - listener.getsockname(), self.cfg) + resp, environ = wsgi.create(req, conn.sock, conn.addr, + conn.listener.getsockname(), self.cfg) environ["wsgi.multithread"] = True self.nr += 1 - if self.nr >= self.max_requests: + + if self.alive and self.nr >= self.max_requests: self.log.info("Autorestarting worker after current request.") + resp.force_close() self.alive = False + if not self.cfg.keepalive: resp.force_close() @@ -283,6 +306,7 @@ class ThreadWorker(base.Worker): else: for item in respiter: resp.write(item) + resp.close() request_time = datetime.now() - request_start self.log.access(resp, req, environ, request_time) @@ -303,8 +327,8 @@ class ThreadWorker(base.Worker): # connection to indicate the error. self.log.exception("Error handling request") try: - client.shutdown(socket.SHUT_RDWR) - client.close() + conn.sock.shutdown(socket.SHUT_RDWR) + conn.sock.close() except socket.error: pass raise StopIteration()