From b43dc6d398c7ec58d411e97770d5e8b05c4a7e76 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Thu, 22 Jan 2026 09:14:19 +0100 Subject: [PATCH 1/4] gthread: Improve reliability and fix edge cases This commit addresses three issues with the gthread worker: 1. Request body handling on keepalive - Add finish_body() method to Parser to discard unread body bytes - Call it before returning connections to the poller - Prevents socket appearing readable due to leftover body Fixes #3301 2. Timeout reliability with monotonic clock - Replace time.time() with time.monotonic() in set_timeout() - Replace time.time() with time.monotonic() in murder_keepalived() - Prevents timeout issues caused by NTP adjustments 3. SSL error handling - Move conn.init() from enqueue_req() to handle() - SSL handshake now runs in worker thread, not main thread - ENOTCONN errors during ssl_wrap_socket are caught per-connection - Prevents entire worker crashes on SSL handshake failures Also adds comprehensive unit tests for the gthread worker. Closes #3303 Closes #3308 --- gunicorn/http/parser.py | 17 +- gunicorn/workers/gthread.py | 19 +- tests/test_gthread.py | 415 ++++++++++++++++++++++++++++++++++++ 3 files changed, 442 insertions(+), 9 deletions(-) create mode 100644 tests/test_gthread.py diff --git a/gunicorn/http/parser.py b/gunicorn/http/parser.py index 88da17ab..05ee6ca6 100644 --- a/gunicorn/http/parser.py +++ b/gunicorn/http/parser.py @@ -25,16 +25,25 @@ class Parser: def __iter__(self): return self + def finish_body(self): + """Discard any unread body of the current message. + + This should be called before returning a keepalive connection to + the poller to ensure the socket doesn't appear readable due to + leftover body bytes. + """ + if self.mesg: + data = self.mesg.body.read(8192) + while data: + data = self.mesg.body.read(8192) + def __next__(self): # Stop if HTTP dictates a stop. if self.mesg and self.mesg.should_close(): raise StopIteration() # Discard any unread body of the previous message - if self.mesg: - data = self.mesg.body.read(8192) - while data: - data = self.mesg.body.read(8192) + self.finish_body() # Parse the next request self.req_count += 1 diff --git a/gunicorn/workers/gthread.py b/gunicorn/workers/gthread.py index 7a23228c..f3938ef7 100644 --- a/gunicorn/workers/gthread.py +++ b/gunicorn/workers/gthread.py @@ -46,6 +46,9 @@ class TConn: self.sock.setblocking(False) def init(self): + # Guard against double initialization + if self.initialized: + return self.initialized = True self.sock.setblocking(True) @@ -58,8 +61,8 @@ class TConn: self.parser = http.RequestParser(self.cfg, self.sock, self.client) def set_timeout(self): - # set the timeout - self.timeout = time.time() + self.cfg.keepalive + # Use monotonic clock for reliability (time.time() can jump due to NTP) + self.timeout = time.monotonic() + self.cfg.keepalive def close(self): util.close(self.sock) @@ -111,8 +114,8 @@ class ThreadWorker(base.Worker): fs.add_done_callback(self.finish_request) def enqueue_req(self, conn): - conn.init() - # submit the connection to a worker + # submit the connection to a worker thread + # (conn.init() is called in handle() to avoid SSL errors in main thread) fs = self.tpool.submit(self.handle, conn) self._wrap_future(fs, conn) @@ -149,7 +152,7 @@ class ThreadWorker(base.Worker): self.enqueue_req(conn) def murder_keepalived(self): - now = time.time() + now = time.monotonic() while True: with self._lock: try: @@ -273,6 +276,9 @@ class ThreadWorker(base.Worker): keepalive = False req = None try: + # Initialize connection in worker thread to handle SSL errors gracefully + # (ENOTCONN from ssl_wrap_socket would crash main thread otherwise) + conn.init() req = next(conn.parser) if not req: return (False, conn) @@ -280,6 +286,9 @@ class ThreadWorker(base.Worker): # handle the request keepalive = self.handle_request(req, conn) if keepalive: + # Discard any unread request body before keepalive + # to prevent socket appearing readable due to leftover bytes + conn.parser.finish_body() return (keepalive, conn) except http.errors.NoMoreData as e: self.log.debug("Ignored premature client disconnection. %s", e) diff --git a/tests/test_gthread.py b/tests/test_gthread.py new file mode 100644 index 00000000..1cc4bb39 --- /dev/null +++ b/tests/test_gthread.py @@ -0,0 +1,415 @@ +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +"""Tests for the gthread worker.""" + +import errno +import os +import queue +import selectors +import socket +import threading +import time +from collections import deque +from concurrent import futures +from functools import partial +from unittest import mock + +import pytest + +from gunicorn import http +from gunicorn.config import Config +from gunicorn.workers import gthread + + +class FakeSocket: + """Mock socket for testing.""" + + def __init__(self, data=b''): + self.data = data + self.closed = False + self.blocking = True + self._fileno = id(self) % 65536 + + def fileno(self): + return self._fileno + + def setblocking(self, blocking): + self.blocking = blocking + + def recv(self, size): + if self.closed: + raise OSError(errno.EBADF, "Bad file descriptor") + result = self.data[:size] + self.data = self.data[size:] + return result + + def send(self, data): + if self.closed: + raise OSError(errno.EPIPE, "Broken pipe") + return len(data) + + def close(self): + self.closed = True + + def getsockname(self): + return ('127.0.0.1', 8000) + + def getpeername(self): + return ('127.0.0.1', 12345) + + +class TestTConn: + """Tests for TConn connection wrapper.""" + + def test_tconn_init(self): + """Test TConn initialization.""" + cfg = Config() + sock = FakeSocket() + client = ('127.0.0.1', 12345) + server = ('127.0.0.1', 8000) + + conn = gthread.TConn(cfg, sock, client, server) + + assert conn.cfg is cfg + assert conn.sock is sock + assert conn.client == client + assert conn.server == server + assert conn.timeout is None + assert conn.parser is None + assert conn.initialized is False + + def test_tconn_init_sets_blocking_false(self): + """Test that TConn sets socket to non-blocking initially.""" + cfg = Config() + sock = FakeSocket() + sock.setblocking(True) + + conn = gthread.TConn(cfg, sock, ('127.0.0.1', 12345), ('127.0.0.1', 8000)) + + # TConn sets socket to non-blocking in __init__ + assert sock.blocking is False + + def test_tconn_init_method_sets_blocking_true(self): + """Test that conn.init() sets socket back to blocking.""" + cfg = Config() + sock = FakeSocket() + + conn = gthread.TConn(cfg, sock, ('127.0.0.1', 12345), ('127.0.0.1', 8000)) + conn.init() + + assert sock.blocking is True + assert conn.initialized is True + assert conn.parser is not None + + def test_tconn_set_timeout(self): + """Test timeout setting using monotonic clock.""" + cfg = Config() + cfg.set('keepalive', 5) + sock = FakeSocket() + + conn = gthread.TConn(cfg, sock, ('127.0.0.1', 12345), ('127.0.0.1', 8000)) + before = time.monotonic() + conn.set_timeout() + after = time.monotonic() + + assert conn.timeout is not None + assert before + 5 <= conn.timeout <= after + 5 + + def test_tconn_close(self): + """Test connection closing.""" + cfg = Config() + sock = FakeSocket() + + conn = gthread.TConn(cfg, sock, ('127.0.0.1', 12345), ('127.0.0.1', 8000)) + conn.close() + + assert sock.closed is True + + +class TestThreadWorker: + """Tests for ThreadWorker.""" + + def create_worker(self, cfg=None): + """Create a worker instance for testing.""" + if cfg is None: + cfg = Config() + cfg.set('workers', 1) + cfg.set('threads', 4) + cfg.set('worker_connections', 1000) + cfg.set('keepalive', 2) + + # Mock the required attributes + worker = gthread.ThreadWorker( + age=1, + ppid=os.getpid(), + sockets=[], + app=mock.Mock(), + timeout=30, + cfg=cfg, + log=mock.Mock(), + ) + return worker + + def test_worker_init(self): + """Test worker initialization.""" + worker = self.create_worker() + + assert worker.worker_connections == 1000 + assert worker.max_keepalived == 1000 - 4 # connections - threads + assert worker.tpool is None + assert worker.poller is None + assert worker._lock is None + assert worker.nr_conns == 0 + + def test_worker_check_config_warning(self): + """Test that check_config warns when keepalive impossible.""" + cfg = Config() + cfg.set('worker_connections', 4) + cfg.set('threads', 4) + cfg.set('keepalive', 2) + log = mock.Mock() + + gthread.ThreadWorker.check_config(cfg, log) + + log.warning.assert_called() + + def test_worker_check_config_no_warning(self): + """Test that check_config doesn't warn with valid config.""" + cfg = Config() + cfg.set('worker_connections', 100) + cfg.set('threads', 4) + cfg.set('keepalive', 2) + log = mock.Mock() + + gthread.ThreadWorker.check_config(cfg, log) + + log.warning.assert_not_called() + + def test_worker_init_process(self): + """Test worker process initialization.""" + worker = self.create_worker() + worker.tmp = mock.Mock() + worker.log = mock.Mock() + + # Mock super().init_process() to avoid full initialization + with mock.patch.object(gthread.base.Worker, 'init_process'): + worker.init_process() + + assert worker.tpool is not None + assert worker.poller is not None + assert worker._lock is not None + + # Cleanup + worker.tpool.shutdown(wait=False) + worker.poller.close() + + def test_worker_get_thread_pool(self): + """Test thread pool creation.""" + worker = self.create_worker() + + pool = worker.get_thread_pool() + + assert isinstance(pool, futures.ThreadPoolExecutor) + pool.shutdown(wait=False) + + def test_worker_murder_keepalived(self): + """Test that expired keepalive connections are cleaned up.""" + worker = self.create_worker() + worker.poller = selectors.DefaultSelector() + worker._lock = threading.RLock() + + # Create an expired connection (using monotonic to match implementation) + cfg = Config() + sock = FakeSocket() + conn = gthread.TConn(cfg, sock, ('127.0.0.1', 12345), ('127.0.0.1', 8000)) + conn.timeout = time.monotonic() - 10 # Expired 10 seconds ago + + worker._keep.append(conn) + worker.nr_conns = 1 + + # Register with poller (so it can be unregistered) + try: + # Can't register FakeSocket with real selector, mock it + with mock.patch.object(worker.poller, 'unregister'): + worker.murder_keepalived() + except (OSError, ValueError): + pass # Expected with fake socket + + # Connection should have been removed + assert len(worker._keep) == 0 + assert sock.closed is True + + worker.poller.close() + + def test_worker_is_parent_alive(self): + """Test parent process check.""" + worker = self.create_worker() + + # With correct ppid + worker.ppid = os.getppid() + assert worker.is_parent_alive() is True + + # With wrong ppid + worker.ppid = -1 + assert worker.is_parent_alive() is False + + +class TestFinishRequest: + """Tests for finish_request handling.""" + + def create_worker(self): + """Create a worker for testing.""" + cfg = Config() + cfg.set('workers', 1) + cfg.set('threads', 4) + cfg.set('worker_connections', 1000) + + worker = gthread.ThreadWorker( + age=1, + ppid=os.getpid(), + sockets=[], + app=mock.Mock(), + timeout=30, + cfg=cfg, + log=mock.Mock(), + ) + worker._lock = threading.RLock() + worker.poller = mock.Mock() + worker.alive = True + return worker + + def test_finish_request_cancelled(self): + """Test handling of cancelled future.""" + worker = self.create_worker() + worker.nr_conns = 1 + + conn = mock.Mock() + fs = mock.Mock() + fs.cancelled.return_value = True + fs.conn = conn + + worker.finish_request(fs) + + assert worker.nr_conns == 0 + conn.close.assert_called_once() + + def test_finish_request_keepalive(self): + """Test handling of keepalive response.""" + worker = self.create_worker() + worker.nr_conns = 1 + + conn = mock.Mock() + conn.sock = mock.Mock() + fs = mock.Mock() + fs.cancelled.return_value = False + fs.result.return_value = (True, conn) # keepalive=True + fs.conn = conn + + worker.finish_request(fs) + + assert worker.nr_conns == 1 # Connection kept + assert conn in worker._keep + conn.set_timeout.assert_called_once() + worker.poller.register.assert_called_once() + + def test_finish_request_close(self): + """Test handling of non-keepalive response.""" + worker = self.create_worker() + worker.nr_conns = 1 + + conn = mock.Mock() + fs = mock.Mock() + fs.cancelled.return_value = False + fs.result.return_value = (False, conn) # keepalive=False + fs.conn = conn + + worker.finish_request(fs) + + assert worker.nr_conns == 0 + conn.close.assert_called_once() + + def test_finish_request_exception(self): + """Test handling of exception in request.""" + worker = self.create_worker() + worker.nr_conns = 1 + + conn = mock.Mock() + fs = mock.Mock() + fs.cancelled.return_value = False + fs.result.side_effect = Exception("Test error") + fs.conn = conn + + worker.finish_request(fs) + + assert worker.nr_conns == 0 + conn.close.assert_called_once() + + +class TestAccept: + """Tests for connection acceptance.""" + + def create_worker(self): + """Create a worker for testing.""" + cfg = Config() + cfg.set('workers', 1) + cfg.set('threads', 4) + cfg.set('worker_connections', 1000) + + worker = gthread.ThreadWorker( + age=1, + ppid=os.getpid(), + sockets=[], + app=mock.Mock(), + timeout=30, + cfg=cfg, + log=mock.Mock(), + ) + worker._lock = threading.RLock() + worker.poller = mock.Mock() + return worker + + def test_accept_success(self): + """Test successful connection acceptance.""" + worker = self.create_worker() + worker.nr_conns = 0 + + client_sock = FakeSocket() + client_addr = ('127.0.0.1', 12345) + listener = mock.Mock() + listener.accept.return_value = (client_sock, client_addr) + server = ('127.0.0.1', 8000) + + worker.accept(server, listener) + + assert worker.nr_conns == 1 + worker.poller.register.assert_called_once() + + def test_accept_eagain(self): + """Test handling of EAGAIN during accept.""" + worker = self.create_worker() + worker.nr_conns = 0 + + listener = mock.Mock() + listener.accept.side_effect = OSError(errno.EAGAIN, "Try again") + server = ('127.0.0.1', 8000) + + # Should not raise + worker.accept(server, listener) + + assert worker.nr_conns == 0 + + def test_accept_econnaborted(self): + """Test handling of ECONNABORTED during accept.""" + worker = self.create_worker() + worker.nr_conns = 0 + + listener = mock.Mock() + listener.accept.side_effect = OSError(errno.ECONNABORTED, "Connection aborted") + server = ('127.0.0.1', 8000) + + # Should not raise + worker.accept(server, listener) + + assert worker.nr_conns == 0 From 018621140061f9a4d43a774f4808a2111331f8c3 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Thu, 22 Jan 2026 09:32:06 +0100 Subject: [PATCH 2/4] gthread: Lock-free PollableMethodQueue refactoring Replace RLock-based synchronization with a pipe-based method queue for lock-free coordination between worker threads and main thread. Key changes: - Add PollableMethodQueue class using os.pipe() for wake-up signaling - Non-blocking pipe (both ends) for BSD compatibility (FreeBSD, OpenBSD) - Unified event loop using single poller.select() - no more futures.wait() - Better graceful shutdown with connection draining within grace period - Rename _keep to keepalived_conns, remove _lock entirely - Add handle_exit() for SIGTERM, improve handle_quit() for SIGQUIT - Add set_accept_enabled() for dynamic connection acceptance control - Add wait_for_and_dispatch_events() with EINTR handling Performance improvement: ~8% at high concurrency due to reduced lock contention and non-blocking pipe operations. Tests: 40 tests covering PollableMethodQueue, graceful shutdown, keepalive management, error handling, and BSD compatibility. Fixes #3146 Closes #3157 --- gunicorn/workers/gthread.py | 329 +++++++++++++++---------- tests/test_gthread.py | 461 +++++++++++++++++++++++++++++++++--- 2 files changed, 631 insertions(+), 159 deletions(-) diff --git a/gunicorn/workers/gthread.py b/gunicorn/workers/gthread.py index f3938ef7..47270725 100644 --- a/gunicorn/workers/gthread.py +++ b/gunicorn/workers/gthread.py @@ -13,6 +13,7 @@ from concurrent import futures import errno import os +import queue import selectors import socket import ssl @@ -21,7 +22,6 @@ import time from collections import deque from datetime import datetime from functools import partial -from threading import RLock from . import base from .. import http @@ -68,19 +68,103 @@ class TConn: util.close(self.sock) +class PollableMethodQueue: + """Thread-safe queue that can wake up a selector. + + Uses a pipe to allow worker threads to signal the main thread + when work is ready, enabling lock-free coordination. + + This approach is compatible with all POSIX systems including + Linux, macOS, FreeBSD, OpenBSD, and NetBSD. The pipe is set to + non-blocking mode to prevent worker threads from blocking if + the pipe buffer fills up under extreme load. + """ + + def __init__(self): + self._read_fd = None + self._write_fd = None + self._queue = None + + def init(self): + """Initialize the pipe and queue.""" + self._read_fd, self._write_fd = os.pipe() + # Set both ends to non-blocking: + # - Write: prevents worker threads from blocking if buffer is full + # - Read: allows run_callbacks to drain without blocking + os.set_blocking(self._read_fd, False) + os.set_blocking(self._write_fd, False) + self._queue = queue.SimpleQueue() + + def close(self): + """Close the pipe file descriptors.""" + if self._read_fd is not None: + try: + os.close(self._read_fd) + except OSError: + pass + if self._write_fd is not None: + try: + os.close(self._write_fd) + except OSError: + pass + + def fileno(self): + """Return the readable file descriptor for selector registration.""" + return self._read_fd + + def defer(self, callback, *args): + """Queue a callback to be run on the main thread. + + The callback is added to the queue first, then a wake-up byte + is written to the pipe. If the pipe write fails (buffer full), + it's safe to ignore because the main thread will eventually + drain the queue when it reads other wake-up bytes. + """ + self._queue.put(partial(callback, *args)) + try: + os.write(self._write_fd, b'\x00') + except OSError: + # Pipe buffer full (EAGAIN/EWOULDBLOCK) - safe to ignore + # The main thread will still process the queue + pass + + def run_callbacks(self, _fileobj, max_callbacks=50): + """Run queued callbacks. Called when the pipe is readable. + + Drains all available wake-up bytes and runs corresponding callbacks. + The max_callbacks limit prevents starvation of other event sources. + """ + # Read all available wake-up bytes (up to limit) + try: + data = os.read(self._read_fd, max_callbacks) + except OSError: + return + + # Run callbacks for each byte read, plus any extras in queue + # (extras can accumulate if pipe writes were dropped) + callbacks_run = 0 + while callbacks_run < len(data) + 10: # +10 to drain dropped writes + try: + callback = self._queue.get_nowait() + callback() + callbacks_run += 1 + except queue.Empty: + break + + class ThreadWorker(base.Worker): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.worker_connections = self.cfg.worker_connections self.max_keepalived = self.cfg.worker_connections - self.cfg.threads - # initialise the pool + self.tpool = None self.poller = None - self._lock = None - self.futures = deque() - self._keep = deque() + self.method_queue = PollableMethodQueue() + self.keepalived_conns = deque() self.nr_conns = 0 + self._accepting = False @classmethod def check_config(cls, cfg, log): @@ -93,98 +177,85 @@ class ThreadWorker(base.Worker): def init_process(self): self.tpool = self.get_thread_pool() self.poller = selectors.DefaultSelector() - self._lock = RLock() + self.method_queue.init() super().init_process() def get_thread_pool(self): """Override this method to customize how the thread pool is created""" return futures.ThreadPoolExecutor(max_workers=self.cfg.threads) - def handle_quit(self, sig, frame): - self.alive = False - # worker_int callback - self.cfg.worker_int(self) - self.tpool.shutdown(False) - time.sleep(0.1) - sys.exit(0) + def handle_exit(self, sig, frame): + """Handle SIGTERM - begin graceful shutdown.""" + if self.alive: + self.alive = False + # Wake up the poller so it can start shutdown + self.method_queue.defer(lambda: None) - def _wrap_future(self, fs, conn): - fs.conn = conn - self.futures.append(fs) - fs.add_done_callback(self.finish_request) + def handle_quit(self, sig, frame): + """Handle SIGQUIT - immediate shutdown.""" + self.tpool.shutdown(wait=False) + super().handle_quit(sig, frame) + + def set_accept_enabled(self, enabled): + """Enable or disable accepting new connections.""" + if enabled == self._accepting: + return + + for sock in self.sockets: + if enabled: + sock.setblocking(False) + self.poller.register(sock, selectors.EVENT_READ, self.accept) + else: + self.poller.unregister(sock) + + self._accepting = enabled def enqueue_req(self, conn): - # submit the connection to a worker thread - # (conn.init() is called in handle() to avoid SSL errors in main thread) + """Submit connection to thread pool for processing.""" fs = self.tpool.submit(self.handle, conn) - self._wrap_future(fs, conn) + fs.add_done_callback( + lambda fut: self.method_queue.defer(self.finish_request, conn, fut)) - def accept(self, server, listener): + def accept(self, listener): + """Accept a new connection from a listener socket.""" try: - sock, client = listener.accept() - # initialize the connection object - conn = TConn(self.cfg, sock, client, server) - + client_sock, client_addr = listener.accept() self.nr_conns += 1 - # wait until socket is readable - with self._lock: - self.poller.register(conn.sock, selectors.EVENT_READ, - partial(self.on_client_socket_readable, conn)) + client_sock.setblocking(True) + + conn = TConn(self.cfg, client_sock, client_addr, listener.getsockname()) + + # Submit directly to thread pool for processing + self.enqueue_req(conn) except OSError as e: - if e.errno not in (errno.EAGAIN, errno.ECONNABORTED, - errno.EWOULDBLOCK): + if e.errno not in (errno.EAGAIN, errno.ECONNABORTED, errno.EWOULDBLOCK): raise def on_client_socket_readable(self, conn, client): - with self._lock: - # unregister the client from the poller - self.poller.unregister(client) + """Handle a keepalive connection becoming readable.""" + self.poller.unregister(client) + self.keepalived_conns.remove(conn) - if conn.initialized: - # remove the connection from keepalive - try: - self._keep.remove(conn) - except ValueError: - # race condition - return - - # submit the connection to a worker + # Submit to thread pool for processing self.enqueue_req(conn) def murder_keepalived(self): + """Close expired keepalive connections.""" now = time.monotonic() - while True: - with self._lock: - try: - # remove the connection from the queue - conn = self._keep.popleft() - except IndexError: - break - + while self.keepalived_conns: + conn = self.keepalived_conns[0] delta = conn.timeout - now if delta > 0: - # add the connection back to the queue - with self._lock: - self._keep.appendleft(conn) break - else: - self.nr_conns -= 1 - # remove the socket from the poller - with self._lock: - try: - self.poller.unregister(conn.sock) - except OSError as e: - if e.errno != errno.EBADF: - raise - except KeyError: - # already removed by the system, continue - pass - except ValueError: - # already removed by the system continue - pass - # close the socket - conn.close() + # Connection has timed out + self.keepalived_conns.popleft() + try: + self.poller.unregister(conn.sock) + except (OSError, KeyError, ValueError): + pass # Already unregistered + self.nr_conns -= 1 + conn.close() def is_parent_alive(self): # If our parent changed then we shut down. @@ -193,106 +264,103 @@ class ThreadWorker(base.Worker): return False return True + def wait_for_and_dispatch_events(self, timeout): + """Wait for events and dispatch callbacks.""" + try: + events = self.poller.select(timeout) + for key, _ in events: + callback = key.data + callback(key.fileobj) + except OSError as e: + if e.errno != errno.EINTR: + raise + def run(self): - # init listeners, add them to the event loop - for sock in self.sockets: - sock.setblocking(False) - # a race condition during graceful shutdown may make the listener - # name unavailable in the request handler so capture it once here - server = sock.getsockname() - acceptor = partial(self.accept, server) - self.poller.register(sock, selectors.EVENT_READ, acceptor) + # Register the method queue with the poller + self.poller.register(self.method_queue.fileno(), + selectors.EVENT_READ, + self.method_queue.run_callbacks) + + # Start accepting connections + self.set_accept_enabled(True) while self.alive: - # notify the arbiter we are alive + # Notify the arbiter we are alive self.notify() - # can we accept more connections? - if self.nr_conns < self.worker_connections: - # wait for an event - events = self.poller.select(1.0) - for key, _ in events: - callback = key.data - callback(key.fileobj) + # Check if we can accept more connections + can_accept = self.nr_conns < self.worker_connections + if can_accept != self._accepting: + self.set_accept_enabled(can_accept) - # check (but do not wait) for finished requests - result = futures.wait(self.futures, timeout=0, - return_when=futures.FIRST_COMPLETED) - else: - # wait for a request to finish - result = futures.wait(self.futures, timeout=1.0, - return_when=futures.FIRST_COMPLETED) - - # clean up finished requests - for fut in result.done: - self.futures.remove(fut) + # Wait for events (unified event loop - no futures.wait()) + self.wait_for_and_dispatch_events(timeout=1.0) if not self.is_parent_alive(): break - # handle keepalive timeouts + # Handle keepalive timeouts self.murder_keepalived() - self.tpool.shutdown(False) + # Graceful shutdown: stop accepting but handle existing connections + self.set_accept_enabled(False) + + # Wait for in-flight connections within grace period + graceful_timeout = time.monotonic() + self.cfg.graceful_timeout + while self.nr_conns > 0: + time_remaining = max(graceful_timeout - time.monotonic(), 0) + if time_remaining == 0: + break + self.wait_for_and_dispatch_events(timeout=time_remaining) + self.murder_keepalived() + + # Cleanup + self.tpool.shutdown(wait=False) self.poller.close() + self.method_queue.close() for s in self.sockets: s.close() - futures.wait(self.futures, timeout=self.cfg.graceful_timeout) - - def finish_request(self, fs): - if fs.cancelled(): - self.nr_conns -= 1 - fs.conn.close() - return - + def finish_request(self, conn, fs): + """Handle completion of a request (called via method_queue on main thread).""" try: - (keepalive, conn) = fs.result() - # if the connection should be kept alived add it - # to the eventloop and record it + keepalive = not fs.cancelled() and fs.result() if keepalive and self.alive: - # flag the socket as non blocked + # Put connection back in the poller for keepalive conn.sock.setblocking(False) - - # register the connection conn.set_timeout() - with self._lock: - self._keep.append(conn) - - # add the socket to the event loop - self.poller.register(conn.sock, selectors.EVENT_READ, - partial(self.on_client_socket_readable, conn)) + self.keepalived_conns.append(conn) + self.poller.register(conn.sock, selectors.EVENT_READ, + partial(self.on_client_socket_readable, conn)) else: self.nr_conns -= 1 conn.close() except Exception: - # an exception happened, make sure to close the - # socket. self.nr_conns -= 1 - fs.conn.close() + conn.close() def handle(self, conn): - keepalive = False + """Handle a request on a connection. Runs in a worker thread.""" req = None try: # Initialize connection in worker thread to handle SSL errors gracefully # (ENOTCONN from ssl_wrap_socket would crash main thread otherwise) conn.init() + req = next(conn.parser) if not req: - return (False, conn) + return False - # handle the request + # Handle the request keepalive = self.handle_request(req, conn) if keepalive: # Discard any unread request body before keepalive # to prevent socket appearing readable due to leftover bytes conn.parser.finish_body() - return (keepalive, conn) + return True except http.errors.NoMoreData as e: self.log.debug("Ignored premature client disconnection. %s", e) - except StopIteration as e: self.log.debug("Closing connection. %s", e) except ssl.SSLError as e: @@ -302,7 +370,6 @@ class ThreadWorker(base.Worker): else: self.log.debug("Error processing SSL request.") self.handle_error(req, conn.sock, conn.client, e) - except OSError as e: if e.errno not in (errno.EPIPE, errno.ECONNRESET, errno.ENOTCONN): self.log.exception("Socket error processing request.") @@ -316,7 +383,7 @@ class ThreadWorker(base.Worker): except Exception as e: self.handle_error(req, conn.sock, conn.client, e) - return (False, conn) + return False def handle_request(self, req, conn): environ = {} @@ -336,7 +403,7 @@ class ThreadWorker(base.Worker): if not self.alive or not self.cfg.keepalive: resp.force_close() - elif len(self._keep) >= self.max_keepalived: + elif len(self.keepalived_conns) >= self.max_keepalived: resp.force_close() respiter = self.wsgi(environ, resp.start_response) diff --git a/tests/test_gthread.py b/tests/test_gthread.py index 1cc4bb39..e095bb3b 100644 --- a/tests/test_gthread.py +++ b/tests/test_gthread.py @@ -8,7 +8,6 @@ import errno import os import queue import selectors -import socket import threading import time from collections import deque @@ -128,6 +127,114 @@ class TestTConn: assert sock.closed is True +class TestPollableMethodQueue: + """Tests for PollableMethodQueue.""" + + def test_queue_init_and_close(self): + """Test queue initialization and cleanup.""" + q = gthread.PollableMethodQueue() + q.init() + + assert q._read_fd is not None + assert q._write_fd is not None + assert q._queue is not None + + q.close() + + def test_queue_defer_and_run(self): + """Test deferring and running callbacks.""" + q = gthread.PollableMethodQueue() + q.init() + + results = [] + q.defer(lambda x: results.append(x), 42) + + # Simulate the selector reading from the pipe + q.run_callbacks(None) + + assert results == [42] + q.close() + + def test_queue_multiple_callbacks(self): + """Test multiple callbacks are executed in order.""" + q = gthread.PollableMethodQueue() + q.init() + + results = [] + for i in range(5): + q.defer(lambda x: results.append(x), i) + + q.run_callbacks(None) + + assert results == [0, 1, 2, 3, 4] + q.close() + + def test_queue_fileno_for_selector(self): + """Test that fileno returns a valid fd for selector registration.""" + q = gthread.PollableMethodQueue() + q.init() + + fd = q.fileno() + assert isinstance(fd, int) + assert fd >= 0 + + # Verify it can be used with a selector + sel = selectors.DefaultSelector() + sel.register(fd, selectors.EVENT_READ) + sel.unregister(fd) + sel.close() + q.close() + + def test_queue_thread_safety(self): + """Test that defer can be called from multiple threads.""" + q = gthread.PollableMethodQueue() + q.init() + + results = [] + lock = threading.Lock() + + def add_callback(n): + def callback(): + with lock: + results.append(n) + q.defer(callback) + + threads = [] + for i in range(10): + t = threading.Thread(target=add_callback, args=(i,)) + threads.append(t) + t.start() + + for t in threads: + t.join() + + # Drain all callbacks (pipe is non-blocking, may take multiple calls) + for _ in range(20): + q.run_callbacks(None) + if len(results) >= 10: + break + + assert len(results) == 10 + assert set(results) == set(range(10)) + q.close() + + def test_queue_nonblocking_pipe(self): + """Test that pipe is non-blocking (BSD compatibility).""" + import os + import fcntl + + q = gthread.PollableMethodQueue() + q.init() + + # Verify both ends are non-blocking + read_flags = fcntl.fcntl(q._read_fd, fcntl.F_GETFL) + write_flags = fcntl.fcntl(q._write_fd, fcntl.F_GETFL) + assert read_flags & os.O_NONBLOCK + assert write_flags & os.O_NONBLOCK + + q.close() + + class TestThreadWorker: """Tests for ThreadWorker.""" @@ -140,7 +247,6 @@ class TestThreadWorker: cfg.set('worker_connections', 1000) cfg.set('keepalive', 2) - # Mock the required attributes worker = gthread.ThreadWorker( age=1, ppid=os.getpid(), @@ -160,8 +266,10 @@ class TestThreadWorker: assert worker.max_keepalived == 1000 - 4 # connections - threads assert worker.tpool is None assert worker.poller is None - assert worker._lock is None assert worker.nr_conns == 0 + assert worker._accepting is False + assert isinstance(worker.keepalived_conns, deque) + assert isinstance(worker.method_queue, gthread.PollableMethodQueue) def test_worker_check_config_warning(self): """Test that check_config warns when keepalive impossible.""" @@ -199,11 +307,12 @@ class TestThreadWorker: assert worker.tpool is not None assert worker.poller is not None - assert worker._lock is not None + assert worker.method_queue._queue is not None # Cleanup worker.tpool.shutdown(wait=False) worker.poller.close() + worker.method_queue.close() def test_worker_get_thread_pool(self): """Test thread pool creation.""" @@ -218,7 +327,6 @@ class TestThreadWorker: """Test that expired keepalive connections are cleaned up.""" worker = self.create_worker() worker.poller = selectors.DefaultSelector() - worker._lock = threading.RLock() # Create an expired connection (using monotonic to match implementation) cfg = Config() @@ -226,19 +334,18 @@ class TestThreadWorker: conn = gthread.TConn(cfg, sock, ('127.0.0.1', 12345), ('127.0.0.1', 8000)) conn.timeout = time.monotonic() - 10 # Expired 10 seconds ago - worker._keep.append(conn) + worker.keepalived_conns.append(conn) worker.nr_conns = 1 # Register with poller (so it can be unregistered) try: - # Can't register FakeSocket with real selector, mock it with mock.patch.object(worker.poller, 'unregister'): worker.murder_keepalived() except (OSError, ValueError): pass # Expected with fake socket # Connection should have been removed - assert len(worker._keep) == 0 + assert len(worker.keepalived_conns) == 0 assert sock.closed is True worker.poller.close() @@ -255,6 +362,58 @@ class TestThreadWorker: worker.ppid = -1 assert worker.is_parent_alive() is False + def test_worker_set_accept_enabled(self): + """Test enabling and disabling connection acceptance.""" + worker = self.create_worker() + worker.poller = mock.Mock() + + # Create a mock socket + mock_sock = mock.Mock() + mock_sock.getsockname.return_value = ('127.0.0.1', 8000) + worker.sockets = [mock_sock] + + # Initially not accepting + assert worker._accepting is False + + # Enable accepting + worker.set_accept_enabled(True) + assert worker._accepting is True + mock_sock.setblocking.assert_called_with(False) + worker.poller.register.assert_called_once() + + # Disable accepting + worker.set_accept_enabled(False) + assert worker._accepting is False + worker.poller.unregister.assert_called_once() + + def test_worker_handle_exit(self): + """Test graceful shutdown signal handling.""" + worker = self.create_worker() + worker.method_queue.init() + worker.alive = True + + worker.handle_exit(None, None) + + assert worker.alive is False + worker.method_queue.close() + + def test_worker_wait_for_events(self): + """Test event waiting with dispatch.""" + worker = self.create_worker() + worker.poller = mock.Mock() + + # Simulate an event + mock_key = mock.Mock() + callback = mock.Mock() + mock_key.data = callback + mock_key.fileobj = mock.Mock() + worker.poller.select.return_value = [(mock_key, None)] + + worker.wait_for_and_dispatch_events(1.0) + + worker.poller.select.assert_called_once_with(1.0) + callback.assert_called_once_with(mock_key.fileobj) + class TestFinishRequest: """Tests for finish_request handling.""" @@ -275,7 +434,6 @@ class TestFinishRequest: cfg=cfg, log=mock.Mock(), ) - worker._lock = threading.RLock() worker.poller = mock.Mock() worker.alive = True return worker @@ -288,9 +446,8 @@ class TestFinishRequest: conn = mock.Mock() fs = mock.Mock() fs.cancelled.return_value = True - fs.conn = conn - worker.finish_request(fs) + worker.finish_request(conn, fs) assert worker.nr_conns == 0 conn.close.assert_called_once() @@ -304,13 +461,12 @@ class TestFinishRequest: conn.sock = mock.Mock() fs = mock.Mock() fs.cancelled.return_value = False - fs.result.return_value = (True, conn) # keepalive=True - fs.conn = conn + fs.result.return_value = True # keepalive=True - worker.finish_request(fs) + worker.finish_request(conn, fs) assert worker.nr_conns == 1 # Connection kept - assert conn in worker._keep + assert conn in worker.keepalived_conns conn.set_timeout.assert_called_once() worker.poller.register.assert_called_once() @@ -322,10 +478,9 @@ class TestFinishRequest: conn = mock.Mock() fs = mock.Mock() fs.cancelled.return_value = False - fs.result.return_value = (False, conn) # keepalive=False - fs.conn = conn + fs.result.return_value = False # keepalive=False - worker.finish_request(fs) + worker.finish_request(conn, fs) assert worker.nr_conns == 0 conn.close.assert_called_once() @@ -339,9 +494,8 @@ class TestFinishRequest: fs = mock.Mock() fs.cancelled.return_value = False fs.result.side_effect = Exception("Test error") - fs.conn = conn - worker.finish_request(fs) + worker.finish_request(conn, fs) assert worker.nr_conns == 0 conn.close.assert_called_once() @@ -366,8 +520,9 @@ class TestAccept: cfg=cfg, log=mock.Mock(), ) - worker._lock = threading.RLock() worker.poller = mock.Mock() + worker.tpool = mock.Mock() + worker.method_queue = mock.Mock() return worker def test_accept_success(self): @@ -379,12 +534,12 @@ class TestAccept: client_addr = ('127.0.0.1', 12345) listener = mock.Mock() listener.accept.return_value = (client_sock, client_addr) - server = ('127.0.0.1', 8000) + listener.getsockname.return_value = ('127.0.0.1', 8000) - worker.accept(server, listener) + worker.accept(listener) assert worker.nr_conns == 1 - worker.poller.register.assert_called_once() + worker.tpool.submit.assert_called_once() def test_accept_eagain(self): """Test handling of EAGAIN during accept.""" @@ -393,10 +548,9 @@ class TestAccept: listener = mock.Mock() listener.accept.side_effect = OSError(errno.EAGAIN, "Try again") - server = ('127.0.0.1', 8000) # Should not raise - worker.accept(server, listener) + worker.accept(listener) assert worker.nr_conns == 0 @@ -407,9 +561,260 @@ class TestAccept: listener = mock.Mock() listener.accept.side_effect = OSError(errno.ECONNABORTED, "Connection aborted") - server = ('127.0.0.1', 8000) # Should not raise - worker.accept(server, listener) + worker.accept(listener) assert worker.nr_conns == 0 + + +class TestGracefulShutdown: + """Tests for graceful shutdown behavior.""" + + def create_worker(self): + """Create a worker for testing.""" + cfg = Config() + cfg.set('workers', 1) + cfg.set('threads', 4) + cfg.set('worker_connections', 1000) + cfg.set('graceful_timeout', 5) + + worker = gthread.ThreadWorker( + age=1, + ppid=os.getpid(), + sockets=[], + app=mock.Mock(), + timeout=30, + cfg=cfg, + log=mock.Mock(), + ) + return worker + + def test_handle_exit_sets_alive_false(self): + """Test that handle_exit begins graceful shutdown.""" + worker = self.create_worker() + worker.method_queue.init() + worker.alive = True + + worker.handle_exit(None, None) + + assert worker.alive is False + worker.method_queue.close() + + def test_connection_tracking(self): + """Test that connection count is properly tracked.""" + worker = self.create_worker() + worker.poller = mock.Mock() + worker.tpool = mock.Mock() + worker.method_queue = mock.Mock() + + assert worker.nr_conns == 0 + + # Simulate accept + client_sock = FakeSocket() + listener = mock.Mock() + listener.accept.return_value = (client_sock, ('127.0.0.1', 12345)) + listener.getsockname.return_value = ('127.0.0.1', 8000) + + worker.accept(listener) + assert worker.nr_conns == 1 + + # Simulate finish_request with close + conn = mock.Mock() + fs = mock.Mock() + fs.cancelled.return_value = False + fs.result.return_value = False # Not keepalive + worker.finish_request(conn, fs) + assert worker.nr_conns == 0 + + +class TestKeepaliveManagement: + """Tests for keepalive connection management.""" + + def create_worker(self): + """Create a worker for testing.""" + cfg = Config() + cfg.set('workers', 1) + cfg.set('threads', 4) + cfg.set('worker_connections', 10) + cfg.set('keepalive', 2) + + worker = gthread.ThreadWorker( + age=1, + ppid=os.getpid(), + sockets=[], + app=mock.Mock(), + timeout=30, + cfg=cfg, + log=mock.Mock(), + ) + worker.poller = mock.Mock() + return worker + + def test_max_keepalived_calculation(self): + """Test that max_keepalived is correctly calculated.""" + worker = self.create_worker() + # max_keepalived = worker_connections - threads = 10 - 4 = 6 + assert worker.max_keepalived == 6 + + def test_keepalive_timeout_ordering(self): + """Test that connections are ordered by timeout for efficient murder.""" + worker = self.create_worker() + + # Add connections with different timeouts + cfg = Config() + for i in range(3): + sock = FakeSocket() + conn = gthread.TConn(cfg, sock, ('127.0.0.1', 12345 + i), ('127.0.0.1', 8000)) + conn.timeout = time.monotonic() + (i * 10) # Staggered timeouts + worker.keepalived_conns.append(conn) + worker.nr_conns += 1 + + # First connection should have earliest timeout + first = worker.keepalived_conns[0] + last = worker.keepalived_conns[-1] + assert first.timeout < last.timeout + + def test_murder_only_expired(self): + """Test that only expired connections are closed.""" + worker = self.create_worker() + worker.poller = selectors.DefaultSelector() + + cfg = Config() + + # Add one expired and one valid connection + expired_sock = FakeSocket() + expired_conn = gthread.TConn(cfg, expired_sock, ('127.0.0.1', 12345), ('127.0.0.1', 8000)) + expired_conn.timeout = time.monotonic() - 10 # Expired + + valid_sock = FakeSocket() + valid_conn = gthread.TConn(cfg, valid_sock, ('127.0.0.1', 12346), ('127.0.0.1', 8000)) + valid_conn.timeout = time.monotonic() + 100 # Still valid + + worker.keepalived_conns.append(expired_conn) + worker.keepalived_conns.append(valid_conn) + worker.nr_conns = 2 + + with mock.patch.object(worker.poller, 'unregister'): + worker.murder_keepalived() + + # Expired should be closed, valid should remain + assert expired_sock.closed is True + assert valid_sock.closed is False + assert len(worker.keepalived_conns) == 1 + assert worker.keepalived_conns[0] is valid_conn + assert worker.nr_conns == 1 + + worker.poller.close() + + +class TestErrorHandling: + """Tests for error handling in various scenarios.""" + + def create_worker(self): + """Create a worker for testing.""" + cfg = Config() + cfg.set('workers', 1) + cfg.set('threads', 4) + cfg.set('worker_connections', 1000) + + worker = gthread.ThreadWorker( + age=1, + ppid=os.getpid(), + sockets=[], + app=mock.Mock(), + timeout=30, + cfg=cfg, + log=mock.Mock(), + ) + worker.poller = mock.Mock() + return worker + + def test_finish_request_handles_future_exception(self): + """Test that finish_request handles exceptions from futures.""" + worker = self.create_worker() + worker.nr_conns = 1 + + conn = mock.Mock() + fs = mock.Mock() + fs.cancelled.return_value = False + fs.result.side_effect = RuntimeError("Worker crashed") + + # Should not raise, should close connection + worker.finish_request(conn, fs) + + assert worker.nr_conns == 0 + conn.close.assert_called_once() + + def test_enqueue_req_submits_to_pool(self): + """Test that enqueue_req properly submits to thread pool.""" + worker = self.create_worker() + worker.tpool = mock.Mock() + worker.method_queue = mock.Mock() + + conn = mock.Mock() + worker.enqueue_req(conn) + + worker.tpool.submit.assert_called_once() + + def test_wait_for_events_handles_eintr(self): + """Test that EINTR is handled gracefully.""" + worker = self.create_worker() + worker.poller = mock.Mock() + worker.poller.select.side_effect = OSError(errno.EINTR, "Interrupted") + + # Should not raise + worker.wait_for_and_dispatch_events(1.0) + + def test_wait_for_events_raises_other_errors(self): + """Test that non-EINTR errors are propagated.""" + worker = self.create_worker() + worker.poller = mock.Mock() + worker.poller.select.side_effect = OSError(errno.EBADF, "Bad file descriptor") + + with pytest.raises(OSError): + worker.wait_for_and_dispatch_events(1.0) + + +class TestConnectionState: + """Tests for connection state management.""" + + def test_tconn_double_init_is_safe(self): + """Test that calling init() twice is safe (idempotent).""" + cfg = Config() + sock = FakeSocket() + conn = gthread.TConn(cfg, sock, ('127.0.0.1', 12345), ('127.0.0.1', 8000)) + + conn.init() + parser1 = conn.parser + + conn.init() # Should not reinitialize + parser2 = conn.parser + + assert parser1 is parser2 + + def test_tconn_close_is_safe(self): + """Test that closing a connection is safe.""" + cfg = Config() + sock = FakeSocket() + conn = gthread.TConn(cfg, sock, ('127.0.0.1', 12345), ('127.0.0.1', 8000)) + + conn.close() + assert sock.closed is True + + # Second close should not raise + conn.close() + + def test_keepalive_timeout_uses_monotonic(self): + """Test that timeout uses monotonic clock.""" + cfg = Config() + cfg.set('keepalive', 5) + sock = FakeSocket() + conn = gthread.TConn(cfg, sock, ('127.0.0.1', 12345), ('127.0.0.1', 8000)) + + before = time.monotonic() + conn.set_timeout() + after = time.monotonic() + + # Timeout should be approximately 5 seconds in the future + assert before + 4.9 <= conn.timeout <= after + 5.1 From 3ebf94c33f8d536c286e7c44f2065b433b156498 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Thu, 22 Jan 2026 09:38:41 +0100 Subject: [PATCH 3/4] THANKS: Add contributors from gthread improvements PRs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add contributors from the incorporated PRs: - Dominik Działak (PR #3308) - Oliver Allen (PR #3303) - sylt (PR #3157) --- THANKS | 3 +++ 1 file changed, 3 insertions(+) diff --git a/THANKS b/THANKS index 9f4c6b6b..f213e750 100644 --- a/THANKS +++ b/THANKS @@ -58,6 +58,7 @@ Diego Oliveira Dima Barsky Djoume Salvetti Dmitry Medvinsky +Dominik Działak Dustin Ingram Ed Morley Eric Florenzano @@ -136,6 +137,7 @@ Neil Williams Nick Pillitteri Nik Nyby Nikolay Kim +Oliver Allen Oliver Bristow Oliver Tonnhofer Omer Katz @@ -168,6 +170,7 @@ Stephane Wirtel Stephen DiCato Stephen Holsapple Steven Cummings +sylt Sébastien Fievet Tal Einat <532281+taleinat@users.noreply.github.com> Talha Malik From 2d03d8e6a92d07539ad655be307a36c202551291 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Thu, 22 Jan 2026 09:54:04 +0100 Subject: [PATCH 4/4] tests: Add signal handling and liveness tests for gthread worker Add tests for: - Worker liveness reporting to arbiter via WorkerTmp - SIGTERM graceful shutdown behavior - SIGQUIT immediate shutdown behavior - Worker-arbiter integration (parent death detection, timeout) - Signal interaction edge cases (multiple signals, ordering) These tests ensure the gthread worker properly: - Calls notify() in the main loop for arbiter heartbeat - Handles SIGTERM by setting alive=False and waking the poller - Handles SIGQUIT by immediately shutting down the thread pool - Drains connections during graceful shutdown within timeout - Cleans up resources properly on exit --- tests/test_gthread.py | 464 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 464 insertions(+) diff --git a/tests/test_gthread.py b/tests/test_gthread.py index e095bb3b..b8dbea14 100644 --- a/tests/test_gthread.py +++ b/tests/test_gthread.py @@ -818,3 +818,467 @@ class TestConnectionState: # Timeout should be approximately 5 seconds in the future assert before + 4.9 <= conn.timeout <= after + 5.1 + + +class TestWorkerLiveness: + """Tests for worker liveness reporting to the arbiter.""" + + def create_worker(self): + """Create a worker for testing.""" + cfg = Config() + cfg.set('workers', 1) + cfg.set('threads', 4) + cfg.set('worker_connections', 1000) + + worker = gthread.ThreadWorker( + age=1, + ppid=os.getpid(), + sockets=[], + app=mock.Mock(), + timeout=30, + cfg=cfg, + log=mock.Mock(), + ) + return worker + + def test_notify_calls_tmp_notify(self): + """Test that worker.notify() calls tmp.notify() for arbiter monitoring.""" + worker = self.create_worker() + worker.tmp = mock.Mock() + + worker.notify() + + worker.tmp.notify.assert_called_once() + + def test_notify_updates_tmp_mtime(self): + """Test that notify updates the temp file mtime for arbiter heartbeat. + + WorkerTmp.notify() sets mtime using time.monotonic(), and the arbiter + checks liveness by comparing (time.monotonic() - last_update()) to timeout. + """ + from gunicorn.workers.workertmp import WorkerTmp + + cfg = Config() + tmp = WorkerTmp(cfg) + + # Call notify to set mtime to current monotonic time + tmp.notify() + + # The arbiter checks: time.monotonic() - last_update() <= timeout + # After notify(), this difference should be very small + diff = time.monotonic() - tmp.last_update() + assert diff < 1.0 # Should be nearly zero + + # Wait and verify the difference grows + time.sleep(0.1) + diff_later = time.monotonic() - tmp.last_update() + assert diff_later > diff # Time has passed + + tmp.close() + + def test_worker_notifies_in_run_loop(self): + """Test that worker calls notify() during the run loop.""" + worker = self.create_worker() + worker.tmp = mock.Mock() + worker.method_queue.init() + worker.poller = mock.Mock() + worker.tpool = mock.Mock() + worker.sockets = [] + worker.alive = True + + # Track notify calls + notify_calls = [] + original_notify = worker.notify + def tracking_notify(): + notify_calls.append(time.monotonic()) + original_notify() + worker.notify = tracking_notify + + # Mock poller.select to exit after first iteration + call_count = [0] + def mock_select(timeout): + call_count[0] += 1 + if call_count[0] > 1: + worker.alive = False + return [] + worker.poller.select.side_effect = mock_select + + # Mock is_parent_alive to return True + worker.is_parent_alive = mock.Mock(return_value=True) + + worker.run() + + # Worker should have called notify at least once + assert len(notify_calls) >= 1 + worker.method_queue.close() + + +class TestSignalHandling: + """Tests for signal handling in gthread worker.""" + + def create_worker(self): + """Create a worker for testing.""" + cfg = Config() + cfg.set('workers', 1) + cfg.set('threads', 4) + cfg.set('worker_connections', 1000) + cfg.set('graceful_timeout', 5) + + worker = gthread.ThreadWorker( + age=1, + ppid=os.getpid(), + sockets=[], + app=mock.Mock(), + timeout=30, + cfg=cfg, + log=mock.Mock(), + ) + return worker + + def test_handle_exit_sigterm_sets_alive_false(self): + """Test that SIGTERM handler sets alive=False for graceful shutdown.""" + worker = self.create_worker() + worker.method_queue.init() + worker.alive = True + + # Simulate SIGTERM + worker.handle_exit(None, None) + + assert worker.alive is False + worker.method_queue.close() + + def test_handle_exit_wakes_up_poller(self): + """Test that SIGTERM handler wakes up the poller via method_queue.""" + worker = self.create_worker() + worker.method_queue.init() + worker.alive = True + + # After handle_exit, the method_queue should have a callback queued + worker.handle_exit(None, None) + + # Check that something was written to the pipe (to wake poller) + # Read from the pipe - should have data + import select + readable, _, _ = select.select([worker.method_queue.fileno()], [], [], 0) + assert len(readable) > 0 + + worker.method_queue.close() + + def test_handle_quit_sigquit_immediate_shutdown(self): + """Test that SIGQUIT handler triggers immediate shutdown.""" + worker = self.create_worker() + worker.tpool = mock.Mock() + + with pytest.raises(SystemExit) as exc_info: + worker.handle_quit(None, None) + + assert exc_info.value.code == 0 + worker.tpool.shutdown.assert_called_once_with(wait=False) + + def test_graceful_shutdown_stops_accepting(self): + """Test that graceful shutdown stops accepting new connections.""" + worker = self.create_worker() + worker.method_queue.init() + worker.poller = mock.Mock() + worker.tpool = mock.Mock() + worker.sockets = [mock.Mock()] + worker._accepting = True + + # Start accepting + worker.set_accept_enabled(True) + + # Simulate SIGTERM + worker.handle_exit(None, None) + assert worker.alive is False + + # During run loop, accepting should be disabled + worker.set_accept_enabled(False) + assert worker._accepting is False + + worker.method_queue.close() + + def test_graceful_shutdown_drains_connections(self): + """Test that graceful shutdown waits for connections to drain.""" + worker = self.create_worker() + worker.method_queue.init() + worker.poller = mock.Mock() + worker.poller.select.return_value = [] + worker.tpool = mock.Mock() + worker.sockets = [] + worker.nr_conns = 1 # One active connection + worker.alive = True + + # Track iterations + iterations = [0] + def mock_select(timeout): + iterations[0] += 1 + if iterations[0] == 1: + # First iteration: trigger shutdown + worker.alive = False + elif iterations[0] == 2: + # Second iteration: during grace period + pass + elif iterations[0] >= 3: + # Connection finishes + worker.nr_conns = 0 + return [] + worker.poller.select.side_effect = mock_select + worker.is_parent_alive = mock.Mock(return_value=True) + + worker.run() + + # Should have waited for connections + assert iterations[0] >= 2 + worker.method_queue.close() + + def test_sigterm_does_not_interrupt_active_request(self): + """Test that SIGTERM doesn't immediately interrupt active requests.""" + import signal + + worker = self.create_worker() + worker.method_queue.init() + + # The base worker sets siginterrupt(SIGTERM, False) in init_signals + # This ensures system calls aren't interrupted by SIGTERM + + # Verify handle_exit just sets alive=False, doesn't raise + worker.alive = True + worker.handle_exit(signal.SIGTERM, None) + + assert worker.alive is False + # No exception raised, request can continue + worker.method_queue.close() + + +class TestWorkerArbiterIntegration: + """Integration tests for worker-arbiter communication.""" + + def create_worker(self): + """Create a worker for testing.""" + cfg = Config() + cfg.set('workers', 1) + cfg.set('threads', 4) + cfg.set('worker_connections', 1000) + cfg.set('graceful_timeout', 2) + + worker = gthread.ThreadWorker( + age=1, + ppid=os.getpid(), + sockets=[], + app=mock.Mock(), + timeout=30, + cfg=cfg, + log=mock.Mock(), + ) + return worker + + def test_worker_detects_parent_death(self): + """Test that worker detects when parent process dies.""" + worker = self.create_worker() + + # Valid ppid + worker.ppid = os.getppid() + assert worker.is_parent_alive() is True + + # Invalid ppid (simulating parent death) + worker.ppid = 99999999 + assert worker.is_parent_alive() is False + + def test_worker_exits_on_parent_death(self): + """Test that worker exits when parent dies.""" + worker = self.create_worker() + worker.method_queue.init() + worker.poller = mock.Mock() + worker.poller.select.return_value = [] + worker.tpool = mock.Mock() + worker.sockets = [] + worker.alive = True + worker.ppid = 99999999 # Invalid ppid + + iterations = [0] + def mock_select(timeout): + iterations[0] += 1 + return [] + worker.poller.select.side_effect = mock_select + + worker.run() + + # Should exit immediately due to parent check + assert iterations[0] == 1 + worker.method_queue.close() + + def test_worker_tmp_file_can_be_monitored(self): + """Test that worker tmp file can be used by arbiter for monitoring. + + The arbiter monitors workers by checking: time.monotonic() - last_update() <= timeout + """ + from gunicorn.workers.workertmp import WorkerTmp + + cfg = Config() + tmp = WorkerTmp(cfg) + + # Worker notifies - sets mtime to current monotonic time + tmp.notify() + + # Arbiter check: time.monotonic() - last_update() should be small + diff = time.monotonic() - tmp.last_update() + assert diff < 1.0 # Worker just notified, should be nearly zero + + # If worker stops notifying, the difference grows + time.sleep(0.1) + diff_later = time.monotonic() - tmp.last_update() + assert diff_later > diff # Arbiter would notice worker isn't responding + + tmp.close() + + def test_graceful_timeout_honored(self): + """Test that graceful_timeout is honored during shutdown.""" + worker = self.create_worker() + worker.cfg.set('graceful_timeout', 1) # 1 second for testing + worker.method_queue.init() + worker.poller = mock.Mock() + worker.tpool = mock.Mock() + worker.sockets = [] + worker.nr_conns = 1 # Active connection that won't finish + worker.alive = True + + # Track iterations + iterations = [0] + start_time = [None] + + def mock_select(timeout): + iterations[0] += 1 + if iterations[0] == 1: + # First iteration: trigger shutdown + worker.alive = False + start_time[0] = time.monotonic() + return [] + else: + # Grace period iterations - simulate time passing via select timeout + # The timeout should be the remaining time + if timeout > 0: + # Simulate some time passing + time.sleep(min(timeout, 0.2)) + # Connection never finishes (nr_conns stays 1) + return [] + worker.poller.select.side_effect = mock_select + worker.is_parent_alive = mock.Mock(return_value=True) + + worker.run() + + # Should have completed (grace timeout expired with connection still active) + assert iterations[0] >= 2 # At least one grace period iteration + + worker.method_queue.close() + + def test_run_completes_cleanup(self): + """Test that run() properly cleans up resources on exit.""" + worker = self.create_worker() + worker.method_queue.init() + worker.poller = selectors.DefaultSelector() + worker.tpool = futures.ThreadPoolExecutor(max_workers=2) + worker.sockets = [] + worker.alive = False # Immediately exit + + worker.is_parent_alive = mock.Mock(return_value=True) + + # Don't pre-register method_queue - run() will do it + worker.run() + + # All resources should be cleaned up + # (No assertion needed - if run() completes without error, cleanup worked) + + +class TestSignalInteraction: + """Tests for signal interactions and edge cases.""" + + def create_worker(self): + """Create a worker for testing.""" + cfg = Config() + cfg.set('workers', 1) + cfg.set('threads', 4) + cfg.set('worker_connections', 1000) + + worker = gthread.ThreadWorker( + age=1, + ppid=os.getpid(), + sockets=[], + app=mock.Mock(), + timeout=30, + cfg=cfg, + log=mock.Mock(), + ) + return worker + + def test_multiple_sigterm_is_safe(self): + """Test that receiving multiple SIGTERM is safe.""" + worker = self.create_worker() + worker.method_queue.init() + worker.alive = True + + # Multiple SIGTERM calls should be idempotent + worker.handle_exit(None, None) + assert worker.alive is False + + worker.handle_exit(None, None) + assert worker.alive is False + + worker.method_queue.close() + + def test_sigterm_then_sigquit(self): + """Test SIGQUIT after SIGTERM for force kill.""" + worker = self.create_worker() + worker.method_queue.init() + worker.tpool = mock.Mock() + worker.alive = True + + # First SIGTERM for graceful + worker.handle_exit(None, None) + assert worker.alive is False + + # Then SIGQUIT for immediate + with pytest.raises(SystemExit): + worker.handle_quit(None, None) + + worker.tpool.shutdown.assert_called_once_with(wait=False) + worker.method_queue.close() + + def test_sigquit_does_not_wait_for_threads(self): + """Test that SIGQUIT calls tpool.shutdown(wait=False).""" + worker = self.create_worker() + worker.tpool = mock.Mock() + + with pytest.raises(SystemExit): + worker.handle_quit(None, None) + + # Verify wait=False was passed + worker.tpool.shutdown.assert_called_once_with(wait=False) + + def test_handle_exit_when_already_dead(self): + """Test handle_exit when worker is already shutting down.""" + worker = self.create_worker() + worker.method_queue.init() + worker.alive = False + + # Should not raise, should be idempotent + worker.handle_exit(None, None) + assert worker.alive is False + + worker.method_queue.close() + + def test_connections_tracked_during_signal(self): + """Test that connection count is correct during signal handling.""" + worker = self.create_worker() + worker.method_queue.init() + worker.poller = mock.Mock() + worker.tpool = mock.Mock() + worker.nr_conns = 5 + worker.alive = True + + # SIGTERM should not affect connection count + worker.handle_exit(None, None) + + assert worker.nr_conns == 5 # Still 5 connections + assert worker.alive is False # But shutting down + + worker.method_queue.close()