feat(companion): Implement BACKOFF with fixed restart delay

Reaping now transitions each exited companion via handle_exit: a manually
stopped one settles in STOPPED, any other exit enters BACKOFF with
next_retry_at = now + restart_delay (fixed, no exponential backoff or cap).
Add retry_backoff to re-fork BACKOFF companions once their delay elapses,
bumping restart_count and returning them to STARTING.

Add tests for backoff on unexpected exit, manual-stop staying stopped, retry
timing, and reap-to-backoff.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Tanmoy Sarkar 2026-06-09 17:56:22 +05:30
parent 84d69c46fd
commit 87bc4cf70e
3 changed files with 105 additions and 9 deletions

View File

@ -676,7 +676,7 @@ No per-companion logic in Arbiter.
- [x] Redirect `stdout` and `stderr`.
- [x] Reap exited companion processes.
- [x] Implement `STARTING -> RUNNING` using `startsecs`.
- [ ] Implement `BACKOFF` with fixed `companion_restart_delay`.
- [x] Implement `BACKOFF` with fixed `companion_restart_delay`.
- [ ] Implement `start_process`.
- [ ] Implement `stop_process`.
- [ ] Implement `restart_process`.

View File

@ -61,14 +61,23 @@ class CompanionManager:
def reap_processes(self) -> list:
"""Reap any companions that have exited and record their exit info.
``waitpid(-1, WNOHANG)`` asks the kernel for any dead child without
blocking: it returns ``(pid, status)`` for one reaped child, or
``(0, 0)`` when none are waiting. We loop so a single call clears the
whole backlog (several children can die between ticks), and stop on
``ChildProcessError`` which means there are no children left at all.
Each reaped pid is matched back to its companion. Deciding whether to
restart belongs to the run loop; here we only record the exit and free
the pid, returning the companions that were reaped.
A companion runs as a forked child of the manager, so when it dies the
kernel hands its exit status back to us as a zombie until we collect it
with ``waitpid``. This method does that collecting, and is meant to be
called once per run-loop tick (typically after a ``SIGCHLD``).
``waitpid(-1, WNOHANG)`` asks the kernel for any one dead child without
blocking. It returns ``(pid, status)`` for a child it reaped, or
``(0, 0)`` when children are still alive but none have exited. Several
companions can die between two ticks, so we loop until one of those two
stop conditions is hit: ``(0, 0)`` (nothing more to reap right now) or
``ChildProcessError`` (no child processes exist at all).
For each reaped pid we look up its companion, then in order: record the
exit (signal or code, time, count), free the pid, and move it to its
next public state via :meth:`handle_exit` -- STOPPED if it was stopped
on purpose, otherwise BACKOFF for a later restart. Pids we don't
recognise are ignored. Returns the list of companions reaped this call.
"""
reaped = []
while True:
@ -81,9 +90,46 @@ class CompanionManager:
proc = self._process_by_pid(pid)
if proc is not None:
self._record_exit(proc, status)
self.handle_exit(proc)
reaped.append(proc)
return reaped
def handle_exit(self, proc: CompanionProcess, now: float = None) -> None:
"""Decide a companion's fate after it exits: stay stopped or back off.
A companion that was stopped on purpose settles in STOPPED and stays
there. Any other exit is unexpected, so it enters BACKOFF and is
scheduled to restart after a fixed ``restart_delay`` (no exponential
backoff, no retry cap).
"""
now = now or time.time()
if proc.manual_stop:
proc.state = State.STOPPED
proc.next_retry_at = None
return
proc.state = State.BACKOFF
proc.next_retry_at = now + proc.restart_delay
self.log.info("companion %s exited, retrying in %ss",
proc.name, proc.restart_delay)
def retry_backoff(self, now: float = None) -> list:
"""Respawn BACKOFF companions whose fixed retry delay has elapsed.
Each retry bumps ``restart_count`` and re-forks the companion, which
puts it back into STARTING. Returns the companions that were retried.
"""
now = now or time.time()
retried = []
for proc in self.processes.values():
if proc.state != State.BACKOFF or proc.next_retry_at is None:
continue
if now >= proc.next_retry_at:
proc.restart_count += 1
proc.next_retry_at = None
self.spawn_process(proc)
retried.append(proc)
return retried
def promote_running(self, now: float = None) -> list:
"""Move companions that survived ``startsecs`` from STARTING to RUNNING.

View File

@ -125,6 +125,56 @@ def test_reap_no_children():
assert mgr.reap_processes() == []
def test_handle_exit_unexpected_backoff():
mgr = make_manager("rq")
proc = mgr.processes["rq"]
proc.restart_delay = 5
mgr.handle_exit(proc, now=100.0)
assert proc.state == State.BACKOFF
assert proc.next_retry_at == 105.0
def test_handle_exit_manual_stop_stays_stopped():
mgr = make_manager("rq")
proc = mgr.processes["rq"]
proc.manual_stop = True
mgr.handle_exit(proc, now=100.0)
assert proc.state == State.STOPPED
assert proc.next_retry_at is None
def test_retry_backoff_respawns_when_due():
mgr = make_manager("rq")
proc = mgr.processes["rq"]
proc.state = State.BACKOFF
proc.next_retry_at = 100.0
with mock.patch("os.fork", return_value=555):
retried = mgr.retry_backoff(now=101.0)
assert retried == [proc]
assert proc.restart_count == 1
assert proc.state == State.STARTING
assert proc.pid == 555
def test_retry_backoff_waits_until_due():
mgr = make_manager("rq")
proc = mgr.processes["rq"]
proc.state = State.BACKOFF
proc.next_retry_at = 100.0
assert mgr.retry_backoff(now=99.0) == []
assert proc.state == State.BACKOFF
def test_reap_unexpected_exit_enters_backoff():
mgr = make_manager("rq")
proc = mgr.processes["rq"]
proc.pid = 4321
with mock.patch("os.waitpid", side_effect=[(4321, 1 << 8), (0, 0)]):
mgr.reap_processes()
assert proc.state == State.BACKOFF
assert proc.next_retry_at is not None
def test_promote_running_after_startsecs():
mgr = make_manager("rq")
proc = mgr.processes["rq"]