fix keepalive

This commit is contained in:
benoitc 2014-05-13 12:30:57 +02:00
parent c353eaacee
commit 6aa99e4441
4 changed files with 74 additions and 67 deletions

View File

@ -16,7 +16,7 @@ def app(environ, start_response):
"""Simplest possible application object""" """Simplest possible application object"""
errors = environ['wsgi.errors'] errors = environ['wsgi.errors']
pprint.pprint(('ENVIRON', environ), stream=errors) # pprint.pprint(('ENVIRON', environ), stream=errors)
data = b'Hello, World!\n' data = b'Hello, World!\n'
status = '200 OK' status = '200 OK'

View File

@ -95,7 +95,7 @@ class Config(object):
## are we using a threaded worker? ## are we using a threaded worker?
is_sync = uri.endswith('SyncWorker') or uri == 'sync' is_sync = uri.endswith('SyncWorker') or uri == 'sync'
if is_sync and self.threads > 1: if is_sync and self.threads > 1:
uri = "gunicorn.workers.gthread.ThreadedWorker" uri = "gunicorn.workers.gthread.ThreadWorker"
worker_class = util.load_class(uri) worker_class = util.load_class(uri)
if hasattr(worker_class, "setup"): if hasattr(worker_class, "setup"):

View File

@ -133,6 +133,8 @@ if hasattr(select, 'kqueue'):
self.events = [] self.events = []
def addfd(self, fd, mode, repeat=True): def addfd(self, fd, mode, repeat=True):
fd = fd_(fd)
if mode == 'r': if mode == 'r':
kmode = select.KQ_FILTER_READ kmode = select.KQ_FILTER_READ
else: else:
@ -150,6 +152,8 @@ if hasattr(select, 'kqueue'):
self.kq.control([ev], 0) self.kq.control([ev], 0)
def delfd(self, fd, mode): def delfd(self, fd, mode):
fd = fd_(fd)
if mode == 'r': if mode == 'r':
kmode = select.KQ_FILTER_READ kmode = select.KQ_FILTER_READ
else: else:
@ -205,6 +209,8 @@ if hasattr(select, "epoll"):
self.events = [] self.events = []
def addfd(self, fd, mode, repeat=True): def addfd(self, fd, mode, repeat=True):
fd = fd_(fd)
if mode == 'r': if mode == 'r':
mode = (select.EPOLLIN, repeat) mode = (select.EPOLLIN, repeat)
else: else:
@ -234,6 +240,8 @@ if hasattr(select, "epoll"):
addfd_(fd, mask) addfd_(fd, mask)
def delfd(self, fd, mode): def delfd(self, fd, mode):
fd = fd_(fd)
if mode == 'r': if mode == 'r':
mode = select.POLLIN | select.POLLPRI mode = select.POLLIN | select.POLLPRI
else: else:
@ -273,9 +281,9 @@ if hasattr(select, "epoll"):
if events: if events:
all_events = [] all_events = []
fds = {} fds = {}
for fd, ev in self.events: for fd, ev in events:
fd = fd_(fd) fd = fd_(fd)
if ev == select.EPOLLIN: if ev & select.EPOLLIN:
mode = 'r' mode = 'r'
else: else:
mode = 'w' mode = 'w'
@ -293,7 +301,7 @@ if hasattr(select, "epoll"):
for m, r in self.fds[fd]: for m, r in self.fds[fd]:
if not r: if not r:
continue continue
modes.append(m, r) modes.append((m, r))
if modes != self.fds[fd]: if modes != self.fds[fd]:
self.fds[fd] = modes self.fds[fd] = modes
@ -401,7 +409,7 @@ if hasattr(select, "poll") or hasattr(select, "epoll"):
if fd not in self.fds: if fd not in self.fds:
continue continue
if ev == select.POLLIN or ev == select.POLLPRI: if ev & select.POLLIN or ev & select.POLLPRI:
mode = 'r' mode = 'r'
else: else:
mode = 'w' mode = 'w'

View File

@ -16,6 +16,7 @@ from datetime import datetime
import errno import errno
import heapq import heapq
import os import os
import operator
import socket import socket
import ssl import ssl
import sys import sys
@ -31,11 +32,11 @@ from .. import six
class TConn(): class TConn():
def __init__(self, worker, listener, sock, addr): def __init__(self, worker, listener, sock, addr, parser):
self.listener = listener self.listener = listener
self.sock = sock self.sock = sock
self.addr = addr self.addr = addr
self.when = fs.timeout self.parser = parser
# set the timeout # set the timeout
self.timeout = time.time() + worker.cfg.keepalive self.timeout = time.time() + worker.cfg.keepalive
@ -57,15 +58,16 @@ class ThreadWorker(base.Worker):
self._heap = [] self._heap = []
self.keepalived = {} self.keepalived = {}
def _wrap(self, fs, listener, client, addr): def _wrap_future(self, fs, listener, client, addr):
fs.listener = listener fs.listener = listener
fs.sock = client fs.sock = client
fs.addr = addr fs.addr = addr
self.futures.add(fs)
def run(self): def run(self):
for s in self.sockets: for s in self.sockets:
s.setblocking(False) s.setblocking(0)
self.poller.add_fd(s, 'r') self.poller.addfd(s, 'r')
listeners = dict([(s.fileno(), s) for s in self.sockets]) listeners = dict([(s.fileno(), s) for s in self.sockets])
while self.alive: while self.alive:
@ -78,77 +80,70 @@ class ThreadWorker(base.Worker):
# notify the arbiter we are alive # notify the arbiter we are alive
self.notify() self.notify()
events = self.poller.wait(0.1) events = self.poller.wait(0.01)
if events: if events:
for (fd, mode) in events: for (fd, mode) in events:
fs = None fs = None
client = None client = None
if fd in listeners: if fd in listeners:
listener = listeners[fd]
# start to accept connections # start to accept connections
try: try:
client, addr = sock.accept() client, addr = listener.accept()
# add a job to the pool # add a job to the pool
fs = self.tpool.submit(self.handle, listeners[fd], fs = self.tpool.submit(self.handle, listener,
client, addr, False) client, addr, False)
self._wrap_future(fs, listemers[fd], self._wrap_future(fs, listener, client, addr)
client, addr)
except socket.error as e: except socket.error as e:
if e.args[0] not in (errno.EAGAIN, if e.args[0] not in (errno.EAGAIN,
errno.ECONNABORTED, errno.EWOULDBLOCK): errno.ECONNABORTED, errno.EWOULDBLOCK):
raise raise
else: elif fd in self.keepalived:
# keepalive connection # get the client connection
if fd in self.keepalived: client = self.keepalived[fd]
# get the client connection
client = self.keepalived[fd]
# remove it from the heap # remove it from the heap
try: try:
del self._heap[operator.indexOf(self._heap, t)] del self._heap[operator.indexOf(self._heap, client)]
except (KeyError, IndexError): except (KeyError, IndexError):
pass pass
# add a job to the pool # add a job to the pool
fs = self.tpool.submit(self.handle, client.listener, fs = self.tpool.submit(self.handle, client.listener,
client.sock, client.addr, True) client.sock, client.addr, client.parser)
self._wrap_future(fs, client.listener, # wrap the future
client.sock, client.addr) self._wrap_future(fs, client.listener, client.sock,
client.addr)
if fs is not None:
self.futures.add(fs)
# handle jobs, we give a chance to all jobs to be executed. # handle jobs, we give a chance to all jobs to be executed.
if self.futures: if self.futures:
res = futures.wait([fs for fs in self.futures], self.notify()
timeout=self.timeout,
return_when=futures.ALL_COMPLETED)
for fs in res: res = futures.wait(self.futures, timeout=self.timeout,
# remove the future from our list return_when=futures.FIRST_COMPLETED)
self.futures.remove(fs)
for fs in res.done:
try: try:
result = fs.result() (keepalive, parser) = fs.result()
# if the connection should be kept alived add it # if the connection should be kept alived add it
# to the eventloop and record it # to the eventloop and record it
if result and result is not None: if keepalive:
# flag the socket as non blocked # flag the socket as non blocked
fs.sock.setblocking(0) fs.sock.setblocking(0)
util.close_on_exec(fs.sock)
tconn = TConn(self, fs.listener, fs.sock, tconn = TConn(self, fs.listener, fs.sock,
fs.addr) fs.addr, parser)
# register the connection # register the connection
heapq.heappush(self._heap, tconn) heapq.heappush(self._heap, tconn)
self.keepalived[fs.sock.fileno()] = tconn self.keepalived[fs.sock.fileno()] = tconn
# add the socket to the event loop # add the socket to the event loop
self.poller.add_fd(fs.sock.fileno(), 'r') self.poller.addfd(fs.sock, 'r', False)
else: else:
# at this point the connection should be # at this point the connection should be
# closed but we make sure it is. # closed but we make sure it is.
@ -157,32 +152,34 @@ class ThreadWorker(base.Worker):
# an exception happened, make sure to close the # an exception happened, make sure to close the
# socket. # socket.
util.close(fs.sock) util.close(fs.sock)
finally:
# remove the future from our list
self.futures.remove(fs)
# hanle keepalive timeouts # hanle keepalive timeouts
now = time.time() now = time.time()
while True: while True:
if not len(self._heap): if not len(self._heap):
continue break
conn = heapq.heappop(self._heap) conn = heapq.heappop(self._heap)
delta = t.timeout = now delta = conn.timeout - now
if delta > 0: if delta > 0:
heapq.heappush(self._heap, t) heapq.heappush(self._heap, conn)
break break
else: else:
# remove the socket from the poller # remove the socket from the poller
self.poller.del_fd(conn.sock.fileno(), 'r') self.poller.delfd(conn.sock.fileno(), 'r')
# close the socket # close the socket
conn.sock.close() util.close(conn.sock)
# shutdown the pool # shutdown the pool
self.tpool.shutdown(False) self.tpool.shutdown(False)
# wait for the workers # wait for the workers
futures.wait([fs for fs in self.futures], futures.wait(self.futures, timeout=self.cfg.graceful_timeout)
timeout=self.cfg.graceful_timeout)
# if we have still fures running, try to close them # if we have still fures running, try to close them
for fs in self.futures: for fs in self.futures:
@ -196,28 +193,32 @@ class ThreadWorker(base.Worker):
util.close(sock) util.close(sock)
def handle(self, listener, client, addr, keepalived): def handle(self, listener, client, addr, parser):
keepalive = False keepalive = False
req = None
try: try:
client.setblocking(1)
# wrap the connection # wrap the connection
if not keepalived and self.cfg.is_ssl: if not parser:
client = ssl.wrap_socket(client, server_side=True, if self.cfg.is_ssl:
**self.cfg.ssl_options) client = ssl.wrap_socket(client, server_side=True,
**self.cfg.ssl_options)
parser = http.RequestParser(self.cfg, client)
client.setblocking(1)
util.close_on_exec(client)
parser = http.RequestParser(self.cfg, sock)
req = six.next(parser) req = six.next(parser)
if not req:
return (False, None)
# handle the request # handle the request
keepalive = self.handle_request(listener, req, client, addr) keepalive = self.handle_request(listener, req, client, addr)
if keepalive:
return (keepalive, parser)
except http.errors.NoMoreData as e: except http.errors.NoMoreData as e:
self.log.debug("Ignored premature client disconnection. %s", e) self.log.debug("Ignored premature client disconnection. %s", e)
except StopIteration as e: except StopIteration as e:
self.log.debug("Closing connection. %s", e) self.log.debug("Closing connection. %s", e)
except ssl.SSLError as e: except ssl.SSLError as e:
if e.args[0] == ssl.SSL_ERROR_EOF: if e.args[0] == ssl.SSL_ERROR_EOF:
self.log.debug("ssl connection closed") self.log.debug("ssl connection closed")
@ -236,10 +237,8 @@ class ThreadWorker(base.Worker):
self.log.debug("Ignoring EPIPE") self.log.debug("Ignoring EPIPE")
except Exception as e: except Exception as e:
self.handle_error(req, client, addr, e) self.handle_error(req, client, addr, e)
finally:
if not keepalive: return (False, None)
util.close(client)
return keepalive
def handle_request(self, listener, req, client, addr): def handle_request(self, listener, req, client, addr):
environ = {} environ = {}
@ -274,8 +273,8 @@ class ThreadWorker(base.Worker):
respiter.close() respiter.close()
if resp.should_close(): if resp.should_close():
raise StopIteration() self.log.debug("Closing connection.")
return False
except socket.error: except socket.error:
exc_info = sys.exc_info() exc_info = sys.exc_info()
# pass to next try-except level # pass to next try-except level