diff --git a/docs/design/companion-process-manager.md b/docs/design/companion-process-manager.md index 56e3e1fe..cb45e0c4 100644 --- a/docs/design/companion-process-manager.md +++ b/docs/design/companion-process-manager.md @@ -687,7 +687,7 @@ No per-companion logic in Arbiter. - [x] Implement `start`. - [x] Implement `stop`. - [x] Implement `restart`. -- [ ] Implement transactional `reread`. +- [x] Implement transactional `reread`. - [ ] Add manager spawn/reap logic in Arbiter. - [ ] Add manager shutdown handling in Arbiter. - [ ] Wire Gunicorn reload to manager `reread` or restart. diff --git a/gunicorn/companion/manager.py b/gunicorn/companion/manager.py index 53e33ce5..c3fc80bd 100644 --- a/gunicorn/companion/manager.py +++ b/gunicorn/companion/manager.py @@ -31,6 +31,9 @@ class CompanionManager: self.log = log self.pid = os.getpid() self.processes = {c.name: CompanionProcess(c) for c in configs} + # Set by the arbiter wiring: a no-arg callable that re-reads and + # validates companion config, returning a fresh CompanionConfig list. + self.config_loader = None def handle_command(self, obj: dict) -> dict: """Route a decoded control command to its action. @@ -44,6 +47,15 @@ class CompanionManager: cmd = obj["cmd"] if cmd == "status": return {"ok": True, "companions": self.status()} + if cmd == "reread": + if self.config_loader is None: + raise CommandError("reread not configured") + try: + new_configs = self.config_loader() + except Exception as e: + return {"ok": False, "error": "invalid config: %s" % e, + "kept_old_config": True} + return self.reread_config(new_configs) # Every remaining command acts on one named companion. name = obj.get("name") @@ -64,6 +76,62 @@ class CompanionManager: now = now or time.time() return [proc.status_dict(now) for proc in self.processes.values()] + def reread_config(self, new_configs) -> dict: + """Transactionally apply a fresh set of companion configs. + + Each companion is compared with the running set by ``config_hash``: + a new name is added and started, a missing name is stopped and removed, + a changed hash stores the new config and restarts (unless the companion + was manually stopped, which keeps it STOPPED with the new config ready + for its next start), and an unchanged hash is left alone. Validation + runs first, so a bad config touches nothing and the old one stays live. + """ + try: + new_by_name = self._index_configs(new_configs) + except CommandError as e: + return {"ok": False, "error": str(e), "kept_old_config": True} + + added, removed, restarted, unchanged = [], [], [], [] + old_names = set(self.processes) + new_names = set(new_by_name) + + for name in old_names - new_names: + self.stop_process(name) + del self.processes[name] + removed.append(name) + + for name in new_names - old_names: + proc = CompanionProcess(new_by_name[name]) + self.processes[name] = proc + self.spawn_process(proc) + added.append(name) + + for name in new_names & old_names: + proc = self.processes[name] + if proc.config.config_hash == new_by_name[name].config_hash: + unchanged.append(name) + continue + proc.config = new_by_name[name] + if proc.manual_stop: + unchanged.append(name) + else: + self.restart_process(name) + restarted.append(name) + + return {"ok": True, "added": added, "removed": removed, + "restarted": restarted, "unchanged": unchanged} + + @staticmethod + def _index_configs(configs) -> dict: + """Index configs by name, rejecting duplicates.""" + by_name = {} + for config in configs: + if config.name in by_name: + raise CommandError( + "invalid config: duplicate companion name %s" % config.name) + by_name[config.name] = config + return by_name + def spawn_process(self, proc: CompanionProcess) -> int: """Fork one companion. diff --git a/tests/test_companion_manager.py b/tests/test_companion_manager.py index 94e7c130..021d2269 100644 --- a/tests/test_companion_manager.py +++ b/tests/test_companion_manager.py @@ -172,6 +172,90 @@ def test_handle_command_unknown(): mgr.handle_command({"cmd": "reread"}) +def cfg(name, **kw): + return CompanionConfig(name=name, target=lambda: None, **kw) + + +def test_reread_adds_new(): + mgr = make_manager("rq") + new = [cfg("rq"), cfg("scheduler")] + with mock.patch("os.fork", return_value=10): + result = mgr.reread_config(new) + assert result["added"] == ["scheduler"] + assert "scheduler" in mgr.processes + assert mgr.processes["scheduler"].state == State.STARTING + + +def test_reread_removes_missing(): + mgr = make_manager("rq", "scheduler") + mgr.processes["scheduler"].state = State.RUNNING + mgr.processes["scheduler"].pid = 11 + with mock.patch("os.kill"): + result = mgr.reread_config([cfg("rq")]) + assert result["removed"] == ["scheduler"] + assert "scheduler" not in mgr.processes + + +def test_reread_restarts_changed(): + mgr = make_manager("rq") + mgr.processes["rq"].state = State.RUNNING + mgr.processes["rq"].pid = 12 + changed = cfg("rq", env={"X": "1"}) # different hash + with mock.patch("os.kill"): + result = mgr.reread_config([changed]) + assert result["restarted"] == ["rq"] + assert mgr.processes["rq"].config is changed + assert mgr.processes["rq"].state == State.STOPPING + + +def test_reread_changed_manual_stop_keeps_stopped(): + mgr = make_manager("rq") + proc = mgr.processes["rq"] + proc.manual_stop = True + proc.state = State.STOPPED + changed = cfg("rq", env={"X": "1"}) + result = mgr.reread_config([changed]) + assert result["unchanged"] == ["rq"] + assert proc.config is changed and proc.state == State.STOPPED + + +def test_reread_unchanged_noop(): + mgr = make_manager("rq") + same = mgr.processes["rq"].config + result = mgr.reread_config([same]) + assert result["unchanged"] == ["rq"] + assert result["restarted"] == [] + + +def test_reread_duplicate_name_keeps_old(): + mgr = make_manager("rq") + result = mgr.reread_config([cfg("rq"), cfg("rq")]) + assert result["ok"] is False and result["kept_old_config"] is True + assert "duplicate" in result["error"] + + +def test_handle_command_reread_no_loader(): + mgr = make_manager("rq") + with pytest.raises(CommandError): + mgr.handle_command({"cmd": "reread"}) + + +def test_handle_command_reread_runs_loader(): + mgr = make_manager("rq") + mgr.config_loader = lambda: [mgr.processes["rq"].config] + resp = mgr.handle_command({"cmd": "reread"}) + assert resp["ok"] is True and resp["unchanged"] == ["rq"] + + +def test_handle_command_reread_bad_config(): + mgr = make_manager("rq") + def boom(): + raise ValueError("duplicate companion name rq") + mgr.config_loader = boom + resp = mgr.handle_command({"cmd": "reread"}) + assert resp["ok"] is False and resp["kept_old_config"] is True + + def test_start_process_stopped_spawns(): mgr = make_manager("rq") proc = mgr.processes["rq"]