feat(companion): Spawn and reap the manager from the arbiter

Run the companion manager as a single arbiter child with its own
supervision loop, and host the config model with its loader.

config.py holds CompanionConfig (moved from process.py) and
build_companion_configs(cfg), which expands each companion_workers entry into
a CompanionConfig, filling omitted fields from the global companion_* settings.
It is also the reread config_loader. process.py keeps State and CompanionProcess.

CompanionManager.run() is the forked-child body: installs SIGCHLD/SIGTERM/SIGINT
via a self-pipe, brings up the control socket, starts every companion, then
select-waits on the socket and the pipe. Each tick reaps exits, retries backoff,
promotes past startsecs, and SIGKILLs companions past their stop deadline.
SIGTERM/SIGINT stop all companions and return.

Arbiter gains companion_manager_pid, manage_companion_manager (respawns the
manager when it is gone and companions are configured), spawn_companion_manager
(fork; child runs the loop), and reap detection that clears the pid on exit.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Tanmoy Sarkar 2026-06-09 22:24:53 +05:30
parent 9f3762d6b6
commit 457bc5a69a
8 changed files with 377 additions and 72 deletions

View File

@ -688,7 +688,7 @@ No per-companion logic in Arbiter.
- [x] Implement `stop`.
- [x] Implement `restart`.
- [x] Implement transactional `reread`.
- [ ] Add manager spawn/reap logic in Arbiter.
- [x] Add manager spawn/reap logic in Arbiter.
- [ ] Add manager shutdown handling in Arbiter.
- [ ] Wire Gunicorn reload to manager `reread` or restart.
- [ ] Close Gunicorn-only fds in manager child.

View File

