gunicorn/tests/test_companion_control.py
Tanmoy Sarkar 672c45a9c7 fix(companion): Harden manager and control against runtime errors
- _safe_kill: a companion can exit between the manager deciding to
  signal it and the kill landing; swallow ProcessLookupError at the three
  os.kill sites so the resulting race cannot take the manager down.
- _redirect_output: close the opened log fd after dup2 so a long-lived
  companion does not leak a descriptor per start.
- serve_connection: drop a control connection whose line grows past
  MAX_LINE_BYTES without a newline, so a client cannot pin unbounded
  memory in the manager.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-12 23:30:48 +05:30

186 lines
5.9 KiB
Python

#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
import json
from unittest import mock
import pytest
from gunicorn.companion.config import CompanionConfig
from gunicorn.companion.control import (
MAX_LINE_BYTES,
CommandError,
ControlServer,
decode_command,
encode_response,
)
from gunicorn.companion.manager import CompanionManager
def make_manager(*names):
configs = [CompanionConfig(name=name, target=lambda: None) for name in names]
return CompanionManager(configs, log=mock.Mock())
def server_for(manager):
return ControlServer(dispatch=manager.handle_command, path="/tmp/x.sock")
def test_decode_command_valid():
assert decode_command('{"cmd": "status"}') == {"cmd": "status"}
def test_decode_command_bad_json():
with pytest.raises(CommandError):
decode_command("{not json")
def test_decode_command_not_object():
with pytest.raises(CommandError):
decode_command("[1, 2, 3]")
def test_decode_command_missing_cmd():
with pytest.raises(CommandError):
decode_command('{"name": "rq"}')
def test_encode_response_newline_terminated():
response = encode_response({"ok": True})
assert response.endswith(b"\n")
assert json.loads(response) == {"ok": True}
def test_handle_line_dispatches():
server = ControlServer(
dispatch=lambda command: {"ok": True, "echo": command["cmd"]},
path="/tmp/x.sock")
response = server.handle_line('{"cmd": "status"}')
assert json.loads(response) == {"ok": True, "echo": "status"}
def test_handle_line_bad_json_error_envelope():
server = ControlServer(dispatch=lambda command: {"ok": True}, path="/tmp/x.sock")
response = json.loads(server.handle_line("garbage"))
assert response["ok"] is False and "JSON" in response["error"]
def test_handle_line_dispatch_command_error():
def dispatch(command):
raise CommandError("unknown command")
server = ControlServer(dispatch=dispatch, path="/tmp/x.sock")
response = json.loads(server.handle_line('{"cmd": "bogus"}'))
assert response["ok"] is False and response["error"] == "unknown command"
def test_handle_line_unexpected_exception_caught():
def dispatch(command):
raise ValueError("unknown stop signal 'SIGTRM'")
server = ControlServer(dispatch=dispatch, path="/tmp/x.sock",
log=mock.Mock())
response = json.loads(server.handle_line('{"cmd": "stop", "name": "rq"}'))
assert response["ok"] is False and "internal error" in response["error"]
def test_create_unlinks_stale_and_chmods():
server = ControlServer(dispatch=lambda command: {}, path="/tmp/x.sock",
mode=0o600)
listener = mock.Mock()
with mock.patch("os.path.exists", return_value=True), \
mock.patch("os.unlink") as unlink, \
mock.patch("socket.socket", return_value=listener), \
mock.patch("os.chmod") as chmod:
server.create()
unlink.assert_called_once_with("/tmp/x.sock")
listener.bind.assert_called_once_with("/tmp/x.sock")
chmod.assert_called_once_with("/tmp/x.sock", 0o600)
listener.listen.assert_called_once()
def test_close_unlinks():
server = ControlServer(dispatch=lambda command: {}, path="/tmp/x.sock")
server.listener = mock.Mock()
with mock.patch("os.path.exists", return_value=True), \
mock.patch("os.unlink") as unlink:
server.close()
unlink.assert_called_once_with("/tmp/x.sock")
assert server.listener is None
def test_control_status_command_end_to_end():
manager = make_manager("rq")
response = json.loads(server_for(manager).handle_line('{"cmd": "status"}'))
assert response["ok"] is True
assert response["companions"][0]["name"] == "rq"
def test_control_start_command_end_to_end():
manager = make_manager("rq")
with mock.patch("os.fork", return_value=10):
response = json.loads(
server_for(manager).handle_line('{"cmd": "start", "name": "rq"}'))
assert response["ok"] is True
assert "rq" in response["message"]
def test_control_unknown_command_error_envelope():
manager = make_manager("rq")
response = json.loads(
server_for(manager).handle_line('{"cmd": "bogus", "name": "rq"}'))
assert response["ok"] is False
assert "unknown" in response["error"]
def test_control_missing_name_error_envelope():
manager = make_manager("rq")
response = json.loads(server_for(manager).handle_line('{"cmd": "start"}'))
assert response["ok"] is False
assert "name" in response["error"]
def test_control_reread_without_loader_error_envelope():
manager = make_manager("rq")
response = json.loads(server_for(manager).handle_line('{"cmd": "reread"}'))
assert response["ok"] is False
assert "reread" in response["error"]
class FakeConnection:
"""Minimal connection stand-in for serve_connection: yields preset chunks."""
def __init__(self, chunks):
self._chunks = list(chunks)
self.sent = []
def __enter__(self):
return self
def __exit__(self, *exc):
return False
def recv(self, _size):
return self._chunks.pop(0) if self._chunks else b""
def sendall(self, data):
self.sent.append(data)
def test_serve_connection_drops_oversized_line():
log = mock.Mock()
server = ControlServer(dispatch=lambda command: {"ok": True},
path="/tmp/x.sock", log=log)
# A flood with no newline: the connection is dropped, nothing dispatched.
connection = FakeConnection([b"x" * (MAX_LINE_BYTES + 1)])
server.serve_connection(connection)
assert connection.sent == []
log.warning.assert_called_once()
def test_serve_connection_answers_complete_line():
manager = make_manager("rq")
connection = FakeConnection([b'{"cmd": "status"}\n'])
server_for(manager).serve_connection(connection)
assert len(connection.sent) == 1
assert json.loads(connection.sent[0])["ok"] is True