refactor(companion): Spell out abbreviated identifiers

No behaviour change.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Tanmoy Sarkar 2026-06-09 22:03:13 +05:30
parent 5db503295c
commit 9f3762d6b6
6 changed files with 321 additions and 317 deletions

View File

@ -25,19 +25,19 @@ def decode_command(line):
JSON object carrying a string ``cmd``; anything else is a ``CommandError``.
"""
try:
obj = json.loads(line)
command = json.loads(line)
except (ValueError, TypeError):
raise CommandError("invalid JSON")
if not isinstance(obj, dict):
if not isinstance(command, dict):
raise CommandError("request must be a JSON object")
if not isinstance(obj.get("cmd"), str):
if not isinstance(command.get("cmd"), str):
raise CommandError("missing 'cmd'")
return obj
return command
def encode_response(obj):
def encode_response(response):
"""Encode a response dict as one newline-terminated JSON line of bytes."""
return (json.dumps(obj) + "\n").encode("utf-8")
return (json.dumps(response) + "\n").encode("utf-8")
class ControlServer:
@ -58,7 +58,7 @@ class ControlServer:
self.mode = mode
self.log = log
self.backlog = backlog
self.sock = None
self.listener = None
def create(self):
"""Bind and listen on the Unix socket, replacing any stale one.
@ -69,18 +69,18 @@ class ControlServer:
"""
if os.path.exists(self.path):
os.unlink(self.path)
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.bind(self.path)
listener = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
listener.bind(self.path)
os.chmod(self.path, self.mode)
sock.listen(self.backlog)
self.sock = sock
return sock
listener.listen(self.backlog)
self.listener = listener
return listener
def close(self):
"""Close the listening socket and remove its file."""
if self.sock is not None:
self.sock.close()
self.sock = None
if self.listener is not None:
self.listener.close()
self.listener = None
if os.path.exists(self.path):
os.unlink(self.path)
@ -93,25 +93,25 @@ class ControlServer:
"""
try:
response = self.dispatch(decode_command(line))
except CommandError as e:
response = {"ok": False, "error": str(e)}
except CommandError as error:
response = {"ok": False, "error": str(error)}
return encode_response(response)
def serve_connection(self, conn):
def serve_connection(self, connection):
"""Serve newline-delimited requests on one accepted connection.
Reads until the client hangs up, buffering partial reads and answering
each complete line as it arrives. A trailing fragment without a newline
is ignored.
"""
buf = b""
with conn:
buffer = b""
with connection:
while True:
chunk = conn.recv(65536)
chunk = connection.recv(65536)
if not chunk:
break
buf += chunk
while b"\n" in buf:
line, buf = buf.split(b"\n", 1)
buffer += chunk
while b"\n" in buffer:
line, buffer = buffer.split(b"\n", 1)
if line.strip():
conn.sendall(self.handle_line(line))
connection.sendall(self.handle_line(line))

View File