@ -14,6 +14,9 @@ import socket
from gunicorn.errors import HaltServer, AppImportError
from gunicorn.pidfile import Pidfile
from gunicorn import sock, systemd, util
from gunicorn.companion.config import build_companion_configs
from gunicorn.companion.control import ControlServer
from gunicorn.companion.manager import CompanionManager
from gunicorn import __version__, SERVER_SOFTWARE
@ -63,6 +66,7 @@ class Arbiter:
self.reexec_pid = 0
self.master_pid = 0
self.master_name = "Master"
self.companion_manager_pid = 0
cwd = util.getcwd()
@ -201,6 +205,7 @@ class Arbiter:
try:
self.manage_workers()
self.manage_companion_manager()
while True:
self.maybe_promote_master()
@ -210,6 +215,7 @@ class Arbiter:
self.sleep()
self.murder_workers()
self.manage_workers()
self.manage_companion_manager()
continue
if sig not in self.SIG_NAMES:
@ -519,6 +525,12 @@ class Arbiter:
break
if self.reexec_pid == wpid:
self.reexec_pid = 0
elif self.companion_manager_pid == wpid:
# The manager itself exited; clear its pid so the main
# loop respawns it. It owns its companions' lifecycles.
self.companion_manager_pid = 0
self.log.error(
"Companion manager (pid:%s) exited", wpid)
else:
# A worker was terminated. If the termination reason was
# that it could not boot, we'll shut it down to avoid
@ -645,6 +657,52 @@ class Arbiter:
self.spawn_worker()
time.sleep(0.1 * random.random())
def manage_companion_manager(self):
"""Keep the companion manager alive, spawning it if it is not running.
Does nothing unless companions are configured. The manager is a single
child of the arbiter; per-companion supervision lives entirely inside
it, so the arbiter only ensures the one manager process exists.
"""
if self.companion_manager_pid == 0 and self.cfg.companion_workers:
self.spawn_companion_manager()
def spawn_companion_manager(self):
"""Fork the companion manager process.
The parent records the manager pid and returns. The child builds the
configs, runs the manager's supervision loop, and exits when the loop
returns. The manager forks the individual companions itself.
"""
configs = build_companion_configs(self.cfg)
if not configs:
return
manager = CompanionManager(configs, self.log)
manager.config_loader = lambda: build_companion_configs(self.cfg)
if self.cfg.companion_control_socket:
manager.control = ControlServer(
manager.handle_command,
self.cfg.companion_control_socket,
mode=self.cfg.companion_control_socket_mode or 0o600,
log=self.log)
pid = os.fork()
if pid != 0:
self.companion_manager_pid = pid
self.log.info("Companion manager started (pid:%s)", pid)
return pid
# Process Child
try:
util._setproctitle("companion manager [%s]" % self.proc_name)
manager.run()
sys.exit(0)
except SystemExit:
raise
except Exception:
self.log.exception("Exception in companion manager process")
sys.exit(-1)
def kill_workers(self, sig):
"""\
Kill all workers with the signal `sig`

View File

@ -0,0 +1,106 @@
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
import hashlib
import json
# Maps each optional companion field to the global setting build_companion_configs
# reads when a spec omits it. ``name`` and ``target`` are required per spec and
# have no global default, so they are filled directly instead of through here.
FIELD_DEFAULTS = {
"cwd": "companion_cwd",
"env": "companion_env",
"stop_signal": "companion_stop_signal",
"stop_timeout": "companion_stop_timeout",
"reload_timeout": "companion_reload_timeout",
"stdout": "companion_stdout",
"stderr": "companion_stderr",
"startsecs": "companion_startsecs",
}
class CompanionConfig:
"""Validated, normalized config for a single companion.
Built from one entry of ``companion_workers`` with global defaults already
applied. ``config_hash`` is a stable digest of every field; the manager
restarts a companion whenever its hash changes on reread.
"""
def __init__(
self,
name,
target,
cwd=None,
env=None,
stop_signal="SIGTERM",
stop_timeout=60,
reload_timeout=60,
stdout=None,
stderr=None,
startsecs=1,
):
self.name = name
self.target = target
self.cwd = cwd
self.env = dict(env or {})
self.stop_signal = stop_signal
self.stop_timeout = stop_timeout
self.reload_timeout = reload_timeout
self.stdout = stdout
self.stderr = stderr
self.startsecs = startsecs
def to_dict(self):
return {
"name": self.name,
"target": self.target,
"cwd": self.cwd,
"env": self.env,
"stop_signal": self.stop_signal,
"stop_timeout": self.stop_timeout,
"reload_timeout": self.reload_timeout,
"stdout": self.stdout,
"stderr": self.stderr,
"startsecs": self.startsecs,
}
@property
def config_hash(self):
# Sort keys so dict ordering never changes the digest. A callable
# target has no stable repr across runs, so use its qualified name.
data = self.to_dict()
data["target"] = self._target_key(self.target)
payload = json.dumps(data, sort_keys=True, default=str)
return hashlib.sha256(payload.encode("utf-8")).hexdigest()
@staticmethod
def _target_key(target):
if callable(target):
module = getattr(target, "__module__", "")
qualified_name = getattr(target, "__qualname__", repr(target))
return "%s:%s" % (module, qualified_name)
return str(target)
def __repr__(self):
return "<CompanionConfig %s>" % self.name
def build_companion_configs(cfg):
"""Build a CompanionConfig list from ``cfg.companion_workers``.
A spec missing ``name`` or ``target`` is rejected, since the manager has
nothing to supervise without both.
"""
configs = []
for spec in cfg.companion_workers:
if "name" not in spec or "target" not in spec:
raise ValueError(
"each companion worker needs 'name' and 'target': %s" % spec)
fields = {field: spec.get(field, getattr(cfg, setting))
for field, setting in FIELD_DEFAULTS.items()}
configs.append(
CompanionConfig(name=spec["name"], target=spec["target"], **fields))
return configs

View File

@ -7,15 +7,17 @@ from __future__ import annotations
import importlib
import os
import select
import signal
import time
from typing import TYPE_CHECKING, Callable, Iterable, Union
from gunicorn import util
from gunicorn.companion.control import CommandError
from gunicorn.companion.process import CompanionProcess, State
if TYPE_CHECKING:
from gunicorn.companion.process import CompanionConfig
from gunicorn.companion.config import CompanionConfig
class CompanionManager:
@ -34,6 +36,133 @@ class CompanionManager:
# Set by the arbiter wiring: a no-arg callable that re-reads and
# validates companion config, returning a fresh CompanionConfig list.
self.config_loader = None
# Set by the arbiter wiring: the ControlServer, or None when no control
# socket is configured. Created inside the child by run().
self.control = None
self.stopping = False
self._wakeup_pipe = None
def run(self) -> None:
"""Run the manager's supervision loop. This is the forked child body.
Installs signal handling, brings up the control socket, starts every
companion, then loops servicing the socket and the companions until a
SIGTERM or SIGINT asks it to stop, at which point it shuts the
companions down and returns. Each tick reaps exited companions,
retries any that are backing off, promotes those past ``startsecs``,
and kills any that overran their stop deadline.
"""
self._install_signals()
if self.control is not None:
self.control.create()
for process in self.processes.values():
self.spawn_process(process)
self.log.info("companion manager running (pid %s)", self.pid)
try:
while not self.stopping:
self._tick()
self._wait()
self.stop_all()
finally:
if self.control is not None:
self.control.close()
def _tick(self, now: float = None) -> None:
"""One supervision pass over every companion."""
now = now or time.time()
self.reap_processes()
self.retry_backoff(now)
self.promote_running(now)
self.enforce_deadlines(now)
def _wait(self, timeout: float = 1.0) -> None:
"""Block until a signal or a control request arrives, or we time out.
The self-pipe carries signal wake-ups; the control listener carries
client connections. Either readable (or the timeout) ends the wait, so
the next loop pass reacts promptly without busy-spinning.
"""
readers = [self._wakeup_pipe[0]]
if self.control is not None and self.control.listener is not None:
readers.append(self.control.listener)
try:
ready, _, _ = select.select(readers, [], [], timeout)
except (InterruptedError, OSError):
return
for reader in ready:
if reader is self._wakeup_pipe[0]:
self._drain_wakeup()
else:
self._accept_control()
def _accept_control(self) -> None:
"""Accept one waiting control connection and answer its requests."""
try:
connection, _ = self.control.listener.accept()
except OSError:
return
self.control.serve_connection(connection)
def enforce_deadlines(self, now: float = None) -> None:
"""SIGKILL companions that overran their stop deadline.
A graceful ``stop_signal`` may be ignored or take too long; once the
deadline set by stop/restart passes, the companion is force-killed so
it cannot wedge the manager. The reaper picks up the exit afterwards.
"""
now = now or time.time()
for process in self.processes.values():
if process.state != State.STOPPING or process.stop_deadline is None:
continue
if now >= process.stop_deadline and process.pid is not None:
os.kill(process.pid, signal.SIGKILL)
process.kill_count += 1
process.stop_deadline = None
self.log.warning("companion %s killed after timeout (pid %s)",
process.name, process.pid)
def stop_all(self) -> None:
"""Stop every companion and reap them as they exit.
Sends each one its stop signal, then keeps reaping and enforcing stop
deadlines until they are all gone, so the manager exits without leaving
orphaned companions behind.
"""
for name in list(self.processes):
self.stop_process(name)
while any(process.pid is not None for process in self.processes.values()):
now = time.time()
self.enforce_deadlines(now)
self.reap_processes()
self._wait(timeout=0.2)
def _install_signals(self) -> None:
"""Set up the self-pipe and signal handlers for the supervision loop."""
self._wakeup_pipe = os.pipe()
for fd in self._wakeup_pipe:
util.set_non_blocking(fd)
util.close_on_exec(fd)
signal.signal(signal.SIGCHLD, self._wakeup)
signal.signal(signal.SIGTERM, self._signal_stop)
signal.signal(signal.SIGINT, self._signal_stop)
def _signal_stop(self, signum, frame) -> None:
self.stopping = True
self._wakeup()
def _wakeup(self, signum=None, frame=None) -> None:
"""Wake the loop out of ``select`` by writing to the self-pipe."""
try:
os.write(self._wakeup_pipe[1], b".")
except OSError:
pass
def _drain_wakeup(self) -> None:
try:
while os.read(self._wakeup_pipe[0], 4096):
pass
except OSError:
pass
def handle_command(self, command: dict) -> dict:
"""Route a decoded control command to its action.

