make sure we can alwas accept connection.

This is done by limiting the number of kept alived requests to MAX WORKER
CONNECTIONS - N Threads so at any time we can accept N connection.
This commit is contained in:
benoitc 2014-12-20 16:52:22 +01:00
parent fcd9d04515
commit 1f92511430

View File

@ -18,6 +18,7 @@ import os
import socket import socket
import ssl import ssl
import sys import sys
from threading import RLock
import time import time
from .. import http from .. import http
@ -84,16 +85,19 @@ class ThreadWorker(base.Worker):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super(ThreadWorker, self).__init__(*args, **kwargs) super(ThreadWorker, self).__init__(*args, **kwargs)
self.worker_connections = self.cfg.worker_connections self.worker_connections = self.cfg.worker_connections
self.max_keepalived = self.cfg.worker_connections - self.cfg.threads
# initialise the pool # initialise the pool
self.tpool = None self.tpool = None
self.poller = None self.poller = None
self._lock = None
self.futures = deque() self.futures = deque()
self._keep = deque() self._keep = deque()
def init_process(self): def init_process(self):
self.tpool = futures.ThreadPoolExecutor(max_workers=self.cfg.threads) self.tpool = futures.ThreadPoolExecutor(max_workers=self.cfg.threads)
self.poller = selectors.DefaultSelector() self.poller = selectors.DefaultSelector()
self._lock = RLock()
super(ThreadWorker, self).init_process() super(ThreadWorker, self).init_process()
def _wrap_future(self, fs, conn): def _wrap_future(self, fs, conn):
@ -121,9 +125,9 @@ class ThreadWorker(base.Worker):
raise raise
def reuse_connection(self, conn, client): def reuse_connection(self, conn, client):
with self._lock:
# unregister the client from the poller # unregister the client from the poller
self.poller.unregister(client) self.poller.unregister(client)
# remove the connection from keepalive # remove the connection from keepalive
try: try:
self._keep.remove(conn) self._keep.remove(conn)
@ -137,6 +141,7 @@ class ThreadWorker(base.Worker):
def murder_keepalived(self): def murder_keepalived(self):
now = time.time() now = time.time()
while True: while True:
with self._lock:
try: try:
# remove the connection from the queue # remove the connection from the queue
conn = self._keep.popleft() conn = self._keep.popleft()
@ -146,16 +151,17 @@ class ThreadWorker(base.Worker):
delta = conn.timeout - now delta = conn.timeout - now
if delta > 0: if delta > 0:
# add the connection back to the queue # add the connection back to the queue
with self._lock:
self._keep.appendleft(conn) self._keep.appendleft(conn)
break break
else: else:
self.nr -= 1 self.nr -= 1
# remove the socket from the poller # remove the socket from the poller
with self._lock:
try: try:
self.poller.unregister(conn.sock) self.poller.unregister(conn.sock)
except socket.error as e: except socket.error as e:
if e.args[0] == errno.EBADF: if e.args[0] != errno.EBADF:
pass
raise raise
# close the socket # close the socket
@ -200,12 +206,14 @@ class ThreadWorker(base.Worker):
return_when=futures.FIRST_COMPLETED) return_when=futures.FIRST_COMPLETED)
if not result.done: if not result.done:
self.tpool.shutdown(False) break
self.poller.close()
return
else: else:
[self.futures.remove(f) for f in result.done] [self.futures.remove(f) for f in result.done]
self.tpool.shutdown(False)
self.poller.close()
def finish_request(self, fs): def finish_request(self, fs):
if fs.cancelled(): if fs.cancelled():
@ -222,6 +230,7 @@ class ThreadWorker(base.Worker):
# register the connection # register the connection
conn.set_timeout() conn.set_timeout()
with self._lock:
self._keep.append(conn) self._keep.append(conn)
# add the socket to the event loop # add the socket to the event loop
@ -291,6 +300,8 @@ class ThreadWorker(base.Worker):
if not self.cfg.keepalive: if not self.cfg.keepalive:
resp.force_close() resp.force_close()
elif len(self._keep) >= self.max_keepalived:
resp.force_close()
respiter = self.wsgi(environ, resp.start_response) respiter = self.wsgi(environ, resp.start_response)
try: try: