mirror of
https://github.com/frappe/gunicorn.git
synced 2026-07-03 11:11:30 +08:00
fix(gthread): prevent thread pool exhaustion from slow clients
Add a timeout when waiting for initial request data in worker threads. If no data arrives within 5 seconds, the connection is deferred back to the main poller instead of blocking the thread indefinitely. This fixes a regression from v24 where connections were submitted directly to the thread pool after accept(). In v23, connections were registered with the poller first and only submitted when data arrived. The new hybrid approach maintains the performance benefits for fast clients (immediate processing) while protecting against slow-client scenarios that can exhaust the thread pool and cause health check timeouts. Changes: - Add _DEFER sentinel and DEFAULT_WORKER_DATA_TIMEOUT constant - Add TConn.wait_for_data() method using selectors - Add TConn.data_ready flag to track data availability - Add pending_conns deque for deferred connections - Add on_pending_socket_readable() callback - Add murder_pending() to clean up timed-out pending connections - Modify handle() to wait for data with timeout before processing - Modify finish_request() to handle _DEFER and register with poller Fixes #3518
This commit is contained in:
parent
2d4310116d
commit
b5f127e99b
@ -30,6 +30,15 @@ from .. import sock
|
||||
from ..http import wsgi
|
||||
|
||||
|
||||
# Sentinel value to indicate connection should be deferred back to poller
|
||||
_DEFER = object()
|
||||
|
||||
# Default timeout (in seconds) for waiting for request data in worker thread.
|
||||
# If no data arrives within this timeout, the connection is deferred back to
|
||||
# the main poller to prevent thread pool exhaustion from slow clients.
|
||||
DEFAULT_WORKER_DATA_TIMEOUT = 5.0
|
||||
|
||||
|
||||
class TConn:
|
||||
|
||||
def __init__(self, cfg, sock, client, server):
|
||||
@ -42,6 +51,8 @@ class TConn:
|
||||
self.parser = None
|
||||
self.initialized = False
|
||||
self.is_http2 = False
|
||||
# Track if we've already waited for data (to avoid waiting again after defer)
|
||||
self.data_ready = False
|
||||
|
||||
# set the socket to non blocking
|
||||
self.sock.setblocking(False)
|
||||
@ -79,6 +90,37 @@ class TConn:
|
||||
# Use monotonic clock for reliability (time.time() can jump due to NTP)
|
||||
self.timeout = time.monotonic() + self.cfg.keepalive
|
||||
|
||||
def wait_for_data(self, timeout):
|
||||
"""Wait for data to be available on the socket.
|
||||
|
||||
Uses selectors to wait for the socket to become readable within
|
||||
the given timeout. This prevents slow clients from blocking
|
||||
thread pool slots indefinitely.
|
||||
|
||||
Args:
|
||||
timeout: Maximum time to wait in seconds.
|
||||
|
||||
Returns:
|
||||
True if data is available, False if timeout expired.
|
||||
"""
|
||||
if self.data_ready:
|
||||
return True
|
||||
|
||||
# Use a temporary selector to wait for data
|
||||
sel = selectors.DefaultSelector()
|
||||
try:
|
||||
sel.register(self.sock, selectors.EVENT_READ)
|
||||
events = sel.select(timeout=timeout)
|
||||
if events:
|
||||
self.data_ready = True
|
||||
return True
|
||||
return False
|
||||
except (OSError, ValueError):
|
||||
# Socket closed or invalid
|
||||
return False
|
||||
finally:
|
||||
sel.close()
|
||||
|
||||
def close(self):
|
||||
util.close(self.sock)
|
||||
|
||||
@ -178,6 +220,8 @@ class ThreadWorker(base.Worker):
|
||||
self.poller = None
|
||||
self.method_queue = PollableMethodQueue()
|
||||
self.keepalived_conns = deque()
|
||||
# Connections waiting for data (deferred from thread pool)
|
||||
self.pending_conns = deque()
|
||||
self.nr_conns = 0
|
||||
self._accepting = False
|
||||
|
||||
@ -254,6 +298,17 @@ class ThreadWorker(base.Worker):
|
||||
# Submit to thread pool for processing
|
||||
self.enqueue_req(conn)
|
||||
|
||||
def on_pending_socket_readable(self, conn, client):
|
||||
"""Handle a pending (deferred) connection becoming readable."""
|
||||
self.poller.unregister(client)
|
||||
self.pending_conns.remove(conn)
|
||||
|
||||
# Mark data as ready so we don't wait again in handle()
|
||||
conn.data_ready = True
|
||||
|
||||
# Submit to thread pool for processing
|
||||
self.enqueue_req(conn)
|
||||
|
||||
def murder_keepalived(self):
|
||||
"""Close expired keepalive connections."""
|
||||
now = time.monotonic()
|
||||
@ -272,6 +327,24 @@ class ThreadWorker(base.Worker):
|
||||
self.nr_conns -= 1
|
||||
conn.close()
|
||||
|
||||
def murder_pending(self):
|
||||
"""Close expired pending connections (waiting for initial data)."""
|
||||
now = time.monotonic()
|
||||
while self.pending_conns:
|
||||
conn = self.pending_conns[0]
|
||||
delta = conn.timeout - now
|
||||
if delta > 0:
|
||||
break
|
||||
|
||||
# Connection has timed out waiting for data
|
||||
self.pending_conns.popleft()
|
||||
try:
|
||||
self.poller.unregister(conn.sock)
|
||||
except (OSError, KeyError, ValueError):
|
||||
pass # Already unregistered
|
||||
self.nr_conns -= 1
|
||||
conn.close()
|
||||
|
||||
def is_parent_alive(self):
|
||||
# If our parent changed then we shut down.
|
||||
if self.ppid != os.getppid():
|
||||
@ -314,8 +387,9 @@ class ThreadWorker(base.Worker):
|
||||
if not self.is_parent_alive():
|
||||
break
|
||||
|
||||
# Handle keepalive timeouts
|
||||
# Handle keepalive and pending connection timeouts
|
||||
self.murder_keepalived()
|
||||
self.murder_pending()
|
||||
|
||||
# Graceful shutdown: stop accepting but handle existing connections
|
||||
self.set_accept_enabled(False)
|
||||
@ -328,6 +402,7 @@ class ThreadWorker(base.Worker):
|
||||
break
|
||||
self.wait_for_and_dispatch_events(timeout=time_remaining)
|
||||
self.murder_keepalived()
|
||||
self.murder_pending()
|
||||
|
||||
# Cleanup
|
||||
self.tpool.shutdown(wait=False)
|
||||
@ -340,9 +415,19 @@ class ThreadWorker(base.Worker):
|
||||
def finish_request(self, conn, fs):
|
||||
"""Handle completion of a request (called via method_queue on main thread)."""
|
||||
try:
|
||||
keepalive = not fs.cancelled() and fs.result()
|
||||
if keepalive and self.alive:
|
||||
# Put connection back in the poller for keepalive
|
||||
result = fs.result() if not fs.cancelled() else False
|
||||
|
||||
if result is _DEFER and self.alive:
|
||||
# Connection deferred - no data arrived within timeout.
|
||||
# Put it on the poller to wait for data without consuming a thread.
|
||||
conn.sock.setblocking(False)
|
||||
# Use keepalive timeout for pending connections too
|
||||
conn.timeout = time.monotonic() + self.cfg.keepalive
|
||||
self.pending_conns.append(conn)
|
||||
self.poller.register(conn.sock, selectors.EVENT_READ,
|
||||
partial(self.on_pending_socket_readable, conn))
|
||||
elif result and self.alive:
|
||||
# Keepalive - put connection back in the poller
|
||||
conn.sock.setblocking(False)
|
||||
conn.set_timeout()
|
||||
self.keepalived_conns.append(conn)
|
||||
@ -359,6 +444,16 @@ class ThreadWorker(base.Worker):
|
||||
"""Handle a request on a connection. Runs in a worker thread."""
|
||||
req = None
|
||||
try:
|
||||
# For new connections (not yet initialized), wait for data with timeout
|
||||
# to prevent slow clients from blocking thread pool slots indefinitely.
|
||||
# Skip this for already-initialized connections (keepalive, deferred)
|
||||
# since they're coming from the poller and data is already available.
|
||||
if not conn.initialized and not conn.data_ready:
|
||||
# Wait for data with timeout before committing this thread
|
||||
if not conn.wait_for_data(DEFAULT_WORKER_DATA_TIMEOUT):
|
||||
# No data within timeout - defer to poller
|
||||
return _DEFER
|
||||
|
||||
# Always ensure blocking mode in worker thread.
|
||||
# Critical for keepalive connections: the socket is set to non-blocking
|
||||
# for the selector in finish_request(), but must be blocking for
|
||||
|
||||
@ -1564,3 +1564,215 @@ class TestHTTP2TrailerCallback:
|
||||
send_trailers_h2([])
|
||||
|
||||
assert len(pending_trailers) == 0
|
||||
|
||||
|
||||
class TestSlowClientResilience:
|
||||
"""Tests for slow client handling to prevent thread pool exhaustion."""
|
||||
|
||||
def create_worker(self, cfg=None):
|
||||
"""Helper to create a ThreadWorker for testing."""
|
||||
if cfg is None:
|
||||
cfg = Config()
|
||||
cfg.set('threads', 4)
|
||||
cfg.set('worker_connections', 1000)
|
||||
cfg.set('keepalive', 5)
|
||||
|
||||
worker = gthread.ThreadWorker(
|
||||
age=1,
|
||||
ppid=os.getpid(),
|
||||
sockets=[],
|
||||
app=mock.Mock(),
|
||||
timeout=30,
|
||||
cfg=cfg,
|
||||
log=mock.Mock(),
|
||||
)
|
||||
return worker
|
||||
|
||||
def test_tconn_wait_for_data_returns_true_when_ready(self):
|
||||
"""Test wait_for_data returns True when data_ready is already set."""
|
||||
cfg = Config()
|
||||
sock = FakeSocket()
|
||||
conn = gthread.TConn(cfg, sock, ('127.0.0.1', 12345), ('127.0.0.1', 8000))
|
||||
conn.data_ready = True
|
||||
|
||||
# Should return True immediately without waiting
|
||||
assert conn.wait_for_data(5.0) is True
|
||||
|
||||
def test_tconn_wait_for_data_sets_data_ready(self):
|
||||
"""Test wait_for_data sets data_ready flag when data arrives."""
|
||||
import socket as stdlib_socket
|
||||
# Create a real socket pair to test selector behavior
|
||||
server, client = stdlib_socket.socketpair()
|
||||
try:
|
||||
cfg = Config()
|
||||
conn = gthread.TConn(cfg, server, ('127.0.0.1', 12345), ('127.0.0.1', 8000))
|
||||
conn.data_ready = False
|
||||
|
||||
# Send data from client
|
||||
client.send(b'GET / HTTP/1.1\r\n')
|
||||
|
||||
# Should detect data is ready
|
||||
result = conn.wait_for_data(1.0)
|
||||
|
||||
assert result is True
|
||||
assert conn.data_ready is True
|
||||
finally:
|
||||
server.close()
|
||||
client.close()
|
||||
|
||||
def test_tconn_wait_for_data_timeout(self):
|
||||
"""Test wait_for_data returns False on timeout."""
|
||||
import socket as stdlib_socket
|
||||
# Create a real socket pair but don't send any data
|
||||
server, client = stdlib_socket.socketpair()
|
||||
try:
|
||||
cfg = Config()
|
||||
conn = gthread.TConn(cfg, server, ('127.0.0.1', 12345), ('127.0.0.1', 8000))
|
||||
conn.data_ready = False
|
||||
|
||||
# Don't send any data - should timeout
|
||||
start = time.monotonic()
|
||||
result = conn.wait_for_data(0.1) # Short timeout
|
||||
elapsed = time.monotonic() - start
|
||||
|
||||
assert result is False
|
||||
assert conn.data_ready is False
|
||||
assert elapsed >= 0.1
|
||||
finally:
|
||||
server.close()
|
||||
client.close()
|
||||
|
||||
def test_finish_request_handles_defer(self):
|
||||
"""Test finish_request puts deferred connections back on poller."""
|
||||
worker = self.create_worker()
|
||||
worker.poller = mock.Mock()
|
||||
worker.pending_conns = deque()
|
||||
worker.nr_conns = 1
|
||||
worker.alive = True
|
||||
|
||||
sock = FakeSocket()
|
||||
conn = gthread.TConn(worker.cfg, sock, ('127.0.0.1', 12345), ('127.0.0.1', 8000))
|
||||
|
||||
# Create a future that returns _DEFER
|
||||
fs = mock.Mock()
|
||||
fs.cancelled.return_value = False
|
||||
fs.result.return_value = gthread._DEFER
|
||||
|
||||
worker.finish_request(conn, fs)
|
||||
|
||||
# Connection should be in pending_conns, not closed
|
||||
assert len(worker.pending_conns) == 1
|
||||
assert worker.pending_conns[0] is conn
|
||||
assert worker.nr_conns == 1 # Still counted
|
||||
assert not sock.closed
|
||||
|
||||
# Should be registered with poller
|
||||
worker.poller.register.assert_called_once()
|
||||
|
||||
def test_on_pending_socket_readable_sets_data_ready(self):
|
||||
"""Test on_pending_socket_readable marks connection data as ready."""
|
||||
worker = self.create_worker()
|
||||
worker.poller = mock.Mock()
|
||||
worker.tpool = mock.Mock()
|
||||
worker.method_queue = mock.Mock()
|
||||
worker.pending_conns = deque()
|
||||
|
||||
sock = FakeSocket()
|
||||
conn = gthread.TConn(worker.cfg, sock, ('127.0.0.1', 12345), ('127.0.0.1', 8000))
|
||||
conn.data_ready = False
|
||||
worker.pending_conns.append(conn)
|
||||
|
||||
# Simulate socket becoming readable
|
||||
worker.on_pending_socket_readable(conn, sock)
|
||||
|
||||
assert conn.data_ready is True
|
||||
assert conn not in worker.pending_conns
|
||||
worker.poller.unregister.assert_called_once_with(sock)
|
||||
worker.tpool.submit.assert_called_once()
|
||||
|
||||
def test_murder_pending_closes_expired_connections(self):
|
||||
"""Test murder_pending closes connections that have timed out."""
|
||||
worker = self.create_worker()
|
||||
worker.poller = mock.Mock()
|
||||
worker.pending_conns = deque()
|
||||
worker.nr_conns = 2
|
||||
|
||||
# Create two connections, one expired, one not
|
||||
sock1 = FakeSocket()
|
||||
conn1 = gthread.TConn(worker.cfg, sock1, ('127.0.0.1', 12345), ('127.0.0.1', 8000))
|
||||
conn1.timeout = time.monotonic() - 10 # Expired
|
||||
|
||||
sock2 = FakeSocket()
|
||||
conn2 = gthread.TConn(worker.cfg, sock2, ('127.0.0.1', 12346), ('127.0.0.1', 8000))
|
||||
conn2.timeout = time.monotonic() + 100 # Not expired
|
||||
|
||||
worker.pending_conns.append(conn1)
|
||||
worker.pending_conns.append(conn2)
|
||||
|
||||
worker.murder_pending()
|
||||
|
||||
# Only expired connection should be closed
|
||||
assert sock1.closed
|
||||
assert not sock2.closed
|
||||
assert len(worker.pending_conns) == 1
|
||||
assert worker.pending_conns[0] is conn2
|
||||
assert worker.nr_conns == 1
|
||||
|
||||
def test_handle_defers_slow_connection(self):
|
||||
"""Test that handle() returns _DEFER for connections without data."""
|
||||
worker = self.create_worker()
|
||||
|
||||
# Create a connection that will timeout waiting for data
|
||||
sock = mock.Mock()
|
||||
conn = gthread.TConn(worker.cfg, sock, ('127.0.0.1', 12345), ('127.0.0.1', 8000))
|
||||
conn.initialized = False
|
||||
conn.data_ready = False
|
||||
|
||||
# Mock wait_for_data to simulate timeout
|
||||
conn.wait_for_data = mock.Mock(return_value=False)
|
||||
|
||||
result = worker.handle(conn)
|
||||
|
||||
assert result is gthread._DEFER
|
||||
conn.wait_for_data.assert_called_once()
|
||||
|
||||
def test_handle_processes_fast_connection(self):
|
||||
"""Test that handle() processes connections with data immediately."""
|
||||
worker = self.create_worker()
|
||||
worker.wsgi = mock.Mock(return_value=[b'OK'])
|
||||
|
||||
# Create a connection with data ready
|
||||
sock = mock.Mock()
|
||||
conn = gthread.TConn(worker.cfg, sock, ('127.0.0.1', 12345), ('127.0.0.1', 8000))
|
||||
conn.initialized = False
|
||||
conn.data_ready = True # Data is ready
|
||||
|
||||
# Mock init and parser
|
||||
conn.init = mock.Mock()
|
||||
conn.parser = mock.Mock()
|
||||
conn.parser.__next__ = mock.Mock(return_value=None) # No request parsed
|
||||
|
||||
result = worker.handle(conn)
|
||||
|
||||
# Should not return _DEFER since data was ready
|
||||
assert result is not gthread._DEFER
|
||||
conn.init.assert_called_once()
|
||||
|
||||
def test_handle_skips_wait_for_initialized_connections(self):
|
||||
"""Test handle() skips wait_for_data for already initialized (keepalive) connections."""
|
||||
worker = self.create_worker()
|
||||
worker.wsgi = mock.Mock(return_value=[b'OK'])
|
||||
|
||||
sock = mock.Mock()
|
||||
conn = gthread.TConn(worker.cfg, sock, ('127.0.0.1', 12345), ('127.0.0.1', 8000))
|
||||
conn.initialized = True # Already initialized (keepalive)
|
||||
conn.data_ready = False
|
||||
conn.wait_for_data = mock.Mock()
|
||||
|
||||
conn.parser = mock.Mock()
|
||||
conn.parser.__next__ = mock.Mock(return_value=None)
|
||||
|
||||
worker.handle(conn)
|
||||
|
||||
# wait_for_data should not be called for initialized connections
|
||||
conn.wait_for_data.assert_not_called()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user