From 56cc094b6866a91b309db28c23ddaa256bc43483 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Sat, 24 Jan 2026 08:37:49 +0100 Subject: [PATCH] feat(dirty): add benchmark suite and fix arbiter concurrency Add comprehensive benchmark suite for stress testing the dirty pool: - dirty_bench_app.py: Configurable benchmark app with sleep/cpu/mixed/payload tasks - dirty_benchmark.py: Main runner with isolated and integrated test modes - dirty_bench_wsgi.py: WSGI app for HTTP integration testing - dirty_bench_gunicorn.py: Gunicorn config for integration benchmarks Fix arbiter concurrency issues: - Add per-worker locks to serialize requests and prevent read conflicts - Implement round-robin worker selection for linear throughput scaling The benchmark suite supports: - Quick smoke tests (--quick) - Full isolated benchmarks (--isolated) - Configuration sweeps (--config-sweep) - Payload size tests (--payload-tests) - Integration tests with wrk (--integrated) --- benchmarks/dirty_bench_app.py | 223 ++++++ benchmarks/dirty_bench_gunicorn.py | 60 ++ benchmarks/dirty_bench_wsgi.py | 167 +++++ benchmarks/dirty_benchmark.py | 1061 ++++++++++++++++++++++++++++ gunicorn/dirty/arbiter.py | 80 ++- 5 files changed, 1561 insertions(+), 30 deletions(-) create mode 100644 benchmarks/dirty_bench_app.py create mode 100644 benchmarks/dirty_bench_gunicorn.py create mode 100644 benchmarks/dirty_bench_wsgi.py create mode 100755 benchmarks/dirty_benchmark.py diff --git a/benchmarks/dirty_bench_app.py b/benchmarks/dirty_bench_app.py new file mode 100644 index 00000000..8ac6b963 --- /dev/null +++ b/benchmarks/dirty_bench_app.py @@ -0,0 +1,223 @@ +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +""" +Benchmark DirtyApp for stress testing the dirty arbiter pool. + +Provides configurable workloads for testing: +- Pure sleep (scheduling overhead) +- CPU-bound work (thread pool utilization) +- Mixed I/O + CPU (realistic workloads) +- Payload generation (serialization overhead) +""" + +import time + +from gunicorn.dirty import DirtyApp + + +class BenchmarkApp(DirtyApp): + """ + Configurable benchmark app for stress testing. + + Provides various task types to test different aspects of the + dirty pool performance. + """ + + def init(self): + """Fast initialization - no heavy resources to load.""" + self.call_count = 0 + self.total_sleep_ms = 0 + self.total_cpu_ms = 0 + + def sleep_task(self, duration_ms): + """ + Pure sleep task - tests scheduling overhead. + + This simulates I/O-bound work like waiting for external APIs. + The thread is blocked but not consuming CPU. + + Args: + duration_ms: Sleep duration in milliseconds + + Returns: + dict with sleep duration + """ + self.call_count += 1 + self.total_sleep_ms += duration_ms + time.sleep(duration_ms / 1000.0) + return {"slept_ms": duration_ms} + + def cpu_task(self, duration_ms, intensity=1.0): + """ + CPU-bound work - tests thread pool utilization. + + Performs actual computation to simulate CPU-intensive work + like model inference or data processing. + + Args: + duration_ms: Target duration in milliseconds + intensity: Work intensity multiplier (1.0 = normal) + + Returns: + dict with computed iterations and actual duration + """ + self.call_count += 1 + start = time.perf_counter() + target_end = start + (duration_ms / 1000.0) + + # Perform CPU work until target duration + iterations = 0 + work_per_iteration = int(1000 * intensity) + + while time.perf_counter() < target_end: + # Do some actual computation + x = 0.0 + for i in range(work_per_iteration): + x += i * 0.001 + x = x * 1.001 if x < 1000000 else x * 0.999 + iterations += 1 + + actual_ms = (time.perf_counter() - start) * 1000 + self.total_cpu_ms += actual_ms + + return { + "iterations": iterations, + "target_ms": duration_ms, + "actual_ms": round(actual_ms, 2), + "intensity": intensity + } + + def mixed_task(self, sleep_ms, cpu_ms, intensity=1.0): + """ + Mixed I/O + CPU task - simulates realistic workloads. + + First performs I/O (sleep), then does CPU work. This is + common in real apps: fetch data, then process it. + + Args: + sleep_ms: I/O simulation duration in milliseconds + cpu_ms: CPU work duration in milliseconds + intensity: CPU work intensity multiplier + + Returns: + dict with both sleep and CPU metrics + """ + self.call_count += 1 + + # I/O phase (sleep) + time.sleep(sleep_ms / 1000.0) + self.total_sleep_ms += sleep_ms + + # CPU phase + start = time.perf_counter() + target_end = start + (cpu_ms / 1000.0) + + iterations = 0 + work_per_iteration = int(1000 * intensity) + + while time.perf_counter() < target_end: + x = 0.0 + for i in range(work_per_iteration): + x += i * 0.001 + x = x * 1.001 if x < 1000000 else x * 0.999 + iterations += 1 + + actual_cpu_ms = (time.perf_counter() - start) * 1000 + self.total_cpu_ms += actual_cpu_ms + + return { + "sleep_ms": sleep_ms, + "cpu_iterations": iterations, + "target_cpu_ms": cpu_ms, + "actual_cpu_ms": round(actual_cpu_ms, 2), + "total_ms": round(sleep_ms + actual_cpu_ms, 2) + } + + def payload_task(self, size_bytes, duration_ms=0): + """ + Generate payload of specified size - tests serialization. + + Creates a deterministic payload to test JSON serialization + overhead for different response sizes. + + Args: + size_bytes: Target payload size in bytes + duration_ms: Optional sleep before generating payload + + Returns: + dict with 'data' field of specified size + """ + self.call_count += 1 + + if duration_ms > 0: + time.sleep(duration_ms / 1000.0) + self.total_sleep_ms += duration_ms + + # Generate payload - use a pattern that compresses differently + # than pure repeated characters for more realistic testing + pattern = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789" + repeats = (size_bytes // len(pattern)) + 1 + data = (pattern * repeats)[:size_bytes] + + return { + "data": data, + "size": len(data) + } + + def echo_task(self, payload): + """ + Echo back payload - tests round-trip serialization. + + Useful for testing request/response serialization together. + + Args: + payload: Data to echo back + + Returns: + dict with echoed payload and its size + """ + self.call_count += 1 + + # Calculate size based on type + if isinstance(payload, str): + size = len(payload) + elif isinstance(payload, (dict, list)): + import json + size = len(json.dumps(payload)) + else: + size = len(str(payload)) + + return { + "echoed_size": size, + "payload": payload + } + + def stats(self): + """ + Return accumulated statistics. + + Returns: + dict with call counts and totals + """ + return { + "call_count": self.call_count, + "total_sleep_ms": self.total_sleep_ms, + "total_cpu_ms": round(self.total_cpu_ms, 2) + } + + def reset_stats(self): + """Reset accumulated statistics.""" + self.call_count = 0 + self.total_sleep_ms = 0 + self.total_cpu_ms = 0 + return {"reset": True} + + def health(self): + """Health check endpoint for warmup.""" + return {"status": "ok"} + + def close(self): + """Cleanup on shutdown.""" + pass diff --git a/benchmarks/dirty_bench_gunicorn.py b/benchmarks/dirty_bench_gunicorn.py new file mode 100644 index 00000000..4d49e75c --- /dev/null +++ b/benchmarks/dirty_bench_gunicorn.py @@ -0,0 +1,60 @@ +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +""" +Gunicorn configuration for dirty pool integration benchmarks. + +Usage: + gunicorn -c benchmarks/dirty_bench_gunicorn.py \ + benchmarks.dirty_bench_wsgi:app +""" + +# Bind address +bind = "127.0.0.1:8000" + +# HTTP worker configuration +workers = 4 +worker_class = "gthread" +threads = 4 +worker_connections = 1000 + +# Dirty pool configuration +dirty_apps = ["benchmarks.dirty_bench_app:BenchmarkApp"] +dirty_workers = 4 +dirty_threads = 1 +dirty_timeout = 300 +dirty_graceful_timeout = 30 + +# Logging +accesslog = "-" +errorlog = "-" +loglevel = "info" + +# Timeouts +timeout = 120 +graceful_timeout = 30 +keepalive = 2 + + +# Lifecycle hooks + +def on_dirty_starting(arbiter): + """Called when dirty arbiter is starting.""" + print(f"[dirty] Arbiter starting (pid: {arbiter.pid})") + + +def dirty_post_fork(arbiter, worker): + """Called after dirty worker fork.""" + print(f"[dirty] Worker {worker.pid} forked") + + +def dirty_worker_init(worker): + """Called after dirty worker apps are initialized.""" + print(f"[dirty] Worker {worker.pid} initialized with apps: " + f"{list(worker.apps.keys())}") + + +def dirty_worker_exit(arbiter, worker): + """Called when dirty worker exits.""" + print(f"[dirty] Worker {worker.pid} exiting") diff --git a/benchmarks/dirty_bench_wsgi.py b/benchmarks/dirty_bench_wsgi.py new file mode 100644 index 00000000..5324ba7e --- /dev/null +++ b/benchmarks/dirty_bench_wsgi.py @@ -0,0 +1,167 @@ +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +""" +WSGI app for integration benchmarking of the dirty pool. + +This simple WSGI application calls the dirty pool and returns results. +Use with gunicorn for end-to-end benchmarking that includes HTTP overhead. + +Example: + gunicorn benchmarks.dirty_bench_wsgi:app \ + --workers 4 \ + --dirty-app benchmarks.dirty_bench_app:BenchmarkApp \ + --dirty-workers 2 \ + --bind 127.0.0.1:8000 +""" + +import json +from urllib.parse import parse_qs + +from gunicorn.dirty import get_dirty_client + + +# Default benchmark app path +BENCHMARK_APP = "benchmarks.dirty_bench_app:BenchmarkApp" + + +def app(environ, start_response): + """ + WSGI application that calls dirty pool tasks. + + Query parameters: + action: Task action to call (default: sleep_task) + duration: Duration in ms for sleep/cpu tasks (default: 10) + sleep: Sleep duration for mixed_task (default: 50) + cpu: CPU duration for mixed_task (default: 50) + size: Payload size in bytes for payload_task (default: 100) + intensity: CPU intensity for cpu/mixed tasks (default: 1.0) + app: Dirty app path (default: benchmarks.dirty_bench_app:BenchmarkApp) + + Endpoints: + / - Default sleep_task + /sleep - sleep_task with ?duration=N + /cpu - cpu_task with ?duration=N&intensity=N + /mixed - mixed_task with ?sleep=N&cpu=N + /payload - payload_task with ?size=N + /echo - echo_task (POST body echoed) + /stats - Get accumulated stats + /health - Health check + """ + path = environ.get('PATH_INFO', '/') + method = environ.get('REQUEST_METHOD', 'GET') + query = parse_qs(environ.get('QUERY_STRING', '')) + + # Helper to get query params with defaults + def get_param(name, default, type_fn=int): + values = query.get(name, []) + if values: + try: + return type_fn(values[0]) + except (ValueError, TypeError): + return default + return default + + # Get app path from query or use default + app_path = query.get('app', [BENCHMARK_APP])[0] + + try: + client = get_dirty_client() + + # Route based on path + if path in ('/', '/sleep'): + duration = get_param('duration', 10) + result = client.execute(app_path, "sleep_task", duration) + + elif path == '/cpu': + duration = get_param('duration', 100) + intensity = get_param('intensity', 1.0, float) + result = client.execute(app_path, "cpu_task", duration, intensity) + + elif path == '/mixed': + sleep_ms = get_param('sleep', 50) + cpu_ms = get_param('cpu', 50) + intensity = get_param('intensity', 1.0, float) + result = client.execute(app_path, "mixed_task", sleep_ms, cpu_ms, + intensity) + + elif path == '/payload': + size = get_param('size', 100) + duration = get_param('duration', 0) + result = client.execute(app_path, "payload_task", size, duration) + + elif path == '/echo': + # Read request body for echo + try: + content_length = int(environ.get('CONTENT_LENGTH', 0)) + except (ValueError, TypeError): + content_length = 0 + + if content_length > 0: + body = environ['wsgi.input'].read(content_length) + try: + payload = json.loads(body.decode('utf-8')) + except (json.JSONDecodeError, UnicodeDecodeError): + payload = body.decode('utf-8', errors='replace') + else: + payload = "" + + result = client.execute(app_path, "echo_task", payload) + + elif path == '/stats': + result = client.execute(app_path, "stats") + + elif path == '/reset': + result = client.execute(app_path, "reset_stats") + + elif path == '/health': + result = client.execute(app_path, "health") + + else: + # Unknown path - return 404 + status = '404 Not Found' + body = json.dumps({"error": f"Unknown path: {path}"}).encode() + headers = [ + ('Content-Type', 'application/json'), + ('Content-Length', str(len(body))), + ] + start_response(status, headers) + return [body] + + # Success response + status = '200 OK' + body = json.dumps(result).encode() + headers = [ + ('Content-Type', 'application/json'), + ('Content-Length', str(len(body))), + ] + start_response(status, headers) + return [body] + + except Exception as e: + # Error response + status = '500 Internal Server Error' + error_msg = {"error": str(e), "type": type(e).__name__} + body = json.dumps(error_msg).encode() + headers = [ + ('Content-Type', 'application/json'), + ('Content-Length', str(len(body))), + ] + start_response(status, headers) + return [body] + + +# Gunicorn configuration for integration testing +# These can be overridden on the command line + +# Example gunicorn invocation: +# gunicorn benchmarks.dirty_bench_wsgi:app \ +# -c benchmarks/dirty_bench_gunicorn.py \ +# --dirty-app benchmarks.dirty_bench_app:BenchmarkApp \ +# --dirty-workers 2 + + +def post_fork(server, worker): + """Hook called after worker fork.""" + pass diff --git a/benchmarks/dirty_benchmark.py b/benchmarks/dirty_benchmark.py new file mode 100755 index 00000000..f59fdf18 --- /dev/null +++ b/benchmarks/dirty_benchmark.py @@ -0,0 +1,1061 @@ +#!/usr/bin/env python3 +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +""" +Dirty Pool Benchmark Runner + +Stress tests and benchmarks the dirty arbiter pool to find bottlenecks +and optimization opportunities. + +Test Modes: +- Isolated: Direct client -> arbiter -> worker (no HTTP overhead) +- Integrated: HTTP workers calling dirty pool (realistic end-to-end) + +Usage: + # Quick smoke test + python benchmarks/dirty_benchmark.py --quick + + # Full isolated suite + python benchmarks/dirty_benchmark.py --isolated --output results.json + + # Specific scenario + python benchmarks/dirty_benchmark.py \ + --duration 100 \ + --concurrency 50 \ + --workers 4 \ + --threads 2 + + # Payload size tests + python benchmarks/dirty_benchmark.py --payload-tests + + # Integration tests (requires gunicorn running) + python benchmarks/dirty_benchmark.py --integrated --url http://127.0.0.1:8000 +""" + +import argparse +import asyncio +import json +import multiprocessing +import os +import signal +import statistics +import subprocess +import sys +import tempfile +import threading +import time +from concurrent.futures import ThreadPoolExecutor, as_completed +from dataclasses import dataclass, field, asdict +from pathlib import Path +from typing import Any + +# Add parent to path for imports +BENCHMARK_DIR = Path(__file__).parent +sys.path.insert(0, str(BENCHMARK_DIR.parent)) + +from gunicorn.dirty.client import DirtyClient +from gunicorn.dirty.arbiter import DirtyArbiter + + +# Default benchmark app path +BENCHMARK_APP = "benchmarks.dirty_bench_app:BenchmarkApp" + + +@dataclass +class LatencyStats: + """Latency statistics in milliseconds.""" + min: float = 0.0 + max: float = 0.0 + mean: float = 0.0 + stddev: float = 0.0 + p50: float = 0.0 + p95: float = 0.0 + p99: float = 0.0 + + @classmethod + def from_samples(cls, samples: list[float]) -> "LatencyStats": + """Calculate statistics from list of latency samples.""" + if not samples: + return cls() + + sorted_samples = sorted(samples) + n = len(sorted_samples) + + return cls( + min=sorted_samples[0], + max=sorted_samples[-1], + mean=statistics.mean(sorted_samples), + stddev=statistics.stdev(sorted_samples) if n > 1 else 0.0, + p50=sorted_samples[int(n * 0.50)], + p95=sorted_samples[int(n * 0.95)] if n >= 20 else sorted_samples[-1], + p99=sorted_samples[int(n * 0.99)] if n >= 100 else sorted_samples[-1], + ) + + +@dataclass +class BenchmarkResult: + """Results from a single benchmark run.""" + scenario: str + config: dict + total_requests: int = 0 + successful: int = 0 + failed: int = 0 + errors: list[str] = field(default_factory=list) + duration_sec: float = 0.0 + requests_per_sec: float = 0.0 + latency_ms: LatencyStats = field(default_factory=LatencyStats) + + def to_dict(self) -> dict: + """Convert to dictionary for JSON serialization.""" + d = asdict(self) + d['latency_ms'] = asdict(self.latency_ms) + return d + + +class MockConfig: + """Mock gunicorn config for standalone arbiter testing.""" + + def __init__( + self, + dirty_apps: list[str], + dirty_workers: int = 2, + dirty_threads: int = 1, + dirty_timeout: int = 300, + dirty_graceful_timeout: int = 30, + ): + self.dirty_apps = dirty_apps + self.dirty_workers = dirty_workers + self.dirty_threads = dirty_threads + self.dirty_timeout = dirty_timeout + self.dirty_graceful_timeout = dirty_graceful_timeout + + # Other required config + self.env = {} + self.uid = os.getuid() + self.gid = os.getgid() + self.initgroups = False + self.proc_name = "dirty-benchmark" + + # WorkerTmp requirements + self.umask = 0 + self.worker_tmp_dir = None + + # Hook stubs + def on_dirty_starting(self, arbiter): + pass + + def dirty_post_fork(self, arbiter, worker): + pass + + def dirty_worker_init(self, worker): + pass + + def dirty_worker_exit(self, arbiter, worker): + pass + + +class MockLogger: + """Mock logger for standalone testing.""" + + def __init__(self, verbose: bool = False): + self.verbose = verbose + + def debug(self, msg, *args): + if self.verbose: + print(f"[DEBUG] {msg % args if args else msg}") + + def info(self, msg, *args): + if self.verbose: + print(f"[INFO] {msg % args if args else msg}") + + def warning(self, msg, *args): + print(f"[WARN] {msg % args if args else msg}") + + def error(self, msg, *args): + print(f"[ERROR] {msg % args if args else msg}") + + def critical(self, msg, *args): + print(f"[CRIT] {msg % args if args else msg}") + + def exception(self, msg, *args): + print(f"[EXC] {msg % args if args else msg}") + + def reopen_files(self): + pass + + def close_on_exec(self): + pass + + +class IsolatedBenchmark: + """ + Run benchmarks directly against the dirty pool without HTTP. + + Spawns a standalone dirty arbiter and workers, then runs concurrent + clients to measure performance. + """ + + def __init__( + self, + dirty_workers: int = 2, + dirty_threads: int = 1, + dirty_timeout: int = 300, + verbose: bool = False, + ): + self.dirty_workers = dirty_workers + self.dirty_threads = dirty_threads + self.dirty_timeout = dirty_timeout + self.verbose = verbose + + self.arbiter = None + self.arbiter_pid = None + self.socket_path = None + self._tmpdir = None + + def start(self): + """Start the dirty arbiter and workers.""" + # Create temp directory for socket + self._tmpdir = tempfile.mkdtemp(prefix="dirty-bench-") + self.socket_path = os.path.join(self._tmpdir, "arbiter.sock") + + # Create config and logger + cfg = MockConfig( + dirty_apps=[BENCHMARK_APP], + dirty_workers=self.dirty_workers, + dirty_threads=self.dirty_threads, + dirty_timeout=self.dirty_timeout, + ) + log = MockLogger(verbose=self.verbose) + + # Fork arbiter process + pid = os.fork() + if pid == 0: + # Child process - run arbiter + try: + arbiter = DirtyArbiter(cfg, log, socket_path=self.socket_path) + arbiter.run() + except Exception as e: + print(f"Arbiter error: {e}") + finally: + os._exit(0) + + # Parent process + self.arbiter_pid = pid + + # Wait for arbiter socket to be ready + for _ in range(50): # 5 seconds max + if os.path.exists(self.socket_path): + break + time.sleep(0.1) + else: + raise RuntimeError("Arbiter socket not ready") + + # Give workers time to start + time.sleep(0.5) + + def stop(self): + """Stop the dirty arbiter.""" + if self.arbiter_pid: + try: + os.kill(self.arbiter_pid, signal.SIGTERM) + os.waitpid(self.arbiter_pid, 0) + except (OSError, ChildProcessError): + pass + self.arbiter_pid = None + + # Cleanup temp directory + if self._tmpdir: + try: + for f in os.listdir(self._tmpdir): + os.unlink(os.path.join(self._tmpdir, f)) + os.rmdir(self._tmpdir) + except OSError: + pass + self._tmpdir = None + + def warmup(self, requests: int = 10): + """Warm up the pool with a few requests.""" + with DirtyClient(self.socket_path, timeout=30.0) as client: + for _ in range(requests): + client.execute(BENCHMARK_APP, "health") + + def run_benchmark( + self, + action: str, + args: tuple = (), + kwargs: dict = None, + total_requests: int = 1000, + concurrency: int = 10, + timeout: float = 30.0, + ) -> tuple[list[float], list[str]]: + """ + Run a benchmark with specified parameters. + + Each concurrent worker maintains a persistent connection to the arbiter + and makes sequential requests. This simulates how real HTTP workers + use the dirty client (one connection per worker thread). + + Args: + action: Action to call on the benchmark app + args: Positional arguments for the action + kwargs: Keyword arguments for the action + total_requests: Total number of requests to make + concurrency: Number of concurrent clients + timeout: Timeout per request in seconds + + Returns: + Tuple of (latencies in ms, error messages) + """ + kwargs = kwargs or {} + latencies = [] + errors = [] + lock = threading.Lock() + + # Calculate requests per worker + requests_per_worker = total_requests // concurrency + remainder = total_requests % concurrency + + def worker_task(num_requests: int) -> None: + """Worker that makes sequential requests on a persistent connection.""" + worker_latencies = [] + worker_errors = [] + + try: + client = DirtyClient(self.socket_path, timeout=timeout) + client.connect() + + for _ in range(num_requests): + try: + start = time.perf_counter() + client.execute(BENCHMARK_APP, action, *args, **kwargs) + elapsed = (time.perf_counter() - start) * 1000 + worker_latencies.append(elapsed) + except Exception as e: + worker_errors.append(str(e)) + # Reconnect on error + try: + client.close() + client = DirtyClient(self.socket_path, timeout=timeout) + client.connect() + except Exception: + pass + + client.close() + except Exception as e: + worker_errors.append(f"Connection error: {e}") + + # Add results to shared lists + with lock: + latencies.extend(worker_latencies) + errors.extend(worker_errors) + + # Run concurrent workers + with ThreadPoolExecutor(max_workers=concurrency) as executor: + futures = [] + for i in range(concurrency): + # Distribute remainder requests among first few workers + num = requests_per_worker + (1 if i < remainder else 0) + if num > 0: + futures.append(executor.submit(worker_task, num)) + + # Wait for all workers to complete + for future in as_completed(futures): + future.result() # Raises any exceptions + + return latencies, errors + + +class IntegratedBenchmark: + """ + Run benchmarks against gunicorn with dirty pool via HTTP. + + Uses wrk or ab for load testing, or falls back to Python requests. + """ + + def __init__( + self, + url: str = "http://127.0.0.1:8000", + verbose: bool = False, + ): + self.url = url.rstrip('/') + self.verbose = verbose + self._tool = None + + def check_dependencies(self) -> str | None: + """Check for available load testing tools.""" + for tool in ['wrk', 'ab']: + try: + subprocess.run([tool, '--version'], capture_output=True, + check=False) + return tool + except FileNotFoundError: + continue + return None + + def warmup(self, requests: int = 10): + """Warm up the server.""" + import urllib.request + for _ in range(requests): + try: + urllib.request.urlopen(f"{self.url}/health", timeout=5) + except Exception: + pass + + def run_wrk( + self, + path: str, + duration: int = 10, + threads: int = 4, + connections: int = 100, + ) -> dict: + """Run wrk benchmark and parse results.""" + url = f"{self.url}{path}" + cmd = [ + 'wrk', + '-t', str(threads), + '-c', str(connections), + '-d', f'{duration}s', + '--latency', + url, + ] + + result = subprocess.run(cmd, capture_output=True, text=True, + check=False) + return self._parse_wrk_output(result.stdout) + + def _parse_wrk_output(self, output: str) -> dict: + """Parse wrk output to extract metrics.""" + metrics = { + 'requests_per_sec': 0.0, + 'latency_ms': {}, + 'errors': 0, + } + + for line in output.split('\n'): + if 'Requests/sec' in line: + try: + metrics['requests_per_sec'] = float( + line.split(':')[1].strip()) + except (ValueError, IndexError): + pass + elif 'Latency' in line and 'Distribution' not in line: + parts = line.split() + if len(parts) >= 2: + metrics['latency_ms']['avg'] = self._parse_duration( + parts[1]) + elif '50%' in line: + parts = line.split() + if len(parts) >= 2: + metrics['latency_ms']['p50'] = self._parse_duration( + parts[1]) + elif '99%' in line: + parts = line.split() + if len(parts) >= 2: + metrics['latency_ms']['p99'] = self._parse_duration( + parts[1]) + elif 'Socket errors' in line: + # Parse error counts + parts = line.split(',') + for part in parts: + if any(x in part for x in ['connect', 'read', 'write', + 'timeout']): + try: + metrics['errors'] += int(part.split()[-1]) + except (ValueError, IndexError): + pass + + return metrics + + def _parse_duration(self, s: str) -> float: + """Parse wrk duration string (e.g., '12.34ms', '1.23s') to ms.""" + s = s.strip() + if s.endswith('us'): + return float(s[:-2]) / 1000 + elif s.endswith('ms'): + return float(s[:-2]) + elif s.endswith('s'): + return float(s[:-1]) * 1000 + else: + return float(s) + + def run_python_benchmark( + self, + path: str, + total_requests: int = 1000, + concurrency: int = 10, + timeout: float = 30.0, + ) -> tuple[list[float], list[str]]: + """ + Run benchmark using Python urllib. + + Fallback when wrk/ab not available. + """ + import urllib.request + import urllib.error + + url = f"{self.url}{path}" + latencies = [] + errors = [] + + def make_request() -> tuple[float | None, str | None]: + try: + start = time.perf_counter() + urllib.request.urlopen(url, timeout=timeout) + elapsed = (time.perf_counter() - start) * 1000 + return elapsed, None + except Exception as e: + return None, str(e) + + with ThreadPoolExecutor(max_workers=concurrency) as executor: + futures = [executor.submit(make_request) + for _ in range(total_requests)] + + for future in as_completed(futures): + latency, error = future.result() + if latency is not None: + latencies.append(latency) + if error: + errors.append(error) + + return latencies, errors + + +def run_isolated_suite( + workers: int = 2, + threads: int = 1, + verbose: bool = False, +) -> list[BenchmarkResult]: + """Run the full isolated benchmark suite.""" + results = [] + + bench = IsolatedBenchmark( + dirty_workers=workers, + dirty_threads=threads, + verbose=verbose, + ) + + print(f"\nStarting isolated benchmarks (workers={workers}, " + f"threads={threads})...") + + try: + bench.start() + bench.warmup() + + # Define scenarios + scenarios = [ + # Baseline + { + "name": "baseline_10ms", + "action": "sleep_task", + "args": (10,), + "requests": 1000, + "concurrency": 1, + "description": "Single request latency (10ms sleep)", + }, + # Throughput + { + "name": "throughput_10ms", + "action": "sleep_task", + "args": (10,), + "requests": 5000, + "concurrency": 100, + "description": "Max requests/sec (10ms sleep, 100 clients)", + }, + # CPU Bound + { + "name": "cpu_bound_100ms", + "action": "cpu_task", + "args": (100,), + "requests": 500, + "concurrency": 20, + "description": "CPU-bound work (100ms, 20 clients)", + }, + # I/O Bound + { + "name": "io_bound_500ms", + "action": "sleep_task", + "args": (500,), + "requests": 200, + "concurrency": 50, + "description": "I/O-bound work (500ms sleep, 50 clients)", + }, + # Mixed + { + "name": "mixed_50_50", + "action": "mixed_task", + "args": (50, 50), + "requests": 500, + "concurrency": 30, + "description": "Mixed workload (50ms sleep + 50ms CPU)", + }, + # Overload + { + "name": "overload_10ms", + "action": "sleep_task", + "args": (10,), + "requests": 2000, + "concurrency": 200, + "description": "Overload test (10ms, 200 clients)", + }, + ] + + for scenario in scenarios: + print(f" Running {scenario['name']}: {scenario['description']}...") + + start_time = time.perf_counter() + latencies, errors = bench.run_benchmark( + action=scenario["action"], + args=scenario.get("args", ()), + kwargs=scenario.get("kwargs"), + total_requests=scenario["requests"], + concurrency=scenario["concurrency"], + ) + duration = time.perf_counter() - start_time + + result = BenchmarkResult( + scenario=scenario["name"], + config={ + "dirty_workers": workers, + "dirty_threads": threads, + "task_action": scenario["action"], + "task_args": scenario.get("args", ()), + "concurrency": scenario["concurrency"], + }, + total_requests=scenario["requests"], + successful=len(latencies), + failed=len(errors), + errors=errors[:10] if errors else [], # First 10 errors + duration_sec=round(duration, 2), + requests_per_sec=round(len(latencies) / duration, 1), + latency_ms=LatencyStats.from_samples(latencies), + ) + results.append(result) + + print(f" Requests/sec: {result.requests_per_sec:.1f}, " + f"p50: {result.latency_ms.p50:.1f}ms, " + f"p99: {result.latency_ms.p99:.1f}ms, " + f"failed: {result.failed}") + + finally: + bench.stop() + + return results + + +def run_payload_suite( + workers: int = 2, + threads: int = 1, + verbose: bool = False, +) -> list[BenchmarkResult]: + """Run payload size benchmark suite.""" + results = [] + + bench = IsolatedBenchmark( + dirty_workers=workers, + dirty_threads=threads, + verbose=verbose, + ) + + print(f"\nStarting payload benchmarks (workers={workers})...") + + try: + bench.start() + bench.warmup() + + # Payload sizes to test + payload_sizes = [ + (100, "100B", "Tiny payload"), + (1024, "1KB", "Small payload"), + (10240, "10KB", "Medium payload"), + (102400, "100KB", "Large payload"), + (1048576, "1MB", "Very large payload"), + ] + + for size, size_label, description in payload_sizes: + # Adjust concurrency for larger payloads + concurrency = max(5, 100 // (size // 1024 + 1)) + requests = max(100, 1000 // (size // 1024 + 1)) + + print(f" Running payload_{size_label}: {description}...") + + start_time = time.perf_counter() + latencies, errors = bench.run_benchmark( + action="payload_task", + args=(size,), + total_requests=requests, + concurrency=concurrency, + ) + duration = time.perf_counter() - start_time + + result = BenchmarkResult( + scenario=f"payload_{size_label}", + config={ + "dirty_workers": workers, + "dirty_threads": threads, + "payload_bytes": size, + "concurrency": concurrency, + }, + total_requests=requests, + successful=len(latencies), + failed=len(errors), + errors=errors[:5] if errors else [], + duration_sec=round(duration, 2), + requests_per_sec=round(len(latencies) / duration, 1), + latency_ms=LatencyStats.from_samples(latencies), + ) + results.append(result) + + # Calculate throughput in MB/s + throughput_mb = (len(latencies) * size) / duration / 1024 / 1024 + + print(f" Requests/sec: {result.requests_per_sec:.1f}, " + f"p50: {result.latency_ms.p50:.1f}ms, " + f"throughput: {throughput_mb:.1f} MB/s") + + finally: + bench.stop() + + return results + + +def run_quick_test(verbose: bool = False) -> list[BenchmarkResult]: + """Run a quick smoke test.""" + results = [] + + bench = IsolatedBenchmark(dirty_workers=1, dirty_threads=1, verbose=verbose) + + print("\nRunning quick smoke test...") + + try: + bench.start() + bench.warmup(5) + + # Simple test + start_time = time.perf_counter() + latencies, errors = bench.run_benchmark( + action="sleep_task", + args=(10,), + total_requests=100, + concurrency=10, + ) + duration = time.perf_counter() - start_time + + result = BenchmarkResult( + scenario="quick_test", + config={"dirty_workers": 1, "dirty_threads": 1}, + total_requests=100, + successful=len(latencies), + failed=len(errors), + errors=errors[:5] if errors else [], + duration_sec=round(duration, 2), + requests_per_sec=round(len(latencies) / duration, 1), + latency_ms=LatencyStats.from_samples(latencies), + ) + results.append(result) + + print(f" Requests/sec: {result.requests_per_sec:.1f}, " + f"p50: {result.latency_ms.p50:.1f}ms, " + f"failed: {result.failed}") + + if result.failed == 0: + print(" PASS: Quick test successful") + else: + print(f" WARN: {result.failed} requests failed") + + finally: + bench.stop() + + return results + + +def run_config_sweep(verbose: bool = False) -> list[BenchmarkResult]: + """ + Sweep through different configurations to find optimal settings. + + Tests combinations of workers and threads. + """ + results = [] + + configs = [ + (1, 1), # Baseline + (2, 1), # 2 workers, 1 thread each + (4, 1), # 4 workers, 1 thread each + (2, 2), # 2 workers, 2 threads each + (2, 4), # 2 workers, 4 threads each + (4, 2), # 4 workers, 2 threads each + ] + + print("\nRunning configuration sweep...") + + for workers, threads in configs: + print(f"\n Testing workers={workers}, threads={threads}...") + + bench = IsolatedBenchmark( + dirty_workers=workers, + dirty_threads=threads, + verbose=verbose, + ) + + try: + bench.start() + bench.warmup() + + # Run a standard workload + start_time = time.perf_counter() + latencies, errors = bench.run_benchmark( + action="mixed_task", + args=(20, 20), # 20ms sleep + 20ms CPU + total_requests=1000, + concurrency=50, + ) + duration = time.perf_counter() - start_time + + result = BenchmarkResult( + scenario=f"config_w{workers}_t{threads}", + config={ + "dirty_workers": workers, + "dirty_threads": threads, + "task": "mixed_task(20, 20)", + "concurrency": 50, + }, + total_requests=1000, + successful=len(latencies), + failed=len(errors), + errors=errors[:5] if errors else [], + duration_sec=round(duration, 2), + requests_per_sec=round(len(latencies) / duration, 1), + latency_ms=LatencyStats.from_samples(latencies), + ) + results.append(result) + + print(f" Requests/sec: {result.requests_per_sec:.1f}, " + f"p50: {result.latency_ms.p50:.1f}ms, " + f"p99: {result.latency_ms.p99:.1f}ms") + + finally: + bench.stop() + + # Print summary + print("\n Configuration Summary:") + print(" " + "-" * 60) + sorted_results = sorted(results, key=lambda r: -r.requests_per_sec) + for r in sorted_results: + cfg = r.config + print(f" w={cfg['dirty_workers']}, t={cfg['dirty_threads']}: " + f"{r.requests_per_sec:.1f} req/s, " + f"p99={r.latency_ms.p99:.1f}ms") + + return results + + +def generate_report(results: list[BenchmarkResult], output_path: str = None): + """Generate a summary report from benchmark results.""" + print("\n" + "=" * 70) + print("BENCHMARK REPORT") + print("=" * 70) + + for result in results: + print(f"\n{result.scenario}") + print("-" * 40) + print(f" Config: {json.dumps(result.config, indent=None)}") + print(f" Requests: {result.successful}/{result.total_requests} " + f"({result.failed} failed)") + print(f" Duration: {result.duration_sec}s") + print(f" Throughput: {result.requests_per_sec:.1f} req/s") + print(f" Latency (ms):") + print(f" min: {result.latency_ms.min:.2f}") + print(f" p50: {result.latency_ms.p50:.2f}") + print(f" p95: {result.latency_ms.p95:.2f}") + print(f" p99: {result.latency_ms.p99:.2f}") + print(f" max: {result.latency_ms.max:.2f}") + print(f" mean: {result.latency_ms.mean:.2f} " + f"(stddev: {result.latency_ms.stddev:.2f})") + + if result.errors: + print(f" Errors (first {len(result.errors)}):") + for err in result.errors[:3]: + print(f" - {err[:80]}") + + if output_path: + output_data = { + "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S"), + "results": [r.to_dict() for r in results], + } + with open(output_path, 'w') as f: + json.dump(output_data, f, indent=2) + print(f"\nResults saved to: {output_path}") + + +def main(): + parser = argparse.ArgumentParser( + description='Benchmark the gunicorn dirty pool', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=__doc__, + ) + + # Mode selection + mode_group = parser.add_mutually_exclusive_group() + mode_group.add_argument('--quick', action='store_true', + help='Run quick smoke test') + mode_group.add_argument('--isolated', action='store_true', + help='Run isolated benchmark suite') + mode_group.add_argument('--payload-tests', action='store_true', + help='Run payload size tests') + mode_group.add_argument('--config-sweep', action='store_true', + help='Sweep through configurations') + mode_group.add_argument('--integrated', action='store_true', + help='Run integrated HTTP benchmarks') + + # Configuration + parser.add_argument('--workers', type=int, default=2, + help='Number of dirty workers (default: 2)') + parser.add_argument('--threads', type=int, default=1, + help='Threads per dirty worker (default: 1)') + parser.add_argument('--duration', type=int, default=10, + help='Task duration in ms for custom run') + parser.add_argument('--concurrency', type=int, default=10, + help='Number of concurrent clients') + parser.add_argument('--requests', type=int, default=1000, + help='Total requests to make') + + # Integration mode options + parser.add_argument('--url', default='http://127.0.0.1:8000', + help='Server URL for integrated tests') + + # Output + parser.add_argument('--output', '-o', + help='Output JSON file for results') + parser.add_argument('--verbose', '-v', action='store_true', + help='Verbose output') + + args = parser.parse_args() + + results = [] + + try: + if args.quick: + results = run_quick_test(verbose=args.verbose) + elif args.isolated: + results = run_isolated_suite( + workers=args.workers, + threads=args.threads, + verbose=args.verbose, + ) + elif args.payload_tests: + results = run_payload_suite( + workers=args.workers, + threads=args.threads, + verbose=args.verbose, + ) + elif args.config_sweep: + results = run_config_sweep(verbose=args.verbose) + elif args.integrated: + bench = IntegratedBenchmark(url=args.url, verbose=args.verbose) + tool = bench.check_dependencies() + + if tool == 'wrk': + print(f"\nRunning integrated benchmarks with wrk...") + bench.warmup() + + # Run basic scenarios + scenarios = [ + ("/sleep?duration=10", "sleep_10ms"), + ("/cpu?duration=100", "cpu_100ms"), + ("/mixed?sleep=50&cpu=50", "mixed_50_50"), + ] + + for path, name in scenarios: + print(f" Running {name}...") + metrics = bench.run_wrk(path, duration=10, connections=100) + print(f" Requests/sec: {metrics.get('requests_per_sec', 'N/A')}") + + print("\nNote: For detailed results, use wrk directly:") + print(f" wrk -t4 -c100 -d30s --latency '{args.url}/sleep?duration=10'") + else: + print("\nUsing Python fallback (install wrk for better results)...") + bench.warmup() + + latencies, errors = bench.run_python_benchmark( + "/sleep?duration=10", + total_requests=args.requests, + concurrency=args.concurrency, + ) + + result = BenchmarkResult( + scenario="integrated_sleep", + config={"url": args.url, "concurrency": args.concurrency}, + total_requests=args.requests, + successful=len(latencies), + failed=len(errors), + errors=errors[:5], + duration_sec=sum(latencies) / 1000 / args.concurrency, + requests_per_sec=len(latencies) / (sum(latencies) / 1000 / + args.concurrency), + latency_ms=LatencyStats.from_samples(latencies), + ) + results.append(result) + + else: + # Default: run custom single benchmark + print(f"\nRunning custom benchmark: " + f"duration={args.duration}ms, concurrency={args.concurrency}") + + bench = IsolatedBenchmark( + dirty_workers=args.workers, + dirty_threads=args.threads, + verbose=args.verbose, + ) + + try: + bench.start() + bench.warmup() + + start_time = time.perf_counter() + latencies, errors = bench.run_benchmark( + action="sleep_task", + args=(args.duration,), + total_requests=args.requests, + concurrency=args.concurrency, + ) + duration = time.perf_counter() - start_time + + result = BenchmarkResult( + scenario="custom", + config={ + "dirty_workers": args.workers, + "dirty_threads": args.threads, + "task_duration_ms": args.duration, + "concurrency": args.concurrency, + }, + total_requests=args.requests, + successful=len(latencies), + failed=len(errors), + errors=errors[:10], + duration_sec=round(duration, 2), + requests_per_sec=round(len(latencies) / duration, 1), + latency_ms=LatencyStats.from_samples(latencies), + ) + results.append(result) + + finally: + bench.stop() + + # Generate report + if results: + generate_report(results, args.output) + + except KeyboardInterrupt: + print("\nBenchmark interrupted") + sys.exit(1) + except Exception as e: + print(f"\nError: {e}") + if args.verbose: + import traceback + traceback.print_exc() + sys.exit(1) + + +if __name__ == '__main__': + main() diff --git a/gunicorn/dirty/arbiter.py b/gunicorn/dirty/arbiter.py index bfeaab82..29ddcb6f 100644 --- a/gunicorn/dirty/arbiter.py +++ b/gunicorn/dirty/arbiter.py @@ -67,6 +67,8 @@ class DirtyArbiter: self.workers = {} # pid -> DirtyWorker self.worker_sockets = {} # pid -> socket_path self.worker_connections = {} # pid -> (reader, writer) + self.worker_locks = {} # pid -> asyncio.Lock (serialize requests per worker) + self._worker_rr_index = 0 # Round-robin index for worker selection self.worker_age = 0 self.alive = True @@ -224,6 +226,9 @@ class DirtyArbiter: """ Route a request to an available dirty worker. + Requests to each worker are serialized using a per-worker lock + to ensure only one request is in flight at a time per worker. + Args: request: Request message dict @@ -240,43 +245,57 @@ class DirtyArbiter: DirtyError("No dirty workers available") ) - try: - # Get or establish connection to worker - reader, writer = await self._get_worker_connection(worker_pid) + # Get or create lock for this worker + if worker_pid not in self.worker_locks: + self.worker_locks[worker_pid] = asyncio.Lock() + worker_lock = self.worker_locks[worker_pid] - # Send request to worker - await DirtyProtocol.write_message_async(writer, request) - - # Wait for response with timeout + # Serialize requests to this worker + async with worker_lock: try: - response = await asyncio.wait_for( - DirtyProtocol.read_message_async(reader), - timeout=self.cfg.dirty_timeout - ) - return response - except asyncio.TimeoutError: + # Get or establish connection to worker + reader, writer = await self._get_worker_connection(worker_pid) + + # Send request to worker + await DirtyProtocol.write_message_async(writer, request) + + # Wait for response with timeout + try: + response = await asyncio.wait_for( + DirtyProtocol.read_message_async(reader), + timeout=self.cfg.dirty_timeout + ) + return response + except asyncio.TimeoutError: + return make_error_response( + request_id, + DirtyTimeoutError("Worker timeout", self.cfg.dirty_timeout) + ) + except Exception as e: + self.log.error("Error routing request to worker %s: %s", + worker_pid, e) + # Remove failed connection + self._close_worker_connection(worker_pid) return make_error_response( request_id, - DirtyTimeoutError("Worker timeout", self.cfg.dirty_timeout) + DirtyWorkerError(f"Worker communication failed: {e}", + worker_id=worker_pid) ) - except Exception as e: - self.log.error("Error routing request to worker %s: %s", - worker_pid, e) - # Remove failed connection - self._close_worker_connection(worker_pid) - return make_error_response( - request_id, - DirtyWorkerError(f"Worker communication failed: {e}", - worker_id=worker_pid) - ) async def _get_available_worker(self): - """Get an available worker PID.""" - for pid in list(self.workers.keys()): - # For now, just return first worker - # Future: implement load balancing - return pid - return None + """ + Get an available worker PID using round-robin selection. + + Distributes requests across all available workers evenly to + maximize throughput when multiple workers are configured. + """ + worker_pids = list(self.workers.keys()) + if not worker_pids: + return None + + # Round-robin selection + self._worker_rr_index = (self._worker_rr_index + 1) % len(worker_pids) + return worker_pids[self._worker_rr_index] async def _get_worker_connection(self, worker_pid): """Get or create connection to a worker.""" @@ -373,6 +392,7 @@ class DirtyArbiter: def _cleanup_worker(self, pid): """Clean up after a worker exits.""" self._close_worker_connection(pid) + self.worker_locks.pop(pid, None) worker = self.workers.pop(pid, None) if worker: self.cfg.dirty_worker_exit(self, worker)