Arbiter signal handling improvements (#3441)

* tests: Add tests for current signal handling behavior

Add tests for arbiter signal handling:
- TestSignalHandlerRegistration (4 tests): Verify signal handler
  registration, pipe creation, SIGCHLD separate handler, and
  expected signals list
- TestSignalQueue (4 tests): Test signal queueing, max queue size,
  wakeup writes to pipe, and sleep returns on pipe data
- TestReapWorkers (6 tests): Test worker reaping for normal exit,
  error exit codes, WORKER_BOOT_ERROR, APP_LOAD_ERROR, signal
  termination, and SIGKILL OOM hint

These tests establish baseline coverage before refactoring the
signal handling code for safety and reliability improvements.

* tests: Add tests for SIGHUP reload and worker lifecycle

Add tests for reload and worker management:
- TestSighupReload (3 tests): Verify reload spawns configured number
  of workers, calls manage_workers, and logs hang up message
- TestWorkerLifecycle (4 tests): Test spawn_worker adds to WORKERS
  dict, kill_worker sends correct signal, murder_workers sends
  SIGABRT first then SIGKILL on subsequent timeout

* arbiter: Fix waitpid status parsing using POSIX macros

Use os.WIFEXITED/WEXITSTATUS and os.WIFSIGNALED/WTERMSIG instead
of manual bit shifting for waitpid status interpretation. This
correctly distinguishes between normal exits and signal termination.

The previous code used 'status >> 8' which only worked for normal
exits, and used raw status values for signal detection which was
incorrect.

Fixes part of #3435 and #3056 (signal name display issues)

* arbiter: Change SIGTERM log level to warning

Log signal termination at warning level for expected signals
(SIGTERM, SIGQUIT) since these typically occur during normal
graceful shutdown. SIGKILL remains at error level with the
OOM hint since it indicates abnormal termination.

Fixes #3311, #3050 (SIGTERM logged as error)

* arbiter: Remove logging from SIGCHLD signal handler

Move reap_workers() call from signal handler context to main loop.
The signal handler (now signal_chld) only queues the signal and
wakes up the main loop. The actual reap_workers() is called from
handle_chld() in the main loop where logging is safe.

This fixes potential deadlocks caused by logging from signal
handler context when holding the logging lock.

Fixes #3198, #3004 (logging in signal handlers unsafe, deadlock)

* arbiter: Replace PIPE+select with queue.SimpleQueue

Use queue.SimpleQueue for signal handling instead of PIPE+select.
SimpleQueue is reentrant-safe and can be used from signal handlers.

Changes:
- Remove PIPE-based wakeup mechanism
- Add SIG_QUEUE as SimpleQueue instance
- Add WAKEUP_REQUEST sentinel for non-signal wakeups
- Replace sleep() with wait_for_signals() using queue.get()
- Simplify signal handler to just put_nowait()
- Update main loop to iterate over wait_for_signals()
- Add reap_workers() call in stop() to properly clean up workers
  since SIGCHLD is no longer processed during shutdown

This simplifies the code and removes the dependency on select().

Also adds integration tests for signal handling that verify:
- Basic request/response
- Graceful shutdown with SIGTERM/SIGINT
- SIGHUP reload
- Multiple concurrent requests

* arbiter: Wait for old workers on SIGHUP reload

After spawning new workers during reload, wait for old workers to
terminate before returning from reload(). This prevents the issue
where old workers could receive double SIGTERM - once from
manage_workers() and again from the arbiter loop.

The reload now tracks worker_age before spawning, then waits up to
graceful_timeout for workers older than that age to exit.

Fixes #3312, #3274 (SIGHUP can send double SIGTERM)

* arbiter: Log SIGCHLD at debug level

SIGCHLD is received frequently (whenever a worker exits) and doesn't
need to be logged at info level. Log it at debug level to reduce
noise in the logs while still making it available for debugging.

* tests: Fix lint warnings in test_arbiter.py
This commit is contained in:
Benoit Chesneau 2026-01-22 11:56:23 +01:00 committed by GitHub
parent 7af8cccb4a
commit b650332c70
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 666 additions and 84 deletions

View File

@ -3,8 +3,8 @@
# See the NOTICE for more information.
import errno
import os
import queue
import random
import select
import signal
import sys
import time
@ -37,10 +37,10 @@ class Arbiter:
LISTENERS = []
WORKERS = {}
PIPE = []
# I love dynamic languages
SIG_QUEUE = []
# Sentinel value for non-signal wakeups
WAKEUP_REQUEST = signal.NSIG
SIGNALS = [getattr(signal, "SIG%s" % x)
for x in "HUP QUIT INT TERM TTIN TTOU USR1 USR2 WINCH".split()]
SIG_NAMES = dict(
@ -55,6 +55,9 @@ class Arbiter:
self._last_logged_active_worker_count = None
self.log = None
# Signal queue - SimpleQueue is reentrant-safe for signal handlers
self.SIG_QUEUE = queue.SimpleQueue()
self.setup(app)
self.pidfile = None
@ -172,27 +175,16 @@ class Arbiter:
Initialize master signal handling. Most of the signals
are queued. Child signals only wake up the master.
"""
# close old PIPE
for p in self.PIPE:
os.close(p)
# initialize the pipe
self.PIPE = pair = os.pipe()
for p in pair:
util.set_non_blocking(p)
util.close_on_exec(p)
self.log.close_on_exec()
# initialize all signals
for s in self.SIGNALS:
signal.signal(s, self.signal)
signal.signal(signal.SIGCHLD, self.handle_chld)
signal.signal(signal.SIGCHLD, self.signal_chld)
def signal(self, sig, frame):
if len(self.SIG_QUEUE) < 5:
self.SIG_QUEUE.append(sig)
self.wakeup()
"""Signal handler - NO LOGGING, just queue the signal."""
self.SIG_QUEUE.put_nowait(sig)
def run(self):
"Main master loop."
@ -205,25 +197,24 @@ class Arbiter:
while True:
self.maybe_promote_master()
sig = self.SIG_QUEUE.pop(0) if self.SIG_QUEUE else None
if sig is None:
self.sleep()
self.murder_workers()
self.manage_workers()
continue
# Wait for and process signals
for sig in self.wait_for_signals(timeout=1.0):
if sig not in self.SIG_NAMES:
self.log.info("Ignoring unknown signal: %s", sig)
continue
if sig not in self.SIG_NAMES:
self.log.info("Ignoring unknown signal: %s", sig)
continue
signame = self.SIG_NAMES.get(sig)
handler = getattr(self, "handle_%s" % signame, None)
if not handler:
self.log.error("Unhandled signal: %s", signame)
continue
# Log SIGCHLD at debug level since it's frequent
log_level = self.log.debug if sig == signal.SIGCHLD else self.log.info
log_level("Handling signal: %s", signame)
handler()
signame = self.SIG_NAMES.get(sig)
handler = getattr(self, "handle_%s" % signame, None)
if not handler:
self.log.error("Unhandled signal: %s", signame)
continue
self.log.info("Handling signal: %s", signame)
handler()
self.wakeup()
self.murder_workers()
self.manage_workers()
except (StopIteration, KeyboardInterrupt):
self.halt()
except HaltServer as inst:
@ -238,10 +229,13 @@ class Arbiter:
self.pidfile.unlink()
sys.exit(-1)
def handle_chld(self, sig, frame):
"SIGCHLD handling"
def signal_chld(self, sig, frame):
"""SIGCHLD signal handler - NO LOGGING, just queue the signal."""
self.SIG_QUEUE.put_nowait(sig)
def handle_chld(self):
"""SIGCHLD handling - called from main loop, safe to log."""
self.reap_workers()
self.wakeup()
def handle_hup(self):
"""\
@ -329,14 +323,8 @@ class Arbiter:
util._setproctitle("master [%s]" % self.proc_name)
def wakeup(self):
"""\
Wake up the arbiter by writing to the PIPE
"""
try:
os.write(self.PIPE[1], b'.')
except OSError as e:
if e.errno not in [errno.EAGAIN, errno.EINTR]:
raise
"""Wake up the arbiter's main loop."""
self.SIG_QUEUE.put_nowait(self.WAKEUP_REQUEST)
def halt(self, reason=None, exit_status=0):
""" halt arbiter """
@ -352,24 +340,30 @@ class Arbiter:
self.cfg.on_exit(self)
sys.exit(exit_status)
def sleep(self):
def wait_for_signals(self, timeout=1.0):
"""\
Sleep until PIPE is readable or we timeout.
A readable PIPE means a signal occurred.
Wait for signals with timeout.
Returns a list of signals that were received.
"""
signals = []
try:
ready = select.select([self.PIPE[0]], [], [], 1.0)
if not ready[0]:
return
while os.read(self.PIPE[0], 1):
pass
except OSError as e:
# TODO: select.error is a subclass of OSError since Python 3.3.
error_number = getattr(e, 'errno', e.args[0])
if error_number not in [errno.EAGAIN, errno.EINTR]:
raise
# Block until we get a signal or timeout
sig = self.SIG_QUEUE.get(block=True, timeout=timeout)
if sig != self.WAKEUP_REQUEST:
signals.append(sig)
# Drain any additional queued signals
while True:
try:
sig = self.SIG_QUEUE.get_nowait()
if sig != self.WAKEUP_REQUEST:
signals.append(sig)
except queue.Empty:
break
except queue.Empty:
pass
except KeyboardInterrupt:
sys.exit()
return signals
def stop(self, graceful=True):
"""\
@ -394,9 +388,12 @@ class Arbiter:
self.kill_workers(sig)
# wait until the graceful timeout
while self.WORKERS and time.time() < limit:
self.reap_workers()
time.sleep(0.1)
self.kill_workers(signal.SIGKILL)
# Final reap to clean up any remaining zombies
self.reap_workers()
def reexec(self):
"""\
@ -480,13 +477,28 @@ class Arbiter:
# set new proc_name
util._setproctitle("master [%s]" % self.proc_name)
# Remember current worker age before spawning new workers
last_worker_age = self.worker_age
# spawn new workers
for _ in range(self.cfg.workers):
self.spawn_worker()
# manage workers
# manage workers - this will kill old workers beyond num_workers
self.manage_workers()
# wait for old workers to terminate to prevent double SIGTERM
deadline = time.monotonic() + self.cfg.graceful_timeout
while time.monotonic() < deadline:
if not self.WORKERS:
break
# Check if all remaining workers are newer than last_worker_age
oldest = min(w.age for w in self.WORKERS.values())
if oldest > last_worker_age:
break
self.reap_workers()
time.sleep(0.1)
def murder_workers(self):
"""\
Kill unused/idle workers
@ -523,9 +535,30 @@ class Arbiter:
# A worker was terminated. If the termination reason was
# that it could not boot, we'll shut it down to avoid
# infinite start/stop cycles.
exitcode = status >> 8
if exitcode != 0:
self.log.error('Worker (pid:%s) exited with code %s', wpid, exitcode)
exitcode = None
if os.WIFEXITED(status):
exitcode = os.WEXITSTATUS(status)
elif os.WIFSIGNALED(status):
sig = os.WTERMSIG(status)
try:
sig_name = signal.Signals(sig).name
except ValueError:
sig_name = "signal {}".format(sig)
msg = "Worker (pid:{}) was sent {}!".format(
wpid, sig_name)
# SIGKILL suggests OOM, log as error
if sig == signal.SIGKILL:
msg += " Perhaps out of memory?"
self.log.error(msg)
else:
# SIGTERM/SIGQUIT are expected during shutdown
self.log.warning(msg)
if exitcode is not None and exitcode != 0:
self.log.error("Worker (pid:%s) exited with code %s.",
wpid, exitcode)
if exitcode == self.WORKER_BOOT_ERROR:
reason = "Worker failed to boot."
raise HaltServer(reason, self.WORKER_BOOT_ERROR)
@ -533,27 +566,6 @@ class Arbiter:
reason = "App failed to load."
raise HaltServer(reason, self.APP_LOAD_ERROR)
if exitcode > 0:
# If the exit code of the worker is greater than 0,
# let the user know.
self.log.error("Worker (pid:%s) exited with code %s.",
wpid, exitcode)
elif status > 0:
# If the exit code of the worker is 0 and the status
# is greater than 0, then it was most likely killed
# via a signal.
try:
sig_name = signal.Signals(status).name
except ValueError:
sig_name = "code {}".format(status)
msg = "Worker (pid:{}) was sent {}!".format(
wpid, sig_name)
# Additional hint for SIGKILL
if status == signal.SIGKILL:
msg += " Perhaps out of memory?"
self.log.error(msg)
worker = self.WORKERS.pop(wpid, None)
if not worker:
continue

View File

@ -3,10 +3,14 @@
# See the NOTICE for more information.
import os
import signal
from unittest import mock
import pytest
import gunicorn.app.base
import gunicorn.arbiter
import gunicorn.errors
from gunicorn.config import ReusePort
@ -185,3 +189,366 @@ def test_env_vars_available_during_preload():
# Note that we aren't making any assertions here, they are made in the
# dummy application object being loaded here instead.
gunicorn.arbiter.Arbiter(PreloadedAppWithEnvSettings())
# ============================================================================
# Signal Handler Registration Tests
# ============================================================================
class TestSignalHandlerRegistration:
"""Tests for signal handler registration during arbiter initialization."""
def test_init_signals_registers_all_signals(self):
"""Verify that init_signals registers handlers for all expected signals."""
arbiter = gunicorn.arbiter.Arbiter(DummyApplication())
with mock.patch('signal.signal') as mock_signal:
arbiter.init_signals()
# Verify all expected signals are registered
registered_signals = {call[0][0] for call in mock_signal.call_args_list}
expected_signals = set(arbiter.SIGNALS)
expected_signals.add(signal.SIGCHLD)
assert expected_signals.issubset(registered_signals), \
f"Missing signals: {expected_signals - registered_signals}"
def test_init_signals_creates_queue(self):
"""Verify that arbiter has a SimpleQueue for signals."""
arbiter = gunicorn.arbiter.Arbiter(DummyApplication())
# Verify SimpleQueue was created
import queue
assert isinstance(arbiter.SIG_QUEUE, queue.SimpleQueue)
def test_sigchld_has_separate_handler(self):
"""Verify that SIGCHLD uses a separate signal handler from other signals."""
arbiter = gunicorn.arbiter.Arbiter(DummyApplication())
with mock.patch('signal.signal') as mock_signal:
arbiter.init_signals()
# Find the handler for SIGCHLD - uses signal_chld for async-signal-safety
sigchld_calls = [c for c in mock_signal.call_args_list
if c[0][0] == signal.SIGCHLD]
assert len(sigchld_calls) == 1
assert sigchld_calls[0][0][1] == arbiter.signal_chld
# Find handlers for other signals
other_calls = [c for c in mock_signal.call_args_list
if c[0][0] in arbiter.SIGNALS]
for call in other_calls:
assert call[0][1] == arbiter.signal
def test_signals_list_contains_expected(self):
"""Verify that SIGNALS list contains all expected signal types."""
arbiter = gunicorn.arbiter.Arbiter(DummyApplication())
expected = ['HUP', 'QUIT', 'INT', 'TERM', 'TTIN', 'TTOU',
'USR1', 'USR2', 'WINCH']
for name in expected:
sig = getattr(signal, f'SIG{name}')
assert sig in arbiter.SIGNALS, f"SIG{name} not in SIGNALS list"
# ============================================================================
# Signal Queue Tests
# ============================================================================
class TestSignalQueue:
"""Tests for signal queueing and wakeup mechanism using SimpleQueue."""
def test_signal_queued_on_receipt(self):
"""Verify that signals are queued when the signal handler is called."""
arbiter = gunicorn.arbiter.Arbiter(DummyApplication())
arbiter.signal(signal.SIGHUP, None)
# Get the signal from the queue
sig = arbiter.SIG_QUEUE.get_nowait()
assert sig == signal.SIGHUP
def test_multiple_signals_queued(self):
"""Verify that multiple signals can be queued."""
arbiter = gunicorn.arbiter.Arbiter(DummyApplication())
# Queue multiple signals
arbiter.signal(signal.SIGHUP, None)
arbiter.signal(signal.SIGTERM, None)
arbiter.signal_chld(signal.SIGCHLD, None)
signals = []
while True:
try:
signals.append(arbiter.SIG_QUEUE.get_nowait())
except Exception:
break
assert signal.SIGHUP in signals
assert signal.SIGTERM in signals
assert signal.SIGCHLD in signals
def test_wakeup_puts_sentinel(self):
"""Verify that wakeup puts the WAKEUP_REQUEST sentinel to the queue."""
arbiter = gunicorn.arbiter.Arbiter(DummyApplication())
arbiter.wakeup()
sig = arbiter.SIG_QUEUE.get_nowait()
assert sig == arbiter.WAKEUP_REQUEST
def test_wait_for_signals_returns_signals(self):
"""Verify that wait_for_signals returns queued signals."""
arbiter = gunicorn.arbiter.Arbiter(DummyApplication())
# Queue some signals
arbiter.SIG_QUEUE.put_nowait(signal.SIGHUP)
arbiter.SIG_QUEUE.put_nowait(signal.SIGTERM)
signals = arbiter.wait_for_signals(timeout=0.1)
assert signal.SIGHUP in signals
assert signal.SIGTERM in signals
def test_wait_for_signals_filters_wakeup_request(self):
"""Verify that WAKEUP_REQUEST sentinel is filtered from results."""
arbiter = gunicorn.arbiter.Arbiter(DummyApplication())
# Queue a wakeup request and a real signal
arbiter.SIG_QUEUE.put_nowait(arbiter.WAKEUP_REQUEST)
arbiter.SIG_QUEUE.put_nowait(signal.SIGHUP)
signals = arbiter.wait_for_signals(timeout=0.1)
assert arbiter.WAKEUP_REQUEST not in signals
assert signal.SIGHUP in signals
# ============================================================================
# Reap Workers Tests
# ============================================================================
class TestReapWorkers:
"""Tests for worker reaping and exit status handling."""
@mock.patch('os.waitpid')
def test_reap_normal_exit(self, mock_waitpid):
"""Verify that a worker with normal exit (code 0) is properly reaped."""
mock_waitpid.side_effect = [(42, 0), (0, 0)]
arbiter = gunicorn.arbiter.Arbiter(DummyApplication())
arbiter.cfg.settings['child_exit'] = mock.Mock()
mock_worker = mock.Mock()
arbiter.WORKERS = {42: mock_worker}
arbiter.reap_workers()
mock_worker.tmp.close.assert_called_once()
arbiter.cfg.child_exit.assert_called_once_with(arbiter, mock_worker)
assert 42 not in arbiter.WORKERS
@mock.patch('os.waitpid')
def test_reap_exit_with_error_code(self, mock_waitpid):
"""Verify that a worker exiting with non-zero code is logged."""
# Exit code 1 (status = 1 << 8 = 256)
mock_waitpid.side_effect = [(42, 256), (0, 0)]
arbiter = gunicorn.arbiter.Arbiter(DummyApplication())
arbiter.cfg.settings['child_exit'] = mock.Mock()
mock_worker = mock.Mock()
arbiter.WORKERS = {42: mock_worker}
with mock.patch.object(arbiter.log, 'error') as mock_log:
arbiter.reap_workers()
# Should log the error exit
assert any('exited with code' in str(call) for call in mock_log.call_args_list)
@mock.patch('os.waitpid')
def test_reap_worker_boot_error(self, mock_waitpid):
"""Verify that WORKER_BOOT_ERROR causes HaltServer."""
# Exit code 3 (WORKER_BOOT_ERROR) = status 3 << 8 = 768
mock_waitpid.side_effect = [(42, 768), (0, 0)]
arbiter = gunicorn.arbiter.Arbiter(DummyApplication())
arbiter.cfg.settings['child_exit'] = mock.Mock()
mock_worker = mock.Mock()
arbiter.WORKERS = {42: mock_worker}
with pytest.raises(gunicorn.errors.HaltServer) as exc_info:
arbiter.reap_workers()
assert exc_info.value.exit_status == gunicorn.arbiter.Arbiter.WORKER_BOOT_ERROR
@mock.patch('os.waitpid')
def test_reap_app_load_error(self, mock_waitpid):
"""Verify that APP_LOAD_ERROR causes HaltServer."""
# Exit code 4 (APP_LOAD_ERROR) = status 4 << 8 = 1024
mock_waitpid.side_effect = [(42, 1024), (0, 0)]
arbiter = gunicorn.arbiter.Arbiter(DummyApplication())
arbiter.cfg.settings['child_exit'] = mock.Mock()
mock_worker = mock.Mock()
arbiter.WORKERS = {42: mock_worker}
with pytest.raises(gunicorn.errors.HaltServer) as exc_info:
arbiter.reap_workers()
assert exc_info.value.exit_status == gunicorn.arbiter.Arbiter.APP_LOAD_ERROR
@mock.patch('os.waitpid')
def test_reap_killed_by_signal(self, mock_waitpid):
"""Verify that a worker killed by signal is properly identified."""
# Status for SIGTERM (15) killed process
mock_waitpid.side_effect = [(42, signal.SIGTERM), (0, 0)]
arbiter = gunicorn.arbiter.Arbiter(DummyApplication())
arbiter.cfg.settings['child_exit'] = mock.Mock()
mock_worker = mock.Mock()
arbiter.WORKERS = {42: mock_worker}
# SIGTERM should be logged as warning (not error)
with mock.patch.object(arbiter.log, 'warning') as mock_log:
arbiter.reap_workers()
# Should log the signal
assert any('SIGTERM' in str(call) for call in mock_log.call_args_list)
@mock.patch('os.waitpid')
def test_reap_killed_by_sigkill_oom_hint(self, mock_waitpid):
"""Verify that SIGKILL adds OOM hint to log message."""
# Status for SIGKILL (9) killed process
mock_waitpid.side_effect = [(42, signal.SIGKILL), (0, 0)]
arbiter = gunicorn.arbiter.Arbiter(DummyApplication())
arbiter.cfg.settings['child_exit'] = mock.Mock()
mock_worker = mock.Mock()
arbiter.WORKERS = {42: mock_worker}
with mock.patch.object(arbiter.log, 'error') as mock_log:
arbiter.reap_workers()
# Should include OOM hint
log_messages = ' '.join(str(call) for call in mock_log.call_args_list)
assert 'out of memory' in log_messages.lower()
# ============================================================================
# SIGHUP Reload Tests
# ============================================================================
class TestSighupReload:
"""Tests for SIGHUP (reload) handling."""
@mock.patch('gunicorn.arbiter.Arbiter.spawn_worker')
@mock.patch('gunicorn.arbiter.Arbiter.manage_workers')
def test_reload_spawns_new_workers(self, mock_manage, mock_spawn):
"""Verify that reload spawns the configured number of workers."""
arbiter = gunicorn.arbiter.Arbiter(DummyApplication())
arbiter.cfg.set('workers', 3)
arbiter.LISTENERS = [mock.Mock()]
arbiter.pidfile = None
# Mock app.reload to prevent it from resetting config
arbiter.app.reload = mock.Mock()
# Mock setup to prevent it from resetting num_workers
arbiter.setup = mock.Mock()
arbiter.reload()
assert mock_spawn.call_count == 3
@mock.patch('gunicorn.arbiter.Arbiter.spawn_worker')
@mock.patch('gunicorn.arbiter.Arbiter.manage_workers')
def test_reload_calls_manage_workers(self, mock_manage, mock_spawn):
"""Verify that reload calls manage_workers after spawning."""
arbiter = gunicorn.arbiter.Arbiter(DummyApplication())
arbiter.cfg.set('workers', 1)
arbiter.LISTENERS = [mock.Mock()]
arbiter.pidfile = None
arbiter.reload()
mock_manage.assert_called_once()
@mock.patch('gunicorn.arbiter.Arbiter.spawn_worker')
@mock.patch('gunicorn.arbiter.Arbiter.manage_workers')
def test_reload_logs_hang_up(self, mock_manage, mock_spawn):
"""Verify that handle_hup logs the hang up message."""
arbiter = gunicorn.arbiter.Arbiter(DummyApplication())
arbiter.LISTENERS = [mock.Mock()]
arbiter.pidfile = None
with mock.patch.object(arbiter.log, 'info') as mock_log:
arbiter.handle_hup()
# Check that "Hang up" was logged
assert any('Hang up' in str(call) for call in mock_log.call_args_list)
# ============================================================================
# Worker Lifecycle Tests
# ============================================================================
class TestWorkerLifecycle:
"""Tests for worker spawning, killing, and lifecycle management."""
@mock.patch('os.fork')
def test_spawn_worker_adds_to_workers_dict(self, mock_fork):
"""Verify that spawn_worker adds the worker to WORKERS dict."""
mock_fork.return_value = 12345 # Non-zero = parent process
arbiter = gunicorn.arbiter.Arbiter(DummyApplication())
arbiter.WORKERS = {}
arbiter.pid = os.getpid()
arbiter.LISTENERS = []
pid = arbiter.spawn_worker()
assert pid == 12345
assert 12345 in arbiter.WORKERS
assert arbiter.WORKERS[12345].age == arbiter.worker_age
def test_kill_worker_sends_signal(self):
"""Verify that kill_worker sends the specified signal."""
arbiter = gunicorn.arbiter.Arbiter(DummyApplication())
mock_worker = mock.Mock()
arbiter.WORKERS = {42: mock_worker}
with mock.patch('os.kill') as mock_kill:
arbiter.kill_worker(42, signal.SIGTERM)
mock_kill.assert_called_once_with(42, signal.SIGTERM)
def test_murder_workers_sends_sigabrt_first(self):
"""Verify that murder_workers sends SIGABRT on first timeout."""
arbiter = gunicorn.arbiter.Arbiter(DummyApplication())
arbiter.timeout = 30
mock_worker = mock.Mock()
mock_worker.aborted = False
# Simulate timeout by returning a very old update time
mock_worker.tmp.last_update.return_value = 0
arbiter.WORKERS = {42: mock_worker}
with mock.patch('time.monotonic', return_value=100), \
mock.patch.object(arbiter, 'kill_worker') as mock_kill:
arbiter.murder_workers()
mock_kill.assert_called_once_with(42, signal.SIGABRT)
assert mock_worker.aborted is True
def test_murder_workers_sends_sigkill_second(self):
"""Verify that murder_workers sends SIGKILL on second timeout."""
arbiter = gunicorn.arbiter.Arbiter(DummyApplication())
arbiter.timeout = 30
mock_worker = mock.Mock()
mock_worker.aborted = True # Already aborted once
mock_worker.tmp.last_update.return_value = 0
arbiter.WORKERS = {42: mock_worker}
with mock.patch('time.monotonic', return_value=100), \
mock.patch.object(arbiter, 'kill_worker') as mock_kill:
arbiter.murder_workers()
mock_kill.assert_called_once_with(42, signal.SIGKILL)

View File

@ -0,0 +1,203 @@
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
"""
Integration tests for arbiter signal handling.
These tests start a real gunicorn process and verify signal handling
works correctly with actual requests and signals.
"""
import os
import signal
import socket
import subprocess
import sys
import time
import pytest
# Simple WSGI app inline
SIMPLE_APP = '''
def application(environ, start_response):
"""Basic hello world response."""
status = '200 OK'
body = b'Hello, World!'
headers = [
('Content-Type', 'text/plain'),
('Content-Length', str(len(body))),
]
start_response(status, headers)
return [body]
'''
def find_free_port():
"""Find a free port to bind to."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('127.0.0.1', 0))
return s.getsockname()[1]
def wait_for_server(host, port, timeout=10):
"""Wait until server is accepting connections."""
start = time.monotonic()
while time.monotonic() - start < timeout:
try:
with socket.create_connection((host, port), timeout=1):
return True
except (ConnectionRefusedError, socket.timeout, OSError):
time.sleep(0.1)
return False
def make_request(host, port, path='/'):
"""Make a simple HTTP request and return the response body."""
with socket.create_connection((host, port), timeout=5) as sock:
request = f'GET {path} HTTP/1.1\r\nHost: {host}\r\nConnection: close\r\n\r\n'
sock.sendall(request.encode())
response = b''
while True:
chunk = sock.recv(4096)
if not chunk:
break
response += chunk
return response
@pytest.fixture
def app_module(tmp_path):
"""Create a temporary app module."""
app_file = tmp_path / "app.py"
app_file.write_text(SIMPLE_APP)
return str(app_file.parent), "app:application"
@pytest.fixture
def gunicorn_server(app_module):
"""Start and stop a gunicorn server."""
app_dir, app_name = app_module
port = find_free_port()
# Start gunicorn
cmd = [
sys.executable, '-m', 'gunicorn',
'--bind', f'127.0.0.1:{port}',
'--workers', '2',
'--worker-class', 'sync',
'--access-logfile', '-',
'--error-logfile', '-',
'--log-level', 'info',
app_name
]
proc = subprocess.Popen(
cmd,
cwd=app_dir,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env={**os.environ, 'PYTHONPATH': app_dir}
)
# Wait for server to start
if not wait_for_server('127.0.0.1', port):
proc.terminate()
proc.wait()
stdout, stderr = proc.communicate()
pytest.fail(f"Gunicorn failed to start:\nstdout: {stdout.decode()}\nstderr: {stderr.decode()}")
yield proc, port
# Cleanup
if proc.poll() is None:
proc.terminate()
try:
proc.wait(timeout=5)
except subprocess.TimeoutExpired:
proc.kill()
proc.wait()
class TestSignalHandlingIntegration:
"""Integration tests for signal handling."""
def test_basic_request(self, gunicorn_server):
"""Verify the server responds to basic requests."""
proc, port = gunicorn_server
response = make_request('127.0.0.1', port)
assert b'Hello, World!' in response
def test_graceful_shutdown_sigterm(self, gunicorn_server):
"""Verify SIGTERM causes graceful shutdown."""
proc, port = gunicorn_server
# Verify server is working
response = make_request('127.0.0.1', port)
assert b'Hello, World!' in response
# Send SIGTERM
proc.send_signal(signal.SIGTERM)
# Wait for process to exit
try:
exit_code = proc.wait(timeout=10)
assert exit_code == 0, f"Expected exit code 0, got {exit_code}"
except subprocess.TimeoutExpired:
proc.kill()
pytest.fail("Gunicorn did not exit within timeout after SIGTERM")
def test_graceful_shutdown_sigint(self, gunicorn_server):
"""Verify SIGINT causes graceful shutdown."""
proc, port = gunicorn_server
# Verify server is working
response = make_request('127.0.0.1', port)
assert b'Hello, World!' in response
# Send SIGINT
proc.send_signal(signal.SIGINT)
# Wait for process to exit
try:
exit_code = proc.wait(timeout=10)
assert exit_code == 0, f"Expected exit code 0, got {exit_code}"
except subprocess.TimeoutExpired:
proc.kill()
pytest.fail("Gunicorn did not exit within timeout after SIGINT")
def test_sighup_reload(self, gunicorn_server):
"""Verify SIGHUP triggers reload."""
proc, port = gunicorn_server
# Verify server is working
response = make_request('127.0.0.1', port)
assert b'Hello, World!' in response
# Send SIGHUP
proc.send_signal(signal.SIGHUP)
# Wait a moment for reload
time.sleep(2)
# Verify server still works after reload
assert proc.poll() is None, "Server died after SIGHUP"
response = make_request('127.0.0.1', port)
assert b'Hello, World!' in response
def test_multiple_requests_under_load(self, gunicorn_server):
"""Verify server handles multiple concurrent requests."""
proc, port = gunicorn_server
# Make several requests in sequence
for _ in range(10):
response = make_request('127.0.0.1', port)
assert b'Hello, World!' in response
# Verify server is still running
assert proc.poll() is None
if __name__ == '__main__':
pytest.main([__file__, '-v'])