View File

@ -3,8 +3,6 @@
# See the NOTICE for more information.
import hashlib
import json
import time
from enum import Enum
@ -26,73 +24,6 @@ class State(str, Enum):
STOPPING = "STOPPING"
class CompanionConfig:
"""Validated, normalized config for a single companion.
Built from one entry of ``companion_workers`` with global defaults already
applied. ``config_hash`` is a stable digest of every field; the manager
restarts a companion whenever its hash changes on reread.
"""
def __init__(
self,
name,
target,
cwd=None,
env=None,
stop_signal="SIGTERM",
stop_timeout=60,
reload_timeout=60,
stdout=None,
stderr=None,
startsecs=1,
):
self.name = name
self.target = target
self.cwd = cwd
self.env = dict(env or {})
self.stop_signal = stop_signal
self.stop_timeout = stop_timeout
self.reload_timeout = reload_timeout
self.stdout = stdout
self.stderr = stderr
self.startsecs = startsecs
def to_dict(self):
return {
"name": self.name,
"target": self.target,
"cwd": self.cwd,
"env": self.env,
"stop_signal": self.stop_signal,
"stop_timeout": self.stop_timeout,
"reload_timeout": self.reload_timeout,
"stdout": self.stdout,
"stderr": self.stderr,
"startsecs": self.startsecs,
}
@property
def config_hash(self):
# Sort keys so dict ordering never changes the digest. A callable
# target has no stable repr across runs, so use its qualified name.
data = self.to_dict()
data["target"] = self._target_key(self.target)
payload = json.dumps(data, sort_keys=True, default=str)
return hashlib.sha256(payload.encode("utf-8")).hexdigest()
@staticmethod
def _target_key(target):
if callable(target):
module = getattr(target, "__module__", "")
qualified_name = getattr(target, "__qualname__", repr(target))
return "%s:%s" % (module, qualified_name)
return str(target)
def __repr__(self):
return "<CompanionConfig %s>" % self.name
class CompanionProcess:
"""Runtime state for one companion, separate from its static config.

View File

@ -147,6 +147,39 @@ def test_arbiter_reap_workers(mock_os_waitpid):
arbiter.cfg.child_exit.assert_called_with(arbiter, mock_worker)
def test_arbiter_manage_companion_manager_spawns_when_configured():
arbiter = gunicorn.arbiter.Arbiter(DummyApplication())
arbiter.cfg.set("companion_workers", [{"name": "rq", "target": "pkg:run"}])
arbiter.spawn_companion_manager = mock.Mock()
arbiter.manage_companion_manager()
arbiter.spawn_companion_manager.assert_called_once_with()
def test_arbiter_manage_companion_manager_noop_without_companions():
arbiter = gunicorn.arbiter.Arbiter(DummyApplication())
arbiter.spawn_companion_manager = mock.Mock()
arbiter.manage_companion_manager()
arbiter.spawn_companion_manager.assert_not_called()
def test_arbiter_manage_companion_manager_noop_when_running():
arbiter = gunicorn.arbiter.Arbiter(DummyApplication())
arbiter.cfg.set("companion_workers", [{"name": "rq", "target": "pkg:run"}])
arbiter.companion_manager_pid = 4242
arbiter.spawn_companion_manager = mock.Mock()
arbiter.manage_companion_manager()
arbiter.spawn_companion_manager.assert_not_called()
@mock.patch('os.waitpid')
def test_arbiter_reap_clears_companion_manager_pid(mock_os_waitpid):
mock_os_waitpid.side_effect = [(4242, 0), (0, 0)]
arbiter = gunicorn.arbiter.Arbiter(DummyApplication())
arbiter.companion_manager_pid = 4242
arbiter.reap_workers()
assert arbiter.companion_manager_pid == 0
class PreloadedAppWithEnvSettings(DummyApplication):
"""
Simple application that makes use of the 'preload' feature to

