diff --git a/benchmarks/results/queue_refactor_results.json b/benchmarks/results/queue_refactor_results.json new file mode 100644 index 00000000..c5ca192d --- /dev/null +++ b/benchmarks/results/queue_refactor_results.json @@ -0,0 +1,168 @@ +{ + "timestamp": "2026-01-24T10:56:33", + "results": [ + { + "scenario": "baseline_10ms", + "config": { + "dirty_workers": 4, + "dirty_threads": 1, + "task_action": "sleep_task", + "task_args": [ + 10 + ], + "concurrency": 1 + }, + "total_requests": 1000, + "successful": 1000, + "failed": 0, + "errors": [], + "duration_sec": 12.27, + "requests_per_sec": 81.5, + "latency_ms": { + "min": 10.432417009724304, + "max": 13.792542013106868, + "mean": 12.266892079642275, + "stddev": 0.871026700472873, + "p50": 12.80679099727422, + "p95": 13.078375020995736, + "p99": 13.141458010068163 + } + }, + { + "scenario": "throughput_10ms", + "config": { + "dirty_workers": 4, + "dirty_threads": 1, + "task_action": "sleep_task", + "task_args": [ + 10 + ], + "concurrency": 100 + }, + "total_requests": 5000, + "successful": 5000, + "failed": 0, + "errors": [], + "duration_sec": 14.95, + "requests_per_sec": 334.4, + "latency_ms": { + "min": 11.470375000499189, + "max": 341.3927500077989, + "mean": 294.71728502821645, + "stddev": 34.9421432011074, + "p50": 305.2922079805285, + "p95": 326.4670000062324, + "p99": 334.32295799138956 + } + }, + { + "scenario": "cpu_bound_100ms", + "config": { + "dirty_workers": 4, + "dirty_threads": 1, + "task_action": "cpu_task", + "task_args": [ + 100 + ], + "concurrency": 20 + }, + "total_requests": 500, + "successful": 500, + "failed": 0, + "errors": [], + "duration_sec": 12.55, + "requests_per_sec": 39.8, + "latency_ms": { + "min": 100.59350001392886, + "max": 502.4004160077311, + "mean": 493.9748328983551, + "stddev": 48.57073135808595, + "p50": 502.01483300770633, + "p95": 502.21283300197683, + "p99": 502.2801249870099 + } + }, + { + "scenario": "io_bound_500ms", + "config": { + "dirty_workers": 4, + "dirty_threads": 1, + "task_action": "sleep_task", + "task_args": [ + 500 + ], + "concurrency": 50 + }, + "total_requests": 200, + "successful": 200, + "failed": 0, + "errors": [], + "duration_sec": 25.19, + "requests_per_sec": 7.9, + "latency_ms": { + "min": 501.3219590182416, + "max": 6563.243499986129, + "mean": 5566.4884116455505, + "stddev": 1566.1525736181566, + "p50": 6052.653749997262, + "p95": 6553.810708021047, + "p99": 6559.503666008823 + } + }, + { + "scenario": "mixed_50_50", + "config": { + "dirty_workers": 4, + "dirty_threads": 1, + "task_action": "mixed_task", + "task_args": [ + 50, + 50 + ], + "concurrency": 30 + }, + "total_requests": 500, + "successful": 500, + "failed": 0, + "errors": [], + "duration_sec": 12.98, + "requests_per_sec": 38.5, + "latency_ms": { + "min": 102.34933299943805, + "max": 839.0888340072706, + "mean": 756.4045974735054, + "stddev": 103.21897997316475, + "p50": 762.6495829899795, + "p95": 832.905125018442, + "p99": 836.0978330019861 + } + }, + { + "scenario": "overload_10ms", + "config": { + "dirty_workers": 4, + "dirty_threads": 1, + "task_action": "sleep_task", + "task_args": [ + 10 + ], + "concurrency": 200 + }, + "total_requests": 2000, + "successful": 2000, + "failed": 0, + "errors": [], + "duration_sec": 5.99, + "requests_per_sec": 334.1, + "latency_ms": { + "min": 10.763874975964427, + "max": 625.4918330232613, + "mean": 565.1407622727129, + "stddev": 104.98938999734894, + "p50": 590.0453749927692, + "p95": 617.4105420068372, + "p99": 621.7636249784846 + } + } + ] +} \ No newline at end of file diff --git a/gunicorn/dirty/arbiter.py b/gunicorn/dirty/arbiter.py index 29ddcb6f..63dd6269 100644 --- a/gunicorn/dirty/arbiter.py +++ b/gunicorn/dirty/arbiter.py @@ -67,7 +67,8 @@ class DirtyArbiter: self.workers = {} # pid -> DirtyWorker self.worker_sockets = {} # pid -> socket_path self.worker_connections = {} # pid -> (reader, writer) - self.worker_locks = {} # pid -> asyncio.Lock (serialize requests per worker) + self.worker_queues = {} # pid -> asyncio.Queue + self.worker_consumers = {} # pid -> asyncio.Task self._worker_rr_index = 0 # Round-robin index for worker selection self.worker_age = 0 self.alive = True @@ -224,10 +225,10 @@ class DirtyArbiter: async def route_request(self, request): """ - Route a request to an available dirty worker. + Route a request to an available dirty worker via queue. - Requests to each worker are serialized using a per-worker lock - to ensure only one request is in flight at a time per worker. + Each worker has a dedicated queue and consumer task. Requests are + submitted to the queue and processed sequentially by the consumer. Args: request: Request message dict @@ -245,42 +246,75 @@ class DirtyArbiter: DirtyError("No dirty workers available") ) - # Get or create lock for this worker - if worker_pid not in self.worker_locks: - self.worker_locks[worker_pid] = asyncio.Lock() - worker_lock = self.worker_locks[worker_pid] + # Get queue (start consumer if needed) + if worker_pid not in self.worker_queues: + await self._start_worker_consumer(worker_pid) - # Serialize requests to this worker - async with worker_lock: - try: - # Get or establish connection to worker - reader, writer = await self._get_worker_connection(worker_pid) + queue = self.worker_queues[worker_pid] + future = asyncio.get_running_loop().create_future() - # Send request to worker - await DirtyProtocol.write_message_async(writer, request) + # Submit request to queue + await queue.put((request, future)) - # Wait for response with timeout + # Wait for response + try: + return await future + except Exception as e: + return make_error_response( + request_id, + DirtyWorkerError(f"Request failed: {e}", worker_id=worker_pid) + ) + + async def _start_worker_consumer(self, worker_pid): + """Start a consumer task for a worker's request queue.""" + queue = asyncio.Queue() + self.worker_queues[worker_pid] = queue + + async def consumer(): + while self.alive: try: - response = await asyncio.wait_for( - DirtyProtocol.read_message_async(reader), - timeout=self.cfg.dirty_timeout - ) - return response - except asyncio.TimeoutError: - return make_error_response( - request_id, - DirtyTimeoutError("Worker timeout", self.cfg.dirty_timeout) - ) - except Exception as e: - self.log.error("Error routing request to worker %s: %s", - worker_pid, e) - # Remove failed connection - self._close_worker_connection(worker_pid) - return make_error_response( - request_id, - DirtyWorkerError(f"Worker communication failed: {e}", - worker_id=worker_pid) - ) + request, future = await queue.get() + try: + response = await self._execute_on_worker(worker_pid, request) + if not future.done(): + future.set_result(response) + except Exception as e: + if not future.done(): + future.set_exception(e) + finally: + queue.task_done() + except asyncio.CancelledError: + break + + task = asyncio.create_task(consumer()) + self.worker_consumers[worker_pid] = task + + async def _execute_on_worker(self, worker_pid, request): + """Execute request on a specific worker (called by consumer).""" + request_id = request.get("id", "unknown") + + try: + reader, writer = await self._get_worker_connection(worker_pid) + await DirtyProtocol.write_message_async(writer, request) + + response = await asyncio.wait_for( + DirtyProtocol.read_message_async(reader), + timeout=self.cfg.dirty_timeout + ) + return response + except asyncio.TimeoutError: + return make_error_response( + request_id, + DirtyTimeoutError("Worker timeout", self.cfg.dirty_timeout) + ) + except Exception as e: + self.log.error("Error executing on worker %s: %s", worker_pid, e) + self._close_worker_connection(worker_pid) + return make_error_response( + request_id, + DirtyWorkerError(f"Worker communication failed: {e}", + worker_id=worker_pid) + ) async def _get_available_worker(self): """ @@ -392,7 +426,15 @@ class DirtyArbiter: def _cleanup_worker(self, pid): """Clean up after a worker exits.""" self._close_worker_connection(pid) - self.worker_locks.pop(pid, None) + + # Cancel consumer task + if pid in self.worker_consumers: + self.worker_consumers[pid].cancel() + del self.worker_consumers[pid] + + # Remove queue + self.worker_queues.pop(pid, None) + worker = self.workers.pop(pid, None) if worker: self.cfg.dirty_worker_exit(self, worker) @@ -463,6 +505,10 @@ class DirtyArbiter: async def stop(self, graceful=True): """Stop all workers.""" + # Cancel all consumer tasks + for task in self.worker_consumers.values(): + task.cancel() + sig = signal.SIGTERM if graceful else signal.SIGQUIT limit = time.time() + self.cfg.dirty_graceful_timeout diff --git a/tests/test_dirty_arbiter.py b/tests/test_dirty_arbiter.py index ee3f10f3..c8c9444c 100644 --- a/tests/test_dirty_arbiter.py +++ b/tests/test_dirty_arbiter.py @@ -172,6 +172,42 @@ class TestDirtyArbiterWorkerManagement: arbiter._cleanup_sync() + @pytest.mark.asyncio + async def test_cleanup_worker_cancels_consumer(self): + """Test that worker cleanup cancels consumer task and removes queue.""" + cfg = Config() + cfg.set("dirty_workers", 2) + log = MockLog() + + arbiter = DirtyArbiter(cfg=cfg, log=log) + arbiter.alive = True + + # Simulate a worker with queue and consumer + fake_pid = 99999 + arbiter.workers[fake_pid] = "fake_worker" + arbiter.worker_sockets[fake_pid] = "/tmp/fake.sock" + + # Create queue and mock consumer task + arbiter.worker_queues[fake_pid] = asyncio.Queue() + + async def mock_consumer(): + try: + while True: + await asyncio.sleep(1) + except asyncio.CancelledError: + pass + + arbiter.worker_consumers[fake_pid] = asyncio.create_task(mock_consumer()) + + arbiter._cleanup_worker(fake_pid) + + assert fake_pid not in arbiter.workers + assert fake_pid not in arbiter.worker_sockets + assert fake_pid not in arbiter.worker_queues + assert fake_pid not in arbiter.worker_consumers + + arbiter._cleanup_sync() + def test_reap_workers_no_children(self): """Test reap_workers when no children have exited.""" cfg = Config() @@ -799,3 +835,110 @@ class TestDirtyArbiterSighupHandler: loop.close() arbiter._cleanup_sync() + + +class TestDirtyArbiterQueueBehavior: + """Tests for queue-based request routing.""" + + @pytest.mark.asyncio + async def test_start_worker_consumer_creates_queue_and_task(self): + """Test _start_worker_consumer creates queue and task.""" + cfg = Config() + log = MockLog() + + arbiter = DirtyArbiter(cfg=cfg, log=log) + arbiter.alive = True + + fake_pid = 99999 + + await arbiter._start_worker_consumer(fake_pid) + + assert fake_pid in arbiter.worker_queues + assert fake_pid in arbiter.worker_consumers + assert isinstance(arbiter.worker_queues[fake_pid], asyncio.Queue) + assert isinstance(arbiter.worker_consumers[fake_pid], asyncio.Task) + + # Cancel task for cleanup + arbiter.worker_consumers[fake_pid].cancel() + try: + await arbiter.worker_consumers[fake_pid] + except asyncio.CancelledError: + pass + + arbiter._cleanup_sync() + + @pytest.mark.asyncio + async def test_route_request_starts_consumer_on_demand(self): + """Test route_request starts consumer if not exists.""" + cfg = Config() + log = MockLog() + + arbiter = DirtyArbiter(cfg=cfg, log=log) + arbiter.pid = os.getpid() + arbiter.alive = True + + # Register fake worker + fake_pid = 99999 + arbiter.workers[fake_pid] = "fake_worker" + arbiter.worker_sockets[fake_pid] = "/tmp/nonexistent.sock" + + assert fake_pid not in arbiter.worker_queues + assert fake_pid not in arbiter.worker_consumers + + # Make request - should start consumer + request = make_request( + request_id="test-123", + app_path="test:App", + action="test" + ) + + # This will fail (no socket), but consumer should be started + await arbiter.route_request(request) + + assert fake_pid in arbiter.worker_queues + assert fake_pid in arbiter.worker_consumers + + # Cleanup + arbiter.alive = False + arbiter.worker_consumers[fake_pid].cancel() + try: + await arbiter.worker_consumers[fake_pid] + except asyncio.CancelledError: + pass + + arbiter._cleanup_sync() + + @pytest.mark.asyncio + async def test_stop_cancels_all_consumers(self): + """Test stop() cancels all consumer tasks.""" + cfg = Config() + cfg.set("dirty_graceful_timeout", 1) + log = MockLog() + + arbiter = DirtyArbiter(cfg=cfg, log=log) + arbiter.pid = os.getpid() + arbiter.alive = True + + # Create mock consumers + async def mock_consumer(): + try: + while True: + await asyncio.sleep(1) + except asyncio.CancelledError: + pass + + task1 = asyncio.create_task(mock_consumer()) + task2 = asyncio.create_task(mock_consumer()) + arbiter.worker_consumers[1] = task1 + arbiter.worker_consumers[2] = task2 + + await arbiter.stop(graceful=True) + + # Allow cancelled tasks to complete + await asyncio.sleep(0) + + # All consumers should be done (cancelled and caught) + assert task1.done() + assert task2.done() + + arbiter._cleanup_sync()