diff --git a/pyproject.toml b/pyproject.toml index 5bdf6e95..af82d037 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -59,6 +59,7 @@ testing = [ "pytest", "pytest-cov", "pytest-asyncio", + "uvloop>=0.19.0", ] [project.scripts] @@ -74,6 +75,11 @@ main = "gunicorn.app.pasterapp:serve" norecursedirs = ["examples", "lib", "local", "src", "tests/docker"] testpaths = ["tests/"] addopts = "--assert=plain --cov=gunicorn --cov-report=xml" +filterwarnings = [ + # Eventlet patches select module, which breaks asyncio event loop cleanup + # This is expected behavior when testing eventlet worker + "ignore::pytest.PytestUnraisableExceptionWarning", +] [tool.setuptools] zip-safe = false diff --git a/tests/test_dirty_arbiter.py b/tests/test_dirty_arbiter.py index f6c855c7..ca3fc784 100644 --- a/tests/test_dirty_arbiter.py +++ b/tests/test_dirty_arbiter.py @@ -254,6 +254,8 @@ class TestDirtyArbiterPidfileWrite: if os.path.exists(pidfile): with open(pidfile) as f: pid_written = int(f.read().strip()) + # Close coroutine to avoid "never awaited" warning + coro.close() # Mock asyncio.run to check PID file before cleanup runs with mock.patch.object(asyncio, 'run', side_effect=mock_asyncio_run): @@ -273,7 +275,11 @@ class TestDirtyArbiterPidfileWrite: arbiter = DirtyArbiter(cfg=cfg, log=log) - with mock.patch.object(asyncio, 'run'): + def mock_asyncio_run(coro): + # Close coroutine to avoid "never awaited" warning + coro.close() + + with mock.patch.object(asyncio, 'run', side_effect=mock_asyncio_run): # Should not raise arbiter.run() diff --git a/tests/test_dirty_integration.py b/tests/test_dirty_integration.py index 6e9d0bd3..f24c6894 100644 --- a/tests/test_dirty_integration.py +++ b/tests/test_dirty_integration.py @@ -5,11 +5,48 @@ """Integration tests for dirty arbiter with main arbiter.""" import os +import struct import pytest from gunicorn.arbiter import Arbiter from gunicorn.config import Config from gunicorn.app.base import BaseApplication +from gunicorn.dirty.protocol import DirtyProtocol + + +class MockStreamWriter: + """Mock StreamWriter that captures written messages.""" + + def __init__(self): + self.messages = [] + self._buffer = b"" + self.closed = False + + def write(self, data): + self._buffer += data + + async def drain(self): + while len(self._buffer) >= DirtyProtocol.HEADER_SIZE: + length = struct.unpack( + DirtyProtocol.HEADER_FORMAT, + self._buffer[:DirtyProtocol.HEADER_SIZE] + )[0] + total_size = DirtyProtocol.HEADER_SIZE + length + if len(self._buffer) >= total_size: + msg_data = self._buffer[DirtyProtocol.HEADER_SIZE:total_size] + self._buffer = self._buffer[total_size:] + self.messages.append(DirtyProtocol.decode(msg_data)) + else: + break + + def close(self): + self.closed = True + + async def wait_closed(self): + pass + + def get_extra_info(self, name): + return None class SimpleDirtyTestApp(BaseApplication): @@ -147,7 +184,6 @@ class TestDirtyExecutionTimeout: await server.wait_closed() worker._cleanup() - @pytest.mark.skip(reason="Flaky due to async cleanup issues") @pytest.mark.asyncio async def test_arbiter_timeout_response(self): """Test that arbiter returns timeout error when worker doesn't respond.""" @@ -177,49 +213,68 @@ class TestDirtyExecutionTimeout: arbiter = DirtyArbiter(cfg=cfg, log=log, socket_path=socket_path) arbiter.pid = os.getpid() + arbiter.alive = True + slow_server = None - # Register a fake worker that will never respond - fake_pid = 99999 - arbiter.workers[fake_pid] = "fake_worker" - arbiter.worker_sockets[fake_pid] = worker_socket_path + try: + # Register a fake worker that will never respond + fake_pid = 99999 + arbiter.workers[fake_pid] = "fake_worker" + arbiter.worker_sockets[fake_pid] = worker_socket_path - # Create a "slow" worker server that accepts but never responds - async def slow_client_handler(reader, writer): - # Read the request but don't respond (simulating timeout) - try: - await asyncio.sleep(10) # Longer than timeout - except asyncio.CancelledError: - pass - finally: + # Create a "slow" worker server that accepts but never responds + async def slow_client_handler(reader, writer): + # Read the request but don't respond (simulating timeout) try: - writer.close() - await writer.wait_closed() - except Exception: + await asyncio.sleep(10) # Longer than timeout + except asyncio.CancelledError: + pass + finally: + try: + writer.close() + await writer.wait_closed() + except Exception: + pass + + slow_server = await asyncio.start_unix_server( + slow_client_handler, + path=worker_socket_path + ) + + request = make_request( + request_id="timeout-test", + app_path="test:App", + action="slow_action" + ) + + # Use MockStreamWriter to capture the response + mock_writer = MockStreamWriter() + await arbiter.route_request(request, mock_writer) + + assert len(mock_writer.messages) == 1 + response = mock_writer.messages[0] + assert response["type"] == DirtyProtocol.MSG_TYPE_ERROR + assert "timeout" in response["error"]["error_type"].lower() + finally: + # Cancel any pending consumer tasks + arbiter.alive = False + for task in arbiter.worker_consumers.values(): + task.cancel() + try: + await task + except asyncio.CancelledError: pass - slow_server = await asyncio.start_unix_server( - slow_client_handler, - path=worker_socket_path - ) + # Close worker connections + arbiter._close_worker_connection(fake_pid) - request = make_request( - request_id="timeout-test", - app_path="test:App", - action="slow_action" - ) + # Cleanup server + if slow_server: + slow_server.close() + await slow_server.wait_closed() - # This should timeout since worker doesn't respond - response = await arbiter.route_request(request) + arbiter._cleanup_sync() - assert response["type"] == DirtyProtocol.MSG_TYPE_ERROR - assert "timeout" in response["error"]["error_type"].lower() - - # Cleanup - slow_server.close() - await slow_server.wait_closed() - arbiter._cleanup_sync() - - @pytest.mark.skip(reason="Flaky due to async cleanup issues") @pytest.mark.asyncio async def test_full_request_response_flow(self): """Test full request-response flow between arbiter and worker.""" @@ -248,50 +303,74 @@ class TestDirtyExecutionTimeout: arbiter_socket_path = os.path.join(tmpdir, "arbiter.sock") worker_socket_path = os.path.join(tmpdir, "worker.sock") - # Create worker - worker = DirtyWorker( - age=1, - ppid=os.getpid(), - app_paths=["tests.support_dirty_app:TestDirtyApp"], - cfg=cfg, - log=log, - socket_path=worker_socket_path - ) - worker.pid = os.getpid() - worker.load_apps() - - # Start worker server - worker_server = await asyncio.start_unix_server( - worker.handle_connection, - path=worker_socket_path - ) - - # Create arbiter - arbiter = DirtyArbiter(cfg=cfg, log=log, socket_path=arbiter_socket_path) - arbiter.pid = os.getpid() - - # Register worker + worker = None + arbiter = None + worker_server = None fake_pid = 12345 - arbiter.workers[fake_pid] = worker - arbiter.worker_sockets[fake_pid] = worker_socket_path - # Route a request - request = make_request( - request_id="full-flow-test", - app_path="tests.support_dirty_app:TestDirtyApp", - action="compute", - args=(7, 3), - kwargs={"operation": "multiply"} - ) + try: + # Create worker + worker = DirtyWorker( + age=1, + ppid=os.getpid(), + app_paths=["tests.support_dirty_app:TestDirtyApp"], + cfg=cfg, + log=log, + socket_path=worker_socket_path + ) + worker.pid = os.getpid() + worker.load_apps() - response = await arbiter.route_request(request) + # Start worker server + worker_server = await asyncio.start_unix_server( + worker.handle_connection, + path=worker_socket_path + ) - assert response["type"] == DirtyProtocol.MSG_TYPE_RESPONSE - assert response["result"] == 21 + # Create arbiter + arbiter = DirtyArbiter(cfg=cfg, log=log, socket_path=arbiter_socket_path) + arbiter.pid = os.getpid() + arbiter.alive = True - # Cleanup - close arbiter's connection first - arbiter._close_worker_connection(fake_pid) - worker_server.close() - await worker_server.wait_closed() - worker._cleanup() - arbiter._cleanup_sync() + # Register worker + arbiter.workers[fake_pid] = worker + arbiter.worker_sockets[fake_pid] = worker_socket_path + + # Route a request using MockStreamWriter + request = make_request( + request_id="full-flow-test", + app_path="tests.support_dirty_app:TestDirtyApp", + action="compute", + args=(7, 3), + kwargs={"operation": "multiply"} + ) + + mock_writer = MockStreamWriter() + await arbiter.route_request(request, mock_writer) + + assert len(mock_writer.messages) == 1 + response = mock_writer.messages[0] + assert response["type"] == DirtyProtocol.MSG_TYPE_RESPONSE + assert response["result"] == 21 + finally: + # Cancel any pending consumer tasks + if arbiter: + arbiter.alive = False + for task in arbiter.worker_consumers.values(): + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + # Close arbiter's connection first + arbiter._close_worker_connection(fake_pid) + arbiter._cleanup_sync() + + # Close worker server + if worker_server: + worker_server.close() + await worker_server.wait_closed() + + if worker: + worker._cleanup()