View File

@ -0,0 +1,47 @@
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
import pytest
from gunicorn.config import Config
from gunicorn.companion.config import build_companion_configs
def make_config(workers, **overrides):
cfg = Config()
cfg.set("companion_workers", workers)
for key, value in overrides.items():
cfg.set(key, value)
return cfg
def test_build_applies_global_defaults():
cfg = make_config(
[{"name": "rq", "target": "pkg:run"}],
companion_stop_signal="SIGINT",
companion_startsecs=5)
config, = build_companion_configs(cfg)
assert config.name == "rq"
assert config.target == "pkg:run"
assert config.stop_signal == "SIGINT"
assert config.startsecs == 5
def test_build_per_spec_overrides_global():
cfg = make_config(
[{"name": "rq", "target": "pkg:run", "stop_signal": "SIGTERM"}],
companion_stop_signal="SIGINT")
config, = build_companion_configs(cfg)
assert config.stop_signal == "SIGTERM"
def test_build_empty_when_none_configured():
assert build_companion_configs(make_config([])) == []
def test_build_requires_name_and_target():
with pytest.raises(ValueError):
build_companion_configs(make_config([{"name": "rq"}]))
with pytest.raises(ValueError):
build_companion_configs(make_config([{"target": "pkg:run"}]))

View File

@ -10,7 +10,8 @@ import pytest
from gunicorn.companion.control import CommandError
from gunicorn.companion.manager import CompanionManager
from gunicorn.companion.process import CompanionConfig, State
from gunicorn.companion.config import CompanionConfig
from gunicorn.companion.process import State
def make_manager(*names):