feat(companion): Implement transactional reread

Add CompanionManager.reread_config(new_configs): diffs the running set against
a fresh, validated config list by config_hash -- a new name is added and
started, a missing name stopped and removed, a changed hash stores the config
and restarts (a manually stopped companion keeps STOPPED with the new config
ready), and an unchanged hash is left alone. Returns {ok, added, removed,
restarted, unchanged}. Validation runs first via _index_configs (duplicate-name
check), so a bad config mutates nothing and returns {ok: false, error,
kept_old_config: true}.

Wire the reread command to a config_loader hook on the manager -- the seam
between process supervision and config-file loading, set by the arbiter
(default None raises CommandError). A loader that raises returns the
kept-old-config error envelope.

Add tests for add/remove/restart-changed/manual-stop/unchanged/duplicate and
the reread no-loader, runs-loader, and bad-config paths.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Tanmoy Sarkar 2026-06-09 21:55:51 +05:30
parent ef6e42ecc1
commit 5db503295c
3 changed files with 153 additions and 1 deletions

View File

@ -687,7 +687,7 @@ No per-companion logic in Arbiter.
- [x] Implement `start`.
- [x] Implement `stop`.
- [x] Implement `restart`.
- [ ] Implement transactional `reread`.
- [x] Implement transactional `reread`.
- [ ] Add manager spawn/reap logic in Arbiter.
- [ ] Add manager shutdown handling in Arbiter.
- [ ] Wire Gunicorn reload to manager `reread` or restart.

View File

@ -31,6 +31,9 @@ class CompanionManager:
self.log = log
self.pid = os.getpid()
self.processes = {c.name: CompanionProcess(c) for c in configs}
# Set by the arbiter wiring: a no-arg callable that re-reads and
# validates companion config, returning a fresh CompanionConfig list.
self.config_loader = None
def handle_command(self, obj: dict) -> dict:
"""Route a decoded control command to its action.
@ -44,6 +47,15 @@ class CompanionManager:
cmd = obj["cmd"]
if cmd == "status":
return {"ok": True, "companions": self.status()}
if cmd == "reread":
if self.config_loader is None:
raise CommandError("reread not configured")
try:
new_configs = self.config_loader()
except Exception as e:
return {"ok": False, "error": "invalid config: %s" % e,
"kept_old_config": True}
return self.reread_config(new_configs)
# Every remaining command acts on one named companion.
name = obj.get("name")
@ -64,6 +76,62 @@ class CompanionManager:
now = now or time.time()
return [proc.status_dict(now) for proc in self.processes.values()]
def reread_config(self, new_configs) -> dict:
"""Transactionally apply a fresh set of companion configs.
Each companion is compared with the running set by ``config_hash``:
a new name is added and started, a missing name is stopped and removed,
a changed hash stores the new config and restarts (unless the companion
was manually stopped, which keeps it STOPPED with the new config ready
for its next start), and an unchanged hash is left alone. Validation
runs first, so a bad config touches nothing and the old one stays live.
"""
try:
new_by_name = self._index_configs(new_configs)
except CommandError as e:
return {"ok": False, "error": str(e), "kept_old_config": True}
added, removed, restarted, unchanged = [], [], [], []
old_names = set(self.processes)
new_names = set(new_by_name)
for name in old_names - new_names:
self.stop_process(name)
del self.processes[name]
removed.append(name)
for name in new_names - old_names:
proc = CompanionProcess(new_by_name[name])
self.processes[name] = proc
self.spawn_process(proc)
added.append(name)
for name in new_names & old_names:
proc = self.processes[name]
if proc.config.config_hash == new_by_name[name].config_hash:
unchanged.append(name)
continue
proc.config = new_by_name[name]
if proc.manual_stop:
unchanged.append(name)
else:
self.restart_process(name)
restarted.append(name)
return {"ok": True, "added": added, "removed": removed,
"restarted": restarted, "unchanged": unchanged}
@staticmethod
def _index_configs(configs) -> dict:
"""Index configs by name, rejecting duplicates."""
by_name = {}
for config in configs:
if config.name in by_name:
raise CommandError(
"invalid config: duplicate companion name %s" % config.name)
by_name[config.name] = config
return by_name
def spawn_process(self, proc: CompanionProcess) -> int:
"""Fork one companion.

View File

@ -172,6 +172,90 @@ def test_handle_command_unknown():
mgr.handle_command({"cmd": "reread"})
def cfg(name, **kw):
return CompanionConfig(name=name, target=lambda: None, **kw)
def test_reread_adds_new():
mgr = make_manager("rq")
new = [cfg("rq"), cfg("scheduler")]
with mock.patch("os.fork", return_value=10):
result = mgr.reread_config(new)
assert result["added"] == ["scheduler"]
assert "scheduler" in mgr.processes
assert mgr.processes["scheduler"].state == State.STARTING
def test_reread_removes_missing():
mgr = make_manager("rq", "scheduler")
mgr.processes["scheduler"].state = State.RUNNING
mgr.processes["scheduler"].pid = 11
with mock.patch("os.kill"):
result = mgr.reread_config([cfg("rq")])
assert result["removed"] == ["scheduler"]
assert "scheduler" not in mgr.processes
def test_reread_restarts_changed():
mgr = make_manager("rq")
mgr.processes["rq"].state = State.RUNNING
mgr.processes["rq"].pid = 12
changed = cfg("rq", env={"X": "1"}) # different hash
with mock.patch("os.kill"):
result = mgr.reread_config([changed])
assert result["restarted"] == ["rq"]
assert mgr.processes["rq"].config is changed
assert mgr.processes["rq"].state == State.STOPPING
def test_reread_changed_manual_stop_keeps_stopped():
mgr = make_manager("rq")
proc = mgr.processes["rq"]
proc.manual_stop = True
proc.state = State.STOPPED
changed = cfg("rq", env={"X": "1"})
result = mgr.reread_config([changed])
assert result["unchanged"] == ["rq"]
assert proc.config is changed and proc.state == State.STOPPED
def test_reread_unchanged_noop():
mgr = make_manager("rq")
same = mgr.processes["rq"].config
result = mgr.reread_config([same])
assert result["unchanged"] == ["rq"]
assert result["restarted"] == []
def test_reread_duplicate_name_keeps_old():
mgr = make_manager("rq")
result = mgr.reread_config([cfg("rq"), cfg("rq")])
assert result["ok"] is False and result["kept_old_config"] is True
assert "duplicate" in result["error"]
def test_handle_command_reread_no_loader():
mgr = make_manager("rq")
with pytest.raises(CommandError):
mgr.handle_command({"cmd": "reread"})
def test_handle_command_reread_runs_loader():
mgr = make_manager("rq")
mgr.config_loader = lambda: [mgr.processes["rq"].config]
resp = mgr.handle_command({"cmd": "reread"})
assert resp["ok"] is True and resp["unchanged"] == ["rq"]
def test_handle_command_reread_bad_config():
mgr = make_manager("rq")
def boom():
raise ValueError("duplicate companion name rq")
mgr.config_loader = boom
resp = mgr.handle_command({"cmd": "reread"})
assert resp["ok"] is False and resp["kept_old_config"] is True
def test_start_process_stopped_spawns():
mgr = make_manager("rq")
proc = mgr.processes["rq"]