test: fix warnings and flaky tests in dirty arbiter tests

- Close coroutines in mocked asyncio.run to prevent "never awaited" warning
- Fix flaky integration tests with proper async cleanup and try/finally
- Add uvloop to testing dependencies so uvloop test runs
- Add pytest warning filter for eventlet/asyncio incompatibility
This commit is contained in:
Benoit Chesneau 2026-01-24 20:43:10 +01:00
parent e21d23bfa6
commit b67ff0b31d
3 changed files with 170 additions and 79 deletions

View File

@ -59,6 +59,7 @@ testing = [
"pytest", "pytest",
"pytest-cov", "pytest-cov",
"pytest-asyncio", "pytest-asyncio",
"uvloop>=0.19.0",
] ]
[project.scripts] [project.scripts]
@ -74,6 +75,11 @@ main = "gunicorn.app.pasterapp:serve"
norecursedirs = ["examples", "lib", "local", "src", "tests/docker"] norecursedirs = ["examples", "lib", "local", "src", "tests/docker"]
testpaths = ["tests/"] testpaths = ["tests/"]
addopts = "--assert=plain --cov=gunicorn --cov-report=xml" addopts = "--assert=plain --cov=gunicorn --cov-report=xml"
filterwarnings = [
# Eventlet patches select module, which breaks asyncio event loop cleanup
# This is expected behavior when testing eventlet worker
"ignore::pytest.PytestUnraisableExceptionWarning",
]
[tool.setuptools] [tool.setuptools]
zip-safe = false zip-safe = false

View File

@ -254,6 +254,8 @@ class TestDirtyArbiterPidfileWrite:
if os.path.exists(pidfile): if os.path.exists(pidfile):
with open(pidfile) as f: with open(pidfile) as f:
pid_written = int(f.read().strip()) pid_written = int(f.read().strip())
# Close coroutine to avoid "never awaited" warning
coro.close()
# Mock asyncio.run to check PID file before cleanup runs # Mock asyncio.run to check PID file before cleanup runs
with mock.patch.object(asyncio, 'run', side_effect=mock_asyncio_run): with mock.patch.object(asyncio, 'run', side_effect=mock_asyncio_run):
@ -273,7 +275,11 @@ class TestDirtyArbiterPidfileWrite:
arbiter = DirtyArbiter(cfg=cfg, log=log) arbiter = DirtyArbiter(cfg=cfg, log=log)
with mock.patch.object(asyncio, 'run'): def mock_asyncio_run(coro):
# Close coroutine to avoid "never awaited" warning
coro.close()
with mock.patch.object(asyncio, 'run', side_effect=mock_asyncio_run):
# Should not raise # Should not raise
arbiter.run() arbiter.run()

View File

