feat(companion): Reap exited companion processes

Add reap_processes to CompanionManager: drains waitpid(WNOHANG), matches each
dead pid back to its companion, and records the exit via _record_exit (signal
number or exit code, exited_at, exit_count) while freeing the pid. Returns the
reaped companions; the restart decision stays with the run loop.

Add tests for exit-code, signal, and no-children cases.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Tanmoy Sarkar 2026-06-09 17:49:27 +05:30
parent 2bf7e1b1fb
commit bd8a91f656
3 changed files with 82 additions and 1 deletions

View File

@ -674,7 +674,7 @@ No per-companion logic in Arbiter.
- [x] Spawn one companion process from the manager.
- [x] Apply `cwd` and `env` before target.
- [x] Redirect `stdout` and `stderr`.
- [ ] Reap exited companion processes.
- [x] Reap exited companion processes.
- [ ] Implement `STARTING -> RUNNING` using `startsecs`.
- [ ] Implement `BACKOFF` with fixed `companion_restart_delay`.
- [ ] Implement `start_process`.

View File

@ -58,6 +58,57 @@ class CompanionManager:
os._exit(1)
os._exit(0)
def reap_processes(self) -> list:
"""Reap any companions that have exited and record their exit info.
``waitpid(-1, WNOHANG)`` asks the kernel for any dead child without
blocking: it returns ``(pid, status)`` for one reaped child, or
``(0, 0)`` when none are waiting. We loop so a single call clears the
whole backlog (several children can die between ticks), and stop on
``ChildProcessError`` which means there are no children left at all.
Each reaped pid is matched back to its companion. Deciding whether to
restart belongs to the run loop; here we only record the exit and free
the pid, returning the companions that were reaped.
"""
reaped = []
while True:
try:
pid, status = os.waitpid(-1, os.WNOHANG)
except ChildProcessError:
break
if pid == 0:
break
proc = self._process_by_pid(pid)
if proc is not None:
self._record_exit(proc, status)
reaped.append(proc)
return reaped
def _process_by_pid(self, pid: int):
for proc in self.processes.values():
if proc.pid == pid:
return proc
return None
@staticmethod
def _record_exit(proc: CompanionProcess, status: int) -> None:
"""Store how a companion died: signal number or exit code, plus time.
``status`` is the packed value from ``waitpid``. ``WIFSIGNALED`` tells
us a signal killed it, in which case ``WTERMSIG`` gives the signal
number; otherwise it exited normally and ``WEXITSTATUS`` gives its exit
code. Only one of the two is ever set, so the other is cleared.
"""
if os.WIFSIGNALED(status):
proc.last_exit_signal = os.WTERMSIG(status)
proc.last_exit_code = None
else:
proc.last_exit_code = os.WEXITSTATUS(status)
proc.last_exit_signal = None
proc.exited_at = time.time()
proc.exit_count += 1
proc.pid = None
@staticmethod
def _apply_environment(config: CompanionConfig) -> None:
"""Apply ``cwd`` and ``env`` in the child before running the target.

View File

@ -95,6 +95,36 @@ def test_redirect_output_inherit_noop():
dup2.assert_not_called()
def test_reap_records_exit_code():
mgr = make_manager("rq")
proc = mgr.processes["rq"]
proc.pid = 4321
# exit code 1 -> status 1<<8; second call drains the queue.
with mock.patch("os.waitpid", side_effect=[(4321, 1 << 8), (0, 0)]):
reaped = mgr.reap_processes()
assert reaped == [proc]
assert proc.last_exit_code == 1
assert proc.last_exit_signal is None
assert proc.exit_count == 1
assert proc.pid is None
def test_reap_records_signal():
mgr = make_manager("rq")
proc = mgr.processes["rq"]
proc.pid = 4321
with mock.patch("os.waitpid", side_effect=[(4321, 9), (0, 0)]):
mgr.reap_processes()
assert proc.last_exit_signal == 9
assert proc.last_exit_code is None
def test_reap_no_children():
mgr = make_manager("rq")
with mock.patch("os.waitpid", side_effect=ChildProcessError):
assert mgr.reap_processes() == []
def test_spawn_parent_records_pid_and_starting():
mgr = make_manager("rq")
proc = mgr.processes["rq"]