mirror of
https://github.com/frappe/gunicorn.git
synced 2026-01-14 11:09:11 +08:00
Merge pull request #962 from benoitc/fix/908
stop to accept more requests when maximum accepted is achieved
This commit is contained in:
commit
fa1b7cc828
@ -18,6 +18,7 @@ import os
|
|||||||
import socket
|
import socket
|
||||||
import ssl
|
import ssl
|
||||||
import sys
|
import sys
|
||||||
|
from threading import RLock
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from .. import http
|
from .. import http
|
||||||
@ -65,13 +66,14 @@ class TConn(object):
|
|||||||
|
|
||||||
# initialize the parser
|
# initialize the parser
|
||||||
self.parser = http.RequestParser(self.cfg, self.sock)
|
self.parser = http.RequestParser(self.cfg, self.sock)
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
def set_timeout(self):
|
def set_timeout(self):
|
||||||
# set the timeout
|
# set the timeout
|
||||||
self.timeout = time.time() + self.cfg.keepalive
|
self.timeout = time.time() + self.cfg.keepalive
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
util.close(self.sock)
|
||||||
|
|
||||||
def __lt__(self, other):
|
def __lt__(self, other):
|
||||||
return self.timeout < other.timeout
|
return self.timeout < other.timeout
|
||||||
|
|
||||||
@ -83,68 +85,94 @@ class ThreadWorker(base.Worker):
|
|||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
super(ThreadWorker, self).__init__(*args, **kwargs)
|
super(ThreadWorker, self).__init__(*args, **kwargs)
|
||||||
self.worker_connections = self.cfg.worker_connections
|
self.worker_connections = self.cfg.worker_connections
|
||||||
|
self.max_keepalived = self.cfg.worker_connections - self.cfg.threads
|
||||||
|
|
||||||
# initialise the pool
|
# initialise the pool
|
||||||
self.tpool = None
|
self.tpool = None
|
||||||
self.poller = None
|
self.poller = None
|
||||||
|
self._lock = None
|
||||||
self.futures = deque()
|
self.futures = deque()
|
||||||
self._keep = deque()
|
self._keep = deque()
|
||||||
|
|
||||||
|
def init_process(self):
|
||||||
|
self.tpool = futures.ThreadPoolExecutor(max_workers=self.cfg.threads)
|
||||||
|
self.poller = selectors.DefaultSelector()
|
||||||
|
self._lock = RLock()
|
||||||
|
super(ThreadWorker, self).init_process()
|
||||||
|
|
||||||
def _wrap_future(self, fs, conn):
|
def _wrap_future(self, fs, conn):
|
||||||
fs.conn = conn
|
fs.conn = conn
|
||||||
self.futures.append(fs)
|
self.futures.append(fs)
|
||||||
fs.add_done_callback(self.finish_request)
|
fs.add_done_callback(self.finish_request)
|
||||||
|
|
||||||
def init_process(self):
|
def enqueue_req(self, conn):
|
||||||
self.tpool = futures.ThreadPoolExecutor(max_workers=self.cfg.threads)
|
conn.init()
|
||||||
self.poller = selectors.DefaultSelector()
|
# submit the connection to a worker
|
||||||
super(ThreadWorker, self).init_process()
|
fs = self.tpool.submit(self.handle, conn)
|
||||||
|
self._wrap_future(fs, conn)
|
||||||
|
|
||||||
def accept(self, listener):
|
def accept(self, listener):
|
||||||
if not self.alive:
|
|
||||||
return
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
client, addr = listener.accept()
|
client, addr = listener.accept()
|
||||||
|
# initialize the connection object
|
||||||
conn = TConn(self.cfg, listener, client, addr)
|
conn = TConn(self.cfg, listener, client, addr)
|
||||||
|
self.nr += 1
|
||||||
# wait for the read event to handle the connection
|
# enqueue the job
|
||||||
self.poller.register(client, selectors.EVENT_READ,
|
self.enqueue_req(conn)
|
||||||
partial(self.handle_client, conn))
|
|
||||||
|
|
||||||
except socket.error as e:
|
except socket.error as e:
|
||||||
if e.args[0] not in (errno.EAGAIN,
|
if e.args[0] not in (errno.EAGAIN,
|
||||||
errno.ECONNABORTED, errno.EWOULDBLOCK):
|
errno.ECONNABORTED, errno.EWOULDBLOCK):
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def handle_client(self, conn, client):
|
def reuse_connection(self, conn, client):
|
||||||
# unregister the client from the poller
|
with self._lock:
|
||||||
self.poller.unregister(client)
|
# unregister the client from the poller
|
||||||
|
self.poller.unregister(client)
|
||||||
|
# remove the connection from keepalive
|
||||||
|
try:
|
||||||
|
self._keep.remove(conn)
|
||||||
|
except ValueError:
|
||||||
|
# race condition
|
||||||
|
return
|
||||||
|
|
||||||
# submit the connection to a worker
|
# submit the connection to a worker
|
||||||
fs = self.tpool.submit(self.handle, conn)
|
self.enqueue_req(conn)
|
||||||
self._wrap_future(fs, conn)
|
|
||||||
|
|
||||||
def murder_keepalived(self):
|
def murder_keepalived(self):
|
||||||
now = time.time()
|
now = time.time()
|
||||||
while True:
|
while True:
|
||||||
try:
|
with self._lock:
|
||||||
# remove the connection from the queue
|
try:
|
||||||
conn = self._keep.popleft()
|
# remove the connection from the queue
|
||||||
except IndexError:
|
conn = self._keep.popleft()
|
||||||
break
|
except IndexError:
|
||||||
|
break
|
||||||
|
|
||||||
delta = conn.timeout - now
|
delta = conn.timeout - now
|
||||||
if delta > 0:
|
if delta > 0:
|
||||||
# add the connection back to the queue
|
# add the connection back to the queue
|
||||||
self._keep.appendleft(conn)
|
with self._lock:
|
||||||
|
self._keep.appendleft(conn)
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
|
self.nr -= 1
|
||||||
# remove the socket from the poller
|
# remove the socket from the poller
|
||||||
self.poller.unregister(conn.sock)
|
with self._lock:
|
||||||
|
try:
|
||||||
|
self.poller.unregister(conn.sock)
|
||||||
|
except socket.error as e:
|
||||||
|
if e.args[0] != errno.EBADF:
|
||||||
|
raise
|
||||||
|
|
||||||
# close the socket
|
# close the socket
|
||||||
util.close(conn.sock)
|
conn.close()
|
||||||
|
|
||||||
|
def is_parent_alive(self):
|
||||||
|
# If our parent changed then we shut down.
|
||||||
|
if self.ppid != os.getppid():
|
||||||
|
self.log.info("Parent changed, shutting down: %s", self)
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
# init listeners, add them to the event loop
|
# init listeners, add them to the event loop
|
||||||
@ -155,55 +183,46 @@ class ThreadWorker(base.Worker):
|
|||||||
timeout = self.cfg.timeout or 0.5
|
timeout = self.cfg.timeout or 0.5
|
||||||
|
|
||||||
while self.alive:
|
while self.alive:
|
||||||
# If our parent changed then we shut down.
|
|
||||||
if self.ppid != os.getppid():
|
|
||||||
self.log.info("Parent changed, shutting down: %s", self)
|
|
||||||
return
|
|
||||||
|
|
||||||
# notify the arbiter we are alive
|
# notify the arbiter we are alive
|
||||||
self.notify()
|
self.notify()
|
||||||
|
|
||||||
events = self.poller.select(0.2)
|
# can we accept more connections?
|
||||||
for key, mask in events:
|
if self.nr < self.worker_connections:
|
||||||
callback = key.data
|
# wait for an event
|
||||||
callback(key.fileobj)
|
events = self.poller.select(0.02)
|
||||||
|
for key, mask in events:
|
||||||
|
callback = key.data
|
||||||
|
callback(key.fileobj)
|
||||||
|
|
||||||
|
if not self.is_parent_alive():
|
||||||
|
break
|
||||||
|
|
||||||
# hanle keepalive timeouts
|
# hanle keepalive timeouts
|
||||||
self.murder_keepalived()
|
self.murder_keepalived()
|
||||||
|
|
||||||
# if we more connections than the max number of connections
|
# if the number of connections is < to the max we can handle at
|
||||||
# accepted on a worker, wait until some complete or exit.
|
# the same time there is no need to wait for one
|
||||||
if len(self.futures) >= self.worker_connections:
|
if len(self.futures) < self.cfg.threads:
|
||||||
res = futures.wait(self.futures, timeout=timeout)
|
continue
|
||||||
if not res:
|
|
||||||
self.alive = False
|
|
||||||
self.log.info("max requests achieved")
|
|
||||||
break
|
|
||||||
|
|
||||||
# shutdown the pool
|
result = futures.wait(self.futures, timeout=timeout,
|
||||||
self.poller.close()
|
return_when=futures.FIRST_COMPLETED)
|
||||||
self.tpool.shutdown(False)
|
|
||||||
|
|
||||||
# wait for the workers
|
if not result.done:
|
||||||
futures.wait(self.futures, timeout=self.cfg.graceful_timeout)
|
|
||||||
|
|
||||||
# if we have still fures running, try to close them
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
fs = self.futures.popleft()
|
|
||||||
except IndexError:
|
|
||||||
break
|
break
|
||||||
|
|
||||||
sock = fs.conn.sock
|
else:
|
||||||
|
[self.futures.remove(f) for f in result.done]
|
||||||
|
|
||||||
# the future is not running, cancel it
|
self.tpool.shutdown(False)
|
||||||
if not fs.done() and not fs.running():
|
self.poller.close()
|
||||||
fs.cancel()
|
|
||||||
|
|
||||||
# make sure we close the sockets after the graceful timeout
|
|
||||||
util.close(sock)
|
|
||||||
|
|
||||||
def finish_request(self, fs):
|
def finish_request(self, fs):
|
||||||
|
if fs.cancelled():
|
||||||
|
fs.conn.close()
|
||||||
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
(keepalive, conn) = fs.result()
|
(keepalive, conn) = fs.result()
|
||||||
# if the connection should be kept alived add it
|
# if the connection should be kept alived add it
|
||||||
@ -214,32 +233,22 @@ class ThreadWorker(base.Worker):
|
|||||||
|
|
||||||
# register the connection
|
# register the connection
|
||||||
conn.set_timeout()
|
conn.set_timeout()
|
||||||
self._keep.append(conn)
|
with self._lock:
|
||||||
|
self._keep.append(conn)
|
||||||
|
|
||||||
# add the socket to the event loop
|
# add the socket to the event loop
|
||||||
self.poller.register(conn.sock, selectors.EVENT_READ,
|
self.poller.register(conn.sock, selectors.EVENT_READ,
|
||||||
partial(self.handle_client, conn))
|
partial(self.reuse_connection, conn))
|
||||||
else:
|
else:
|
||||||
util.close(conn.sock)
|
self.nr -= 1
|
||||||
|
conn.close()
|
||||||
except:
|
except:
|
||||||
# an exception happened, make sure to close the
|
# an exception happened, make sure to close the
|
||||||
# socket.
|
# socket.
|
||||||
util.close(fs.conn.sock)
|
self.nr -= 1
|
||||||
finally:
|
fs.conn.close()
|
||||||
# remove the future from our list
|
|
||||||
try:
|
|
||||||
self.futures.remove(fs)
|
|
||||||
except ValueError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
def handle(self, conn):
|
def handle(self, conn):
|
||||||
if not conn.init():
|
|
||||||
# connection kept alive
|
|
||||||
try:
|
|
||||||
self._keep.remove(conn)
|
|
||||||
except ValueError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
keepalive = False
|
keepalive = False
|
||||||
req = None
|
req = None
|
||||||
try:
|
try:
|
||||||
@ -287,8 +296,6 @@ class ThreadWorker(base.Worker):
|
|||||||
conn.listener.getsockname(), self.cfg)
|
conn.listener.getsockname(), self.cfg)
|
||||||
environ["wsgi.multithread"] = True
|
environ["wsgi.multithread"] = True
|
||||||
|
|
||||||
self.nr += 1
|
|
||||||
|
|
||||||
if self.alive and self.nr >= self.max_requests:
|
if self.alive and self.nr >= self.max_requests:
|
||||||
self.log.info("Autorestarting worker after current request.")
|
self.log.info("Autorestarting worker after current request.")
|
||||||
resp.force_close()
|
resp.force_close()
|
||||||
@ -296,6 +303,8 @@ class ThreadWorker(base.Worker):
|
|||||||
|
|
||||||
if not self.cfg.keepalive:
|
if not self.cfg.keepalive:
|
||||||
resp.force_close()
|
resp.force_close()
|
||||||
|
elif len(self._keep) >= self.max_keepalived:
|
||||||
|
resp.force_close()
|
||||||
|
|
||||||
respiter = self.wsgi(environ, resp.start_response)
|
respiter = self.wsgi(environ, resp.start_response)
|
||||||
try:
|
try:
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user