some quick optimisations

This commit is contained in:
benoitc 2014-05-31 00:17:29 +02:00
parent c8e93a6f21
commit 5ba749e9ca

View File

@ -14,7 +14,7 @@ from collections import deque
import concurrent.futures as futures import concurrent.futures as futures
from datetime import datetime from datetime import datetime
import errno import errno
import heapq from functools import partial
import os import os
import operator import operator
import socket import socket
@ -49,10 +49,10 @@ class TConn():
self.parser = None self.parser = None
# set the socket to non blocking # set the socket to non blocking
#self.sock.setblocking(False) self.sock.setblocking(False)
def maybe_init(self): def init(self):
if self.parser is None: if self.parser is None:
# wrap the socket if needed # wrap the socket if needed
if self.cfg.is_ssl: if self.cfg.is_ssl:
@ -62,6 +62,8 @@ class TConn():
# initialize the parser # initialize the parser
self.parser = http.RequestParser(self.cfg, self.sock) self.parser = http.RequestParser(self.cfg, self.sock)
return True
return False
def set_timeout(self): def set_timeout(self):
# set the timeout # set the timeout
@ -78,18 +80,17 @@ class ThreadWorker(base.Worker):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super(ThreadWorker, self).__init__(*args, **kwargs) super(ThreadWorker, self).__init__(*args, **kwargs)
self.worker_connections = self.cfg.worker_connections self.worker_connections = self.cfg.worker_connections
self.idle_workers = 0
# initialise the pool # initialise the pool
self.tpool = None self.tpool = None
self.poller = None self.poller = None
self.futures = set() self.futures = deque()
self._keep = deque() self._keep = deque()
def _wrap_future(self, fs, conn): def _wrap_future(self, fs, conn):
fs.conn = conn fs.conn = conn
self.futures.add(fs) self.futures.append(fs)
fs.add_done_callback(self.finish_request) fs.add_done_callback(self.finish_request)
def init_process(self): def init_process(self):
@ -97,28 +98,26 @@ class ThreadWorker(base.Worker):
self.poller = selectors.DefaultSelector() self.poller = selectors.DefaultSelector()
super(ThreadWorker, self).init_process() super(ThreadWorker, self).init_process()
def accept(self, listener):
def accept(self, listener, *args):
try: try:
client, addr = listener.accept() client, addr = listener.accept()
conn = TConn(self.cfg, listener, client, addr) conn = TConn(self.cfg, listener, client, addr)
# wait for the read event to handle the connection # wait for the read event to handle the connection
self.poller.register(client, selectors.EVENT_READ, self.poller.register(client, selectors.EVENT_READ,
(self.handle_client, (conn,))) partial(self.handle_client, conn))
except socket.error as e: except socket.error as e:
if e.args[0] not in (errno.EAGAIN, if e.args[0] not in (errno.EAGAIN,
errno.ECONNABORTED, errno.EWOULDBLOCK): errno.ECONNABORTED, errno.EWOULDBLOCK):
raise raise
def handle_client(self, client, conn): def handle_client(self, conn, client):
# unregister the client from the poller # unregister the client from the poller
self.poller.unregister(client) self.poller.unregister(client)
# submit the connection to a worker # submit the connection to a worker
fs = self.tpool.submit(self.handle, conn) fs = self.tpool.submit(self.handle, conn)
self.idle_workers += 1
self._wrap_future(fs, conn) self._wrap_future(fs, conn)
@ -145,8 +144,7 @@ class ThreadWorker(base.Worker):
# init listeners, add them to the event loop # init listeners, add them to the event loop
for s in self.sockets: for s in self.sockets:
s.setblocking(False) s.setblocking(False)
self.poller.register(s, selectors.EVENT_READ, self.poller.register(s, selectors.EVENT_READ, self.accept)
(self.accept,()))
while self.alive: while self.alive:
# If our parent changed then we shut down. # If our parent changed then we shut down.
@ -157,17 +155,17 @@ class ThreadWorker(base.Worker):
# notify the arbiter we are alive # notify the arbiter we are alive
self.notify() self.notify()
events = self.poller.select(1.0) events = self.poller.select(0.01)
for key, mask in events: for key, mask in events:
(callback, args) = key.data callback = key.data
callback(key.fileobj, *args) callback(key.fileobj)
# hanle keepalive timeouts # hanle keepalive timeouts
self.murder_keepalived() self.murder_keepalived()
# if we more connections than the max number of connections # if we more connections than the max number of connections
# accepted on a worker, wait until some complete or exit. # accepted on a worker, wait until some complete or exit.
if self.idle_workers >= self.worker_connections: if len(self.futures) >= self.worker_connections:
futures.wait(self.futures, timeout=self.cfg.timeout) futures.wait(self.futures, timeout=self.cfg.timeout)
if not res: if not res:
self.log.info("max requests achieved") self.log.info("max requests achieved")
@ -207,25 +205,27 @@ class ThreadWorker(base.Worker):
# add the socket to the event loop # add the socket to the event loop
self.poller.register(conn.sock, selectors.EVENT_READ, self.poller.register(conn.sock, selectors.EVENT_READ,
(self.handle_client, (conn,))) partial(self.handle_client, conn))
else: else:
util.close(fs.conn.sock) util.close(conn.sock)
except: except:
# an exception happened, make sure to close the # an exception happened, make sure to close the
# socket. # socket.
util.close(fs.conn.sock) util.close(fs.conn.sock)
finally: finally:
# remove the future from our list # remove the future from our list
self.futures.remove(fs) try:
self.idle_workers -= 1 self.futures.remove(fs)
except ValueError:
pass
def handle(self, conn): def handle(self, conn):
try: if not conn.init():
self._keep.remove(conn) # connection kept alive
except ValueError: try:
pass self._keep.remove(conn)
except ValueError:
conn.maybe_init() pass
keepalive = False keepalive = False
req = None req = None