make the code simpler and fix issue with ab

This commit is contained in:
benoitc 2014-05-30 23:26:30 +02:00
parent f8b415496d
commit c8e93a6f21

View File

@ -10,7 +10,7 @@
# If no event happen after the keep alive timeout, the connectoin is # If no event happen after the keep alive timeout, the connectoin is
# closed. # closed.
from collections import deque
import concurrent.futures as futures import concurrent.futures as futures
from datetime import datetime from datetime import datetime
import errno import errno
@ -39,14 +39,33 @@ KEEPALIVED = 1
class TConn(): class TConn():
def __init__(self, worker, listener, sock, addr, parser): def __init__(self, cfg, listener, sock, addr):
self.cfg = cfg
self.listener = listener self.listener = listener
self.sock = sock self.sock = sock
self.addr = addr self.addr = addr
self.parser = parser
self.timeout = None
self.parser = None
# set the socket to non blocking
#self.sock.setblocking(False)
def maybe_init(self):
if self.parser is None:
# wrap the socket if needed
if self.cfg.is_ssl:
client = ssl.wrap_socket(client, server_side=True,
**self.cfg.ssl_options)
# initialize the parser
self.parser = http.RequestParser(self.cfg, self.sock)
def set_timeout(self):
# set the timeout # set the timeout
self.timeout = time.time() + worker.cfg.keepalive self.timeout = time.time() + self.cfg.keepalive
def __lt__(self, other): def __lt__(self, other):
return self.timeout < other.timeout return self.timeout < other.timeout
@ -65,8 +84,7 @@ class ThreadWorker(base.Worker):
self.tpool = None self.tpool = None
self.poller = None self.poller = None
self.futures = set() self.futures = set()
self._heap = [] self._keep = deque()
self.clients = {}
def _wrap_future(self, fs, conn): def _wrap_future(self, fs, conn):
@ -74,80 +92,48 @@ class ThreadWorker(base.Worker):
self.futures.add(fs) self.futures.add(fs)
fs.add_done_callback(self.finish_request) fs.add_done_callback(self.finish_request)
def _unregister_keepalive(self, conn):
try:
del self._heap[operator.indexOf(self._heap, conn)]
except (KeyError, IndexError, ValueError):
pass
def init_process(self): def init_process(self):
self.tpool = futures.ThreadPoolExecutor(max_workers=self.cfg.threads) self.tpool = futures.ThreadPoolExecutor(max_workers=self.cfg.threads)
self.poller = selectors.DefaultSelector() self.poller = selectors.DefaultSelector()
super(ThreadWorker, self).init_process() super(ThreadWorker, self).init_process()
def acceptor(self, listener): def accept(self, listener, *args):
try: try:
client, addr = listener.accept() client, addr = listener.accept()
client.setblocking(False) conn = TConn(self.cfg, listener, client, addr)
# wrap the socket if needed
if self.cfg.is_ssl:
client = ssl.wrap_socket(client, server_side=True,
**self.cfg.ssl_options)
# initialise the parser
parser = http.RequestParser(self.cfg, client)
# register the connection
tconn = TConn(self, listener, client, addr, parser)
self.clients[client] = tconn
# wait for the read event to handle the connection # wait for the read event to handle the connection
self.poller.register(client, selectors.EVENT_READ, self.poller.register(client, selectors.EVENT_READ,
self.handle_client) (self.handle_client, (conn,)))
except socket.error as e: except socket.error as e:
if e.args[0] not in (errno.EAGAIN, if e.args[0] not in (errno.EAGAIN,
errno.ECONNABORTED, errno.EWOULDBLOCK): errno.ECONNABORTED, errno.EWOULDBLOCK):
raise raise
def handle_client(self, client): def handle_client(self, client, conn):
# unregister the client from the poller # unregister the client from the poller
self.poller.unregister(client) self.poller.unregister(client)
try: # submit the connection to a worker
conn = self.clients[client] fs = self.tpool.submit(self.handle, conn)
self.idle_workers += 1
self._wrap_future(fs, conn)
# maybe unregister the keepalive from the heap
self._unregister_keepalive(conn)
# submit the connection to a worker
fs = self.tpool.submit(self.handle, conn)
self.idle_workers += 1
self._wrap_future(fs, conn)
except KeyError:
# no connection registered
return
def murder_keepalived(self): def murder_keepalived(self):
now = time.time() now = time.time()
while True: while True:
if not len(self._heap): if not len(self._keep):
break break
conn = heapq.heappop(self._heap) delta = self._keep[0].timeout - now
delta = conn.timeout - now
if delta > 0: if delta > 0:
heapq.heappush(self._heap, conn)
break break
else: else:
# make sure the connection can't be handled # remove the connection from the queue
try: conn = self._keep.popleft()
del self.clients[conn.sock]
except KeyError:
pass
# remove the socket from the poller # remove the socket from the poller
self.poller.unregister(conn.sock) self.poller.unregister(conn.sock)
@ -155,13 +141,12 @@ class ThreadWorker(base.Worker):
# close the socket # close the socket
util.close(conn.sock) util.close(conn.sock)
def run(self): def run(self):
# init listeners, add them to the event loop # init listeners, add them to the event loop
for s in self.sockets: for s in self.sockets:
s.setblocking(False) s.setblocking(False)
self.poller.register(s, selectors.EVENT_READ, self.acceptor) self.poller.register(s, selectors.EVENT_READ,
(self.accept,()))
while self.alive: while self.alive:
# If our parent changed then we shut down. # If our parent changed then we shut down.
@ -174,8 +159,8 @@ class ThreadWorker(base.Worker):
events = self.poller.select(1.0) events = self.poller.select(1.0)
for key, mask in events: for key, mask in events:
callback = key.data (callback, args) = key.data
callback(key.fileobj) callback(key.fileobj, *args)
# hanle keepalive timeouts # hanle keepalive timeouts
self.murder_keepalived() self.murder_keepalived()
@ -217,17 +202,13 @@ class ThreadWorker(base.Worker):
conn.sock.setblocking(False) conn.sock.setblocking(False)
# register the connection # register the connection
heapq.heappush(self._heap, conn) conn.set_timeout()
self._keep.append(conn)
# add the socket to the event loop # add the socket to the event loop
self.poller.register(conn.sock, selectors.EVENT_READ, self.poller.register(conn.sock, selectors.EVENT_READ,
self.handle_client) (self.handle_client, (conn,)))
else: else:
try:
del self.clients[conn.sock]
except KeyError:
pass
util.close(fs.conn.sock) util.close(fs.conn.sock)
except: except:
# an exception happened, make sure to close the # an exception happened, make sure to close the
@ -239,6 +220,13 @@ class ThreadWorker(base.Worker):
self.idle_workers -= 1 self.idle_workers -= 1
def handle(self, conn): def handle(self, conn):
try:
self._keep.remove(conn)
except ValueError:
pass
conn.maybe_init()
keepalive = False keepalive = False
req = None req = None
try: try: