gunicorn/benchmarks/dirty_streaming.py
Benoit Chesneau f6418d4eb0 feat(dirty): add streaming support and async client benchmarks
Add support for streaming responses when dirty app actions return
generators (sync or async). This enables real-time delivery of
incremental results for use cases like LLM token generation.

Features:
- Streaming protocol with chunk/end/error message types
- Worker support for sync and async generators
- Arbiter forwarding of streaming messages
- Deadline-based timeout handling
- Async client streaming API

Protocol:
- Chunk messages (type: "chunk") contain partial data
- End messages (type: "end") signal stream completion
- Error messages can occur mid-stream

New files:
- benchmarks/dirty_streaming.py: Streaming benchmark suite
- tests/dirty/test_*_streaming*.py: Streaming test coverage
- docs/content/dirty.md: Streaming documentation with examples
2026-01-25 10:23:25 +01:00

756 lines
23 KiB
Python

#!/usr/bin/env python
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
"""
Benchmark suite for dirty worker streaming functionality.
This script benchmarks the streaming performance of dirty workers
to measure throughput, latency, and memory usage.
Usage:
python benchmarks/dirty_streaming.py [OPTIONS]
Options:
--quick Run quick benchmarks only
--full Run full benchmark suite including stress tests
"""
import argparse
import asyncio
import gc
import json
import os
import struct
import sys
import time
import tracemalloc
from datetime import datetime
from unittest import mock
# Add parent directory to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from gunicorn.dirty.protocol import (
DirtyProtocol,
make_request,
make_chunk_message,
make_end_message,
make_response,
)
from gunicorn.dirty.worker import DirtyWorker
from gunicorn.dirty.arbiter import DirtyArbiter
from gunicorn.dirty.client import (
DirtyClient,
DirtyStreamIterator,
DirtyAsyncStreamIterator,
)
from gunicorn.config import Config
class MockStreamWriter:
"""Mock StreamWriter that captures written messages."""
def __init__(self):
self.messages = []
self._buffer = b""
self.bytes_written = 0
def write(self, data):
self._buffer += data
self.bytes_written += len(data)
async def drain(self):
while len(self._buffer) >= DirtyProtocol.HEADER_SIZE:
length = struct.unpack(
DirtyProtocol.HEADER_FORMAT,
self._buffer[:DirtyProtocol.HEADER_SIZE]
)[0]
total_size = DirtyProtocol.HEADER_SIZE + length
if len(self._buffer) >= total_size:
msg_data = self._buffer[DirtyProtocol.HEADER_SIZE:total_size]
self._buffer = self._buffer[total_size:]
self.messages.append(DirtyProtocol.decode(msg_data))
else:
break
def close(self):
pass
async def wait_closed(self):
pass
class MockStreamReader:
"""Mock StreamReader that yields predefined messages."""
def __init__(self, messages):
self._data = b''
for msg in messages:
self._data += DirtyProtocol.encode(msg)
self._pos = 0
async def readexactly(self, n):
if self._pos + n > len(self._data):
raise asyncio.IncompleteReadError(self._data[self._pos:], n)
result = self._data[self._pos:self._pos + n]
self._pos += n
return result
class MockLog:
"""Silent logger for benchmarks."""
def debug(self, msg, *args):
pass
def info(self, msg, *args):
pass
def warning(self, msg, *args):
pass
def error(self, msg, *args):
pass
def close_on_exec(self):
pass
def reopen_files(self):
pass
def create_worker():
"""Create a test worker for benchmarks."""
cfg = Config()
cfg.set("dirty_timeout", 300)
log = MockLog()
with mock.patch('gunicorn.dirty.worker.WorkerTmp'):
worker = DirtyWorker(
age=1,
ppid=os.getpid(),
app_paths=["benchmark:App"],
cfg=cfg,
log=log,
socket_path="/tmp/benchmark.sock"
)
worker.apps = {}
worker._executor = None
worker.tmp = mock.Mock()
return worker
def create_arbiter():
"""Create a test arbiter for benchmarks."""
cfg = Config()
cfg.set("dirty_timeout", 300)
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
arbiter.alive = True
arbiter.workers = {1234: mock.Mock()}
arbiter.worker_sockets = {1234: '/tmp/worker.sock'}
return arbiter
class BenchmarkResults:
"""Store and display benchmark results."""
def __init__(self):
self.results = []
def add(self, name, iterations, duration, chunks=None, bytes_total=None,
memory_start=None, memory_end=None):
throughput = iterations / duration if duration > 0 else 0
result = {
"name": name,
"iterations": iterations,
"duration_s": round(duration, 4),
"throughput_per_s": round(throughput, 2),
}
if chunks:
result["chunks_per_s"] = round(chunks / duration, 2)
if bytes_total:
result["mb_per_s"] = round(bytes_total / (1024 * 1024) / duration, 2)
if memory_start is not None and memory_end is not None:
result["memory_start_mb"] = round(memory_start / (1024 * 1024), 2)
result["memory_end_mb"] = round(memory_end / (1024 * 1024), 2)
result["memory_delta_mb"] = round((memory_end - memory_start) / (1024 * 1024), 2)
self.results.append(result)
def display(self):
print("\n" + "=" * 70)
print("BENCHMARK RESULTS")
print("=" * 70)
for result in self.results:
print(f"\n{result['name']}")
print("-" * 50)
for key, value in result.items():
if key != "name":
print(f" {key}: {value}")
print("\n" + "=" * 70)
def save_json(self, filepath):
with open(filepath, 'w') as f:
json.dump({
"timestamp": datetime.now().isoformat(),
"results": self.results
}, f, indent=2)
print(f"Results saved to {filepath}")
async def benchmark_worker_streaming_throughput(results, chunk_size=1024, num_chunks=1000):
"""Benchmark worker streaming throughput with various chunk sizes."""
worker = create_worker()
writer = MockStreamWriter()
chunk_data = "x" * chunk_size
async def sync_gen():
for _ in range(num_chunks):
yield chunk_data
async def mock_execute(app_path, action, args, kwargs):
return sync_gen()
gc.collect()
tracemalloc.start()
memory_start = tracemalloc.get_traced_memory()[0]
start = time.perf_counter()
with mock.patch.object(worker, 'execute', side_effect=mock_execute):
request = make_request("bench-1", "benchmark:App", "stream")
await worker.handle_request(request, writer)
duration = time.perf_counter() - start
memory_end = tracemalloc.get_traced_memory()[0]
tracemalloc.stop()
total_bytes = chunk_size * num_chunks
results.add(
f"Worker streaming ({chunk_size}B chunks, {num_chunks} chunks)",
iterations=1,
duration=duration,
chunks=num_chunks,
bytes_total=total_bytes,
memory_start=memory_start,
memory_end=memory_end
)
async def benchmark_arbiter_forwarding(results, num_chunks=1000):
"""Benchmark arbiter message forwarding throughput."""
arbiter = create_arbiter()
messages = []
for i in range(num_chunks):
messages.append(make_chunk_message(f"bench-{i}", f"data-{i}"))
messages.append(make_end_message(f"bench-{num_chunks}"))
mock_reader = MockStreamReader(messages)
async def mock_get_connection(pid):
return mock_reader, MockStreamWriter()
arbiter._get_worker_connection = mock_get_connection
client_writer = MockStreamWriter()
gc.collect()
start = time.perf_counter()
request = make_request("bench-forward", "benchmark:App", "stream")
await arbiter._execute_on_worker(1234, request, client_writer)
duration = time.perf_counter() - start
results.add(
f"Arbiter forwarding ({num_chunks} chunks)",
iterations=1,
duration=duration,
chunks=num_chunks,
bytes_total=client_writer.bytes_written
)
arbiter._cleanup_sync()
async def benchmark_streaming_latency(results, iterations=100):
"""Benchmark time-to-first-chunk and time-to-last-chunk."""
worker = create_worker()
first_chunk_times = []
total_times = []
for _ in range(iterations):
writer = MockStreamWriter()
async def gen_3_chunks():
yield "first"
yield "second"
yield "third"
async def mock_execute(app_path, action, args, kwargs):
return gen_3_chunks()
start = time.perf_counter()
with mock.patch.object(worker, 'execute', side_effect=mock_execute):
request = make_request("bench-latency", "benchmark:App", "stream")
await worker.handle_request(request, writer)
# Find time when first chunk was received
if writer.messages:
first_chunk_times.append(time.perf_counter() - start)
total_times.append(time.perf_counter() - start)
avg_first_chunk = sum(first_chunk_times) / len(first_chunk_times) if first_chunk_times else 0
avg_total = sum(total_times) / len(total_times)
print(f"\nLatency Results ({iterations} iterations):")
print(f" Avg time-to-first-chunk: {avg_first_chunk * 1000:.3f}ms")
print(f" Avg time-to-last-chunk: {avg_total * 1000:.3f}ms")
results.add(
f"Streaming latency ({iterations} iterations)",
iterations=iterations,
duration=sum(total_times),
chunks=iterations * 3
)
async def benchmark_concurrent_streams(results, num_streams=10, chunks_per_stream=100):
"""Benchmark multiple concurrent streams."""
arbiter = create_arbiter()
async def run_stream(stream_id):
messages = []
for i in range(chunks_per_stream):
messages.append(make_chunk_message(f"stream-{stream_id}", f"chunk-{i}"))
messages.append(make_end_message(f"stream-{stream_id}"))
mock_reader = MockStreamReader(messages)
async def mock_get_connection(pid):
return mock_reader, MockStreamWriter()
arbiter._get_worker_connection = mock_get_connection
client_writer = MockStreamWriter()
request = make_request(f"bench-concurrent-{stream_id}", "benchmark:App", "stream")
await arbiter._execute_on_worker(1234, request, client_writer)
return len(client_writer.messages)
gc.collect()
start = time.perf_counter()
# Run streams concurrently
tasks = [run_stream(i) for i in range(num_streams)]
results_list = await asyncio.gather(*tasks)
duration = time.perf_counter() - start
total_chunks = sum(results_list)
results.add(
f"Concurrent streams ({num_streams} streams, {chunks_per_stream} chunks each)",
iterations=num_streams,
duration=duration,
chunks=total_chunks
)
arbiter._cleanup_sync()
async def benchmark_memory_stability(results, iterations=10, chunks=1000):
"""Check memory stability over many iterations."""
worker = create_worker()
gc.collect()
tracemalloc.start()
memory_samples = [tracemalloc.get_traced_memory()[0]]
for i in range(iterations):
writer = MockStreamWriter()
async def gen_chunks():
for j in range(chunks):
yield f"chunk-{j}"
async def mock_execute(app_path, action, args, kwargs):
return gen_chunks()
with mock.patch.object(worker, 'execute', side_effect=mock_execute):
request = make_request(f"bench-mem-{i}", "benchmark:App", "stream")
await worker.handle_request(request, writer)
gc.collect()
memory_samples.append(tracemalloc.get_traced_memory()[0])
tracemalloc.stop()
memory_start = memory_samples[0]
memory_end = memory_samples[-1]
memory_max = max(memory_samples)
print(f"\nMemory stability ({iterations} iterations of {chunks} chunks):")
print(f" Start: {memory_start / 1024 / 1024:.2f}MB")
print(f" End: {memory_end / 1024 / 1024:.2f}MB")
print(f" Max: {memory_max / 1024 / 1024:.2f}MB")
print(f" Delta: {(memory_end - memory_start) / 1024 / 1024:.2f}MB")
results.add(
f"Memory stability ({iterations} x {chunks} chunks)",
iterations=iterations * chunks,
duration=0.001, # Use small non-zero value to avoid division by zero
memory_start=memory_start,
memory_end=memory_end
)
class MockClientReader:
"""Mock async reader that simulates receiving streaming messages."""
def __init__(self, num_chunks, chunk_data):
self.num_chunks = num_chunks
self.chunk_data = chunk_data
self._chunk_idx = 0
self._messages = []
self._build_messages()
self._pos = 0
self._data = b''
for msg in self._messages:
self._data += DirtyProtocol.encode(msg)
def _build_messages(self):
for i in range(self.num_chunks):
self._messages.append(make_chunk_message(f"bench-{i}", self.chunk_data))
self._messages.append(make_end_message(f"bench-end"))
async def readexactly(self, n):
if self._pos + n > len(self._data):
raise asyncio.IncompleteReadError(self._data[self._pos:], n)
result = self._data[self._pos:self._pos + n]
self._pos += n
return result
class MockClientWriter:
"""Mock async writer for client connection."""
def __init__(self):
self._buffer = b""
self._closed = False
def write(self, data):
self._buffer += data
async def drain(self):
pass
def close(self):
self._closed = True
async def wait_closed(self):
pass
async def benchmark_async_client_streaming(results, chunk_size=1024, num_chunks=1000):
"""
Benchmark DirtyAsyncStreamIterator directly.
Measures async iterator overhead vs raw message reading.
"""
chunk_data = "x" * chunk_size
# Create mock client with mock reader/writer
client = DirtyClient("/tmp/benchmark.sock", timeout=30.0)
client._reader = MockClientReader(num_chunks, chunk_data)
client._writer = MockClientWriter()
gc.collect()
tracemalloc.start()
memory_start = tracemalloc.get_traced_memory()[0]
start = time.perf_counter()
# Use the async stream iterator directly
iterator = DirtyAsyncStreamIterator(client, "benchmark:App", "stream", (), {})
iterator._started = True # Skip the request sending
iterator._request_id = "bench-async"
iterator._deadline = time.perf_counter() + 300 # 5 min deadline
iterator._last_chunk_time = time.perf_counter()
chunks_received = 0
bytes_received = 0
async for chunk in iterator:
chunks_received += 1
bytes_received += len(chunk)
duration = time.perf_counter() - start
memory_end = tracemalloc.get_traced_memory()[0]
tracemalloc.stop()
results.add(
f"Async client streaming ({chunk_size}B chunks, {num_chunks} chunks)",
iterations=1,
duration=duration,
chunks=chunks_received,
bytes_total=bytes_received,
memory_start=memory_start,
memory_end=memory_end
)
async def benchmark_sync_client_streaming(results, chunk_size=1024, num_chunks=1000):
"""
Benchmark DirtyStreamIterator directly (for comparison with async).
Note: This runs the sync iterator within an async context for comparison.
"""
chunk_data = "x" * chunk_size
# Build raw message data
messages_data = b''
for i in range(num_chunks):
msg = make_chunk_message(f"bench-{i}", chunk_data)
messages_data += DirtyProtocol.encode(msg)
messages_data += DirtyProtocol.encode(make_end_message("bench-end"))
# Create a mock socket-like object
class MockSocket:
def __init__(self, data):
self._data = data
self._pos = 0
self._timeout = None
def recv(self, n, flags=0):
if self._pos >= len(self._data):
return b''
result = self._data[self._pos:self._pos + n]
self._pos += len(result)
return result
def settimeout(self, timeout):
self._timeout = timeout
# Create mock client
client = DirtyClient("/tmp/benchmark.sock", timeout=30.0)
client._sock = MockSocket(messages_data)
gc.collect()
tracemalloc.start()
memory_start = tracemalloc.get_traced_memory()[0]
start = time.perf_counter()
# Use the sync stream iterator
iterator = DirtyStreamIterator(client, "benchmark:App", "stream", (), {})
iterator._started = True # Skip the request sending
iterator._request_id = "bench-sync"
iterator._deadline = time.perf_counter() + 300 # 5 min deadline
iterator._last_chunk_time = time.perf_counter()
chunks_received = 0
bytes_received = 0
for chunk in iterator:
chunks_received += 1
bytes_received += len(chunk)
duration = time.perf_counter() - start
memory_end = tracemalloc.get_traced_memory()[0]
tracemalloc.stop()
results.add(
f"Sync client streaming ({chunk_size}B chunks, {num_chunks} chunks)",
iterations=1,
duration=duration,
chunks=chunks_received,
bytes_total=bytes_received,
memory_start=memory_start,
memory_end=memory_end
)
async def benchmark_async_vs_sync_client_streaming(results, chunk_size=1024, num_chunks=1000):
"""
Compare stream() vs stream_async() performance with the same workload.
"""
chunk_data = "x" * chunk_size
# --- Sync test ---
messages_data = b''
for i in range(num_chunks):
msg = make_chunk_message(f"bench-{i}", chunk_data)
messages_data += DirtyProtocol.encode(msg)
messages_data += DirtyProtocol.encode(make_end_message("bench-end"))
class MockSocket:
def __init__(self, data):
self._data = data
self._pos = 0
self._timeout = None
def recv(self, n, flags=0):
if self._pos >= len(self._data):
return b''
result = self._data[self._pos:self._pos + n]
self._pos += len(result)
return result
def settimeout(self, timeout):
self._timeout = timeout
sync_client = DirtyClient("/tmp/benchmark.sock", timeout=30.0)
sync_client._sock = MockSocket(messages_data)
gc.collect()
sync_start = time.perf_counter()
sync_iter = DirtyStreamIterator(sync_client, "benchmark:App", "stream", (), {})
sync_iter._started = True
sync_iter._request_id = "bench-sync"
sync_iter._deadline = time.perf_counter() + 300 # 5 min deadline
sync_iter._last_chunk_time = time.perf_counter()
sync_chunks = 0
for _ in sync_iter:
sync_chunks += 1
sync_duration = time.perf_counter() - sync_start
# --- Async test ---
async_client = DirtyClient("/tmp/benchmark.sock", timeout=30.0)
async_client._reader = MockClientReader(num_chunks, chunk_data)
async_client._writer = MockClientWriter()
gc.collect()
async_start = time.perf_counter()
async_iter = DirtyAsyncStreamIterator(async_client, "benchmark:App", "stream", (), {})
async_iter._started = True
async_iter._request_id = "bench-async"
async_iter._deadline = time.perf_counter() + 300 # 5 min deadline
async_iter._last_chunk_time = time.perf_counter()
async_chunks = 0
async for _ in async_iter:
async_chunks += 1
async_duration = time.perf_counter() - async_start
# Report comparison
print(f"\nSync vs Async Client Streaming Comparison ({num_chunks} x {chunk_size}B chunks):")
print(f" Sync: {sync_duration * 1000:.3f}ms ({sync_chunks} chunks)")
print(f" Async: {async_duration * 1000:.3f}ms ({async_chunks} chunks)")
if sync_duration > 0:
ratio = async_duration / sync_duration
print(f" Ratio (async/sync): {ratio:.3f}x")
results.add(
f"Sync client streaming comparison ({chunk_size}B, {num_chunks} chunks)",
iterations=1,
duration=sync_duration,
chunks=sync_chunks,
bytes_total=sync_chunks * chunk_size
)
results.add(
f"Async client streaming comparison ({chunk_size}B, {num_chunks} chunks)",
iterations=1,
duration=async_duration,
chunks=async_chunks,
bytes_total=async_chunks * chunk_size
)
async def run_quick_benchmarks():
"""Run quick benchmarks."""
results = BenchmarkResults()
print("Running quick benchmarks...")
await benchmark_worker_streaming_throughput(results, chunk_size=64, num_chunks=1000)
await benchmark_worker_streaming_throughput(results, chunk_size=1024, num_chunks=1000)
await benchmark_arbiter_forwarding(results, num_chunks=1000)
await benchmark_streaming_latency(results, iterations=50)
# Async client streaming benchmarks
await benchmark_async_client_streaming(results, chunk_size=1024, num_chunks=1000)
await benchmark_async_vs_sync_client_streaming(results, chunk_size=1024, num_chunks=1000)
return results
async def run_full_benchmarks():
"""Run full benchmark suite including stress tests."""
results = BenchmarkResults()
print("Running full benchmark suite...")
# Throughput tests with different chunk sizes
for chunk_size in [1, 64, 1024, 65536]:
await benchmark_worker_streaming_throughput(
results, chunk_size=chunk_size, num_chunks=1000
)
# Arbiter forwarding
await benchmark_arbiter_forwarding(results, num_chunks=10000)
# Latency
await benchmark_streaming_latency(results, iterations=100)
# Concurrent streams
await benchmark_concurrent_streams(results, num_streams=10, chunks_per_stream=100)
await benchmark_concurrent_streams(results, num_streams=50, chunks_per_stream=100)
# Memory stability
await benchmark_memory_stability(results, iterations=20, chunks=1000)
# Async client streaming benchmarks
for chunk_size in [64, 1024, 65536]:
await benchmark_async_client_streaming(results, chunk_size=chunk_size, num_chunks=1000)
await benchmark_sync_client_streaming(results, chunk_size=chunk_size, num_chunks=1000)
# Comparison benchmark
await benchmark_async_vs_sync_client_streaming(results, chunk_size=1024, num_chunks=5000)
return results
def main():
parser = argparse.ArgumentParser(description="Dirty streaming benchmarks")
parser.add_argument("--quick", action="store_true", help="Run quick benchmarks only")
parser.add_argument("--full", action="store_true", help="Run full benchmark suite")
parser.add_argument("--output", "-o", help="Output JSON file path")
args = parser.parse_args()
if args.full:
results = asyncio.run(run_full_benchmarks())
else:
results = asyncio.run(run_quick_benchmarks())
results.display()
if args.output:
results.save_json(args.output)
else:
# Save to default location
output_dir = os.path.dirname(os.path.abspath(__file__))
results_dir = os.path.join(output_dir, "results")
os.makedirs(results_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_file = os.path.join(results_dir, f"streaming_benchmark_{timestamp}.json")
results.save_json(output_file)
if __name__ == "__main__":
main()