diff --git a/gunicorn/workers/gthread.py b/gunicorn/workers/gthread.py index dcd238f9..490d1ad0 100644 --- a/gunicorn/workers/gthread.py +++ b/gunicorn/workers/gthread.py @@ -30,6 +30,15 @@ from .. import sock from ..http import wsgi +# Sentinel value to indicate connection should be deferred back to poller +_DEFER = object() + +# Default timeout (in seconds) for waiting for request data in worker thread. +# If no data arrives within this timeout, the connection is deferred back to +# the main poller to prevent thread pool exhaustion from slow clients. +DEFAULT_WORKER_DATA_TIMEOUT = 5.0 + + class TConn: def __init__(self, cfg, sock, client, server): @@ -42,6 +51,8 @@ class TConn: self.parser = None self.initialized = False self.is_http2 = False + # Track if we've already waited for data (to avoid waiting again after defer) + self.data_ready = False # set the socket to non blocking self.sock.setblocking(False) @@ -79,6 +90,37 @@ class TConn: # Use monotonic clock for reliability (time.time() can jump due to NTP) self.timeout = time.monotonic() + self.cfg.keepalive + def wait_for_data(self, timeout): + """Wait for data to be available on the socket. + + Uses selectors to wait for the socket to become readable within + the given timeout. This prevents slow clients from blocking + thread pool slots indefinitely. + + Args: + timeout: Maximum time to wait in seconds. + + Returns: + True if data is available, False if timeout expired. + """ + if self.data_ready: + return True + + # Use a temporary selector to wait for data + sel = selectors.DefaultSelector() + try: + sel.register(self.sock, selectors.EVENT_READ) + events = sel.select(timeout=timeout) + if events: + self.data_ready = True + return True + return False + except (OSError, ValueError): + # Socket closed or invalid + return False + finally: + sel.close() + def close(self): util.close(self.sock) @@ -178,6 +220,8 @@ class ThreadWorker(base.Worker): self.poller = None self.method_queue = PollableMethodQueue() self.keepalived_conns = deque() + # Connections waiting for data (deferred from thread pool) + self.pending_conns = deque() self.nr_conns = 0 self._accepting = False @@ -254,6 +298,17 @@ class ThreadWorker(base.Worker): # Submit to thread pool for processing self.enqueue_req(conn) + def on_pending_socket_readable(self, conn, client): + """Handle a pending (deferred) connection becoming readable.""" + self.poller.unregister(client) + self.pending_conns.remove(conn) + + # Mark data as ready so we don't wait again in handle() + conn.data_ready = True + + # Submit to thread pool for processing + self.enqueue_req(conn) + def murder_keepalived(self): """Close expired keepalive connections.""" now = time.monotonic() @@ -272,6 +327,24 @@ class ThreadWorker(base.Worker): self.nr_conns -= 1 conn.close() + def murder_pending(self): + """Close expired pending connections (waiting for initial data).""" + now = time.monotonic() + while self.pending_conns: + conn = self.pending_conns[0] + delta = conn.timeout - now + if delta > 0: + break + + # Connection has timed out waiting for data + self.pending_conns.popleft() + try: + self.poller.unregister(conn.sock) + except (OSError, KeyError, ValueError): + pass # Already unregistered + self.nr_conns -= 1 + conn.close() + def is_parent_alive(self): # If our parent changed then we shut down. if self.ppid != os.getppid(): @@ -314,8 +387,9 @@ class ThreadWorker(base.Worker): if not self.is_parent_alive(): break - # Handle keepalive timeouts + # Handle keepalive and pending connection timeouts self.murder_keepalived() + self.murder_pending() # Graceful shutdown: stop accepting but handle existing connections self.set_accept_enabled(False) @@ -328,6 +402,7 @@ class ThreadWorker(base.Worker): break self.wait_for_and_dispatch_events(timeout=time_remaining) self.murder_keepalived() + self.murder_pending() # Cleanup self.tpool.shutdown(wait=False) @@ -340,9 +415,19 @@ class ThreadWorker(base.Worker): def finish_request(self, conn, fs): """Handle completion of a request (called via method_queue on main thread).""" try: - keepalive = not fs.cancelled() and fs.result() - if keepalive and self.alive: - # Put connection back in the poller for keepalive + result = fs.result() if not fs.cancelled() else False + + if result is _DEFER and self.alive: + # Connection deferred - no data arrived within timeout. + # Put it on the poller to wait for data without consuming a thread. + conn.sock.setblocking(False) + # Use keepalive timeout for pending connections too + conn.timeout = time.monotonic() + self.cfg.keepalive + self.pending_conns.append(conn) + self.poller.register(conn.sock, selectors.EVENT_READ, + partial(self.on_pending_socket_readable, conn)) + elif result and self.alive: + # Keepalive - put connection back in the poller conn.sock.setblocking(False) conn.set_timeout() self.keepalived_conns.append(conn) @@ -359,6 +444,16 @@ class ThreadWorker(base.Worker): """Handle a request on a connection. Runs in a worker thread.""" req = None try: + # For new connections (not yet initialized), wait for data with timeout + # to prevent slow clients from blocking thread pool slots indefinitely. + # Skip this for already-initialized connections (keepalive, deferred) + # since they're coming from the poller and data is already available. + if not conn.initialized and not conn.data_ready: + # Wait for data with timeout before committing this thread + if not conn.wait_for_data(DEFAULT_WORKER_DATA_TIMEOUT): + # No data within timeout - defer to poller + return _DEFER + # Always ensure blocking mode in worker thread. # Critical for keepalive connections: the socket is set to non-blocking # for the selector in finish_request(), but must be blocking for diff --git a/tests/test_gthread.py b/tests/test_gthread.py index 130e539a..fbaf3810 100644 --- a/tests/test_gthread.py +++ b/tests/test_gthread.py @@ -1564,3 +1564,215 @@ class TestHTTP2TrailerCallback: send_trailers_h2([]) assert len(pending_trailers) == 0 + + +class TestSlowClientResilience: + """Tests for slow client handling to prevent thread pool exhaustion.""" + + def create_worker(self, cfg=None): + """Helper to create a ThreadWorker for testing.""" + if cfg is None: + cfg = Config() + cfg.set('threads', 4) + cfg.set('worker_connections', 1000) + cfg.set('keepalive', 5) + + worker = gthread.ThreadWorker( + age=1, + ppid=os.getpid(), + sockets=[], + app=mock.Mock(), + timeout=30, + cfg=cfg, + log=mock.Mock(), + ) + return worker + + def test_tconn_wait_for_data_returns_true_when_ready(self): + """Test wait_for_data returns True when data_ready is already set.""" + cfg = Config() + sock = FakeSocket() + conn = gthread.TConn(cfg, sock, ('127.0.0.1', 12345), ('127.0.0.1', 8000)) + conn.data_ready = True + + # Should return True immediately without waiting + assert conn.wait_for_data(5.0) is True + + def test_tconn_wait_for_data_sets_data_ready(self): + """Test wait_for_data sets data_ready flag when data arrives.""" + import socket as stdlib_socket + # Create a real socket pair to test selector behavior + server, client = stdlib_socket.socketpair() + try: + cfg = Config() + conn = gthread.TConn(cfg, server, ('127.0.0.1', 12345), ('127.0.0.1', 8000)) + conn.data_ready = False + + # Send data from client + client.send(b'GET / HTTP/1.1\r\n') + + # Should detect data is ready + result = conn.wait_for_data(1.0) + + assert result is True + assert conn.data_ready is True + finally: + server.close() + client.close() + + def test_tconn_wait_for_data_timeout(self): + """Test wait_for_data returns False on timeout.""" + import socket as stdlib_socket + # Create a real socket pair but don't send any data + server, client = stdlib_socket.socketpair() + try: + cfg = Config() + conn = gthread.TConn(cfg, server, ('127.0.0.1', 12345), ('127.0.0.1', 8000)) + conn.data_ready = False + + # Don't send any data - should timeout + start = time.monotonic() + result = conn.wait_for_data(0.1) # Short timeout + elapsed = time.monotonic() - start + + assert result is False + assert conn.data_ready is False + assert elapsed >= 0.1 + finally: + server.close() + client.close() + + def test_finish_request_handles_defer(self): + """Test finish_request puts deferred connections back on poller.""" + worker = self.create_worker() + worker.poller = mock.Mock() + worker.pending_conns = deque() + worker.nr_conns = 1 + worker.alive = True + + sock = FakeSocket() + conn = gthread.TConn(worker.cfg, sock, ('127.0.0.1', 12345), ('127.0.0.1', 8000)) + + # Create a future that returns _DEFER + fs = mock.Mock() + fs.cancelled.return_value = False + fs.result.return_value = gthread._DEFER + + worker.finish_request(conn, fs) + + # Connection should be in pending_conns, not closed + assert len(worker.pending_conns) == 1 + assert worker.pending_conns[0] is conn + assert worker.nr_conns == 1 # Still counted + assert not sock.closed + + # Should be registered with poller + worker.poller.register.assert_called_once() + + def test_on_pending_socket_readable_sets_data_ready(self): + """Test on_pending_socket_readable marks connection data as ready.""" + worker = self.create_worker() + worker.poller = mock.Mock() + worker.tpool = mock.Mock() + worker.method_queue = mock.Mock() + worker.pending_conns = deque() + + sock = FakeSocket() + conn = gthread.TConn(worker.cfg, sock, ('127.0.0.1', 12345), ('127.0.0.1', 8000)) + conn.data_ready = False + worker.pending_conns.append(conn) + + # Simulate socket becoming readable + worker.on_pending_socket_readable(conn, sock) + + assert conn.data_ready is True + assert conn not in worker.pending_conns + worker.poller.unregister.assert_called_once_with(sock) + worker.tpool.submit.assert_called_once() + + def test_murder_pending_closes_expired_connections(self): + """Test murder_pending closes connections that have timed out.""" + worker = self.create_worker() + worker.poller = mock.Mock() + worker.pending_conns = deque() + worker.nr_conns = 2 + + # Create two connections, one expired, one not + sock1 = FakeSocket() + conn1 = gthread.TConn(worker.cfg, sock1, ('127.0.0.1', 12345), ('127.0.0.1', 8000)) + conn1.timeout = time.monotonic() - 10 # Expired + + sock2 = FakeSocket() + conn2 = gthread.TConn(worker.cfg, sock2, ('127.0.0.1', 12346), ('127.0.0.1', 8000)) + conn2.timeout = time.monotonic() + 100 # Not expired + + worker.pending_conns.append(conn1) + worker.pending_conns.append(conn2) + + worker.murder_pending() + + # Only expired connection should be closed + assert sock1.closed + assert not sock2.closed + assert len(worker.pending_conns) == 1 + assert worker.pending_conns[0] is conn2 + assert worker.nr_conns == 1 + + def test_handle_defers_slow_connection(self): + """Test that handle() returns _DEFER for connections without data.""" + worker = self.create_worker() + + # Create a connection that will timeout waiting for data + sock = mock.Mock() + conn = gthread.TConn(worker.cfg, sock, ('127.0.0.1', 12345), ('127.0.0.1', 8000)) + conn.initialized = False + conn.data_ready = False + + # Mock wait_for_data to simulate timeout + conn.wait_for_data = mock.Mock(return_value=False) + + result = worker.handle(conn) + + assert result is gthread._DEFER + conn.wait_for_data.assert_called_once() + + def test_handle_processes_fast_connection(self): + """Test that handle() processes connections with data immediately.""" + worker = self.create_worker() + worker.wsgi = mock.Mock(return_value=[b'OK']) + + # Create a connection with data ready + sock = mock.Mock() + conn = gthread.TConn(worker.cfg, sock, ('127.0.0.1', 12345), ('127.0.0.1', 8000)) + conn.initialized = False + conn.data_ready = True # Data is ready + + # Mock init and parser + conn.init = mock.Mock() + conn.parser = mock.Mock() + conn.parser.__next__ = mock.Mock(return_value=None) # No request parsed + + result = worker.handle(conn) + + # Should not return _DEFER since data was ready + assert result is not gthread._DEFER + conn.init.assert_called_once() + + def test_handle_skips_wait_for_initialized_connections(self): + """Test handle() skips wait_for_data for already initialized (keepalive) connections.""" + worker = self.create_worker() + worker.wsgi = mock.Mock(return_value=[b'OK']) + + sock = mock.Mock() + conn = gthread.TConn(worker.cfg, sock, ('127.0.0.1', 12345), ('127.0.0.1', 8000)) + conn.initialized = True # Already initialized (keepalive) + conn.data_ready = False + conn.wait_for_data = mock.Mock() + + conn.parser = mock.Mock() + conn.parser.__next__ = mock.Mock(return_value=None) + + worker.handle(conn) + + # wait_for_data should not be called for initialized connections + conn.wait_for_data.assert_not_called()