diff --git a/docs/design/companion-process-manager.md b/docs/design/companion-process-manager.md index cb45e0c4..856656cc 100644 --- a/docs/design/companion-process-manager.md +++ b/docs/design/companion-process-manager.md @@ -688,7 +688,7 @@ No per-companion logic in Arbiter. - [x] Implement `stop`. - [x] Implement `restart`. - [x] Implement transactional `reread`. -- [ ] Add manager spawn/reap logic in Arbiter. +- [x] Add manager spawn/reap logic in Arbiter. - [ ] Add manager shutdown handling in Arbiter. - [ ] Wire Gunicorn reload to manager `reread` or restart. - [ ] Close Gunicorn-only fds in manager child. diff --git a/gunicorn/arbiter.py b/gunicorn/arbiter.py index 646d684e..1ccba395 100644 --- a/gunicorn/arbiter.py +++ b/gunicorn/arbiter.py @@ -14,6 +14,9 @@ import socket from gunicorn.errors import HaltServer, AppImportError from gunicorn.pidfile import Pidfile from gunicorn import sock, systemd, util +from gunicorn.companion.config import build_companion_configs +from gunicorn.companion.control import ControlServer +from gunicorn.companion.manager import CompanionManager from gunicorn import __version__, SERVER_SOFTWARE @@ -63,6 +66,7 @@ class Arbiter: self.reexec_pid = 0 self.master_pid = 0 self.master_name = "Master" + self.companion_manager_pid = 0 cwd = util.getcwd() @@ -201,6 +205,7 @@ class Arbiter: try: self.manage_workers() + self.manage_companion_manager() while True: self.maybe_promote_master() @@ -210,6 +215,7 @@ class Arbiter: self.sleep() self.murder_workers() self.manage_workers() + self.manage_companion_manager() continue if sig not in self.SIG_NAMES: @@ -519,6 +525,12 @@ class Arbiter: break if self.reexec_pid == wpid: self.reexec_pid = 0 + elif self.companion_manager_pid == wpid: + # The manager itself exited; clear its pid so the main + # loop respawns it. It owns its companions' lifecycles. + self.companion_manager_pid = 0 + self.log.error( + "Companion manager (pid:%s) exited", wpid) else: # A worker was terminated. If the termination reason was # that it could not boot, we'll shut it down to avoid @@ -645,6 +657,52 @@ class Arbiter: self.spawn_worker() time.sleep(0.1 * random.random()) + def manage_companion_manager(self): + """Keep the companion manager alive, spawning it if it is not running. + + Does nothing unless companions are configured. The manager is a single + child of the arbiter; per-companion supervision lives entirely inside + it, so the arbiter only ensures the one manager process exists. + """ + if self.companion_manager_pid == 0 and self.cfg.companion_workers: + self.spawn_companion_manager() + + def spawn_companion_manager(self): + """Fork the companion manager process. + + The parent records the manager pid and returns. The child builds the + configs, runs the manager's supervision loop, and exits when the loop + returns. The manager forks the individual companions itself. + """ + configs = build_companion_configs(self.cfg) + if not configs: + return + manager = CompanionManager(configs, self.log) + manager.config_loader = lambda: build_companion_configs(self.cfg) + if self.cfg.companion_control_socket: + manager.control = ControlServer( + manager.handle_command, + self.cfg.companion_control_socket, + mode=self.cfg.companion_control_socket_mode or 0o600, + log=self.log) + + pid = os.fork() + if pid != 0: + self.companion_manager_pid = pid + self.log.info("Companion manager started (pid:%s)", pid) + return pid + + # Process Child + try: + util._setproctitle("companion manager [%s]" % self.proc_name) + manager.run() + sys.exit(0) + except SystemExit: + raise + except Exception: + self.log.exception("Exception in companion manager process") + sys.exit(-1) + def kill_workers(self, sig): """\ Kill all workers with the signal `sig` diff --git a/gunicorn/companion/config.py b/gunicorn/companion/config.py new file mode 100644 index 00000000..12cb22c2 --- /dev/null +++ b/gunicorn/companion/config.py @@ -0,0 +1,106 @@ +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +import hashlib +import json + + +# Maps each optional companion field to the global setting build_companion_configs +# reads when a spec omits it. ``name`` and ``target`` are required per spec and +# have no global default, so they are filled directly instead of through here. +FIELD_DEFAULTS = { + "cwd": "companion_cwd", + "env": "companion_env", + "stop_signal": "companion_stop_signal", + "stop_timeout": "companion_stop_timeout", + "reload_timeout": "companion_reload_timeout", + "stdout": "companion_stdout", + "stderr": "companion_stderr", + "startsecs": "companion_startsecs", +} + + +class CompanionConfig: + """Validated, normalized config for a single companion. + + Built from one entry of ``companion_workers`` with global defaults already + applied. ``config_hash`` is a stable digest of every field; the manager + restarts a companion whenever its hash changes on reread. + """ + + def __init__( + self, + name, + target, + cwd=None, + env=None, + stop_signal="SIGTERM", + stop_timeout=60, + reload_timeout=60, + stdout=None, + stderr=None, + startsecs=1, + ): + self.name = name + self.target = target + self.cwd = cwd + self.env = dict(env or {}) + self.stop_signal = stop_signal + self.stop_timeout = stop_timeout + self.reload_timeout = reload_timeout + self.stdout = stdout + self.stderr = stderr + self.startsecs = startsecs + + def to_dict(self): + return { + "name": self.name, + "target": self.target, + "cwd": self.cwd, + "env": self.env, + "stop_signal": self.stop_signal, + "stop_timeout": self.stop_timeout, + "reload_timeout": self.reload_timeout, + "stdout": self.stdout, + "stderr": self.stderr, + "startsecs": self.startsecs, + } + + @property + def config_hash(self): + # Sort keys so dict ordering never changes the digest. A callable + # target has no stable repr across runs, so use its qualified name. + data = self.to_dict() + data["target"] = self._target_key(self.target) + payload = json.dumps(data, sort_keys=True, default=str) + return hashlib.sha256(payload.encode("utf-8")).hexdigest() + + @staticmethod + def _target_key(target): + if callable(target): + module = getattr(target, "__module__", "") + qualified_name = getattr(target, "__qualname__", repr(target)) + return "%s:%s" % (module, qualified_name) + return str(target) + + def __repr__(self): + return "" % self.name + + +def build_companion_configs(cfg): + """Build a CompanionConfig list from ``cfg.companion_workers``. + + A spec missing ``name`` or ``target`` is rejected, since the manager has + nothing to supervise without both. + """ + configs = [] + for spec in cfg.companion_workers: + if "name" not in spec or "target" not in spec: + raise ValueError( + "each companion worker needs 'name' and 'target': %s" % spec) + fields = {field: spec.get(field, getattr(cfg, setting)) + for field, setting in FIELD_DEFAULTS.items()} + configs.append( + CompanionConfig(name=spec["name"], target=spec["target"], **fields)) + return configs diff --git a/gunicorn/companion/manager.py b/gunicorn/companion/manager.py index fced43b4..7ccbe6d6 100644 --- a/gunicorn/companion/manager.py +++ b/gunicorn/companion/manager.py @@ -7,15 +7,17 @@ from __future__ import annotations import importlib import os +import select import signal import time from typing import TYPE_CHECKING, Callable, Iterable, Union +from gunicorn import util from gunicorn.companion.control import CommandError from gunicorn.companion.process import CompanionProcess, State if TYPE_CHECKING: - from gunicorn.companion.process import CompanionConfig + from gunicorn.companion.config import CompanionConfig class CompanionManager: @@ -34,6 +36,133 @@ class CompanionManager: # Set by the arbiter wiring: a no-arg callable that re-reads and # validates companion config, returning a fresh CompanionConfig list. self.config_loader = None + # Set by the arbiter wiring: the ControlServer, or None when no control + # socket is configured. Created inside the child by run(). + self.control = None + self.stopping = False + self._wakeup_pipe = None + + def run(self) -> None: + """Run the manager's supervision loop. This is the forked child body. + + Installs signal handling, brings up the control socket, starts every + companion, then loops servicing the socket and the companions until a + SIGTERM or SIGINT asks it to stop, at which point it shuts the + companions down and returns. Each tick reaps exited companions, + retries any that are backing off, promotes those past ``startsecs``, + and kills any that overran their stop deadline. + """ + self._install_signals() + if self.control is not None: + self.control.create() + for process in self.processes.values(): + self.spawn_process(process) + self.log.info("companion manager running (pid %s)", self.pid) + try: + while not self.stopping: + self._tick() + self._wait() + self.stop_all() + finally: + if self.control is not None: + self.control.close() + + def _tick(self, now: float = None) -> None: + """One supervision pass over every companion.""" + now = now or time.time() + self.reap_processes() + self.retry_backoff(now) + self.promote_running(now) + self.enforce_deadlines(now) + + def _wait(self, timeout: float = 1.0) -> None: + """Block until a signal or a control request arrives, or we time out. + + The self-pipe carries signal wake-ups; the control listener carries + client connections. Either readable (or the timeout) ends the wait, so + the next loop pass reacts promptly without busy-spinning. + """ + readers = [self._wakeup_pipe[0]] + if self.control is not None and self.control.listener is not None: + readers.append(self.control.listener) + try: + ready, _, _ = select.select(readers, [], [], timeout) + except (InterruptedError, OSError): + return + for reader in ready: + if reader is self._wakeup_pipe[0]: + self._drain_wakeup() + else: + self._accept_control() + + def _accept_control(self) -> None: + """Accept one waiting control connection and answer its requests.""" + try: + connection, _ = self.control.listener.accept() + except OSError: + return + self.control.serve_connection(connection) + + def enforce_deadlines(self, now: float = None) -> None: + """SIGKILL companions that overran their stop deadline. + + A graceful ``stop_signal`` may be ignored or take too long; once the + deadline set by stop/restart passes, the companion is force-killed so + it cannot wedge the manager. The reaper picks up the exit afterwards. + """ + now = now or time.time() + for process in self.processes.values(): + if process.state != State.STOPPING or process.stop_deadline is None: + continue + if now >= process.stop_deadline and process.pid is not None: + os.kill(process.pid, signal.SIGKILL) + process.kill_count += 1 + process.stop_deadline = None + self.log.warning("companion %s killed after timeout (pid %s)", + process.name, process.pid) + + def stop_all(self) -> None: + """Stop every companion and reap them as they exit. + + Sends each one its stop signal, then keeps reaping and enforcing stop + deadlines until they are all gone, so the manager exits without leaving + orphaned companions behind. + """ + for name in list(self.processes): + self.stop_process(name) + while any(process.pid is not None for process in self.processes.values()): + now = time.time() + self.enforce_deadlines(now) + self.reap_processes() + self._wait(timeout=0.2) + + def _install_signals(self) -> None: + """Set up the self-pipe and signal handlers for the supervision loop.""" + self._wakeup_pipe = os.pipe() + for fd in self._wakeup_pipe: + util.set_non_blocking(fd) + util.close_on_exec(fd) + signal.signal(signal.SIGCHLD, self._wakeup) + signal.signal(signal.SIGTERM, self._signal_stop) + signal.signal(signal.SIGINT, self._signal_stop) + + def _signal_stop(self, signum, frame) -> None: + self.stopping = True + self._wakeup() + + def _wakeup(self, signum=None, frame=None) -> None: + """Wake the loop out of ``select`` by writing to the self-pipe.""" + try: + os.write(self._wakeup_pipe[1], b".") + except OSError: + pass + + def _drain_wakeup(self) -> None: + try: + while os.read(self._wakeup_pipe[0], 4096): + pass + except OSError: + pass def handle_command(self, command: dict) -> dict: """Route a decoded control command to its action. diff --git a/gunicorn/companion/process.py b/gunicorn/companion/process.py index 16f24b85..4f213c66 100644 --- a/gunicorn/companion/process.py +++ b/gunicorn/companion/process.py @@ -3,8 +3,6 @@ # See the NOTICE for more information. -import hashlib -import json import time from enum import Enum @@ -26,73 +24,6 @@ class State(str, Enum): STOPPING = "STOPPING" -class CompanionConfig: - """Validated, normalized config for a single companion. - - Built from one entry of ``companion_workers`` with global defaults already - applied. ``config_hash`` is a stable digest of every field; the manager - restarts a companion whenever its hash changes on reread. - """ - - def __init__( - self, - name, - target, - cwd=None, - env=None, - stop_signal="SIGTERM", - stop_timeout=60, - reload_timeout=60, - stdout=None, - stderr=None, - startsecs=1, - ): - self.name = name - self.target = target - self.cwd = cwd - self.env = dict(env or {}) - self.stop_signal = stop_signal - self.stop_timeout = stop_timeout - self.reload_timeout = reload_timeout - self.stdout = stdout - self.stderr = stderr - self.startsecs = startsecs - - def to_dict(self): - return { - "name": self.name, - "target": self.target, - "cwd": self.cwd, - "env": self.env, - "stop_signal": self.stop_signal, - "stop_timeout": self.stop_timeout, - "reload_timeout": self.reload_timeout, - "stdout": self.stdout, - "stderr": self.stderr, - "startsecs": self.startsecs, - } - - @property - def config_hash(self): - # Sort keys so dict ordering never changes the digest. A callable - # target has no stable repr across runs, so use its qualified name. - data = self.to_dict() - data["target"] = self._target_key(self.target) - payload = json.dumps(data, sort_keys=True, default=str) - return hashlib.sha256(payload.encode("utf-8")).hexdigest() - - @staticmethod - def _target_key(target): - if callable(target): - module = getattr(target, "__module__", "") - qualified_name = getattr(target, "__qualname__", repr(target)) - return "%s:%s" % (module, qualified_name) - return str(target) - - def __repr__(self): - return "" % self.name - - class CompanionProcess: """Runtime state for one companion, separate from its static config. diff --git a/tests/test_arbiter.py b/tests/test_arbiter.py index 8c1527e2..36983bcf 100644 --- a/tests/test_arbiter.py +++ b/tests/test_arbiter.py @@ -147,6 +147,39 @@ def test_arbiter_reap_workers(mock_os_waitpid): arbiter.cfg.child_exit.assert_called_with(arbiter, mock_worker) +def test_arbiter_manage_companion_manager_spawns_when_configured(): + arbiter = gunicorn.arbiter.Arbiter(DummyApplication()) + arbiter.cfg.set("companion_workers", [{"name": "rq", "target": "pkg:run"}]) + arbiter.spawn_companion_manager = mock.Mock() + arbiter.manage_companion_manager() + arbiter.spawn_companion_manager.assert_called_once_with() + + +def test_arbiter_manage_companion_manager_noop_without_companions(): + arbiter = gunicorn.arbiter.Arbiter(DummyApplication()) + arbiter.spawn_companion_manager = mock.Mock() + arbiter.manage_companion_manager() + arbiter.spawn_companion_manager.assert_not_called() + + +def test_arbiter_manage_companion_manager_noop_when_running(): + arbiter = gunicorn.arbiter.Arbiter(DummyApplication()) + arbiter.cfg.set("companion_workers", [{"name": "rq", "target": "pkg:run"}]) + arbiter.companion_manager_pid = 4242 + arbiter.spawn_companion_manager = mock.Mock() + arbiter.manage_companion_manager() + arbiter.spawn_companion_manager.assert_not_called() + + +@mock.patch('os.waitpid') +def test_arbiter_reap_clears_companion_manager_pid(mock_os_waitpid): + mock_os_waitpid.side_effect = [(4242, 0), (0, 0)] + arbiter = gunicorn.arbiter.Arbiter(DummyApplication()) + arbiter.companion_manager_pid = 4242 + arbiter.reap_workers() + assert arbiter.companion_manager_pid == 0 + + class PreloadedAppWithEnvSettings(DummyApplication): """ Simple application that makes use of the 'preload' feature to diff --git a/tests/test_companion_config.py b/tests/test_companion_config.py new file mode 100644 index 00000000..6ed2d979 --- /dev/null +++ b/tests/test_companion_config.py @@ -0,0 +1,47 @@ +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +import pytest + +from gunicorn.config import Config +from gunicorn.companion.config import build_companion_configs + + +def make_config(workers, **overrides): + cfg = Config() + cfg.set("companion_workers", workers) + for key, value in overrides.items(): + cfg.set(key, value) + return cfg + + +def test_build_applies_global_defaults(): + cfg = make_config( + [{"name": "rq", "target": "pkg:run"}], + companion_stop_signal="SIGINT", + companion_startsecs=5) + config, = build_companion_configs(cfg) + assert config.name == "rq" + assert config.target == "pkg:run" + assert config.stop_signal == "SIGINT" + assert config.startsecs == 5 + + +def test_build_per_spec_overrides_global(): + cfg = make_config( + [{"name": "rq", "target": "pkg:run", "stop_signal": "SIGTERM"}], + companion_stop_signal="SIGINT") + config, = build_companion_configs(cfg) + assert config.stop_signal == "SIGTERM" + + +def test_build_empty_when_none_configured(): + assert build_companion_configs(make_config([])) == [] + + +def test_build_requires_name_and_target(): + with pytest.raises(ValueError): + build_companion_configs(make_config([{"name": "rq"}])) + with pytest.raises(ValueError): + build_companion_configs(make_config([{"target": "pkg:run"}])) diff --git a/tests/test_companion_manager.py b/tests/test_companion_manager.py index e8951f59..ac1688ec 100644 --- a/tests/test_companion_manager.py +++ b/tests/test_companion_manager.py @@ -10,7 +10,8 @@ import pytest from gunicorn.companion.control import CommandError from gunicorn.companion.manager import CompanionManager -from gunicorn.companion.process import CompanionConfig, State +from gunicorn.companion.config import CompanionConfig +from gunicorn.companion.process import State def make_manager(*names):