stop to accept more requests when maximum accepted is achieved

this change makes sure that a worker don't handle more requests than it can
achieved.  The new workflow is quite more simple:

listeners are put in the poller. On read we try to accept on them.
When a connection is accepted it is put in the execution queue
When a request is done and the socket can be kept alived, we put it in the
poller, on read event we will try to handle the new request. If it is not put
out of the poller before the keepalive timeout the socket will be closed.
if all threads are busy we are waiting until one request complet. If it
doesn't complete before the timeout we kill the worker.

fix #908
This commit is contained in:
benoitc 2014-12-20 11:16:58 +01:00
parent fd95f66f2d
commit fcd9d04515

View File

@ -65,13 +65,14 @@ class TConn(object):
# initialize the parser # initialize the parser
self.parser = http.RequestParser(self.cfg, self.sock) self.parser = http.RequestParser(self.cfg, self.sock)
return True
return False
def set_timeout(self): def set_timeout(self):
# set the timeout # set the timeout
self.timeout = time.time() + self.cfg.keepalive self.timeout = time.time() + self.cfg.keepalive
def close(self):
util.close(self.sock)
def __lt__(self, other): def __lt__(self, other):
return self.timeout < other.timeout return self.timeout < other.timeout
@ -90,40 +91,48 @@ class ThreadWorker(base.Worker):
self.futures = deque() self.futures = deque()
self._keep = deque() self._keep = deque()
def _wrap_future(self, fs, conn):
fs.conn = conn
self.futures.append(fs)
fs.add_done_callback(self.finish_request)
def init_process(self): def init_process(self):
self.tpool = futures.ThreadPoolExecutor(max_workers=self.cfg.threads) self.tpool = futures.ThreadPoolExecutor(max_workers=self.cfg.threads)
self.poller = selectors.DefaultSelector() self.poller = selectors.DefaultSelector()
super(ThreadWorker, self).init_process() super(ThreadWorker, self).init_process()
def accept(self, listener): def _wrap_future(self, fs, conn):
if not self.alive: fs.conn = conn
return self.futures.append(fs)
fs.add_done_callback(self.finish_request)
def enqueue_req(self, conn):
conn.init()
# submit the connection to a worker
fs = self.tpool.submit(self.handle, conn)
self._wrap_future(fs, conn)
def accept(self, listener):
try: try:
client, addr = listener.accept() client, addr = listener.accept()
# initialize the connection object
conn = TConn(self.cfg, listener, client, addr) conn = TConn(self.cfg, listener, client, addr)
self.nr += 1
# wait for the read event to handle the connection # enqueue the job
self.poller.register(client, selectors.EVENT_READ, self.enqueue_req(conn)
partial(self.handle_client, conn))
except socket.error as e: except socket.error as e:
if e.args[0] not in (errno.EAGAIN, if e.args[0] not in (errno.EAGAIN,
errno.ECONNABORTED, errno.EWOULDBLOCK): errno.ECONNABORTED, errno.EWOULDBLOCK):
raise raise
def handle_client(self, conn, client): def reuse_connection(self, conn, client):
# unregister the client from the poller # unregister the client from the poller
self.poller.unregister(client) self.poller.unregister(client)
# remove the connection from keepalive
try:
self._keep.remove(conn)
except ValueError:
# race condition
return
# submit the connection to a worker # submit the connection to a worker
fs = self.tpool.submit(self.handle, conn) self.enqueue_req(conn)
self._wrap_future(fs, conn)
def murder_keepalived(self): def murder_keepalived(self):
now = time.time() now = time.time()
@ -140,11 +149,24 @@ class ThreadWorker(base.Worker):
self._keep.appendleft(conn) self._keep.appendleft(conn)
break break
else: else:
self.nr -= 1
# remove the socket from the poller # remove the socket from the poller
self.poller.unregister(conn.sock) try:
self.poller.unregister(conn.sock)
except socket.error as e:
if e.args[0] == errno.EBADF:
pass
raise
# close the socket # close the socket
util.close(conn.sock) conn.close()
def is_parent_alive(self):
# If our parent changed then we shut down.
if self.ppid != os.getppid():
self.log.info("Parent changed, shutting down: %s", self)
return False
return True
def run(self): def run(self):
# init listeners, add them to the event loop # init listeners, add them to the event loop
@ -155,55 +177,41 @@ class ThreadWorker(base.Worker):
timeout = self.cfg.timeout or 0.5 timeout = self.cfg.timeout or 0.5
while self.alive: while self.alive:
# If our parent changed then we shut down.
if self.ppid != os.getppid():
self.log.info("Parent changed, shutting down: %s", self)
return
# notify the arbiter we are alive # notify the arbiter we are alive
self.notify() self.notify()
events = self.poller.select(0.2) # can we accept more connections?
for key, mask in events: if self.nr < self.worker_connections:
callback = key.data # wait for an event
callback(key.fileobj) events = self.poller.select(0.02)
for key, mask in events:
callback = key.data
callback(key.fileobj)
# hanle keepalive timeouts # hanle keepalive timeouts
self.murder_keepalived() self.murder_keepalived()
# if we more connections than the max number of connections # if the number of connections is < to the max we can handle at
# accepted on a worker, wait until some complete or exit. # the same time there is no need to wait for one
if len(self.futures) >= self.worker_connections: if len(self.futures) < self.cfg.threads:
res = futures.wait(self.futures, timeout=timeout) continue
if not res:
self.alive = False
self.log.info("max requests achieved")
break
# shutdown the pool result = futures.wait(self.futures, timeout=timeout,
self.poller.close() return_when=futures.FIRST_COMPLETED)
self.tpool.shutdown(False)
# wait for the workers if not result.done:
futures.wait(self.futures, timeout=self.cfg.graceful_timeout) self.tpool.shutdown(False)
self.poller.close()
return
else:
[self.futures.remove(f) for f in result.done]
# if we have still fures running, try to close them
while True:
try:
fs = self.futures.popleft()
except IndexError:
break
sock = fs.conn.sock
# the future is not running, cancel it
if not fs.done() and not fs.running():
fs.cancel()
# make sure we close the sockets after the graceful timeout
util.close(sock)
def finish_request(self, fs): def finish_request(self, fs):
if fs.cancelled():
fs.conn.close()
return
try: try:
(keepalive, conn) = fs.result() (keepalive, conn) = fs.result()
# if the connection should be kept alived add it # if the connection should be kept alived add it
@ -218,28 +226,17 @@ class ThreadWorker(base.Worker):
# add the socket to the event loop # add the socket to the event loop
self.poller.register(conn.sock, selectors.EVENT_READ, self.poller.register(conn.sock, selectors.EVENT_READ,
partial(self.handle_client, conn)) partial(self.reuse_connection, conn))
else: else:
util.close(conn.sock) self.nr -= 1
conn.close()
except: except:
# an exception happened, make sure to close the # an exception happened, make sure to close the
# socket. # socket.
util.close(fs.conn.sock) self.nr -= 1
finally: fs.conn.close()
# remove the future from our list
try:
self.futures.remove(fs)
except ValueError:
pass
def handle(self, conn): def handle(self, conn):
if not conn.init():
# connection kept alive
try:
self._keep.remove(conn)
except ValueError:
pass
keepalive = False keepalive = False
req = None req = None
try: try:
@ -287,8 +284,6 @@ class ThreadWorker(base.Worker):
conn.listener.getsockname(), self.cfg) conn.listener.getsockname(), self.cfg)
environ["wsgi.multithread"] = True environ["wsgi.multithread"] = True
self.nr += 1
if self.alive and self.nr >= self.max_requests: if self.alive and self.nr >= self.max_requests:
self.log.info("Autorestarting worker after current request.") self.log.info("Autorestarting worker after current request.")
resp.force_close() resp.force_close()