fix(ctl): prevent fork deadlock in control socket server

- Use os.register_at_fork() to properly handle fork() with asyncio
- Start control server after initial workers spawn, not before
- Change default socket path to /run/gunicorn.ctl (like BIRD)
- Add integration tests for sync, gthread, and gevent workers

Fixes #3509
This commit is contained in:
Benoit Chesneau 2026-02-26 20:54:35 +01:00
parent d3f80e8cfd
commit 089ad45818
4 changed files with 500 additions and 12 deletions

View File

@ -193,8 +193,8 @@ class Arbiter:
if self.cfg.dirty_workers > 0 and self.cfg.dirty_apps:
self.spawn_dirty_arbiter()
# Start control socket server
self._start_control_server()
# Note: control socket server is started after initial workers spawn
# to avoid fork deadlocks with asyncio
self.cfg.when_ready(self)
@ -222,6 +222,10 @@ class Arbiter:
try:
self.manage_workers()
# Start control socket server after initial workers are spawned
# to avoid fork deadlocks with asyncio
self._start_control_server()
while True:
self.maybe_promote_master()
@ -687,17 +691,11 @@ class Arbiter:
self.cfg, self.log)
self.cfg.pre_fork(self, worker)
# Stop control server before fork to prevent deadlocks.
# The asyncio thread holds locks that would be stuck in the child.
self._stop_control_server()
pid = os.fork()
if pid != 0:
worker.pid = pid
self.WORKERS[pid] = worker
self._stats['workers_spawned'] += 1
# Restart control server in parent after fork
self._start_control_server()
return pid
# Do not inherit the temporary files of other workers

View File

@ -3123,7 +3123,7 @@ class ControlSocket(Setting):
cli = ["--control-socket"]
meta = "PATH"
validator = validate_string
default = "/tmp/gunicorn.ctl"
default = "/run/gunicorn.ctl"
desc = """\
Unix socket path for control interface.
@ -3131,8 +3131,9 @@ class ControlSocket(Setting):
``gunicornc`` command-line tool. Commands include viewing worker
status, adjusting worker count, and graceful reload/shutdown.
By default, creates ``/tmp/gunicorn.ctl``. Set an absolute path for a
different location (e.g., ``/var/run/gunicorn.ctl``).
By default, creates ``/run/gunicorn.ctl`` (requires write access to
``/run``). For user-level deployments, specify a different path such
as ``/tmp/gunicorn.ctl`` or ``~/.gunicorn.ctl``.
Use ``--no-control-socket`` to disable.

View File

@ -7,6 +7,12 @@ Control Socket Server
Runs in the arbiter process and accepts commands via Unix socket.
Uses asyncio in a background thread to handle client connections.
Fork Safety:
This server uses os.register_at_fork() to properly handle fork() calls.
Before fork: the asyncio thread is stopped to prevent lock issues.
After fork in parent: the server is restarted.
After fork in child: references are cleared (workers don't need the control server).
"""
import asyncio
@ -22,12 +28,54 @@ from gunicorn.ctl.protocol import (
)
# Module-level tracking of active control server instances for fork handling.
# This is necessary because os.register_at_fork() callbacks are process-level.
_active_servers = set()
_fork_handlers_registered = False
def _register_fork_handlers():
"""Register fork handlers once at module level."""
global _fork_handlers_registered
if _fork_handlers_registered:
return
_fork_handlers_registered = True
os.register_at_fork(
before=_before_fork,
after_in_parent=_after_fork_parent,
after_in_child=_after_fork_child,
)
def _before_fork():
"""Called before fork() - stop all active control servers."""
for server in list(_active_servers):
server._stop_for_fork()
def _after_fork_parent():
"""Called in parent after fork() - restart all control servers."""
for server in list(_active_servers):
server._restart_after_fork()
def _after_fork_child():
"""Called in child after fork() - cleanup references."""
# In the child process (worker), we don't need the control server.
# Just clear the references without trying to stop anything.
_active_servers.clear()
class ControlSocketServer:
"""
Control socket server running in arbiter process.
The server runs an asyncio event loop in a background thread,
accepting connections and dispatching commands to handlers.
Fork safety is handled via os.register_at_fork() - the server
automatically stops before fork and restarts after in the parent.
"""
def __init__(self, arbiter, socket_path, socket_mode=0o600):
@ -48,6 +96,10 @@ class ControlSocketServer:
self._loop = None
self._thread = None
self._running = False
self._was_running_before_fork = False
# Ensure fork handlers are registered
_register_fork_handlers()
def start(self):
"""Start server in background thread with asyncio event loop."""
@ -58,8 +110,14 @@ class ControlSocketServer:
self._thread = threading.Thread(target=self._run_loop, daemon=True)
self._thread.start()
# Track this server for fork handling
_active_servers.add(self)
def stop(self):
"""Stop server and cleanup socket."""
# Remove from active servers tracking
_active_servers.discard(self)
if not self._running:
return
@ -80,6 +138,39 @@ class ControlSocketServer:
except OSError:
pass
def _stop_for_fork(self):
"""Stop server before fork (called by fork handler)."""
if not self._running:
self._was_running_before_fork = False
return
self._was_running_before_fork = True
self._running = False
if self._loop and self._server:
try:
self._loop.call_soon_threadsafe(self._shutdown)
except RuntimeError:
# Loop may already be closed
pass
if self._thread:
self._thread.join(timeout=2.0)
self._thread = None
self._loop = None
self._server = None
def _restart_after_fork(self):
"""Restart server in parent after fork (called by fork handler)."""
if not self._was_running_before_fork:
return
self._was_running_before_fork = False
self._running = True
self._thread = threading.Thread(target=self._run_loop, daemon=True)
self._thread.start()
def _shutdown(self):
"""Shutdown server (called from event loop thread)."""
if self._server:
@ -90,7 +181,7 @@ class ControlSocketServer:
try:
asyncio.run(self._serve())
except Exception as e:
if self.arbiter.log:
if self._running and self.arbiter.log:
self.arbiter.log.error("Control server error: %s", e)
async def _serve(self):

