Merge pull request #3519 from benoitc/fix/gthread-slow-client-resilience

fix(gthread): prevent thread pool exhaustion from slow clients
This commit is contained in:
Benoit Chesneau 2026-02-20 10:18:08 +01:00 committed by GitHub
commit cef6b337d8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 311 additions and 4 deletions

View File

@ -30,6 +30,15 @@ from .. import sock
from ..http import wsgi from ..http import wsgi
# Sentinel value to indicate connection should be deferred back to poller
_DEFER = object()
# Default timeout (in seconds) for waiting for request data in worker thread.
# If no data arrives within this timeout, the connection is deferred back to
# the main poller to prevent thread pool exhaustion from slow clients.
DEFAULT_WORKER_DATA_TIMEOUT = 5.0
class TConn: class TConn:
def __init__(self, cfg, sock, client, server): def __init__(self, cfg, sock, client, server):
@ -42,6 +51,8 @@ class TConn:
self.parser = None self.parser = None
self.initialized = False self.initialized = False
self.is_http2 = False self.is_http2 = False
# Track if we've already waited for data (to avoid waiting again after defer)
self.data_ready = False
# set the socket to non blocking # set the socket to non blocking
self.sock.setblocking(False) self.sock.setblocking(False)
@ -79,6 +90,37 @@ class TConn:
# Use monotonic clock for reliability (time.time() can jump due to NTP) # Use monotonic clock for reliability (time.time() can jump due to NTP)
self.timeout = time.monotonic() + self.cfg.keepalive self.timeout = time.monotonic() + self.cfg.keepalive
def wait_for_data(self, timeout):
"""Wait for data to be available on the socket.
Uses selectors to wait for the socket to become readable within
the given timeout. This prevents slow clients from blocking
thread pool slots indefinitely.
Args:
timeout: Maximum time to wait in seconds.
Returns:
True if data is available, False if timeout expired.
"""
if self.data_ready:
return True
# Use a temporary selector to wait for data
sel = selectors.DefaultSelector()
try:
sel.register(self.sock, selectors.EVENT_READ)
events = sel.select(timeout=timeout)
if events:
self.data_ready = True
return True
return False
except (OSError, ValueError):
# Socket closed or invalid
return False
finally:
sel.close()
def close(self): def close(self):
util.close(self.sock) util.close(self.sock)
@ -178,6 +220,8 @@ class ThreadWorker(base.Worker):
self.poller = None self.poller = None
self.method_queue = PollableMethodQueue() self.method_queue = PollableMethodQueue()
self.keepalived_conns = deque() self.keepalived_conns = deque()
# Connections waiting for data (deferred from thread pool)
self.pending_conns = deque()
self.nr_conns = 0 self.nr_conns = 0
self._accepting = False self._accepting = False
@ -254,6 +298,17 @@ class ThreadWorker(base.Worker):
# Submit to thread pool for processing # Submit to thread pool for processing
self.enqueue_req(conn) self.enqueue_req(conn)
def on_pending_socket_readable(self, conn, client):
"""Handle a pending (deferred) connection becoming readable."""
self.poller.unregister(client)
self.pending_conns.remove(conn)
# Mark data as ready so we don't wait again in handle()
conn.data_ready = True
# Submit to thread pool for processing
self.enqueue_req(conn)
def murder_keepalived(self): def murder_keepalived(self):
"""Close expired keepalive connections.""" """Close expired keepalive connections."""
now = time.monotonic() now = time.monotonic()
@ -272,6 +327,24 @@ class ThreadWorker(base.Worker):
self.nr_conns -= 1 self.nr_conns -= 1
conn.close() conn.close()
def murder_pending(self):
"""Close expired pending connections (waiting for initial data)."""
now = time.monotonic()
while self.pending_conns:
conn = self.pending_conns[0]
delta = conn.timeout - now
if delta > 0:
break
# Connection has timed out waiting for data
self.pending_conns.popleft()
try:
self.poller.unregister(conn.sock)
except (OSError, KeyError, ValueError):
pass # Already unregistered
self.nr_conns -= 1
conn.close()
def is_parent_alive(self): def is_parent_alive(self):
# If our parent changed then we shut down. # If our parent changed then we shut down.
if self.ppid != os.getppid(): if self.ppid != os.getppid():
@ -314,8 +387,9 @@ class ThreadWorker(base.Worker):
if not self.is_parent_alive(): if not self.is_parent_alive():
break break
# Handle keepalive timeouts # Handle keepalive and pending connection timeouts
self.murder_keepalived() self.murder_keepalived()
self.murder_pending()
# Graceful shutdown: stop accepting but handle existing connections # Graceful shutdown: stop accepting but handle existing connections
self.set_accept_enabled(False) self.set_accept_enabled(False)
@ -328,6 +402,7 @@ class ThreadWorker(base.Worker):
break break
self.wait_for_and_dispatch_events(timeout=time_remaining) self.wait_for_and_dispatch_events(timeout=time_remaining)
self.murder_keepalived() self.murder_keepalived()
self.murder_pending()
# Cleanup # Cleanup
self.tpool.shutdown(wait=False) self.tpool.shutdown(wait=False)
@ -340,9 +415,19 @@ class ThreadWorker(base.Worker):
def finish_request(self, conn, fs): def finish_request(self, conn, fs):
"""Handle completion of a request (called via method_queue on main thread).""" """Handle completion of a request (called via method_queue on main thread)."""
try: try:
keepalive = not fs.cancelled() and fs.result() result = fs.result() if not fs.cancelled() else False
if keepalive and self.alive:
# Put connection back in the poller for keepalive if result is _DEFER and self.alive:
# Connection deferred - no data arrived within timeout.
# Put it on the poller to wait for data without consuming a thread.
conn.sock.setblocking(False)
# Use keepalive timeout for pending connections too
conn.timeout = time.monotonic() + self.cfg.keepalive
self.pending_conns.append(conn)
self.poller.register(conn.sock, selectors.EVENT_READ,
partial(self.on_pending_socket_readable, conn))
elif result and self.alive:
# Keepalive - put connection back in the poller
conn.sock.setblocking(False) conn.sock.setblocking(False)
conn.set_timeout() conn.set_timeout()
self.keepalived_conns.append(conn) self.keepalived_conns.append(conn)
@ -359,6 +444,16 @@ class ThreadWorker(base.Worker):
"""Handle a request on a connection. Runs in a worker thread.""" """Handle a request on a connection. Runs in a worker thread."""
req = None req = None
try: try:
# For new connections (not yet initialized), wait for data with timeout
# to prevent slow clients from blocking thread pool slots indefinitely.
# Skip this for already-initialized connections (keepalive, deferred)
# since they're coming from the poller and data is already available.
if not conn.initialized and not conn.data_ready:
# Wait for data with timeout before committing this thread
if not conn.wait_for_data(DEFAULT_WORKER_DATA_TIMEOUT):
# No data within timeout - defer to poller
return _DEFER
# Always ensure blocking mode in worker thread. # Always ensure blocking mode in worker thread.
# Critical for keepalive connections: the socket is set to non-blocking # Critical for keepalive connections: the socket is set to non-blocking
# for the selector in finish_request(), but must be blocking for # for the selector in finish_request(), but must be blocking for

