mirror of
https://github.com/frappe/gunicorn.git
synced 2026-07-01 10:11:30 +08:00
fix(companion): Harden manager and control against runtime errors
- _safe_kill: a companion can exit between the manager deciding to signal it and the kill landing; swallow ProcessLookupError at the three os.kill sites so the resulting race cannot take the manager down. - _redirect_output: close the opened log fd after dup2 so a long-lived companion does not leak a descriptor per start. - serve_connection: drop a control connection whose line grows past MAX_LINE_BYTES without a newline, so a client cannot pin unbounded memory in the manager. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
3b972fe310
commit
672c45a9c7
@ -40,6 +40,11 @@ def encode_response(response):
|
||||
return (json.dumps(response) + "\n").encode("utf-8")
|
||||
|
||||
|
||||
# A control request is a single short JSON line. Cap the unframed buffer so a
|
||||
# client that never sends a newline cannot grow it without bound.
|
||||
MAX_LINE_BYTES = 1 << 20
|
||||
|
||||
|
||||
class ControlServer:
|
||||
"""The manager's Unix-socket control endpoint.
|
||||
|
||||
@ -108,7 +113,9 @@ class ControlServer:
|
||||
|
||||
Reads until the client hangs up, buffering partial reads and answering
|
||||
each complete line as it arrives. A trailing fragment without a newline
|
||||
is ignored.
|
||||
is ignored. A single line that grows past ``MAX_LINE_BYTES`` without a
|
||||
newline is treated as abuse: the connection is dropped so it cannot pin
|
||||
unbounded memory.
|
||||
"""
|
||||
buffer = b""
|
||||
with connection:
|
||||
@ -121,3 +128,9 @@ class ControlServer:
|
||||
line, buffer = buffer.split(b"\n", 1)
|
||||
if line.strip():
|
||||
connection.sendall(self.handle_line(line))
|
||||
if len(buffer) > MAX_LINE_BYTES:
|
||||
if self.log is not None:
|
||||
self.log.warning(
|
||||
"companion control line exceeded %d bytes; closing",
|
||||
MAX_LINE_BYTES)
|
||||
break
|
||||
|
||||
@ -178,7 +178,7 @@ class CompanionManager:
|
||||
if process.state != State.STOPPING or process.stop_deadline is None:
|
||||
continue
|
||||
if now >= process.stop_deadline and process.pid is not None:
|
||||
os.kill(process.pid, signal.SIGKILL)
|
||||
self._safe_kill(process.pid, signal.SIGKILL)
|
||||
process.kill_count += 1
|
||||
process.stop_deadline = None
|
||||
self.log.warning("companion %s killed after timeout (pid %s)",
|
||||
@ -429,7 +429,7 @@ class CompanionManager:
|
||||
process.state = State.STOPPED
|
||||
return True, "%s stopped" % name
|
||||
now = now or time.time()
|
||||
os.kill(process.pid, self._signal_number(process.config.stop_signal))
|
||||
self._safe_kill(process.pid, self._signal_number(process.config.stop_signal))
|
||||
process.state = State.STOPPING
|
||||
process.stop_deadline = now + process.config.stop_timeout
|
||||
self.log.info("companion %s stopping (pid %s)", name, process.pid)
|
||||
@ -454,7 +454,7 @@ class CompanionManager:
|
||||
if process.state in (State.RUNNING, State.STARTING):
|
||||
now = now or time.time()
|
||||
process.restart_pending = True
|
||||
os.kill(process.pid, self._signal_number(process.config.stop_signal))
|
||||
self._safe_kill(process.pid, self._signal_number(process.config.stop_signal))
|
||||
process.state = State.STOPPING
|
||||
process.stop_deadline = now + process.config.reload_timeout
|
||||
self.log.info("companion %s restarting (pid %s)", name, process.pid)
|
||||
@ -463,6 +463,19 @@ class CompanionManager:
|
||||
self.spawn_process(process)
|
||||
return True, "%s started" % name
|
||||
|
||||
@staticmethod
|
||||
def _safe_kill(pid: int, sig) -> None:
|
||||
"""Send ``sig`` to ``pid``, ignoring an already-dead target.
|
||||
|
||||
A companion can exit between the manager deciding to signal it and the
|
||||
kill itself (the exit is only reaped on the next tick). Without this the
|
||||
resulting ``ProcessLookupError`` would escape and take the manager down.
|
||||
"""
|
||||
try:
|
||||
os.kill(pid, sig)
|
||||
except ProcessLookupError:
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def _signal_number(stop_signal) -> int:
|
||||
"""Resolve a stop signal to its number, e.g. ``"SIGTERM"`` -> 15.
|
||||
@ -637,12 +650,14 @@ class CompanionManager:
|
||||
stdout_fd = CompanionManager._open_output(config.stdout)
|
||||
if stdout_fd is not None:
|
||||
os.dup2(stdout_fd, 1)
|
||||
os.close(stdout_fd)
|
||||
if config.stderr == "stdout":
|
||||
os.dup2(1, 2)
|
||||
else:
|
||||
stderr_fd = CompanionManager._open_output(config.stderr)
|
||||
if stderr_fd is not None:
|
||||
os.dup2(stderr_fd, 2)
|
||||
os.close(stderr_fd)
|
||||
|
||||
@staticmethod
|
||||
def _open_output(value):
|
||||
|
||||
@ -9,6 +9,7 @@ import pytest
|
||||
|
||||
from gunicorn.companion.config import CompanionConfig
|
||||
from gunicorn.companion.control import (
|
||||
MAX_LINE_BYTES,
|
||||
CommandError,
|
||||
ControlServer,
|
||||
decode_command,
|
||||
@ -143,3 +144,42 @@ def test_control_reread_without_loader_error_envelope():
|
||||
response = json.loads(server_for(manager).handle_line('{"cmd": "reread"}'))
|
||||
assert response["ok"] is False
|
||||
assert "reread" in response["error"]
|
||||
|
||||
|
||||
class FakeConnection:
|
||||
"""Minimal connection stand-in for serve_connection: yields preset chunks."""
|
||||
|
||||
def __init__(self, chunks):
|
||||
self._chunks = list(chunks)
|
||||
self.sent = []
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, *exc):
|
||||
return False
|
||||
|
||||
def recv(self, _size):
|
||||
return self._chunks.pop(0) if self._chunks else b""
|
||||
|
||||
def sendall(self, data):
|
||||
self.sent.append(data)
|
||||
|
||||
|
||||
def test_serve_connection_drops_oversized_line():
|
||||
log = mock.Mock()
|
||||
server = ControlServer(dispatch=lambda command: {"ok": True},
|
||||
path="/tmp/x.sock", log=log)
|
||||
# A flood with no newline: the connection is dropped, nothing dispatched.
|
||||
connection = FakeConnection([b"x" * (MAX_LINE_BYTES + 1)])
|
||||
server.serve_connection(connection)
|
||||
assert connection.sent == []
|
||||
log.warning.assert_called_once()
|
||||
|
||||
|
||||
def test_serve_connection_answers_complete_line():
|
||||
manager = make_manager("rq")
|
||||
connection = FakeConnection([b'{"cmd": "status"}\n'])
|
||||
server_for(manager).serve_connection(connection)
|
||||
assert len(connection.sent) == 1
|
||||
assert json.loads(connection.sent[0])["ok"] is True
|
||||
|
||||
@ -139,20 +139,26 @@ def test_redirect_output_files():
|
||||
config = CompanionConfig(name="rq", target=lambda: None,
|
||||
stdout="/o.log", stderr="/e.log")
|
||||
with mock.patch("os.open", side_effect=[10, 11]), \
|
||||
mock.patch("os.dup2") as dup2:
|
||||
mock.patch("os.dup2") as dup2, \
|
||||
mock.patch("os.close") as close:
|
||||
CompanionManager._redirect_output(config)
|
||||
dup2.assert_any_call(10, 1)
|
||||
dup2.assert_any_call(11, 2)
|
||||
# The opened fds are closed after being duped onto 1/2, no leak.
|
||||
close.assert_any_call(10)
|
||||
close.assert_any_call(11)
|
||||
|
||||
|
||||
def test_redirect_output_stderr_to_stdout():
|
||||
config = CompanionConfig(name="rq", target=lambda: None,
|
||||
stdout="/o.log", stderr="stdout")
|
||||
with mock.patch("os.open", return_value=10), \
|
||||
mock.patch("os.dup2") as dup2:
|
||||
mock.patch("os.dup2") as dup2, \
|
||||
mock.patch("os.close") as close:
|
||||
CompanionManager._redirect_output(config)
|
||||
dup2.assert_any_call(10, 1)
|
||||
dup2.assert_any_call(1, 2)
|
||||
close.assert_called_once_with(10)
|
||||
|
||||
|
||||
def test_redirect_output_inherit_noop():
|
||||
@ -437,6 +443,21 @@ def test_stop_during_restart_cancels_pending_restart():
|
||||
assert proc.state == State.STOPPED
|
||||
|
||||
|
||||
def test_safe_kill_ignores_dead_process():
|
||||
with mock.patch("os.kill", side_effect=ProcessLookupError):
|
||||
CompanionManager._safe_kill(123, signal.SIGTERM) # must not raise
|
||||
|
||||
|
||||
def test_stop_process_survives_dead_companion():
|
||||
manager = make_manager("rq")
|
||||
proc = manager.processes["rq"]
|
||||
proc.state = State.RUNNING
|
||||
proc.pid = 80
|
||||
with mock.patch("os.kill", side_effect=ProcessLookupError):
|
||||
ok, _ = manager.stop_process("rq", now=1.0)
|
||||
assert ok and proc.state == State.STOPPING
|
||||
|
||||
|
||||
def test_signal_number_resolves_name():
|
||||
assert CompanionManager._signal_number("SIGKILL") == signal.SIGKILL
|
||||
assert CompanionManager._signal_number(9) == 9
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user