feat(dirty): add benchmark suite and fix arbiter concurrency

Add comprehensive benchmark suite for stress testing the dirty pool:
- dirty_bench_app.py: Configurable benchmark app with sleep/cpu/mixed/payload tasks
- dirty_benchmark.py: Main runner with isolated and integrated test modes
- dirty_bench_wsgi.py: WSGI app for HTTP integration testing
- dirty_bench_gunicorn.py: Gunicorn config for integration benchmarks

Fix arbiter concurrency issues:
- Add per-worker locks to serialize requests and prevent read conflicts
- Implement round-robin worker selection for linear throughput scaling

The benchmark suite supports:
- Quick smoke tests (--quick)
- Full isolated benchmarks (--isolated)
- Configuration sweeps (--config-sweep)
- Payload size tests (--payload-tests)
- Integration tests with wrk (--integrated)
This commit is contained in:
Benoit Chesneau 2026-01-24 08:37:49 +01:00
parent 9b0e87deb8
commit 56cc094b68
5 changed files with 1561 additions and 30 deletions

View File

@ -0,0 +1,223 @@
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
"""
Benchmark DirtyApp for stress testing the dirty arbiter pool.
Provides configurable workloads for testing:
- Pure sleep (scheduling overhead)
- CPU-bound work (thread pool utilization)
- Mixed I/O + CPU (realistic workloads)
- Payload generation (serialization overhead)
"""
import time
from gunicorn.dirty import DirtyApp
class BenchmarkApp(DirtyApp):
"""
Configurable benchmark app for stress testing.
Provides various task types to test different aspects of the
dirty pool performance.
"""
def init(self):
"""Fast initialization - no heavy resources to load."""
self.call_count = 0
self.total_sleep_ms = 0
self.total_cpu_ms = 0
def sleep_task(self, duration_ms):
"""
Pure sleep task - tests scheduling overhead.
This simulates I/O-bound work like waiting for external APIs.
The thread is blocked but not consuming CPU.
Args:
duration_ms: Sleep duration in milliseconds
Returns:
dict with sleep duration
"""
self.call_count += 1
self.total_sleep_ms += duration_ms
time.sleep(duration_ms / 1000.0)
return {"slept_ms": duration_ms}
def cpu_task(self, duration_ms, intensity=1.0):
"""
CPU-bound work - tests thread pool utilization.
Performs actual computation to simulate CPU-intensive work
like model inference or data processing.
Args:
duration_ms: Target duration in milliseconds
intensity: Work intensity multiplier (1.0 = normal)
Returns:
dict with computed iterations and actual duration
"""
self.call_count += 1
start = time.perf_counter()
target_end = start + (duration_ms / 1000.0)
# Perform CPU work until target duration
iterations = 0
work_per_iteration = int(1000 * intensity)
while time.perf_counter() < target_end:
# Do some actual computation
x = 0.0
for i in range(work_per_iteration):
x += i * 0.001
x = x * 1.001 if x < 1000000 else x * 0.999
iterations += 1
actual_ms = (time.perf_counter() - start) * 1000
self.total_cpu_ms += actual_ms
return {
"iterations": iterations,
"target_ms": duration_ms,
"actual_ms": round(actual_ms, 2),
"intensity": intensity
}
def mixed_task(self, sleep_ms, cpu_ms, intensity=1.0):
"""
Mixed I/O + CPU task - simulates realistic workloads.
First performs I/O (sleep), then does CPU work. This is
common in real apps: fetch data, then process it.
Args:
sleep_ms: I/O simulation duration in milliseconds
cpu_ms: CPU work duration in milliseconds
intensity: CPU work intensity multiplier
Returns:
dict with both sleep and CPU metrics
"""
self.call_count += 1
# I/O phase (sleep)
time.sleep(sleep_ms / 1000.0)
self.total_sleep_ms += sleep_ms
# CPU phase
start = time.perf_counter()
target_end = start + (cpu_ms / 1000.0)
iterations = 0
work_per_iteration = int(1000 * intensity)
while time.perf_counter() < target_end:
x = 0.0
for i in range(work_per_iteration):
x += i * 0.001
x = x * 1.001 if x < 1000000 else x * 0.999
iterations += 1
actual_cpu_ms = (time.perf_counter() - start) * 1000
self.total_cpu_ms += actual_cpu_ms
return {
"sleep_ms": sleep_ms,
"cpu_iterations": iterations,
"target_cpu_ms": cpu_ms,
"actual_cpu_ms": round(actual_cpu_ms, 2),
"total_ms": round(sleep_ms + actual_cpu_ms, 2)
}
def payload_task(self, size_bytes, duration_ms=0):
"""
Generate payload of specified size - tests serialization.
Creates a deterministic payload to test JSON serialization
overhead for different response sizes.
Args:
size_bytes: Target payload size in bytes
duration_ms: Optional sleep before generating payload
Returns:
dict with 'data' field of specified size
"""
self.call_count += 1
if duration_ms > 0:
time.sleep(duration_ms / 1000.0)
self.total_sleep_ms += duration_ms
# Generate payload - use a pattern that compresses differently
# than pure repeated characters for more realistic testing
pattern = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
repeats = (size_bytes // len(pattern)) + 1
data = (pattern * repeats)[:size_bytes]
return {
"data": data,
"size": len(data)
}
def echo_task(self, payload):
"""
Echo back payload - tests round-trip serialization.
Useful for testing request/response serialization together.
Args:
payload: Data to echo back
Returns:
dict with echoed payload and its size
"""
self.call_count += 1
# Calculate size based on type
if isinstance(payload, str):
size = len(payload)
elif isinstance(payload, (dict, list)):
import json
size = len(json.dumps(payload))
else:
size = len(str(payload))
return {
"echoed_size": size,
"payload": payload
}
def stats(self):
"""
Return accumulated statistics.
Returns:
dict with call counts and totals
"""
return {
"call_count": self.call_count,
"total_sleep_ms": self.total_sleep_ms,
"total_cpu_ms": round(self.total_cpu_ms, 2)
}
def reset_stats(self):
"""Reset accumulated statistics."""
self.call_count = 0
self.total_sleep_ms = 0
self.total_cpu_ms = 0
return {"reset": True}
def health(self):
"""Health check endpoint for warmup."""
return {"status": "ok"}
def close(self):
"""Cleanup on shutdown."""
pass

View File

@ -0,0 +1,60 @@
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
"""
Gunicorn configuration for dirty pool integration benchmarks.
Usage:
gunicorn -c benchmarks/dirty_bench_gunicorn.py \
benchmarks.dirty_bench_wsgi:app
"""
# Bind address
bind = "127.0.0.1:8000"
# HTTP worker configuration
workers = 4
worker_class = "gthread"
threads = 4
worker_connections = 1000
# Dirty pool configuration
dirty_apps = ["benchmarks.dirty_bench_app:BenchmarkApp"]
dirty_workers = 4
dirty_threads = 1
dirty_timeout = 300
dirty_graceful_timeout = 30
# Logging
accesslog = "-"
errorlog = "-"
loglevel = "info"
# Timeouts
timeout = 120
graceful_timeout = 30
keepalive = 2
# Lifecycle hooks
def on_dirty_starting(arbiter):
"""Called when dirty arbiter is starting."""
print(f"[dirty] Arbiter starting (pid: {arbiter.pid})")
def dirty_post_fork(arbiter, worker):
"""Called after dirty worker fork."""
print(f"[dirty] Worker {worker.pid} forked")
def dirty_worker_init(worker):
"""Called after dirty worker apps are initialized."""
print(f"[dirty] Worker {worker.pid} initialized with apps: "
f"{list(worker.apps.keys())}")
def dirty_worker_exit(arbiter, worker):
"""Called when dirty worker exits."""
print(f"[dirty] Worker {worker.pid} exiting")

View File

@ -0,0 +1,167 @@
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
"""
WSGI app for integration benchmarking of the dirty pool.
This simple WSGI application calls the dirty pool and returns results.
Use with gunicorn for end-to-end benchmarking that includes HTTP overhead.
Example:
gunicorn benchmarks.dirty_bench_wsgi:app \
--workers 4 \
--dirty-app benchmarks.dirty_bench_app:BenchmarkApp \
--dirty-workers 2 \
--bind 127.0.0.1:8000
"""
import json
from urllib.parse import parse_qs
from gunicorn.dirty import get_dirty_client
# Default benchmark app path
BENCHMARK_APP = "benchmarks.dirty_bench_app:BenchmarkApp"
def app(environ, start_response):
"""
WSGI application that calls dirty pool tasks.
Query parameters:
action: Task action to call (default: sleep_task)
duration: Duration in ms for sleep/cpu tasks (default: 10)
sleep: Sleep duration for mixed_task (default: 50)
cpu: CPU duration for mixed_task (default: 50)
size: Payload size in bytes for payload_task (default: 100)
intensity: CPU intensity for cpu/mixed tasks (default: 1.0)
app: Dirty app path (default: benchmarks.dirty_bench_app:BenchmarkApp)
Endpoints:
/ - Default sleep_task
/sleep - sleep_task with ?duration=N
/cpu - cpu_task with ?duration=N&intensity=N
/mixed - mixed_task with ?sleep=N&cpu=N
/payload - payload_task with ?size=N
/echo - echo_task (POST body echoed)
/stats - Get accumulated stats
/health - Health check
"""
path = environ.get('PATH_INFO', '/')
method = environ.get('REQUEST_METHOD', 'GET')
query = parse_qs(environ.get('QUERY_STRING', ''))
# Helper to get query params with defaults
def get_param(name, default, type_fn=int):
values = query.get(name, [])
if values:
try:
return type_fn(values[0])
except (ValueError, TypeError):
return default
return default
# Get app path from query or use default
app_path = query.get('app', [BENCHMARK_APP])[0]
try:
client = get_dirty_client()
# Route based on path
if path in ('/', '/sleep'):
duration = get_param('duration', 10)
result = client.execute(app_path, "sleep_task", duration)
elif path == '/cpu':
duration = get_param('duration', 100)
intensity = get_param('intensity', 1.0, float)
result = client.execute(app_path, "cpu_task", duration, intensity)
elif path == '/mixed':
sleep_ms = get_param('sleep', 50)
cpu_ms = get_param('cpu', 50)
intensity = get_param('intensity', 1.0, float)
result = client.execute(app_path, "mixed_task", sleep_ms, cpu_ms,
intensity)
elif path == '/payload':
size = get_param('size', 100)
duration = get_param('duration', 0)
result = client.execute(app_path, "payload_task", size, duration)
elif path == '/echo':
# Read request body for echo
try:
content_length = int(environ.get('CONTENT_LENGTH', 0))
except (ValueError, TypeError):
content_length = 0
if content_length > 0:
body = environ['wsgi.input'].read(content_length)
try:
payload = json.loads(body.decode('utf-8'))
except (json.JSONDecodeError, UnicodeDecodeError):
payload = body.decode('utf-8', errors='replace')
else:
payload = ""
result = client.execute(app_path, "echo_task", payload)
elif path == '/stats':
result = client.execute(app_path, "stats")
elif path == '/reset':
result = client.execute(app_path, "reset_stats")
elif path == '/health':
result = client.execute(app_path, "health")
else:
# Unknown path - return 404
status = '404 Not Found'
body = json.dumps({"error": f"Unknown path: {path}"}).encode()
headers = [
('Content-Type', 'application/json'),
('Content-Length', str(len(body))),
]
start_response(status, headers)
return [body]
# Success response
status = '200 OK'
body = json.dumps(result).encode()
headers = [
('Content-Type', 'application/json'),
('Content-Length', str(len(body))),
]
start_response(status, headers)
return [body]
except Exception as e:
# Error response
status = '500 Internal Server Error'
error_msg = {"error": str(e), "type": type(e).__name__}
body = json.dumps(error_msg).encode()
headers = [
('Content-Type', 'application/json'),
('Content-Length', str(len(body))),
]
start_response(status, headers)
return [body]
# Gunicorn configuration for integration testing
# These can be overridden on the command line
# Example gunicorn invocation:
# gunicorn benchmarks.dirty_bench_wsgi:app \
# -c benchmarks/dirty_bench_gunicorn.py \
# --dirty-app benchmarks.dirty_bench_app:BenchmarkApp \
# --dirty-workers 2
def post_fork(server, worker):
"""Hook called after worker fork."""
pass

1061
benchmarks/dirty_benchmark.py Executable file

File diff suppressed because it is too large Load Diff

View File

@ -67,6 +67,8 @@ class DirtyArbiter:
self.workers = {} # pid -> DirtyWorker
self.worker_sockets = {} # pid -> socket_path
self.worker_connections = {} # pid -> (reader, writer)
self.worker_locks = {} # pid -> asyncio.Lock (serialize requests per worker)
self._worker_rr_index = 0 # Round-robin index for worker selection
self.worker_age = 0
self.alive = True
@ -224,6 +226,9 @@ class DirtyArbiter:
"""
Route a request to an available dirty worker.
Requests to each worker are serialized using a per-worker lock
to ensure only one request is in flight at a time per worker.
Args:
request: Request message dict
@ -240,43 +245,57 @@ class DirtyArbiter:
DirtyError("No dirty workers available")
)
try:
# Get or establish connection to worker
reader, writer = await self._get_worker_connection(worker_pid)
# Get or create lock for this worker
if worker_pid not in self.worker_locks:
self.worker_locks[worker_pid] = asyncio.Lock()
worker_lock = self.worker_locks[worker_pid]
# Send request to worker
await DirtyProtocol.write_message_async(writer, request)
# Wait for response with timeout
# Serialize requests to this worker
async with worker_lock:
try:
response = await asyncio.wait_for(
DirtyProtocol.read_message_async(reader),
timeout=self.cfg.dirty_timeout
)
return response
except asyncio.TimeoutError:
# Get or establish connection to worker
reader, writer = await self._get_worker_connection(worker_pid)
# Send request to worker
await DirtyProtocol.write_message_async(writer, request)
# Wait for response with timeout
try:
response = await asyncio.wait_for(
DirtyProtocol.read_message_async(reader),
timeout=self.cfg.dirty_timeout
)
return response
except asyncio.TimeoutError:
return make_error_response(
request_id,
DirtyTimeoutError("Worker timeout", self.cfg.dirty_timeout)
)
except Exception as e:
self.log.error("Error routing request to worker %s: %s",
worker_pid, e)
# Remove failed connection
self._close_worker_connection(worker_pid)
return make_error_response(
request_id,
DirtyTimeoutError("Worker timeout", self.cfg.dirty_timeout)
DirtyWorkerError(f"Worker communication failed: {e}",
worker_id=worker_pid)
)
except Exception as e:
self.log.error("Error routing request to worker %s: %s",
worker_pid, e)
# Remove failed connection
self._close_worker_connection(worker_pid)
return make_error_response(
request_id,
DirtyWorkerError(f"Worker communication failed: {e}",
worker_id=worker_pid)
)
async def _get_available_worker(self):
"""Get an available worker PID."""
for pid in list(self.workers.keys()):
# For now, just return first worker
# Future: implement load balancing
return pid
return None
"""
Get an available worker PID using round-robin selection.
Distributes requests across all available workers evenly to
maximize throughput when multiple workers are configured.
"""
worker_pids = list(self.workers.keys())
if not worker_pids:
return None
# Round-robin selection
self._worker_rr_index = (self._worker_rr_index + 1) % len(worker_pids)
return worker_pids[self._worker_rr_index]
async def _get_worker_connection(self, worker_pid):
"""Get or create connection to a worker."""
@ -373,6 +392,7 @@ class DirtyArbiter:
def _cleanup_worker(self, pid):
"""Clean up after a worker exits."""
self._close_worker_connection(pid)
self.worker_locks.pop(pid, None)
worker = self.workers.pop(pid, None)
if worker:
self.cfg.dirty_worker_exit(self, worker)