@ -35,7 +35,7 @@ class CompanionManager:
# validates companion config, returning a fresh CompanionConfig list.
self.config_loader = None
def handle_command(self, obj: dict) -> dict:
def handle_command(self, command: dict) -> dict:
"""Route a decoded control command to its action.
This is the ``dispatch`` the control socket calls. ``status`` returns a
@ -44,37 +44,37 @@ class CompanionManager:
commands need a string ``name``, and anything else raises ``CommandError`` so the
socket replies with an error envelope.
"""
cmd = obj["cmd"]
if cmd == "status":
command_name = command["cmd"]
if command_name == "status":
return {"ok": True, "companions": self.status()}
if cmd == "reread":
if command_name == "reread":
if self.config_loader is None:
raise CommandError("reread not configured")
try:
new_configs = self.config_loader()
except Exception as e:
return {"ok": False, "error": "invalid config: %s" % e,
except Exception as error:
return {"ok": False, "error": "invalid config: %s" % error,
"kept_old_config": True}
return self.reread_config(new_configs)
# Every remaining command acts on one named companion.
name = obj.get("name")
name = command.get("name")
if not isinstance(name, str):
raise CommandError("'%s' requires a 'name'" % cmd)
if cmd == "start":
raise CommandError("'%s' requires a 'name'" % command_name)
if command_name == "start":
ok, message = self.start_process(name)
elif cmd == "stop":
elif command_name == "stop":
ok, message = self.stop_process(name)
elif cmd == "restart":
elif command_name == "restart":
ok, message = self.restart_process(name)
else:
raise CommandError("unknown command %r" % cmd)
raise CommandError("unknown command %r" % command_name)
return {"ok": ok, "message": message}
def status(self, now: float = None) -> list:
"""Status entry for every companion, for the ``status`` command."""
now = now or time.time()
return [proc.status_dict(now) for proc in self.processes.values()]
return [process.status_dict(now) for process in self.processes.values()]
def reread_config(self, new_configs) -> dict:
"""Transactionally apply a fresh set of companion configs.
@ -101,18 +101,18 @@ class CompanionManager:
removed.append(name)
for name in new_names - old_names:
proc = CompanionProcess(new_by_name[name])
self.processes[name] = proc
self.spawn_process(proc)
process = CompanionProcess(new_by_name[name])
self.processes[name] = process
self.spawn_process(process)
added.append(name)
for name in new_names & old_names:
proc = self.processes[name]
if proc.config.config_hash == new_by_name[name].config_hash:
process = self.processes[name]
if process.config.config_hash == new_by_name[name].config_hash:
unchanged.append(name)
continue
proc.config = new_by_name[name]
if proc.manual_stop:
process.config = new_by_name[name]
if process.manual_stop:
unchanged.append(name)
else:
self.restart_process(name)
@ -132,7 +132,7 @@ class CompanionManager:
by_name[config.name] = config
return by_name
def spawn_process(self, proc: CompanionProcess) -> int:
def spawn_process(self, process: CompanionProcess) -> int:
"""Fork one companion.
Parent records the pid and moves the companion to STARTING. Child
@ -146,21 +146,21 @@ class CompanionManager:
"""
pid = os.fork()
if pid != 0:
proc.pid = pid
proc.state = State.STARTING
proc.started_at = time.time()
self.log.info("companion %s started (pid %s)", proc.name, pid)
process.pid = pid
process.state = State.STARTING
process.started_at = time.time()
self.log.info("companion %s started (pid %s)", process.name, pid)
return pid
try:
self._apply_environment(proc.config)
self._redirect_output(proc.config)
target = self._resolve_target(proc.config.target)
self._apply_environment(process.config)
self._redirect_output(process.config)
target = self._resolve_target(process.config.target)
target()
except SystemExit:
raise
except BaseException:
self.log.exception("companion %s crashed", proc.name)
self.log.exception("companion %s crashed", process.name)
os._exit(1)
os._exit(0)
@ -173,16 +173,16 @@ class CompanionManager:
without doing anything. STOPPING is rejected so the caller polls status
and retries once the old child is gone. Returns ``(ok, message)``.
"""
proc = self.processes.get(name)
if proc is None:
process = self.processes.get(name)
if process is None:
return False, "unknown companion %s" % name
if proc.state in (State.RUNNING, State.STARTING):
return True, "%s already %s" % (name, proc.state.lower())
if proc.state == State.STOPPING:
if process.state in (State.RUNNING, State.STARTING):
return True, "%s already %s" % (name, process.state.lower())
if process.state == State.STOPPING:
return False, "%s is stopping; retry" % name
proc.manual_stop = False
proc.next_retry_at = None
self.spawn_process(proc)
process.manual_stop = False
process.next_retry_at = None
self.spawn_process(process)
return True, "%s started" % name
def stop_process(self, name: str, now: float = None):
@ -195,21 +195,21 @@ class CompanionManager:
settles in STOPPED. STOPPED and STOPPING are already-there success
no-ops. Returns ``(ok, message)``.
"""
proc = self.processes.get(name)
if proc is None:
process = self.processes.get(name)
if process is None:
return False, "unknown companion %s" % name
proc.manual_stop = True
if proc.state in (State.STOPPED, State.STOPPING):
return True, "%s already %s" % (name, proc.state.lower())
if proc.state == State.BACKOFF:
proc.next_retry_at = None
proc.state = State.STOPPED
process.manual_stop = True
if process.state in (State.STOPPED, State.STOPPING):
return True, "%s already %s" % (name, process.state.lower())
if process.state == State.BACKOFF:
process.next_retry_at = None
process.state = State.STOPPED
return True, "%s stopped" % name
now = now or time.time()
os.kill(proc.pid, self._signal_number(proc.config.stop_signal))
proc.state = State.STOPPING
proc.stop_deadline = now + proc.config.stop_timeout
self.log.info("companion %s stopping (pid %s)", name, proc.pid)
os.kill(process.pid, self._signal_number(process.config.stop_signal))
process.state = State.STOPPING
process.stop_deadline = now + process.config.stop_timeout
self.log.info("companion %s stopping (pid %s)", name, process.pid)
return True, "%s stopping" % name
def restart_process(self, name: str, now: float = None):
@ -222,26 +222,26 @@ class CompanionManager:
STOPPED start again immediately. STOPPING is rejected so the caller
retries. This never rereads config. Returns ``(ok, message)``.
"""
proc = self.processes.get(name)
if proc is None:
process = self.processes.get(name)
if process is None:
return False, "unknown companion %s" % name
if proc.state == State.STOPPING:
if process.state == State.STOPPING:
return False, "%s is stopping; retry" % name
proc.manual_stop = False
if proc.state in (State.RUNNING, State.STARTING):
process.manual_stop = False
if process.state in (State.RUNNING, State.STARTING):
now = now or time.time()
proc.restart_pending = True
os.kill(proc.pid, self._signal_number(proc.config.stop_signal))
proc.state = State.STOPPING
proc.stop_deadline = now + proc.config.reload_timeout
self.log.info("companion %s restarting (pid %s)", name, proc.pid)
process.restart_pending = True
os.kill(process.pid, self._signal_number(process.config.stop_signal))
process.state = State.STOPPING
process.stop_deadline = now + process.config.reload_timeout
self.log.info("companion %s restarting (pid %s)", name, process.pid)
return True, "%s restarting" % name
proc.next_retry_at = None
self.spawn_process(proc)
process.next_retry_at = None
self.spawn_process(process)
return True, "%s started" % name
@staticmethod
def _signal_number(sig) -> int:
def _signal_number(stop_signal) -> int:
"""Resolve a stop signal to its number, e.g. ``"SIGTERM"`` -> 15.
Accepts a signal name or a raw number and validates both against the
@ -249,9 +249,11 @@ class CompanionManager:
than silently sending the wrong signal (or none).
"""
try:
return signal.Signals[sig] if isinstance(sig, str) else signal.Signals(sig)
if isinstance(stop_signal, str):
return signal.Signals[stop_signal]
return signal.Signals(stop_signal)
except (KeyError, ValueError):
raise ValueError("unknown stop signal %r" % (sig,))
raise ValueError("unknown stop signal %r" % (stop_signal,))
def reap_processes(self) -> list:
"""Reap any companions that have exited and record their exit info.
@ -282,14 +284,14 @@ class CompanionManager:
break
if pid == 0:
break
proc = self._process_by_pid(pid)
if proc is not None:
self._record_exit(proc, status)
self.handle_exit(proc)
reaped.append(proc)
process = self._process_by_pid(pid)
if process is not None:
self._record_exit(process, status)
self.handle_exit(process)
reaped.append(process)
return reaped
def handle_exit(self, proc: CompanionProcess, now: float = None) -> None:
def handle_exit(self, process: CompanionProcess, now: float = None) -> None:
"""Decide a companion's fate after it exits: restart, stop, or back off.
A pending restart wins: the old child was asked to stop only so a fresh
@ -300,19 +302,19 @@ class CompanionManager:
backoff, no retry cap).
"""
now = now or time.time()
if proc.restart_pending:
proc.restart_pending = False
proc.restart_count += 1
self.spawn_process(proc)
if process.restart_pending:
process.restart_pending = False
process.restart_count += 1
self.spawn_process(process)
return
if proc.manual_stop:
proc.state = State.STOPPED
proc.next_retry_at = None
if process.manual_stop:
process.state = State.STOPPED
process.next_retry_at = None
return
proc.state = State.BACKOFF
proc.next_retry_at = now + proc.restart_delay
process.state = State.BACKOFF
process.next_retry_at = now + process.restart_delay
self.log.info("companion %s exited, retrying in %ss",
proc.name, proc.restart_delay)
process.name, process.restart_delay)
def retry_backoff(self, now: float = None) -> list:
"""Respawn BACKOFF companions whose fixed retry delay has elapsed.
@ -322,14 +324,14 @@ class CompanionManager:
"""
now = now or time.time()
retried = []
for proc in self.processes.values():
if proc.state != State.BACKOFF or proc.next_retry_at is None:
for process in self.processes.values():
if process.state != State.BACKOFF or process.next_retry_at is None:
continue
if now >= proc.next_retry_at:
proc.restart_count += 1
proc.next_retry_at = None
self.spawn_process(proc)
retried.append(proc)
if now >= process.next_retry_at:
process.restart_count += 1
process.next_retry_at = None
self.spawn_process(process)
retried.append(process)
return retried
def promote_running(self, now: float = None) -> list:
@ -341,23 +343,23 @@ class CompanionManager:
"""
now = now or time.time()
promoted = []
for proc in self.processes.values():
if proc.state != State.STARTING or proc.started_at is None:
for process in self.processes.values():
if process.state != State.STARTING or process.started_at is None:
continue
if now - proc.started_at >= proc.config.startsecs:
proc.state = State.RUNNING
self.log.info("companion %s running (pid %s)", proc.name, proc.pid)
promoted.append(proc)
if now - process.started_at >= process.config.startsecs:
process.state = State.RUNNING
self.log.info("companion %s running (pid %s)", process.name, process.pid)
promoted.append(process)
return promoted
def _process_by_pid(self, pid: int):
for proc in self.processes.values():
if proc.pid == pid:
return proc
for process in self.processes.values():
if process.pid == pid:
return process
return None
@staticmethod
def _record_exit(proc: CompanionProcess, status: int) -> None:
def _record_exit(process: CompanionProcess, status: int) -> None:
"""Store how a companion died: signal number or exit code, plus time.
``status`` is the packed value from ``waitpid``. ``WIFSIGNALED`` tells
@ -366,14 +368,14 @@ class CompanionManager:
code. Only one of the two is ever set, so the other is cleared.
"""
if os.WIFSIGNALED(status):
proc.last_exit_signal = os.WTERMSIG(status)
proc.last_exit_code = None
process.last_exit_signal = os.WTERMSIG(status)
process.last_exit_code = None
else:
proc.last_exit_code = os.WEXITSTATUS(status)
proc.last_exit_signal = None
proc.exited_at = time.time()
proc.exit_count += 1
proc.pid = None
process.last_exit_code = os.WEXITSTATUS(status)
process.last_exit_signal = None
process.exited_at = time.time()
process.exit_count += 1
process.pid = None
@staticmethod
def _apply_environment(config: CompanionConfig) -> None:
@ -397,15 +399,15 @@ class CompanionManager:
we append the output there instead. For stderr you can also pass
``"stdout"`` to fold the two streams into one file.
"""
out = CompanionManager._open_output(config.stdout)
if out is not None:
os.dup2(out, 1)
stdout_fd = CompanionManager._open_output(config.stdout)
if stdout_fd is not None:
os.dup2(stdout_fd, 1)
if config.stderr == "stdout":
os.dup2(1, 2)
else:
err = CompanionManager._open_output(config.stderr)
if err is not None:
os.dup2(err, 2)
stderr_fd = CompanionManager._open_output(config.stderr)
if stderr_fd is not None:
os.dup2(stderr_fd, 2)
@staticmethod
def _open_output(value):
@ -424,7 +426,7 @@ class CompanionManager:
"""
if callable(target):
return target
module, sep, attr = target.partition(":")
if not sep:
module_name, separator, attribute = target.partition(":")
if not separator:
raise ValueError("companion target %r must be 'module:callable'" % target)
return getattr(importlib.import_module(module), attr)
return getattr(importlib.import_module(module_name), attribute)