@ -5,11 +5,48 @@
"""Integration tests for dirty arbiter with main arbiter.""" """Integration tests for dirty arbiter with main arbiter."""
import os import os
import struct
import pytest import pytest
from gunicorn.arbiter import Arbiter from gunicorn.arbiter import Arbiter
from gunicorn.config import Config from gunicorn.config import Config
from gunicorn.app.base import BaseApplication from gunicorn.app.base import BaseApplication
from gunicorn.dirty.protocol import DirtyProtocol
class MockStreamWriter:
"""Mock StreamWriter that captures written messages."""
def __init__(self):
self.messages = []
self._buffer = b""
self.closed = False
def write(self, data):
self._buffer += data
async def drain(self):
while len(self._buffer) >= DirtyProtocol.HEADER_SIZE:
length = struct.unpack(
DirtyProtocol.HEADER_FORMAT,
self._buffer[:DirtyProtocol.HEADER_SIZE]
)[0]
total_size = DirtyProtocol.HEADER_SIZE + length
if len(self._buffer) >= total_size:
msg_data = self._buffer[DirtyProtocol.HEADER_SIZE:total_size]
self._buffer = self._buffer[total_size:]
self.messages.append(DirtyProtocol.decode(msg_data))
else:
break
def close(self):
self.closed = True
async def wait_closed(self):
pass
def get_extra_info(self, name):
return None
class SimpleDirtyTestApp(BaseApplication): class SimpleDirtyTestApp(BaseApplication):
@ -147,7 +184,6 @@ class TestDirtyExecutionTimeout:
await server.wait_closed() await server.wait_closed()
worker._cleanup() worker._cleanup()
@pytest.mark.skip(reason="Flaky due to async cleanup issues")
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_arbiter_timeout_response(self): async def test_arbiter_timeout_response(self):
"""Test that arbiter returns timeout error when worker doesn't respond.""" """Test that arbiter returns timeout error when worker doesn't respond."""
@ -177,7 +213,10 @@ class TestDirtyExecutionTimeout:
arbiter = DirtyArbiter(cfg=cfg, log=log, socket_path=socket_path) arbiter = DirtyArbiter(cfg=cfg, log=log, socket_path=socket_path)
arbiter.pid = os.getpid() arbiter.pid = os.getpid()
arbiter.alive = True
slow_server = None
try:
# Register a fake worker that will never respond # Register a fake worker that will never respond
fake_pid = 99999 fake_pid = 99999
arbiter.workers[fake_pid] = "fake_worker" arbiter.workers[fake_pid] = "fake_worker"
@ -208,18 +247,34 @@ class TestDirtyExecutionTimeout:
action="slow_action" action="slow_action"
) )
# This should timeout since worker doesn't respond # Use MockStreamWriter to capture the response
response = await arbiter.route_request(request) mock_writer = MockStreamWriter()
await arbiter.route_request(request, mock_writer)
assert len(mock_writer.messages) == 1
response = mock_writer.messages[0]
assert response["type"] == DirtyProtocol.MSG_TYPE_ERROR assert response["type"] == DirtyProtocol.MSG_TYPE_ERROR
assert "timeout" in response["error"]["error_type"].lower() assert "timeout" in response["error"]["error_type"].lower()
finally:
# Cancel any pending consumer tasks
arbiter.alive = False
for task in arbiter.worker_consumers.values():
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
# Cleanup # Close worker connections
arbiter._close_worker_connection(fake_pid)
# Cleanup server
if slow_server:
slow_server.close() slow_server.close()
await slow_server.wait_closed() await slow_server.wait_closed()
arbiter._cleanup_sync() arbiter._cleanup_sync()
@pytest.mark.skip(reason="Flaky due to async cleanup issues")
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_full_request_response_flow(self): async def test_full_request_response_flow(self):
"""Test full request-response flow between arbiter and worker.""" """Test full request-response flow between arbiter and worker."""
@ -248,6 +303,12 @@ class TestDirtyExecutionTimeout:
arbiter_socket_path = os.path.join(tmpdir, "arbiter.sock") arbiter_socket_path = os.path.join(tmpdir, "arbiter.sock")
worker_socket_path = os.path.join(tmpdir, "worker.sock") worker_socket_path = os.path.join(tmpdir, "worker.sock")
worker = None
arbiter = None
worker_server = None
fake_pid = 12345
try:
# Create worker # Create worker
worker = DirtyWorker( worker = DirtyWorker(
age=1, age=1,
@ -269,13 +330,13 @@ class TestDirtyExecutionTimeout:
# Create arbiter # Create arbiter
arbiter = DirtyArbiter(cfg=cfg, log=log, socket_path=arbiter_socket_path) arbiter = DirtyArbiter(cfg=cfg, log=log, socket_path=arbiter_socket_path)
arbiter.pid = os.getpid() arbiter.pid = os.getpid()
arbiter.alive = True
# Register worker # Register worker
fake_pid = 12345
arbiter.workers[fake_pid] = worker arbiter.workers[fake_pid] = worker
arbiter.worker_sockets[fake_pid] = worker_socket_path arbiter.worker_sockets[fake_pid] = worker_socket_path
# Route a request # Route a request using MockStreamWriter
request = make_request( request = make_request(
request_id="full-flow-test", request_id="full-flow-test",
app_path="tests.support_dirty_app:TestDirtyApp", app_path="tests.support_dirty_app:TestDirtyApp",
@ -284,14 +345,32 @@ class TestDirtyExecutionTimeout:
kwargs={"operation": "multiply"} kwargs={"operation": "multiply"}
) )
response = await arbiter.route_request(request) mock_writer = MockStreamWriter()
await arbiter.route_request(request, mock_writer)
assert len(mock_writer.messages) == 1
response = mock_writer.messages[0]
assert response["type"] == DirtyProtocol.MSG_TYPE_RESPONSE assert response["type"] == DirtyProtocol.MSG_TYPE_RESPONSE
assert response["result"] == 21 assert response["result"] == 21
finally:
# Cancel any pending consumer tasks
if arbiter:
arbiter.alive = False
for task in arbiter.worker_consumers.values():
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
# Cleanup - close arbiter's connection first # Close arbiter's connection first
arbiter._close_worker_connection(fake_pid) arbiter._close_worker_connection(fake_pid)
arbiter._cleanup_sync()
# Close worker server
if worker_server:
worker_server.close() worker_server.close()
await worker_server.wait_closed() await worker_server.wait_closed()
if worker:
worker._cleanup() worker._cleanup()
arbiter._cleanup_sync()