View File

@ -1564,3 +1564,215 @@ class TestHTTP2TrailerCallback:
send_trailers_h2([]) send_trailers_h2([])
assert len(pending_trailers) == 0 assert len(pending_trailers) == 0
class TestSlowClientResilience:
"""Tests for slow client handling to prevent thread pool exhaustion."""
def create_worker(self, cfg=None):
"""Helper to create a ThreadWorker for testing."""
if cfg is None:
cfg = Config()
cfg.set('threads', 4)
cfg.set('worker_connections', 1000)
cfg.set('keepalive', 5)
worker = gthread.ThreadWorker(
age=1,
ppid=os.getpid(),
sockets=[],
app=mock.Mock(),
timeout=30,
cfg=cfg,
log=mock.Mock(),
)
return worker
def test_tconn_wait_for_data_returns_true_when_ready(self):
"""Test wait_for_data returns True when data_ready is already set."""
cfg = Config()
sock = FakeSocket()
conn = gthread.TConn(cfg, sock, ('127.0.0.1', 12345), ('127.0.0.1', 8000))
conn.data_ready = True
# Should return True immediately without waiting
assert conn.wait_for_data(5.0) is True
def test_tconn_wait_for_data_sets_data_ready(self):
"""Test wait_for_data sets data_ready flag when data arrives."""
import socket as stdlib_socket
# Create a real socket pair to test selector behavior
server, client = stdlib_socket.socketpair()
try:
cfg = Config()
conn = gthread.TConn(cfg, server, ('127.0.0.1', 12345), ('127.0.0.1', 8000))
conn.data_ready = False
# Send data from client
client.send(b'GET / HTTP/1.1\r\n')
# Should detect data is ready
result = conn.wait_for_data(1.0)
assert result is True
assert conn.data_ready is True
finally:
server.close()
client.close()
def test_tconn_wait_for_data_timeout(self):
"""Test wait_for_data returns False on timeout."""
import socket as stdlib_socket
# Create a real socket pair but don't send any data
server, client = stdlib_socket.socketpair()
try:
cfg = Config()
conn = gthread.TConn(cfg, server, ('127.0.0.1', 12345), ('127.0.0.1', 8000))
conn.data_ready = False
# Don't send any data - should timeout
start = time.monotonic()
result = conn.wait_for_data(0.1) # Short timeout
elapsed = time.monotonic() - start
assert result is False
assert conn.data_ready is False
assert elapsed >= 0.1
finally:
server.close()
client.close()
def test_finish_request_handles_defer(self):
"""Test finish_request puts deferred connections back on poller."""
worker = self.create_worker()
worker.poller = mock.Mock()
worker.pending_conns = deque()
worker.nr_conns = 1
worker.alive = True
sock = FakeSocket()
conn = gthread.TConn(worker.cfg, sock, ('127.0.0.1', 12345), ('127.0.0.1', 8000))
# Create a future that returns _DEFER
fs = mock.Mock()
fs.cancelled.return_value = False
fs.result.return_value = gthread._DEFER
worker.finish_request(conn, fs)
# Connection should be in pending_conns, not closed
assert len(worker.pending_conns) == 1
assert worker.pending_conns[0] is conn
assert worker.nr_conns == 1 # Still counted
assert not sock.closed
# Should be registered with poller
worker.poller.register.assert_called_once()
def test_on_pending_socket_readable_sets_data_ready(self):
"""Test on_pending_socket_readable marks connection data as ready."""
worker = self.create_worker()
worker.poller = mock.Mock()
worker.tpool = mock.Mock()
worker.method_queue = mock.Mock()
worker.pending_conns = deque()
sock = FakeSocket()
conn = gthread.TConn(worker.cfg, sock, ('127.0.0.1', 12345), ('127.0.0.1', 8000))
conn.data_ready = False
worker.pending_conns.append(conn)
# Simulate socket becoming readable
worker.on_pending_socket_readable(conn, sock)
assert conn.data_ready is True
assert conn not in worker.pending_conns
worker.poller.unregister.assert_called_once_with(sock)
worker.tpool.submit.assert_called_once()
def test_murder_pending_closes_expired_connections(self):
"""Test murder_pending closes connections that have timed out."""
worker = self.create_worker()
worker.poller = mock.Mock()
worker.pending_conns = deque()
worker.nr_conns = 2
# Create two connections, one expired, one not
sock1 = FakeSocket()
conn1 = gthread.TConn(worker.cfg, sock1, ('127.0.0.1', 12345), ('127.0.0.1', 8000))
conn1.timeout = time.monotonic() - 10 # Expired
sock2 = FakeSocket()
conn2 = gthread.TConn(worker.cfg, sock2, ('127.0.0.1', 12346), ('127.0.0.1', 8000))
conn2.timeout = time.monotonic() + 100 # Not expired
worker.pending_conns.append(conn1)
worker.pending_conns.append(conn2)
worker.murder_pending()
# Only expired connection should be closed
assert sock1.closed
assert not sock2.closed
assert len(worker.pending_conns) == 1
assert worker.pending_conns[0] is conn2
assert worker.nr_conns == 1
def test_handle_defers_slow_connection(self):
"""Test that handle() returns _DEFER for connections without data."""
worker = self.create_worker()
# Create a connection that will timeout waiting for data
sock = mock.Mock()
conn = gthread.TConn(worker.cfg, sock, ('127.0.0.1', 12345), ('127.0.0.1', 8000))
conn.initialized = False
conn.data_ready = False
# Mock wait_for_data to simulate timeout
conn.wait_for_data = mock.Mock(return_value=False)
result = worker.handle(conn)
assert result is gthread._DEFER
conn.wait_for_data.assert_called_once()
def test_handle_processes_fast_connection(self):
"""Test that handle() processes connections with data immediately."""
worker = self.create_worker()
worker.wsgi = mock.Mock(return_value=[b'OK'])
# Create a connection with data ready
sock = mock.Mock()
conn = gthread.TConn(worker.cfg, sock, ('127.0.0.1', 12345), ('127.0.0.1', 8000))
conn.initialized = False
conn.data_ready = True # Data is ready
# Mock init and parser
conn.init = mock.Mock()
conn.parser = mock.Mock()
conn.parser.__next__ = mock.Mock(return_value=None) # No request parsed
result = worker.handle(conn)
# Should not return _DEFER since data was ready
assert result is not gthread._DEFER
conn.init.assert_called_once()
def test_handle_skips_wait_for_initialized_connections(self):
"""Test handle() skips wait_for_data for already initialized (keepalive) connections."""
worker = self.create_worker()
worker.wsgi = mock.Mock(return_value=[b'OK'])
sock = mock.Mock()
conn = gthread.TConn(worker.cfg, sock, ('127.0.0.1', 12345), ('127.0.0.1', 8000))
conn.initialized = True # Already initialized (keepalive)
conn.data_ready = False
conn.wait_for_data = mock.Mock()
conn.parser = mock.Mock()
conn.parser.__next__ = mock.Mock(return_value=None)
worker.handle(conn)
# wait_for_data should not be called for initialized connections
conn.wait_for_data.assert_not_called()