View File

@ -0,0 +1,398 @@
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
"""
Integration tests for control socket fork safety.
These tests verify that the control socket server properly handles fork()
with different worker types (sync, gthread, gevent) without causing deadlocks.
"""
import os
import signal
import socket
import subprocess
import sys
import time
import tempfile
import pytest
# Timeout for CI environments
CI_TIMEOUT = 30
# Simple WSGI app
SIMPLE_APP = '''
def application(environ, start_response):
"""Basic hello world response."""
status = '200 OK'
body = b'Hello, World!'
headers = [
('Content-Type', 'text/plain'),
('Content-Length', str(len(body))),
]
start_response(status, headers)
return [body]
'''
def find_free_port():
"""Find a free port to bind to."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('127.0.0.1', 0))
return s.getsockname()[1]
def wait_for_server(host, port, timeout=CI_TIMEOUT):
"""Wait until server is accepting connections."""
start = time.monotonic()
while time.monotonic() - start < timeout:
try:
with socket.create_connection((host, port), timeout=1):
return True
except (ConnectionRefusedError, socket.timeout, OSError):
time.sleep(0.1)
return False
def wait_for_socket(socket_path, timeout=CI_TIMEOUT):
"""Wait until Unix socket exists."""
start = time.monotonic()
while time.monotonic() - start < timeout:
if os.path.exists(socket_path):
return True
time.sleep(0.1)
return False
def make_request(host, port, path='/'):
"""Make a simple HTTP request and return the response body."""
with socket.create_connection((host, port), timeout=5) as sock:
request = f'GET {path} HTTP/1.1\r\nHost: {host}\r\nConnection: close\r\n\r\n'
sock.sendall(request.encode())
response = b''
while True:
chunk = sock.recv(4096)
if not chunk:
break
response += chunk
return response
@pytest.fixture
def app_module(tmp_path):
"""Create a temporary app module."""
app_file = tmp_path / "app.py"
app_file.write_text(SIMPLE_APP)
return str(app_file.parent), "app:application"
def start_gunicorn(app_dir, app_name, worker_class, port, control_socket_path):
"""Start a gunicorn server with specified worker class and control socket."""
cmd = [
sys.executable, '-m', 'gunicorn',
'--bind', f'127.0.0.1:{port}',
'--workers', '2',
'--worker-class', worker_class,
'--access-logfile', '-',
'--error-logfile', '-',
'--log-level', 'debug',
'--timeout', '30',
'--control-socket', control_socket_path,
app_name
]
# Add threads for gthread worker
if worker_class == 'gthread':
cmd.extend(['--threads', '2'])
proc = subprocess.Popen(
cmd,
cwd=app_dir,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env={**os.environ, 'PYTHONPATH': app_dir},
preexec_fn=os.setsid
)
return proc
def cleanup_gunicorn(proc):
"""Clean up a gunicorn process."""
if proc.poll() is None:
try:
os.killpg(os.getpgid(proc.pid), signal.SIGTERM)
except (ProcessLookupError, OSError):
pass
try:
proc.wait(timeout=5)
except subprocess.TimeoutExpired:
try:
os.killpg(os.getpgid(proc.pid), signal.SIGKILL)
except (ProcessLookupError, OSError):
pass
proc.wait()
def get_short_socket_path(prefix):
"""Get a short socket path that won't exceed Unix socket path limits.
macOS limits Unix socket paths to ~104 characters, so we use /tmp directly.
"""
import uuid
return f"/tmp/gunicorn-{prefix}-{uuid.uuid4().hex[:8]}.ctl"
class TestControlSocketForkSafetySyncWorker:
"""Test control socket fork safety with sync worker."""
def test_sync_worker_boots_with_control_socket(self, app_module, tmp_path):
"""Verify sync worker boots without deadlock when control socket is enabled."""
app_dir, app_name = app_module
port = find_free_port()
# Use short path to avoid Unix socket path length limits (104 chars on macOS)
control_socket = get_short_socket_path("sync")
proc = start_gunicorn(app_dir, app_name, 'sync', port, control_socket)
try:
# Wait for server to start - should not deadlock
if not wait_for_server('127.0.0.1', port, timeout=15):
stdout, stderr = proc.communicate(timeout=1)
pytest.fail(
f"Sync worker deadlocked during startup:\n"
f"stdout: {stdout.decode()}\n"
f"stderr: {stderr.decode()}"
)
# Verify server responds
response = make_request('127.0.0.1', port)
assert b'Hello, World!' in response
# Wait for control socket to be created (started after workers spawn)
assert wait_for_socket(control_socket, timeout=5), \
f"Control socket was not created at {control_socket}"
finally:
cleanup_gunicorn(proc)
# Clean up socket file
if os.path.exists(control_socket):
os.unlink(control_socket)
class TestControlSocketForkSafetyGthreadWorker:
"""Test control socket fork safety with gthread worker."""
def test_gthread_worker_boots_with_control_socket(self, app_module, tmp_path):
"""Verify gthread worker boots without deadlock when control socket is enabled."""
app_dir, app_name = app_module
port = find_free_port()
control_socket = get_short_socket_path("gthread")
proc = start_gunicorn(app_dir, app_name, 'gthread', port, control_socket)
try:
# Wait for server to start - should not deadlock
if not wait_for_server('127.0.0.1', port, timeout=15):
stdout, stderr = proc.communicate(timeout=1)
pytest.fail(
f"Gthread worker deadlocked during startup:\n"
f"stdout: {stdout.decode()}\n"
f"stderr: {stderr.decode()}"
)
# Verify server responds
response = make_request('127.0.0.1', port)
assert b'Hello, World!' in response
# Wait for control socket to be created (started after workers spawn)
assert wait_for_socket(control_socket, timeout=5), \
f"Control socket was not created at {control_socket}"
finally:
cleanup_gunicorn(proc)
if os.path.exists(control_socket):
os.unlink(control_socket)
def is_gevent_available():
"""Check if gevent is available."""
try:
import gevent # noqa: F401
return True
except ImportError:
return False
@pytest.mark.skipif(not is_gevent_available(), reason="gevent not installed")
class TestControlSocketForkSafetyGeventWorker:
"""Test control socket fork safety with gevent worker."""
def test_gevent_worker_boots_with_control_socket(self, app_module, tmp_path):
"""Verify gevent worker boots without deadlock when control socket is enabled.
This test is critical for issue #3509 - the gevent worker uses monkey
patching which can interact badly with asyncio in the control socket thread.
"""
app_dir, app_name = app_module
port = find_free_port()
control_socket = get_short_socket_path("gevent")
proc = start_gunicorn(app_dir, app_name, 'gevent', port, control_socket)
try:
# Wait for server to start - should not deadlock
# Gevent workers may take slightly longer to boot
if not wait_for_server('127.0.0.1', port, timeout=20):
stdout, stderr = proc.communicate(timeout=1)
pytest.fail(
f"Gevent worker deadlocked during startup:\n"
f"stdout: {stdout.decode()}\n"
f"stderr: {stderr.decode()}"
)
# Verify server responds
response = make_request('127.0.0.1', port)
assert b'Hello, World!' in response
# Wait for control socket to be created (started after workers spawn)
assert wait_for_socket(control_socket, timeout=5), \
f"Control socket was not created at {control_socket}"
finally:
cleanup_gunicorn(proc)
if os.path.exists(control_socket):
os.unlink(control_socket)
def test_gevent_worker_handles_multiple_requests(self, app_module, tmp_path):
"""Verify gevent worker handles multiple requests with control socket enabled."""
app_dir, app_name = app_module
port = find_free_port()
control_socket = get_short_socket_path("gevent2")
proc = start_gunicorn(app_dir, app_name, 'gevent', port, control_socket)
try:
if not wait_for_server('127.0.0.1', port, timeout=20):
stdout, stderr = proc.communicate(timeout=1)
pytest.fail(
f"Gevent worker deadlocked during startup:\n"
f"stdout: {stdout.decode()}\n"
f"stderr: {stderr.decode()}"
)
# Make multiple requests
for _ in range(10):
response = make_request('127.0.0.1', port)
assert b'Hello, World!' in response
# Verify server is still running
assert proc.poll() is None, "Server died unexpectedly"
finally:
cleanup_gunicorn(proc)
if os.path.exists(control_socket):
os.unlink(control_socket)
class TestControlSocketDisabled:
"""Test that disabling control socket works."""
def test_no_control_socket_flag(self, app_module, tmp_path):
"""Verify --no-control-socket flag disables control socket."""
app_dir, app_name = app_module
port = find_free_port()
control_socket = str(tmp_path / "gunicorn.ctl")
cmd = [
sys.executable, '-m', 'gunicorn',
'--bind', f'127.0.0.1:{port}',
'--workers', '1',
'--worker-class', 'sync',
'--access-logfile', '-',
'--error-logfile', '-',
'--log-level', 'debug',
'--no-control-socket',
app_name
]
proc = subprocess.Popen(
cmd,
cwd=app_dir,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env={**os.environ, 'PYTHONPATH': app_dir},
preexec_fn=os.setsid
)
try:
if not wait_for_server('127.0.0.1', port, timeout=15):
stdout, stderr = proc.communicate(timeout=1)
pytest.fail(
f"Server failed to start:\n"
f"stdout: {stdout.decode()}\n"
f"stderr: {stderr.decode()}"
)
# Verify server responds
response = make_request('127.0.0.1', port)
assert b'Hello, World!' in response
# Verify control socket does NOT exist
assert not os.path.exists(control_socket), "Control socket should not exist"
finally:
cleanup_gunicorn(proc)
class TestControlSocketAfterReload:
"""Test control socket survives reload."""
def test_control_socket_after_sighup(self, app_module, tmp_path):
"""Verify control socket still works after SIGHUP reload."""
app_dir, app_name = app_module
port = find_free_port()
control_socket = get_short_socket_path("reload")
proc = start_gunicorn(app_dir, app_name, 'sync', port, control_socket)
try:
if not wait_for_server('127.0.0.1', port, timeout=15):
stdout, stderr = proc.communicate(timeout=1)
pytest.fail(
f"Server failed to start:\n"
f"stdout: {stdout.decode()}\n"
f"stderr: {stderr.decode()}"
)
# Verify server and control socket work
response = make_request('127.0.0.1', port)
assert b'Hello, World!' in response
assert wait_for_socket(control_socket, timeout=5), \
f"Control socket was not created at {control_socket}"
# Send SIGHUP to trigger reload
proc.send_signal(signal.SIGHUP)
# Wait for reload to complete
time.sleep(2)
# Verify server still works after reload
assert proc.poll() is None, "Server died after SIGHUP"
response = make_request('127.0.0.1', port)
assert b'Hello, World!' in response
# Verify control socket still exists
assert os.path.exists(control_socket), "Control socket disappeared after reload"
finally:
cleanup_gunicorn(proc)
if os.path.exists(control_socket):
os.unlink(control_socket)
if __name__ == '__main__':
pytest.main([__file__, '-v'])