refactor(dirty): replace per-worker locks with queues

Replace lock-based request serialization with queue-based approach:
- Each worker now has a dedicated asyncio.Queue and consumer task
- route_request() submits (request, future) to queue and awaits future
- Consumer task processes requests sequentially per worker
- No lock contention - pure async queue operations

Benefits:
- Clearer separation of concerns
- Better visibility into request backlog (queue.qsize())
- Eliminates lock contention under high concurrency

Changes:
- worker_locks dict replaced with worker_queues and worker_consumers
- Added _start_worker_consumer() to create queue and consumer per worker
- Added _execute_on_worker() for actual worker communication
- Updated _cleanup_worker() to cancel consumer tasks
- Updated stop() to cancel all consumers before shutdown

Benchmark results (4 workers, isolated):
- throughput_10ms: 333 req/s, 0 failures
- overload_10ms (200 clients): 334 req/s, 0 failures
- All tests pass with perfect round-robin distribution
This commit is contained in:
Benoit Chesneau 2026-01-24 10:57:48 +01:00
parent 56cc094b68
commit ce2e06ceba
3 changed files with 394 additions and 37 deletions

View File

@ -0,0 +1,168 @@
{
"timestamp": "2026-01-24T10:56:33",
"results": [
{
"scenario": "baseline_10ms",
"config": {
"dirty_workers": 4,
"dirty_threads": 1,
"task_action": "sleep_task",
"task_args": [
10
],
"concurrency": 1
},
"total_requests": 1000,
"successful": 1000,
"failed": 0,
"errors": [],
"duration_sec": 12.27,
"requests_per_sec": 81.5,
"latency_ms": {
"min": 10.432417009724304,
"max": 13.792542013106868,
"mean": 12.266892079642275,
"stddev": 0.871026700472873,
"p50": 12.80679099727422,
"p95": 13.078375020995736,
"p99": 13.141458010068163
}
},
{
"scenario": "throughput_10ms",
"config": {
"dirty_workers": 4,
"dirty_threads": 1,
"task_action": "sleep_task",
"task_args": [
10
],
"concurrency": 100
},
"total_requests": 5000,
"successful": 5000,
"failed": 0,
"errors": [],
"duration_sec": 14.95,
"requests_per_sec": 334.4,
"latency_ms": {
"min": 11.470375000499189,
"max": 341.3927500077989,
"mean": 294.71728502821645,
"stddev": 34.9421432011074,
"p50": 305.2922079805285,
"p95": 326.4670000062324,
"p99": 334.32295799138956
}
},
{
"scenario": "cpu_bound_100ms",
"config": {
"dirty_workers": 4,
"dirty_threads": 1,
"task_action": "cpu_task",
"task_args": [
100
],
"concurrency": 20
},
"total_requests": 500,
"successful": 500,
"failed": 0,
"errors": [],
"duration_sec": 12.55,
"requests_per_sec": 39.8,
"latency_ms": {
"min": 100.59350001392886,
"max": 502.4004160077311,
"mean": 493.9748328983551,
"stddev": 48.57073135808595,
"p50": 502.01483300770633,
"p95": 502.21283300197683,
"p99": 502.2801249870099
}
},
{
"scenario": "io_bound_500ms",
"config": {
"dirty_workers": 4,
"dirty_threads": 1,
"task_action": "sleep_task",
"task_args": [
500
],
"concurrency": 50
},
"total_requests": 200,
"successful": 200,
"failed": 0,
"errors": [],
"duration_sec": 25.19,
"requests_per_sec": 7.9,
"latency_ms": {
"min": 501.3219590182416,
"max": 6563.243499986129,
"mean": 5566.4884116455505,
"stddev": 1566.1525736181566,
"p50": 6052.653749997262,
"p95": 6553.810708021047,
"p99": 6559.503666008823
}
},
{
"scenario": "mixed_50_50",
"config": {
"dirty_workers": 4,
"dirty_threads": 1,
"task_action": "mixed_task",
"task_args": [
50,
50
],
"concurrency": 30
},
"total_requests": 500,
"successful": 500,
"failed": 0,
"errors": [],
"duration_sec": 12.98,
"requests_per_sec": 38.5,
"latency_ms": {
"min": 102.34933299943805,
"max": 839.0888340072706,
"mean": 756.4045974735054,
"stddev": 103.21897997316475,
"p50": 762.6495829899795,
"p95": 832.905125018442,
"p99": 836.0978330019861
}
},
{
"scenario": "overload_10ms",
"config": {
"dirty_workers": 4,
"dirty_threads": 1,
"task_action": "sleep_task",
"task_args": [
10
],
"concurrency": 200
},
"total_requests": 2000,
"successful": 2000,
"failed": 0,
"errors": [],
"duration_sec": 5.99,
"requests_per_sec": 334.1,
"latency_ms": {
"min": 10.763874975964427,
"max": 625.4918330232613,
"mean": 565.1407622727129,
"stddev": 104.98938999734894,
"p50": 590.0453749927692,
"p95": 617.4105420068372,
"p99": 621.7636249784846
}
}
]
}