View File

@ -78,15 +78,15 @@ class CompanionConfig:
# target has no stable repr across runs, so use its qualified name.
data = self.to_dict()
data["target"] = self._target_key(self.target)
blob = json.dumps(data, sort_keys=True, default=str)
return hashlib.sha256(blob.encode("utf-8")).hexdigest()
payload = json.dumps(data, sort_keys=True, default=str)
return hashlib.sha256(payload.encode("utf-8")).hexdigest()
@staticmethod
def _target_key(target):
if callable(target):
mod = getattr(target, "__module__", "")
qual = getattr(target, "__qualname__", repr(target))
return "%s:%s" % (mod, qual)
module = getattr(target, "__module__", "")
qualified_name = getattr(target, "__qualname__", repr(target))
return "%s:%s" % (module, qualified_name)
return str(target)
def __repr__(self):
@ -146,8 +146,8 @@ class CompanionProcess:
format_uptime(self.uptime(now) or 0),
)
if self.state == State.BACKOFF and self.next_retry_at is not None:
left = max(0, int(self.next_retry_at - now))
return "exited with %s, retrying in %ds" % (self._exit_status(), left)
seconds_left = max(0, int(self.next_retry_at - now))
return "exited with %s, retrying in %ds" % (self._exit_status(), seconds_left)
if self.state == State.BACKOFF:
return "exited with %s" % self._exit_status()
if self.state == State.STOPPED and self.manual_stop:

View File

@ -652,10 +652,10 @@ def unquote_to_wsgi_str(string):
def format_uptime(seconds):
"""Render a duration like supervisor: ``2 days, 03:12:44`` or ``0:05:12``."""
seconds = int(seconds)
days, rem = divmod(seconds, 86400)
hours, rem = divmod(rem, 3600)
minutes, secs = divmod(rem, 60)
days, remainder = divmod(seconds, 86400)
hours, remainder = divmod(remainder, 3600)
minutes, remaining_seconds = divmod(remainder, 60)
if days:
unit = "day" if days == 1 else "days"
return "%d %s, %02d:%02d:%02d" % (days, unit, hours, minutes, secs)
return "%d:%02d:%02d" % (hours, minutes, secs)
return "%d %s, %02d:%02d:%02d" % (days, unit, hours, minutes, remaining_seconds)
return "%d:%02d:%02d" % (hours, minutes, remaining_seconds)

View File

@ -35,51 +35,53 @@ def test_decode_command_missing_cmd():
def test_encode_response_newline_terminated():
out = encode_response({"ok": True})
assert out.endswith(b"\n")
assert json.loads(out) == {"ok": True}
response = encode_response({"ok": True})
assert response.endswith(b"\n")
assert json.loads(response) == {"ok": True}
def test_handle_line_dispatches():
server = ControlServer(dispatch=lambda obj: {"ok": True, "echo": obj["cmd"]},
path="/tmp/x.sock")
out = server.handle_line('{"cmd": "status"}')
assert json.loads(out) == {"ok": True, "echo": "status"}
server = ControlServer(
dispatch=lambda command: {"ok": True, "echo": command["cmd"]},
path="/tmp/x.sock")
response = server.handle_line('{"cmd": "status"}')
assert json.loads(response) == {"ok": True, "echo": "status"}
def test_handle_line_bad_json_error_envelope():
server = ControlServer(dispatch=lambda obj: {"ok": True}, path="/tmp/x.sock")
out = json.loads(server.handle_line("garbage"))
assert out["ok"] is False and "JSON" in out["error"]
server = ControlServer(dispatch=lambda command: {"ok": True}, path="/tmp/x.sock")
response = json.loads(server.handle_line("garbage"))
assert response["ok"] is False and "JSON" in response["error"]
def test_handle_line_dispatch_command_error():
def dispatch(obj):
def dispatch(command):
raise CommandError("unknown command")
server = ControlServer(dispatch=dispatch, path="/tmp/x.sock")
out = json.loads(server.handle_line('{"cmd": "bogus"}'))
assert out["ok"] is False and out["error"] == "unknown command"
response = json.loads(server.handle_line('{"cmd": "bogus"}'))
assert response["ok"] is False and response["error"] == "unknown command"
def test_create_unlinks_stale_and_chmods():
server = ControlServer(dispatch=lambda o: {}, path="/tmp/x.sock", mode=0o600)
sock = mock.Mock()
server = ControlServer(dispatch=lambda command: {}, path="/tmp/x.sock",
mode=0o600)
listener = mock.Mock()
with mock.patch("os.path.exists", return_value=True), \
mock.patch("os.unlink") as unlink, \
mock.patch("socket.socket", return_value=sock), \
mock.patch("socket.socket", return_value=listener), \
mock.patch("os.chmod") as chmod:
server.create()
unlink.assert_called_once_with("/tmp/x.sock")
sock.bind.assert_called_once_with("/tmp/x.sock")
listener.bind.assert_called_once_with("/tmp/x.sock")
chmod.assert_called_once_with("/tmp/x.sock", 0o600)
sock.listen.assert_called_once()
listener.listen.assert_called_once()
def test_close_unlinks():
server = ControlServer(dispatch=lambda o: {}, path="/tmp/x.sock")
server.sock = mock.Mock()
server = ControlServer(dispatch=lambda command: {}, path="/tmp/x.sock")
server.listener = mock.Mock()
with mock.patch("os.path.exists", return_value=True), \
mock.patch("os.unlink") as unlink:
server.close()
unlink.assert_called_once_with("/tmp/x.sock")
assert server.sock is None
assert server.listener is None

