From 2d03d8e6a92d07539ad655be307a36c202551291 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Thu, 22 Jan 2026 09:54:04 +0100 Subject: [PATCH] tests: Add signal handling and liveness tests for gthread worker Add tests for: - Worker liveness reporting to arbiter via WorkerTmp - SIGTERM graceful shutdown behavior - SIGQUIT immediate shutdown behavior - Worker-arbiter integration (parent death detection, timeout) - Signal interaction edge cases (multiple signals, ordering) These tests ensure the gthread worker properly: - Calls notify() in the main loop for arbiter heartbeat - Handles SIGTERM by setting alive=False and waking the poller - Handles SIGQUIT by immediately shutting down the thread pool - Drains connections during graceful shutdown within timeout - Cleans up resources properly on exit --- tests/test_gthread.py | 464 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 464 insertions(+) diff --git a/tests/test_gthread.py b/tests/test_gthread.py index e095bb3b..b8dbea14 100644 --- a/tests/test_gthread.py +++ b/tests/test_gthread.py @@ -818,3 +818,467 @@ class TestConnectionState: # Timeout should be approximately 5 seconds in the future assert before + 4.9 <= conn.timeout <= after + 5.1 + + +class TestWorkerLiveness: + """Tests for worker liveness reporting to the arbiter.""" + + def create_worker(self): + """Create a worker for testing.""" + cfg = Config() + cfg.set('workers', 1) + cfg.set('threads', 4) + cfg.set('worker_connections', 1000) + + worker = gthread.ThreadWorker( + age=1, + ppid=os.getpid(), + sockets=[], + app=mock.Mock(), + timeout=30, + cfg=cfg, + log=mock.Mock(), + ) + return worker + + def test_notify_calls_tmp_notify(self): + """Test that worker.notify() calls tmp.notify() for arbiter monitoring.""" + worker = self.create_worker() + worker.tmp = mock.Mock() + + worker.notify() + + worker.tmp.notify.assert_called_once() + + def test_notify_updates_tmp_mtime(self): + """Test that notify updates the temp file mtime for arbiter heartbeat. + + WorkerTmp.notify() sets mtime using time.monotonic(), and the arbiter + checks liveness by comparing (time.monotonic() - last_update()) to timeout. + """ + from gunicorn.workers.workertmp import WorkerTmp + + cfg = Config() + tmp = WorkerTmp(cfg) + + # Call notify to set mtime to current monotonic time + tmp.notify() + + # The arbiter checks: time.monotonic() - last_update() <= timeout + # After notify(), this difference should be very small + diff = time.monotonic() - tmp.last_update() + assert diff < 1.0 # Should be nearly zero + + # Wait and verify the difference grows + time.sleep(0.1) + diff_later = time.monotonic() - tmp.last_update() + assert diff_later > diff # Time has passed + + tmp.close() + + def test_worker_notifies_in_run_loop(self): + """Test that worker calls notify() during the run loop.""" + worker = self.create_worker() + worker.tmp = mock.Mock() + worker.method_queue.init() + worker.poller = mock.Mock() + worker.tpool = mock.Mock() + worker.sockets = [] + worker.alive = True + + # Track notify calls + notify_calls = [] + original_notify = worker.notify + def tracking_notify(): + notify_calls.append(time.monotonic()) + original_notify() + worker.notify = tracking_notify + + # Mock poller.select to exit after first iteration + call_count = [0] + def mock_select(timeout): + call_count[0] += 1 + if call_count[0] > 1: + worker.alive = False + return [] + worker.poller.select.side_effect = mock_select + + # Mock is_parent_alive to return True + worker.is_parent_alive = mock.Mock(return_value=True) + + worker.run() + + # Worker should have called notify at least once + assert len(notify_calls) >= 1 + worker.method_queue.close() + + +class TestSignalHandling: + """Tests for signal handling in gthread worker.""" + + def create_worker(self): + """Create a worker for testing.""" + cfg = Config() + cfg.set('workers', 1) + cfg.set('threads', 4) + cfg.set('worker_connections', 1000) + cfg.set('graceful_timeout', 5) + + worker = gthread.ThreadWorker( + age=1, + ppid=os.getpid(), + sockets=[], + app=mock.Mock(), + timeout=30, + cfg=cfg, + log=mock.Mock(), + ) + return worker + + def test_handle_exit_sigterm_sets_alive_false(self): + """Test that SIGTERM handler sets alive=False for graceful shutdown.""" + worker = self.create_worker() + worker.method_queue.init() + worker.alive = True + + # Simulate SIGTERM + worker.handle_exit(None, None) + + assert worker.alive is False + worker.method_queue.close() + + def test_handle_exit_wakes_up_poller(self): + """Test that SIGTERM handler wakes up the poller via method_queue.""" + worker = self.create_worker() + worker.method_queue.init() + worker.alive = True + + # After handle_exit, the method_queue should have a callback queued + worker.handle_exit(None, None) + + # Check that something was written to the pipe (to wake poller) + # Read from the pipe - should have data + import select + readable, _, _ = select.select([worker.method_queue.fileno()], [], [], 0) + assert len(readable) > 0 + + worker.method_queue.close() + + def test_handle_quit_sigquit_immediate_shutdown(self): + """Test that SIGQUIT handler triggers immediate shutdown.""" + worker = self.create_worker() + worker.tpool = mock.Mock() + + with pytest.raises(SystemExit) as exc_info: + worker.handle_quit(None, None) + + assert exc_info.value.code == 0 + worker.tpool.shutdown.assert_called_once_with(wait=False) + + def test_graceful_shutdown_stops_accepting(self): + """Test that graceful shutdown stops accepting new connections.""" + worker = self.create_worker() + worker.method_queue.init() + worker.poller = mock.Mock() + worker.tpool = mock.Mock() + worker.sockets = [mock.Mock()] + worker._accepting = True + + # Start accepting + worker.set_accept_enabled(True) + + # Simulate SIGTERM + worker.handle_exit(None, None) + assert worker.alive is False + + # During run loop, accepting should be disabled + worker.set_accept_enabled(False) + assert worker._accepting is False + + worker.method_queue.close() + + def test_graceful_shutdown_drains_connections(self): + """Test that graceful shutdown waits for connections to drain.""" + worker = self.create_worker() + worker.method_queue.init() + worker.poller = mock.Mock() + worker.poller.select.return_value = [] + worker.tpool = mock.Mock() + worker.sockets = [] + worker.nr_conns = 1 # One active connection + worker.alive = True + + # Track iterations + iterations = [0] + def mock_select(timeout): + iterations[0] += 1 + if iterations[0] == 1: + # First iteration: trigger shutdown + worker.alive = False + elif iterations[0] == 2: + # Second iteration: during grace period + pass + elif iterations[0] >= 3: + # Connection finishes + worker.nr_conns = 0 + return [] + worker.poller.select.side_effect = mock_select + worker.is_parent_alive = mock.Mock(return_value=True) + + worker.run() + + # Should have waited for connections + assert iterations[0] >= 2 + worker.method_queue.close() + + def test_sigterm_does_not_interrupt_active_request(self): + """Test that SIGTERM doesn't immediately interrupt active requests.""" + import signal + + worker = self.create_worker() + worker.method_queue.init() + + # The base worker sets siginterrupt(SIGTERM, False) in init_signals + # This ensures system calls aren't interrupted by SIGTERM + + # Verify handle_exit just sets alive=False, doesn't raise + worker.alive = True + worker.handle_exit(signal.SIGTERM, None) + + assert worker.alive is False + # No exception raised, request can continue + worker.method_queue.close() + + +class TestWorkerArbiterIntegration: + """Integration tests for worker-arbiter communication.""" + + def create_worker(self): + """Create a worker for testing.""" + cfg = Config() + cfg.set('workers', 1) + cfg.set('threads', 4) + cfg.set('worker_connections', 1000) + cfg.set('graceful_timeout', 2) + + worker = gthread.ThreadWorker( + age=1, + ppid=os.getpid(), + sockets=[], + app=mock.Mock(), + timeout=30, + cfg=cfg, + log=mock.Mock(), + ) + return worker + + def test_worker_detects_parent_death(self): + """Test that worker detects when parent process dies.""" + worker = self.create_worker() + + # Valid ppid + worker.ppid = os.getppid() + assert worker.is_parent_alive() is True + + # Invalid ppid (simulating parent death) + worker.ppid = 99999999 + assert worker.is_parent_alive() is False + + def test_worker_exits_on_parent_death(self): + """Test that worker exits when parent dies.""" + worker = self.create_worker() + worker.method_queue.init() + worker.poller = mock.Mock() + worker.poller.select.return_value = [] + worker.tpool = mock.Mock() + worker.sockets = [] + worker.alive = True + worker.ppid = 99999999 # Invalid ppid + + iterations = [0] + def mock_select(timeout): + iterations[0] += 1 + return [] + worker.poller.select.side_effect = mock_select + + worker.run() + + # Should exit immediately due to parent check + assert iterations[0] == 1 + worker.method_queue.close() + + def test_worker_tmp_file_can_be_monitored(self): + """Test that worker tmp file can be used by arbiter for monitoring. + + The arbiter monitors workers by checking: time.monotonic() - last_update() <= timeout + """ + from gunicorn.workers.workertmp import WorkerTmp + + cfg = Config() + tmp = WorkerTmp(cfg) + + # Worker notifies - sets mtime to current monotonic time + tmp.notify() + + # Arbiter check: time.monotonic() - last_update() should be small + diff = time.monotonic() - tmp.last_update() + assert diff < 1.0 # Worker just notified, should be nearly zero + + # If worker stops notifying, the difference grows + time.sleep(0.1) + diff_later = time.monotonic() - tmp.last_update() + assert diff_later > diff # Arbiter would notice worker isn't responding + + tmp.close() + + def test_graceful_timeout_honored(self): + """Test that graceful_timeout is honored during shutdown.""" + worker = self.create_worker() + worker.cfg.set('graceful_timeout', 1) # 1 second for testing + worker.method_queue.init() + worker.poller = mock.Mock() + worker.tpool = mock.Mock() + worker.sockets = [] + worker.nr_conns = 1 # Active connection that won't finish + worker.alive = True + + # Track iterations + iterations = [0] + start_time = [None] + + def mock_select(timeout): + iterations[0] += 1 + if iterations[0] == 1: + # First iteration: trigger shutdown + worker.alive = False + start_time[0] = time.monotonic() + return [] + else: + # Grace period iterations - simulate time passing via select timeout + # The timeout should be the remaining time + if timeout > 0: + # Simulate some time passing + time.sleep(min(timeout, 0.2)) + # Connection never finishes (nr_conns stays 1) + return [] + worker.poller.select.side_effect = mock_select + worker.is_parent_alive = mock.Mock(return_value=True) + + worker.run() + + # Should have completed (grace timeout expired with connection still active) + assert iterations[0] >= 2 # At least one grace period iteration + + worker.method_queue.close() + + def test_run_completes_cleanup(self): + """Test that run() properly cleans up resources on exit.""" + worker = self.create_worker() + worker.method_queue.init() + worker.poller = selectors.DefaultSelector() + worker.tpool = futures.ThreadPoolExecutor(max_workers=2) + worker.sockets = [] + worker.alive = False # Immediately exit + + worker.is_parent_alive = mock.Mock(return_value=True) + + # Don't pre-register method_queue - run() will do it + worker.run() + + # All resources should be cleaned up + # (No assertion needed - if run() completes without error, cleanup worked) + + +class TestSignalInteraction: + """Tests for signal interactions and edge cases.""" + + def create_worker(self): + """Create a worker for testing.""" + cfg = Config() + cfg.set('workers', 1) + cfg.set('threads', 4) + cfg.set('worker_connections', 1000) + + worker = gthread.ThreadWorker( + age=1, + ppid=os.getpid(), + sockets=[], + app=mock.Mock(), + timeout=30, + cfg=cfg, + log=mock.Mock(), + ) + return worker + + def test_multiple_sigterm_is_safe(self): + """Test that receiving multiple SIGTERM is safe.""" + worker = self.create_worker() + worker.method_queue.init() + worker.alive = True + + # Multiple SIGTERM calls should be idempotent + worker.handle_exit(None, None) + assert worker.alive is False + + worker.handle_exit(None, None) + assert worker.alive is False + + worker.method_queue.close() + + def test_sigterm_then_sigquit(self): + """Test SIGQUIT after SIGTERM for force kill.""" + worker = self.create_worker() + worker.method_queue.init() + worker.tpool = mock.Mock() + worker.alive = True + + # First SIGTERM for graceful + worker.handle_exit(None, None) + assert worker.alive is False + + # Then SIGQUIT for immediate + with pytest.raises(SystemExit): + worker.handle_quit(None, None) + + worker.tpool.shutdown.assert_called_once_with(wait=False) + worker.method_queue.close() + + def test_sigquit_does_not_wait_for_threads(self): + """Test that SIGQUIT calls tpool.shutdown(wait=False).""" + worker = self.create_worker() + worker.tpool = mock.Mock() + + with pytest.raises(SystemExit): + worker.handle_quit(None, None) + + # Verify wait=False was passed + worker.tpool.shutdown.assert_called_once_with(wait=False) + + def test_handle_exit_when_already_dead(self): + """Test handle_exit when worker is already shutting down.""" + worker = self.create_worker() + worker.method_queue.init() + worker.alive = False + + # Should not raise, should be idempotent + worker.handle_exit(None, None) + assert worker.alive is False + + worker.method_queue.close() + + def test_connections_tracked_during_signal(self): + """Test that connection count is correct during signal handling.""" + worker = self.create_worker() + worker.method_queue.init() + worker.poller = mock.Mock() + worker.tpool = mock.Mock() + worker.nr_conns = 5 + worker.alive = True + + # SIGTERM should not affect connection count + worker.handle_exit(None, None) + + assert worker.nr_conns == 5 # Still 5 connections + assert worker.alive is False # But shutting down + + worker.method_queue.close()