View File

@ -67,7 +67,8 @@ class DirtyArbiter:
self.workers = {} # pid -> DirtyWorker
self.worker_sockets = {} # pid -> socket_path
self.worker_connections = {} # pid -> (reader, writer)
self.worker_locks = {} # pid -> asyncio.Lock (serialize requests per worker)
self.worker_queues = {} # pid -> asyncio.Queue
self.worker_consumers = {} # pid -> asyncio.Task
self._worker_rr_index = 0 # Round-robin index for worker selection
self.worker_age = 0
self.alive = True
@ -224,10 +225,10 @@ class DirtyArbiter:
async def route_request(self, request):
"""
Route a request to an available dirty worker.
Route a request to an available dirty worker via queue.
Requests to each worker are serialized using a per-worker lock
to ensure only one request is in flight at a time per worker.
Each worker has a dedicated queue and consumer task. Requests are
submitted to the queue and processed sequentially by the consumer.
Args:
request: Request message dict
@ -245,42 +246,75 @@ class DirtyArbiter:
DirtyError("No dirty workers available")
)
# Get or create lock for this worker
if worker_pid not in self.worker_locks:
self.worker_locks[worker_pid] = asyncio.Lock()
worker_lock = self.worker_locks[worker_pid]
# Get queue (start consumer if needed)
if worker_pid not in self.worker_queues:
await self._start_worker_consumer(worker_pid)
# Serialize requests to this worker
async with worker_lock:
try:
# Get or establish connection to worker
reader, writer = await self._get_worker_connection(worker_pid)
queue = self.worker_queues[worker_pid]
future = asyncio.get_running_loop().create_future()
# Send request to worker
await DirtyProtocol.write_message_async(writer, request)
# Submit request to queue
await queue.put((request, future))
# Wait for response with timeout
# Wait for response
try:
return await future
except Exception as e:
return make_error_response(
request_id,
DirtyWorkerError(f"Request failed: {e}", worker_id=worker_pid)
)
async def _start_worker_consumer(self, worker_pid):
"""Start a consumer task for a worker's request queue."""
queue = asyncio.Queue()
self.worker_queues[worker_pid] = queue
async def consumer():
while self.alive:
try:
response = await asyncio.wait_for(
DirtyProtocol.read_message_async(reader),
timeout=self.cfg.dirty_timeout
)
return response
except asyncio.TimeoutError:
return make_error_response(
request_id,
DirtyTimeoutError("Worker timeout", self.cfg.dirty_timeout)
)
except Exception as e:
self.log.error("Error routing request to worker %s: %s",
worker_pid, e)
# Remove failed connection
self._close_worker_connection(worker_pid)
return make_error_response(
request_id,
DirtyWorkerError(f"Worker communication failed: {e}",
worker_id=worker_pid)
)
request, future = await queue.get()
try:
response = await self._execute_on_worker(worker_pid, request)
if not future.done():
future.set_result(response)
except Exception as e:
if not future.done():
future.set_exception(e)
finally:
queue.task_done()
except asyncio.CancelledError:
break
task = asyncio.create_task(consumer())
self.worker_consumers[worker_pid] = task
async def _execute_on_worker(self, worker_pid, request):
"""Execute request on a specific worker (called by consumer)."""
request_id = request.get("id", "unknown")
try:
reader, writer = await self._get_worker_connection(worker_pid)
await DirtyProtocol.write_message_async(writer, request)
response = await asyncio.wait_for(
DirtyProtocol.read_message_async(reader),
timeout=self.cfg.dirty_timeout
)
return response
except asyncio.TimeoutError:
return make_error_response(
request_id,
DirtyTimeoutError("Worker timeout", self.cfg.dirty_timeout)
)
except Exception as e:
self.log.error("Error executing on worker %s: %s", worker_pid, e)
self._close_worker_connection(worker_pid)
return make_error_response(
request_id,
DirtyWorkerError(f"Worker communication failed: {e}",
worker_id=worker_pid)
)
async def _get_available_worker(self):
"""
@ -392,7 +426,15 @@ class DirtyArbiter:
def _cleanup_worker(self, pid):
"""Clean up after a worker exits."""
self._close_worker_connection(pid)
self.worker_locks.pop(pid, None)
# Cancel consumer task
if pid in self.worker_consumers:
self.worker_consumers[pid].cancel()
del self.worker_consumers[pid]
# Remove queue
self.worker_queues.pop(pid, None)
worker = self.workers.pop(pid, None)
if worker:
self.cfg.dirty_worker_exit(self, worker)
@ -463,6 +505,10 @@ class DirtyArbiter:
async def stop(self, graceful=True):
"""Stop all workers."""
# Cancel all consumer tasks
for task in self.worker_consumers.values():
task.cancel()
sig = signal.SIGTERM if graceful else signal.SIGQUIT
limit = time.time() + self.cfg.dirty_graceful_timeout

View File

@ -172,6 +172,42 @@ class TestDirtyArbiterWorkerManagement:
arbiter._cleanup_sync()
@pytest.mark.asyncio
async def test_cleanup_worker_cancels_consumer(self):
"""Test that worker cleanup cancels consumer task and removes queue."""
cfg = Config()
cfg.set("dirty_workers", 2)
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
arbiter.alive = True
# Simulate a worker with queue and consumer
fake_pid = 99999
arbiter.workers[fake_pid] = "fake_worker"
arbiter.worker_sockets[fake_pid] = "/tmp/fake.sock"
# Create queue and mock consumer task
arbiter.worker_queues[fake_pid] = asyncio.Queue()
async def mock_consumer():
try:
while True:
await asyncio.sleep(1)
except asyncio.CancelledError:
pass
arbiter.worker_consumers[fake_pid] = asyncio.create_task(mock_consumer())
arbiter._cleanup_worker(fake_pid)
assert fake_pid not in arbiter.workers
assert fake_pid not in arbiter.worker_sockets
assert fake_pid not in arbiter.worker_queues
assert fake_pid not in arbiter.worker_consumers
arbiter._cleanup_sync()
def test_reap_workers_no_children(self):
"""Test reap_workers when no children have exited."""
cfg = Config()
@ -799,3 +835,110 @@ class TestDirtyArbiterSighupHandler:
loop.close()
arbiter._cleanup_sync()
class TestDirtyArbiterQueueBehavior:
"""Tests for queue-based request routing."""
@pytest.mark.asyncio
async def test_start_worker_consumer_creates_queue_and_task(self):
"""Test _start_worker_consumer creates queue and task."""
cfg = Config()
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
arbiter.alive = True
fake_pid = 99999
await arbiter._start_worker_consumer(fake_pid)
assert fake_pid in arbiter.worker_queues
assert fake_pid in arbiter.worker_consumers
assert isinstance(arbiter.worker_queues[fake_pid], asyncio.Queue)
assert isinstance(arbiter.worker_consumers[fake_pid], asyncio.Task)
# Cancel task for cleanup
arbiter.worker_consumers[fake_pid].cancel()
try:
await arbiter.worker_consumers[fake_pid]
except asyncio.CancelledError:
pass
arbiter._cleanup_sync()
@pytest.mark.asyncio
async def test_route_request_starts_consumer_on_demand(self):
"""Test route_request starts consumer if not exists."""
cfg = Config()
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
arbiter.pid = os.getpid()
arbiter.alive = True
# Register fake worker
fake_pid = 99999
arbiter.workers[fake_pid] = "fake_worker"
arbiter.worker_sockets[fake_pid] = "/tmp/nonexistent.sock"
assert fake_pid not in arbiter.worker_queues
assert fake_pid not in arbiter.worker_consumers
# Make request - should start consumer
request = make_request(
request_id="test-123",
app_path="test:App",
action="test"
)
# This will fail (no socket), but consumer should be started
await arbiter.route_request(request)
assert fake_pid in arbiter.worker_queues
assert fake_pid in arbiter.worker_consumers
# Cleanup
arbiter.alive = False
arbiter.worker_consumers[fake_pid].cancel()
try:
await arbiter.worker_consumers[fake_pid]
except asyncio.CancelledError:
pass
arbiter._cleanup_sync()
@pytest.mark.asyncio
async def test_stop_cancels_all_consumers(self):
"""Test stop() cancels all consumer tasks."""
cfg = Config()
cfg.set("dirty_graceful_timeout", 1)
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
arbiter.pid = os.getpid()
arbiter.alive = True
# Create mock consumers
async def mock_consumer():
try:
while True:
await asyncio.sleep(1)
except asyncio.CancelledError:
pass
task1 = asyncio.create_task(mock_consumer())
task2 = asyncio.create_task(mock_consumer())
arbiter.worker_consumers[1] = task1
arbiter.worker_consumers[2] = task2
await arbiter.stop(graceful=True)
# Allow cancelled tasks to complete
await asyncio.sleep(0)
# All consumers should be done (cancelled and caught)
assert task1.done()
assert task2.done()
arbiter._cleanup_sync()