View File

@ -19,9 +19,9 @@ def make_manager(*names):
def test_manager_builds_one_process_per_config():
mgr = make_manager("rq", "scheduler")
assert set(mgr.processes) == {"rq", "scheduler"}
assert mgr.processes["rq"].state == State.STOPPED
manager = make_manager("rq", "scheduler")
assert set(manager.processes) == {"rq", "scheduler"}
assert manager.processes["rq"].state == State.STOPPED
def test_resolve_target_accepts_callable():
@ -63,10 +63,10 @@ def test_open_output_inherit_returns_none():
def test_open_output_path_opens_append():
with mock.patch("os.open", return_value=9) as op:
with mock.patch("os.open", return_value=9) as open_mock:
fd = CompanionManager._open_output("/var/log/rq.log")
assert fd == 9
flags = op.call_args.args[1]
flags = open_mock.call_args.args[1]
assert flags & os.O_APPEND and flags & os.O_CREAT
@ -98,12 +98,12 @@ def test_redirect_output_inherit_noop():
def test_reap_records_exit_code():
mgr = make_manager("rq")
proc = mgr.processes["rq"]
manager = make_manager("rq")
proc = manager.processes["rq"]
proc.pid = 4321
# exit code 1 -> status 1<<8; second call drains the queue.
with mock.patch("os.waitpid", side_effect=[(4321, 1 << 8), (0, 0)]):
reaped = mgr.reap_processes()
reaped = manager.reap_processes()
assert reaped == [proc]
assert proc.last_exit_code == 1
assert proc.last_exit_signal is None
@ -112,229 +112,229 @@ def test_reap_records_exit_code():
def test_reap_records_signal():
mgr = make_manager("rq")
proc = mgr.processes["rq"]
manager = make_manager("rq")
proc = manager.processes["rq"]
proc.pid = 4321
with mock.patch("os.waitpid", side_effect=[(4321, 9), (0, 0)]):
mgr.reap_processes()
manager.reap_processes()
assert proc.last_exit_signal == 9
assert proc.last_exit_code is None
def test_reap_no_children():
mgr = make_manager("rq")
manager = make_manager("rq")
with mock.patch("os.waitpid", side_effect=ChildProcessError):
assert mgr.reap_processes() == []
assert manager.reap_processes() == []
def test_status_lists_all_companions():
mgr = make_manager("rq", "scheduler")
entries = mgr.status(now=100.0)
manager = make_manager("rq", "scheduler")
entries = manager.status(now=100.0)
assert {e["name"] for e in entries} == {"rq", "scheduler"}
assert all("state" in e and "description" in e for e in entries)
def test_handle_command_status():
mgr = make_manager("rq")
resp = mgr.handle_command({"cmd": "status"})
manager = make_manager("rq")
resp = manager.handle_command({"cmd": "status"})
assert resp["ok"] is True
assert resp["companions"][0]["name"] == "rq"
def test_handle_command_start_routes():
mgr = make_manager("rq")
with mock.patch.object(mgr, "start_process",
return_value=(True, "rq started")) as sp:
resp = mgr.handle_command({"cmd": "start", "name": "rq"})
sp.assert_called_once_with("rq")
manager = make_manager("rq")
with mock.patch.object(manager, "start_process",
return_value=(True, "rq started")) as start_mock:
resp = manager.handle_command({"cmd": "start", "name": "rq"})
start_mock.assert_called_once_with("rq")
assert resp == {"ok": True, "message": "rq started"}
def test_handle_command_stop_and_restart_route():
mgr = make_manager("rq")
with mock.patch.object(mgr, "stop_process", return_value=(True, "s")) as st, \
mock.patch.object(mgr, "restart_process", return_value=(True, "r")) as rt:
mgr.handle_command({"cmd": "stop", "name": "rq"})
mgr.handle_command({"cmd": "restart", "name": "rq"})
st.assert_called_once_with("rq")
rt.assert_called_once_with("rq")
manager = make_manager("rq")
with mock.patch.object(manager, "stop_process", return_value=(True, "s")) as stop_mock, \
mock.patch.object(manager, "restart_process", return_value=(True, "r")) as restart_mock:
manager.handle_command({"cmd": "stop", "name": "rq"})
manager.handle_command({"cmd": "restart", "name": "rq"})
stop_mock.assert_called_once_with("rq")
restart_mock.assert_called_once_with("rq")
def test_handle_command_missing_name():
mgr = make_manager("rq")
manager = make_manager("rq")
with pytest.raises(CommandError):
mgr.handle_command({"cmd": "start"})
manager.handle_command({"cmd": "start"})
def test_handle_command_unknown():
mgr = make_manager("rq")
manager = make_manager("rq")
with pytest.raises(CommandError):
mgr.handle_command({"cmd": "reread"})
manager.handle_command({"cmd": "reread"})
def cfg(name, **kw):
return CompanionConfig(name=name, target=lambda: None, **kw)
def make_config(name, **kwargs):
return CompanionConfig(name=name, target=lambda: None, **kwargs)
def test_reread_adds_new():
mgr = make_manager("rq")
new = [cfg("rq"), cfg("scheduler")]
manager = make_manager("rq")
new = [make_config("rq"), make_config("scheduler")]
with mock.patch("os.fork", return_value=10):
result = mgr.reread_config(new)
result = manager.reread_config(new)
assert result["added"] == ["scheduler"]
assert "scheduler" in mgr.processes
assert mgr.processes["scheduler"].state == State.STARTING
assert "scheduler" in manager.processes
assert manager.processes["scheduler"].state == State.STARTING
def test_reread_removes_missing():
mgr = make_manager("rq", "scheduler")
mgr.processes["scheduler"].state = State.RUNNING
mgr.processes["scheduler"].pid = 11
manager = make_manager("rq", "scheduler")
manager.processes["scheduler"].state = State.RUNNING
manager.processes["scheduler"].pid = 11
with mock.patch("os.kill"):
result = mgr.reread_config([cfg("rq")])
result = manager.reread_config([make_config("rq")])
assert result["removed"] == ["scheduler"]
assert "scheduler" not in mgr.processes
assert "scheduler" not in manager.processes
def test_reread_restarts_changed():
mgr = make_manager("rq")
mgr.processes["rq"].state = State.RUNNING
mgr.processes["rq"].pid = 12
changed = cfg("rq", env={"X": "1"}) # different hash
manager = make_manager("rq")
manager.processes["rq"].state = State.RUNNING
manager.processes["rq"].pid = 12
changed = make_config("rq", env={"X": "1"}) # different hash
with mock.patch("os.kill"):
result = mgr.reread_config([changed])
result = manager.reread_config([changed])
assert result["restarted"] == ["rq"]
assert mgr.processes["rq"].config is changed
assert mgr.processes["rq"].state == State.STOPPING
assert manager.processes["rq"].config is changed
assert manager.processes["rq"].state == State.STOPPING
def test_reread_changed_manual_stop_keeps_stopped():
mgr = make_manager("rq")
proc = mgr.processes["rq"]
manager = make_manager("rq")
proc = manager.processes["rq"]
proc.manual_stop = True
proc.state = State.STOPPED
changed = cfg("rq", env={"X": "1"})
result = mgr.reread_config([changed])
changed = make_config("rq", env={"X": "1"})
result = manager.reread_config([changed])
assert result["unchanged"] == ["rq"]
assert proc.config is changed and proc.state == State.STOPPED
def test_reread_unchanged_noop():
mgr = make_manager("rq")
same = mgr.processes["rq"].config
result = mgr.reread_config([same])
manager = make_manager("rq")
same = manager.processes["rq"].config
result = manager.reread_config([same])
assert result["unchanged"] == ["rq"]
assert result["restarted"] == []
def test_reread_duplicate_name_keeps_old():
mgr = make_manager("rq")
result = mgr.reread_config([cfg("rq"), cfg("rq")])
manager = make_manager("rq")
result = manager.reread_config([make_config("rq"), make_config("rq")])
assert result["ok"] is False and result["kept_old_config"] is True
assert "duplicate" in result["error"]
def test_handle_command_reread_no_loader():
mgr = make_manager("rq")
manager = make_manager("rq")
with pytest.raises(CommandError):
mgr.handle_command({"cmd": "reread"})
manager.handle_command({"cmd": "reread"})
def test_handle_command_reread_runs_loader():
mgr = make_manager("rq")
mgr.config_loader = lambda: [mgr.processes["rq"].config]
resp = mgr.handle_command({"cmd": "reread"})
manager = make_manager("rq")
manager.config_loader = lambda: [manager.processes["rq"].config]
resp = manager.handle_command({"cmd": "reread"})
assert resp["ok"] is True and resp["unchanged"] == ["rq"]
def test_handle_command_reread_bad_config():
mgr = make_manager("rq")
manager = make_manager("rq")
def boom():
raise ValueError("duplicate companion name rq")
mgr.config_loader = boom
resp = mgr.handle_command({"cmd": "reread"})
manager.config_loader = boom
resp = manager.handle_command({"cmd": "reread"})
assert resp["ok"] is False and resp["kept_old_config"] is True
def test_start_process_stopped_spawns():
mgr = make_manager("rq")
proc = mgr.processes["rq"]
manager = make_manager("rq")
proc = manager.processes["rq"]
with mock.patch("os.fork", return_value=70) as fork:
ok, _ = mgr.start_process("rq")
ok, _ = manager.start_process("rq")
fork.assert_called_once()
assert ok and proc.state == State.STARTING and proc.manual_stop is False
def test_start_process_backoff_cancels_retry():
mgr = make_manager("rq")
proc = mgr.processes["rq"]
manager = make_manager("rq")
proc = manager.processes["rq"]
proc.state = State.BACKOFF
proc.next_retry_at = 999.0
proc.manual_stop = True
with mock.patch("os.fork", return_value=71):
ok, _ = mgr.start_process("rq")
ok, _ = manager.start_process("rq")
assert ok and proc.state == State.STARTING
assert proc.next_retry_at is None and proc.manual_stop is False
def test_start_process_running_is_noop():
mgr = make_manager("rq")
mgr.processes["rq"].state = State.RUNNING
manager = make_manager("rq")
manager.processes["rq"].state = State.RUNNING
with mock.patch("os.fork") as fork:
ok, _ = mgr.start_process("rq")
ok, _ = manager.start_process("rq")
assert ok
fork.assert_not_called()
def test_start_process_stopping_rejected():
mgr = make_manager("rq")
mgr.processes["rq"].state = State.STOPPING
ok, msg = mgr.start_process("rq")
manager = make_manager("rq")
manager.processes["rq"].state = State.STOPPING
ok, msg = manager.start_process("rq")
assert not ok and "stopping" in msg
def test_start_process_unknown():
mgr = make_manager("rq")
ok, _ = mgr.start_process("nope")
manager = make_manager("rq")
ok, _ = manager.start_process("nope")
assert not ok
def test_stop_process_running_signals_and_stopping():
mgr = make_manager("rq")
proc = mgr.processes["rq"]
manager = make_manager("rq")
proc = manager.processes["rq"]
proc.state = State.RUNNING
proc.pid = 80
proc.config.stop_timeout = 60
with mock.patch("os.kill") as kill:
ok, _ = mgr.stop_process("rq", now=200.0)
ok, _ = manager.stop_process("rq", now=200.0)
kill.assert_called_once_with(80, signal.SIGTERM)
assert ok and proc.state == State.STOPPING
assert proc.manual_stop is True and proc.stop_deadline == 260.0
def test_stop_process_backoff_to_stopped():
mgr = make_manager("rq")
proc = mgr.processes["rq"]
manager = make_manager("rq")
proc = manager.processes["rq"]
proc.state = State.BACKOFF
proc.next_retry_at = 999.0
with mock.patch("os.kill") as kill:
ok, _ = mgr.stop_process("rq")
ok, _ = manager.stop_process("rq")
kill.assert_not_called()
assert ok and proc.state == State.STOPPED
assert proc.next_retry_at is None and proc.manual_stop is True
def test_stop_process_already_stopped():
mgr = make_manager("rq")
manager = make_manager("rq")
with mock.patch("os.kill") as kill:
ok, _ = mgr.stop_process("rq")
ok, _ = manager.stop_process("rq")
kill.assert_not_called()
assert ok and mgr.processes["rq"].manual_stop is True
assert ok and manager.processes["rq"].manual_stop is True
def test_stop_process_unknown():
mgr = make_manager("rq")
ok, _ = mgr.stop_process("nope")
manager = make_manager("rq")
ok, _ = manager.stop_process("nope")
assert not ok
@ -349,14 +349,14 @@ def test_signal_number_rejects_bad():
def test_restart_process_running_stops_with_reload_timeout():
mgr = make_manager("rq")
proc = mgr.processes["rq"]
manager = make_manager("rq")
proc = manager.processes["rq"]
proc.state = State.RUNNING
proc.pid = 90
proc.config.reload_timeout = 30
proc.manual_stop = True
with mock.patch("os.kill") as kill:
ok, _ = mgr.restart_process("rq", now=300.0)
ok, _ = manager.restart_process("rq", now=300.0)
kill.assert_called_once_with(90, signal.SIGTERM)
assert ok and proc.state == State.STOPPING
assert proc.restart_pending is True and proc.stop_deadline == 330.0
@ -364,14 +364,14 @@ def test_restart_process_running_stops_with_reload_timeout():
def test_restart_pending_reap_respawns_immediately():
mgr = make_manager("rq")
proc = mgr.processes["rq"]
manager = make_manager("rq")
proc = manager.processes["rq"]
proc.state = State.STOPPING
proc.restart_pending = True
proc.pid = 91
with mock.patch("os.waitpid", side_effect=[(91, 0), (0, 0)]), \
mock.patch("os.fork", return_value=92):
mgr.reap_processes()
manager.reap_processes()
assert proc.state == State.STARTING
assert proc.pid == 92
assert proc.restart_pending is False
@ -379,90 +379,90 @@ def test_restart_pending_reap_respawns_immediately():
def test_restart_process_stopped_starts_now():
mgr = make_manager("rq")
proc = mgr.processes["rq"]
manager = make_manager("rq")
proc = manager.processes["rq"]
with mock.patch("os.fork", return_value=93), mock.patch("os.kill") as kill:
ok, _ = mgr.restart_process("rq")
ok, _ = manager.restart_process("rq")
kill.assert_not_called()
assert ok and proc.state == State.STARTING
def test_restart_process_backoff_starts_now():
mgr = make_manager("rq")
proc = mgr.processes["rq"]
manager = make_manager("rq")
proc = manager.processes["rq"]
proc.state = State.BACKOFF
proc.next_retry_at = 999.0
with mock.patch("os.fork", return_value=94):
ok, _ = mgr.restart_process("rq")
ok, _ = manager.restart_process("rq")
assert ok and proc.state == State.STARTING and proc.next_retry_at is None
def test_restart_process_stopping_rejected():
mgr = make_manager("rq")
mgr.processes["rq"].state = State.STOPPING
ok, msg = mgr.restart_process("rq")
manager = make_manager("rq")
manager.processes["rq"].state = State.STOPPING
ok, msg = manager.restart_process("rq")
assert not ok and "stopping" in msg
def test_manual_stop_preserved_through_exit():
# stop a running companion, then reap its child: it must settle in STOPPED
# with manual_stop still set so it is not auto-restarted.
mgr = make_manager("rq")
proc = mgr.processes["rq"]
manager = make_manager("rq")
proc = manager.processes["rq"]
proc.state = State.RUNNING
proc.pid = 60
with mock.patch("os.kill"):
mgr.stop_process("rq", now=10.0)
manager.stop_process("rq", now=10.0)
with mock.patch("os.waitpid", side_effect=[(60, 0), (0, 0)]), \
mock.patch("os.fork") as fork:
mgr.reap_processes()
manager.reap_processes()
fork.assert_not_called()
assert proc.state == State.STOPPED and proc.manual_stop is True
def test_start_clears_manual_stop():
mgr = make_manager("rq")
proc = mgr.processes["rq"]
manager = make_manager("rq")
proc = manager.processes["rq"]
proc.manual_stop = True
with mock.patch("os.fork", return_value=61):
mgr.start_process("rq")
manager.start_process("rq")
assert proc.manual_stop is False
def test_spawn_does_not_touch_manual_stop():
mgr = make_manager("rq")
proc = mgr.processes["rq"]
manager = make_manager("rq")
proc = manager.processes["rq"]
proc.manual_stop = True
with mock.patch("os.fork", return_value=62):
mgr.spawn_process(proc)
manager.spawn_process(proc)
assert proc.manual_stop is True
def test_handle_exit_unexpected_backoff():
mgr = make_manager("rq")
proc = mgr.processes["rq"]
manager = make_manager("rq")
proc = manager.processes["rq"]
proc.restart_delay = 5
mgr.handle_exit(proc, now=100.0)
manager.handle_exit(proc, now=100.0)
assert proc.state == State.BACKOFF
assert proc.next_retry_at == 105.0
def test_handle_exit_manual_stop_stays_stopped():
mgr = make_manager("rq")
proc = mgr.processes["rq"]
manager = make_manager("rq")
proc = manager.processes["rq"]
proc.manual_stop = True
mgr.handle_exit(proc, now=100.0)
manager.handle_exit(proc, now=100.0)
assert proc.state == State.STOPPED
assert proc.next_retry_at is None
def test_retry_backoff_respawns_when_due():
mgr = make_manager("rq")
proc = mgr.processes["rq"]
manager = make_manager("rq")
proc = manager.processes["rq"]
proc.state = State.BACKOFF
proc.next_retry_at = 100.0
with mock.patch("os.fork", return_value=555):
retried = mgr.retry_backoff(now=101.0)
retried = manager.retry_backoff(now=101.0)
assert retried == [proc]
assert proc.restart_count == 1
assert proc.state == State.STARTING
@ -470,59 +470,59 @@ def test_retry_backoff_respawns_when_due():
def test_retry_backoff_waits_until_due():
mgr = make_manager("rq")
proc = mgr.processes["rq"]
manager = make_manager("rq")
proc = manager.processes["rq"]
proc.state = State.BACKOFF
proc.next_retry_at = 100.0
assert mgr.retry_backoff(now=99.0) == []
assert manager.retry_backoff(now=99.0) == []
assert proc.state == State.BACKOFF
def test_reap_unexpected_exit_enters_backoff():
mgr = make_manager("rq")
proc = mgr.processes["rq"]
manager = make_manager("rq")
proc = manager.processes["rq"]
proc.pid = 4321
with mock.patch("os.waitpid", side_effect=[(4321, 1 << 8), (0, 0)]):
mgr.reap_processes()
manager.reap_processes()
assert proc.state == State.BACKOFF
assert proc.next_retry_at is not None
def test_promote_running_after_startsecs():
mgr = make_manager("rq")
proc = mgr.processes["rq"]
manager = make_manager("rq")
proc = manager.processes["rq"]
proc.config.startsecs = 1
proc.state = State.STARTING
proc.started_at = 100.0
promoted = mgr.promote_running(now=101.5)
promoted = manager.promote_running(now=101.5)
assert promoted == [proc]
assert proc.state == State.RUNNING
def test_promote_running_too_early():
mgr = make_manager("rq")
proc = mgr.processes["rq"]
manager = make_manager("rq")
proc = manager.processes["rq"]
proc.config.startsecs = 5
proc.state = State.STARTING
proc.started_at = 100.0
assert mgr.promote_running(now=102.0) == []
assert manager.promote_running(now=102.0) == []
assert proc.state == State.STARTING
def test_promote_running_ignores_non_starting():
mgr = make_manager("rq")
proc = mgr.processes["rq"]
manager = make_manager("rq")
proc = manager.processes["rq"]
proc.state = State.BACKOFF
proc.started_at = 100.0
assert mgr.promote_running(now=999.0) == []
assert manager.promote_running(now=999.0) == []
assert proc.state == State.BACKOFF
def test_spawn_parent_records_pid_and_starting():
mgr = make_manager("rq")
proc = mgr.processes["rq"]
manager = make_manager("rq")
proc = manager.processes["rq"]
with mock.patch("os.fork", return_value=4321):
pid = mgr.spawn_process(proc)
pid = manager.spawn_process(proc)
assert pid == 4321
assert proc.pid == 4321
assert proc.state == State.STARTING