From 6aa99e44415a2a43f0f9476966a782a8e5d42a71 Mon Sep 17 00:00:00 2001 From: benoitc Date: Tue, 13 May 2014 12:30:57 +0200 Subject: [PATCH] fix keepalive --- examples/test.py | 2 +- gunicorn/config.py | 2 +- gunicorn/fdevents.py | 16 +++-- gunicorn/workers/gthread.py | 121 ++++++++++++++++++------------------ 4 files changed, 74 insertions(+), 67 deletions(-) diff --git a/examples/test.py b/examples/test.py index 610cf5b1..77bef952 100644 --- a/examples/test.py +++ b/examples/test.py @@ -16,7 +16,7 @@ def app(environ, start_response): """Simplest possible application object""" errors = environ['wsgi.errors'] - pprint.pprint(('ENVIRON', environ), stream=errors) +# pprint.pprint(('ENVIRON', environ), stream=errors) data = b'Hello, World!\n' status = '200 OK' diff --git a/gunicorn/config.py b/gunicorn/config.py index 58ccc26f..cd4d1784 100644 --- a/gunicorn/config.py +++ b/gunicorn/config.py @@ -95,7 +95,7 @@ class Config(object): ## are we using a threaded worker? is_sync = uri.endswith('SyncWorker') or uri == 'sync' if is_sync and self.threads > 1: - uri = "gunicorn.workers.gthread.ThreadedWorker" + uri = "gunicorn.workers.gthread.ThreadWorker" worker_class = util.load_class(uri) if hasattr(worker_class, "setup"): diff --git a/gunicorn/fdevents.py b/gunicorn/fdevents.py index 0bddb41e..1fee335a 100644 --- a/gunicorn/fdevents.py +++ b/gunicorn/fdevents.py @@ -133,6 +133,8 @@ if hasattr(select, 'kqueue'): self.events = [] def addfd(self, fd, mode, repeat=True): + fd = fd_(fd) + if mode == 'r': kmode = select.KQ_FILTER_READ else: @@ -150,6 +152,8 @@ if hasattr(select, 'kqueue'): self.kq.control([ev], 0) def delfd(self, fd, mode): + fd = fd_(fd) + if mode == 'r': kmode = select.KQ_FILTER_READ else: @@ -205,6 +209,8 @@ if hasattr(select, "epoll"): self.events = [] def addfd(self, fd, mode, repeat=True): + fd = fd_(fd) + if mode == 'r': mode = (select.EPOLLIN, repeat) else: @@ -234,6 +240,8 @@ if hasattr(select, "epoll"): addfd_(fd, mask) def delfd(self, fd, mode): + fd = fd_(fd) + if mode == 'r': mode = select.POLLIN | select.POLLPRI else: @@ -273,9 +281,9 @@ if hasattr(select, "epoll"): if events: all_events = [] fds = {} - for fd, ev in self.events: + for fd, ev in events: fd = fd_(fd) - if ev == select.EPOLLIN: + if ev & select.EPOLLIN: mode = 'r' else: mode = 'w' @@ -293,7 +301,7 @@ if hasattr(select, "epoll"): for m, r in self.fds[fd]: if not r: continue - modes.append(m, r) + modes.append((m, r)) if modes != self.fds[fd]: self.fds[fd] = modes @@ -401,7 +409,7 @@ if hasattr(select, "poll") or hasattr(select, "epoll"): if fd not in self.fds: continue - if ev == select.POLLIN or ev == select.POLLPRI: + if ev & select.POLLIN or ev & select.POLLPRI: mode = 'r' else: mode = 'w' diff --git a/gunicorn/workers/gthread.py b/gunicorn/workers/gthread.py index a0aebf2f..6336e6de 100644 --- a/gunicorn/workers/gthread.py +++ b/gunicorn/workers/gthread.py @@ -16,6 +16,7 @@ from datetime import datetime import errno import heapq import os +import operator import socket import ssl import sys @@ -31,11 +32,11 @@ from .. import six class TConn(): - def __init__(self, worker, listener, sock, addr): + def __init__(self, worker, listener, sock, addr, parser): self.listener = listener self.sock = sock self.addr = addr - self.when = fs.timeout + self.parser = parser # set the timeout self.timeout = time.time() + worker.cfg.keepalive @@ -57,15 +58,16 @@ class ThreadWorker(base.Worker): self._heap = [] self.keepalived = {} - def _wrap(self, fs, listener, client, addr): + def _wrap_future(self, fs, listener, client, addr): fs.listener = listener fs.sock = client fs.addr = addr + self.futures.add(fs) def run(self): for s in self.sockets: - s.setblocking(False) - self.poller.add_fd(s, 'r') + s.setblocking(0) + self.poller.addfd(s, 'r') listeners = dict([(s.fileno(), s) for s in self.sockets]) while self.alive: @@ -78,77 +80,70 @@ class ThreadWorker(base.Worker): # notify the arbiter we are alive self.notify() - events = self.poller.wait(0.1) + events = self.poller.wait(0.01) if events: for (fd, mode) in events: fs = None client = None if fd in listeners: + listener = listeners[fd] # start to accept connections try: - client, addr = sock.accept() + client, addr = listener.accept() # add a job to the pool - fs = self.tpool.submit(self.handle, listeners[fd], + fs = self.tpool.submit(self.handle, listener, client, addr, False) - self._wrap_future(fs, listemers[fd], - client, addr) + self._wrap_future(fs, listener, client, addr) except socket.error as e: if e.args[0] not in (errno.EAGAIN, errno.ECONNABORTED, errno.EWOULDBLOCK): raise - else: - # keepalive connection - if fd in self.keepalived: - # get the client connection - client = self.keepalived[fd] + elif fd in self.keepalived: + # get the client connection + client = self.keepalived[fd] - # remove it from the heap - try: - del self._heap[operator.indexOf(self._heap, t)] - except (KeyError, IndexError): - pass + # remove it from the heap + try: + del self._heap[operator.indexOf(self._heap, client)] + except (KeyError, IndexError): + pass - # add a job to the pool - fs = self.tpool.submit(self.handle, client.listener, - client.sock, client.addr, True) + # add a job to the pool + fs = self.tpool.submit(self.handle, client.listener, + client.sock, client.addr, client.parser) - self._wrap_future(fs, client.listener, - client.sock, client.addr) - - if fs is not None: - self.futures.add(fs) + # wrap the future + self._wrap_future(fs, client.listener, client.sock, + client.addr) # handle jobs, we give a chance to all jobs to be executed. if self.futures: - res = futures.wait([fs for fs in self.futures], - timeout=self.timeout, - return_when=futures.ALL_COMPLETED) + self.notify() - for fs in res: - # remove the future from our list - self.futures.remove(fs) + res = futures.wait(self.futures, timeout=self.timeout, + return_when=futures.FIRST_COMPLETED) + for fs in res.done: try: - result = fs.result() + (keepalive, parser) = fs.result() # if the connection should be kept alived add it # to the eventloop and record it - if result and result is not None: + if keepalive: # flag the socket as non blocked fs.sock.setblocking(0) - util.close_on_exec(fs.sock) tconn = TConn(self, fs.listener, fs.sock, - fs.addr) + fs.addr, parser) # register the connection heapq.heappush(self._heap, tconn) self.keepalived[fs.sock.fileno()] = tconn # add the socket to the event loop - self.poller.add_fd(fs.sock.fileno(), 'r') + self.poller.addfd(fs.sock, 'r', False) else: # at this point the connection should be # closed but we make sure it is. @@ -157,32 +152,34 @@ class ThreadWorker(base.Worker): # an exception happened, make sure to close the # socket. util.close(fs.sock) + finally: + # remove the future from our list + self.futures.remove(fs) # hanle keepalive timeouts now = time.time() while True: if not len(self._heap): - continue + break conn = heapq.heappop(self._heap) - delta = t.timeout = now + delta = conn.timeout - now if delta > 0: - heapq.heappush(self._heap, t) + heapq.heappush(self._heap, conn) break else: # remove the socket from the poller - self.poller.del_fd(conn.sock.fileno(), 'r') + self.poller.delfd(conn.sock.fileno(), 'r') # close the socket - conn.sock.close() + util.close(conn.sock) # shutdown the pool self.tpool.shutdown(False) # wait for the workers - futures.wait([fs for fs in self.futures], - timeout=self.cfg.graceful_timeout) + futures.wait(self.futures, timeout=self.cfg.graceful_timeout) # if we have still fures running, try to close them for fs in self.futures: @@ -196,28 +193,32 @@ class ThreadWorker(base.Worker): util.close(sock) - def handle(self, listener, client, addr, keepalived): + def handle(self, listener, client, addr, parser): keepalive = False + req = None try: + client.setblocking(1) + # wrap the connection - if not keepalived and self.cfg.is_ssl: - client = ssl.wrap_socket(client, server_side=True, - **self.cfg.ssl_options) + if not parser: + if self.cfg.is_ssl: + client = ssl.wrap_socket(client, server_side=True, + **self.cfg.ssl_options) + parser = http.RequestParser(self.cfg, client) - client.setblocking(1) - util.close_on_exec(client) - - parser = http.RequestParser(self.cfg, sock) req = six.next(parser) + if not req: + return (False, None) # handle the request keepalive = self.handle_request(listener, req, client, addr) + if keepalive: + return (keepalive, parser) except http.errors.NoMoreData as e: self.log.debug("Ignored premature client disconnection. %s", e) except StopIteration as e: self.log.debug("Closing connection. %s", e) - except ssl.SSLError as e: if e.args[0] == ssl.SSL_ERROR_EOF: self.log.debug("ssl connection closed") @@ -236,10 +237,8 @@ class ThreadWorker(base.Worker): self.log.debug("Ignoring EPIPE") except Exception as e: self.handle_error(req, client, addr, e) - finally: - if not keepalive: - util.close(client) - return keepalive + + return (False, None) def handle_request(self, listener, req, client, addr): environ = {} @@ -274,8 +273,8 @@ class ThreadWorker(base.Worker): respiter.close() if resp.should_close(): - raise StopIteration() - + self.log.debug("Closing connection.") + return False except socket.error: exc_info = sys.exc_info() # pass to next try-except level