mirror of
https://github.com/frappe/gunicorn.git
synced 2026-07-01 10:11:30 +08:00
- _safe_kill: a companion can exit between the manager deciding to signal it and the kill landing; swallow ProcessLookupError at the three os.kill sites so the resulting race cannot take the manager down. - _redirect_output: close the opened log fd after dup2 so a long-lived companion does not leak a descriptor per start. - serve_connection: drop a control connection whose line grows past MAX_LINE_BYTES without a newline, so a client cannot pin unbounded memory in the manager. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
186 lines
5.9 KiB
Python
186 lines
5.9 KiB
Python
#
|
|
# This file is part of gunicorn released under the MIT license.
|
|
# See the NOTICE for more information.
|
|
|
|
import json
|
|
from unittest import mock
|
|
|
|
import pytest
|
|
|
|
from gunicorn.companion.config import CompanionConfig
|
|
from gunicorn.companion.control import (
|
|
MAX_LINE_BYTES,
|
|
CommandError,
|
|
ControlServer,
|
|
decode_command,
|
|
encode_response,
|
|
)
|
|
from gunicorn.companion.manager import CompanionManager
|
|
|
|
|
|
def make_manager(*names):
|
|
configs = [CompanionConfig(name=name, target=lambda: None) for name in names]
|
|
return CompanionManager(configs, log=mock.Mock())
|
|
|
|
|
|
def server_for(manager):
|
|
return ControlServer(dispatch=manager.handle_command, path="/tmp/x.sock")
|
|
|
|
|
|
def test_decode_command_valid():
|
|
assert decode_command('{"cmd": "status"}') == {"cmd": "status"}
|
|
|
|
|
|
def test_decode_command_bad_json():
|
|
with pytest.raises(CommandError):
|
|
decode_command("{not json")
|
|
|
|
|
|
def test_decode_command_not_object():
|
|
with pytest.raises(CommandError):
|
|
decode_command("[1, 2, 3]")
|
|
|
|
|
|
def test_decode_command_missing_cmd():
|
|
with pytest.raises(CommandError):
|
|
decode_command('{"name": "rq"}')
|
|
|
|
|
|
def test_encode_response_newline_terminated():
|
|
response = encode_response({"ok": True})
|
|
assert response.endswith(b"\n")
|
|
assert json.loads(response) == {"ok": True}
|
|
|
|
|
|
def test_handle_line_dispatches():
|
|
server = ControlServer(
|
|
dispatch=lambda command: {"ok": True, "echo": command["cmd"]},
|
|
path="/tmp/x.sock")
|
|
response = server.handle_line('{"cmd": "status"}')
|
|
assert json.loads(response) == {"ok": True, "echo": "status"}
|
|
|
|
|
|
def test_handle_line_bad_json_error_envelope():
|
|
server = ControlServer(dispatch=lambda command: {"ok": True}, path="/tmp/x.sock")
|
|
response = json.loads(server.handle_line("garbage"))
|
|
assert response["ok"] is False and "JSON" in response["error"]
|
|
|
|
|
|
def test_handle_line_dispatch_command_error():
|
|
def dispatch(command):
|
|
raise CommandError("unknown command")
|
|
server = ControlServer(dispatch=dispatch, path="/tmp/x.sock")
|
|
response = json.loads(server.handle_line('{"cmd": "bogus"}'))
|
|
assert response["ok"] is False and response["error"] == "unknown command"
|
|
|
|
|
|
def test_handle_line_unexpected_exception_caught():
|
|
def dispatch(command):
|
|
raise ValueError("unknown stop signal 'SIGTRM'")
|
|
server = ControlServer(dispatch=dispatch, path="/tmp/x.sock",
|
|
log=mock.Mock())
|
|
response = json.loads(server.handle_line('{"cmd": "stop", "name": "rq"}'))
|
|
assert response["ok"] is False and "internal error" in response["error"]
|
|
|
|
|
|
def test_create_unlinks_stale_and_chmods():
|
|
server = ControlServer(dispatch=lambda command: {}, path="/tmp/x.sock",
|
|
mode=0o600)
|
|
listener = mock.Mock()
|
|
with mock.patch("os.path.exists", return_value=True), \
|
|
mock.patch("os.unlink") as unlink, \
|
|
mock.patch("socket.socket", return_value=listener), \
|
|
mock.patch("os.chmod") as chmod:
|
|
server.create()
|
|
unlink.assert_called_once_with("/tmp/x.sock")
|
|
listener.bind.assert_called_once_with("/tmp/x.sock")
|
|
chmod.assert_called_once_with("/tmp/x.sock", 0o600)
|
|
listener.listen.assert_called_once()
|
|
|
|
|
|
def test_close_unlinks():
|
|
server = ControlServer(dispatch=lambda command: {}, path="/tmp/x.sock")
|
|
server.listener = mock.Mock()
|
|
with mock.patch("os.path.exists", return_value=True), \
|
|
mock.patch("os.unlink") as unlink:
|
|
server.close()
|
|
unlink.assert_called_once_with("/tmp/x.sock")
|
|
assert server.listener is None
|
|
|
|
|
|
def test_control_status_command_end_to_end():
|
|
manager = make_manager("rq")
|
|
response = json.loads(server_for(manager).handle_line('{"cmd": "status"}'))
|
|
assert response["ok"] is True
|
|
assert response["companions"][0]["name"] == "rq"
|
|
|
|
|
|
def test_control_start_command_end_to_end():
|
|
manager = make_manager("rq")
|
|
with mock.patch("os.fork", return_value=10):
|
|
response = json.loads(
|
|
server_for(manager).handle_line('{"cmd": "start", "name": "rq"}'))
|
|
assert response["ok"] is True
|
|
assert "rq" in response["message"]
|
|
|
|
|
|
def test_control_unknown_command_error_envelope():
|
|
manager = make_manager("rq")
|
|
response = json.loads(
|
|
server_for(manager).handle_line('{"cmd": "bogus", "name": "rq"}'))
|
|
assert response["ok"] is False
|
|
assert "unknown" in response["error"]
|
|
|
|
|
|
def test_control_missing_name_error_envelope():
|
|
manager = make_manager("rq")
|
|
response = json.loads(server_for(manager).handle_line('{"cmd": "start"}'))
|
|
assert response["ok"] is False
|
|
assert "name" in response["error"]
|
|
|
|
|
|
def test_control_reread_without_loader_error_envelope():
|
|
manager = make_manager("rq")
|
|
response = json.loads(server_for(manager).handle_line('{"cmd": "reread"}'))
|
|
assert response["ok"] is False
|
|
assert "reread" in response["error"]
|
|
|
|
|
|
class FakeConnection:
|
|
"""Minimal connection stand-in for serve_connection: yields preset chunks."""
|
|
|
|
def __init__(self, chunks):
|
|
self._chunks = list(chunks)
|
|
self.sent = []
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, *exc):
|
|
return False
|
|
|
|
def recv(self, _size):
|
|
return self._chunks.pop(0) if self._chunks else b""
|
|
|
|
def sendall(self, data):
|
|
self.sent.append(data)
|
|
|
|
|
|
def test_serve_connection_drops_oversized_line():
|
|
log = mock.Mock()
|
|
server = ControlServer(dispatch=lambda command: {"ok": True},
|
|
path="/tmp/x.sock", log=log)
|
|
# A flood with no newline: the connection is dropped, nothing dispatched.
|
|
connection = FakeConnection([b"x" * (MAX_LINE_BYTES + 1)])
|
|
server.serve_connection(connection)
|
|
assert connection.sent == []
|
|
log.warning.assert_called_once()
|
|
|
|
|
|
def test_serve_connection_answers_complete_line():
|
|
manager = make_manager("rq")
|
|
connection = FakeConnection([b'{"cmd": "status"}\n'])
|
|
server_for(manager).serve_connection(connection)
|
|
assert len(connection.sent) == 1
|
|
assert json.loads(connection.sent[0])["ok"] is True
|