diff --git a/examples/celery_alternative/Dockerfile b/examples/celery_alternative/Dockerfile index ed160486..a3a49bd1 100644 --- a/examples/celery_alternative/Dockerfile +++ b/examples/celery_alternative/Dockerfile @@ -23,7 +23,7 @@ RUN pip install --no-cache-dir /gunicorn-src # Copy example application COPY examples/celery_alternative /app -RUN pip install --no-cache-dir flask +RUN pip install --no-cache-dir fastapi uvloop requests pytest # Environment variables ENV PYTHONUNBUFFERED=1 @@ -31,7 +31,6 @@ ENV PYTHONDONTWRITEBYTECODE=1 ENV PYTHONPATH=/gunicorn-src ENV GUNICORN_BIND=0.0.0.0:8000 ENV GUNICORN_WORKERS=4 -ENV GUNICORN_THREADS=4 ENV DIRTY_WORKERS=9 ENV DIRTY_TIMEOUT=300 ENV LOG_LEVEL=info diff --git a/examples/celery_alternative/README.md b/examples/celery_alternative/README.md index 50ac13de..4aa0860c 100644 --- a/examples/celery_alternative/README.md +++ b/examples/celery_alternative/README.md @@ -1,16 +1,73 @@ # Celery Alternative Example -This example demonstrates how to replace Celery with Gunicorn's **dirty arbiters** for background task processing. +This example demonstrates how to replace Celery with Gunicorn's **dirty arbiters** for background task processing, using **async ASGI** for non-blocking HTTP handling. -## Why Replace Celery? +## Why Use This Instead of Celery? -| Aspect | Celery | Dirty Arbiters | -|--------|--------|----------------| -| Dependencies | Redis/RabbitMQ + Celery | None (built into Gunicorn) | -| Deployment | Multiple processes/containers | Single process | -| State | Stateless workers | Stateful workers (keep models loaded) | -| Progress | Polling or WebSocket | Native streaming | -| Configuration | Separate config | Same gunicorn.conf.py | +### The Problem with Celery + +Celery requires: +- An external message broker (Redis or RabbitMQ) +- Separate worker processes (`celery -A app worker`) +- Stateless workers that reload models/connections on every task +- Polling or WebSockets for progress updates + +### What Dirty Arbiters Provide + +| Feature | Celery | Dirty Arbiters | +|---------|--------|----------------| +| **External broker** | Required (Redis/RabbitMQ) | None - uses Unix sockets | +| **Deployment** | Multiple processes | Single `gunicorn` command | +| **Worker state** | Stateless | Stateful - keep ML models, DB connections loaded | +| **Progress updates** | Polling or WebSocket | Native streaming | +| **HTTP blocking** | N/A (separate process) | Non-blocking with async ASGI | + +### When to Use Dirty Arbiters + +**Good fit:** +- Tasks that benefit from keeping state (ML models, DB connection pools, caches) +- Tasks where you want immediate results (not fire-and-forget) +- Real-time progress streaming +- Simpler deployment without external dependencies + +**Not ideal for:** +- True fire-and-forget queuing with persistence +- Distributed task execution across multiple machines +- Tasks that must survive server restarts + +## How It Works + +``` +┌─────────────────────────────────────────────────────────────┐ +│ Gunicorn Master │ +├─────────────────────────────────────────────────────────────┤ +│ │ +│ ┌─────────────────────────────────────────────────────┐ │ +│ │ ASGI Workers (uvloop) │ │ +│ │ Non-blocking! One worker handles many requests │ │ +│ │ await client.execute_async() doesn't block │ │ +│ └──────────────────────────┬──────────────────────────┘ │ +│ │ │ +│ Unix Socket IPC │ +│ │ │ +│ ┌──────────────────────────┼──────────────────────────┐ │ +│ │ Dirty Workers (Stateful) │ │ +│ │ │ │ +│ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │ +│ │ │EmailWorker │ │ImageWorker │ │DataWorker │ ... │ │ +│ │ │ (2 procs) │ │ (2 procs) │ │ (4 procs) │ │ │ +│ │ │ │ │ │ │ │ │ │ +│ │ │ SMTP conn │ │ PIL loaded │ │ DB pool │ │ │ +│ │ │ kept alive │ │ in memory │ │ cached │ │ │ +│ │ └────────────┘ └────────────┘ └────────────┘ │ │ +│ │ │ │ +│ │ Dirty Arbiter │ │ +│ └──────────────────────────────────────────────────────┘ │ +│ │ +└─────────────────────────────────────────────────────────────┘ +``` + +**Key insight:** The HTTP workers use async I/O, so `await client.execute_async()` doesn't block the event loop. One ASGI worker can handle thousands of concurrent requests while waiting for dirty workers to complete tasks. ## Quick Start @@ -18,7 +75,7 @@ This example demonstrates how to replace Celery with Gunicorn's **dirty arbiters ```bash # Install dependencies -pip install flask requests pytest +pip install fastapi uvloop httpx pytest pytest-asyncio pip install -e ../.. # Install gunicorn from source # Run the application @@ -29,6 +86,9 @@ curl http://localhost:8000/health curl -X POST http://localhost:8000/api/email/send \ -H "Content-Type: application/json" \ -d '{"to": "test@example.com", "subject": "Hello", "body": "World"}' + +# Interactive API docs +open http://localhost:8000/docs ``` ### Docker @@ -41,104 +101,56 @@ docker compose up --build docker compose --profile test up --build --abort-on-container-exit ``` -## Architecture - -``` -┌─────────────────────────────────────────────────────────────┐ -│ Gunicorn Master │ -├─────────────────────────────────────────────────────────────┤ -│ │ -│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ -│ │ HTTP Worker │ │ HTTP Worker │ │ HTTP Worker │ ... │ -│ │ (gthread) │ │ (gthread) │ │ (gthread) │ │ -│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │ -│ │ │ │ │ -│ └────────────────┼────────────────┘ │ -│ │ │ -│ Unix Socket IPC │ -│ │ │ -│ ┌────────────────┼────────────────┐ │ -│ │ │ │ │ -│ ┌──────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐ │ -│ │ EmailWorker │ │ ImageWorker │ │ DataWorker │ ... │ -│ │ (2 procs) │ │ (2 procs) │ │ (4 procs) │ │ -│ └─────────────┘ └─────────────┘ └─────────────┘ │ -│ │ -│ Dirty Arbiter │ -└─────────────────────────────────────────────────────────────┘ -``` - ## Task Workers -### EmailWorker +Each worker class maintains state across requests: + +### EmailWorker (2 workers) +- Keeps SMTP connection alive - `send_email(to, subject, body)` - Send single email -- `send_bulk_emails(recipients, subject, body)` - Bulk send with progress streaming -- `stats()` - Worker statistics +- `send_bulk_emails(recipients, subject, body)` - Bulk send with streaming progress -### ImageWorker +### ImageWorker (2 workers) +- Keeps PIL/image libraries loaded - `resize(image_data, width, height)` - Resize image -- `generate_thumbnail(image_data, size)` - Generate thumbnail -- `process_batch(images, operation, **params)` - Batch process with streaming +- `process_batch(images, operation)` - Batch process with streaming -### DataWorker -- `aggregate(data, group_by, agg_field, agg_func)` - Aggregate data -- `etl_pipeline(source_data, transformations)` - ETL with progress streaming -- `cached_query(query_key, ttl)` - Cached query execution +### DataWorker (4 workers) +- Maintains DB connection pool and query cache +- `aggregate(data, group_by, agg_field)` - Aggregate data +- `etl_pipeline(source_data, transformations)` - ETL with streaming progress +- `cached_query(query_key, ttl)` - Query with in-memory caching -### ScheduledWorker -- `cleanup_old_files(directory, max_age_days)` - File cleanup -- `generate_daily_report()` - Daily report generation -- `sync_external_data(source)` - External data sync - -## API Endpoints - -### Email -- `POST /api/email/send` - Send single email -- `POST /api/email/send-bulk` - Bulk send (SSE streaming) -- `GET /api/email/stats` - Worker stats - -### Image -- `POST /api/image/resize` - Resize image -- `POST /api/image/thumbnail` - Generate thumbnail -- `POST /api/image/process-batch` - Batch process (SSE streaming) -- `GET /api/image/stats` - Worker stats - -### Data -- `POST /api/data/aggregate` - Aggregate data -- `POST /api/data/etl` - ETL pipeline (SSE streaming) -- `POST /api/data/query` - Cached query -- `GET /api/data/stats` - Worker stats - -### Scheduled -- `POST /api/scheduled/cleanup` - Run cleanup -- `POST /api/scheduled/daily-report` - Generate report -- `POST /api/scheduled/sync` - Sync data -- `GET /api/scheduled/stats` - Worker stats +### ScheduledWorker (1 worker) +- For periodic tasks (call from cron) +- `cleanup_old_files(directory, max_age_days)` +- `generate_daily_report()` ## Streaming Progress Example +Real-time progress without polling: + ```python -import requests +import httpx import json -# Start bulk email with streaming progress -resp = requests.post( - "http://localhost:8000/api/email/send-bulk", - json={ - "recipients": ["a@x.com", "b@x.com", "c@x.com"], - "subject": "Newsletter", - "body": "Hello!", - }, - stream=True, -) - -for line in resp.iter_lines(): - if line and line.startswith(b"data: "): - progress = json.loads(line[6:]) - if progress["type"] == "progress": - print(f"Progress: {progress['percent']}%") - elif progress["type"] == "complete": - print(f"Done! Sent: {progress['sent']}, Failed: {progress['failed']}") +async with httpx.AsyncClient() as client: + async with client.stream( + "POST", + "http://localhost:8000/api/email/send-bulk", + json={ + "recipients": ["a@x.com", "b@x.com", "c@x.com"], + "subject": "Newsletter", + "body": "Hello!", + }, + ) as response: + async for line in response.aiter_lines(): + if line.startswith("data: "): + progress = json.loads(line[6:]) + if progress["type"] == "progress": + print(f"Progress: {progress['percent']}%") + elif progress["type"] == "complete": + print(f"Done! Sent: {progress['sent']}") ``` ## Celery Migration Guide @@ -153,23 +165,25 @@ app = Celery('tasks', broker='redis://localhost') @app.task def send_email(to, subject, body): - # Send email + smtp = smtplib.SMTP(...) # New connection every task! + smtp.send(...) return {"status": "sent"} @app.task(bind=True) def send_bulk(self, recipients, subject, body): for i, to in enumerate(recipients): send_email(to, subject, body) - self.update_state(state='PROGRESS', meta={'current': i}) + self.update_state(state='PROGRESS', meta={'current': i}) # Requires polling! ``` ```python -# views.py -from tasks import send_email, send_bulk +# views.py - Flask +from tasks import send_email -def send_view(request): - send_email.delay(to, subject, body) # Async - return {"status": "queued"} +@app.route('/send') +def send_view(): + send_email.delay(to, subject, body) # Fire and forget + return {"status": "queued"} # Can't get result without polling ``` ### After (Dirty Arbiters) @@ -179,15 +193,16 @@ def send_view(request): from gunicorn.dirty.app import DirtyApp class EmailWorker(DirtyApp): - workers = 2 # Limit workers + workers = 2 def init(self): - self.smtp = connect_smtp() # Stateful! + self.smtp = smtplib.SMTP(...) # Connected once, reused! def __call__(self, action, *args, **kwargs): return getattr(self, action)(*args, **kwargs) def send_email(self, to, subject, body): + self.smtp.send(...) # Reuses connection return {"status": "sent"} def send_bulk(self, recipients, subject, body): @@ -197,13 +212,15 @@ class EmailWorker(DirtyApp): ``` ```python -# views.py -from gunicorn.dirty import get_dirty_client +# views.py - FastAPI (async) +from gunicorn.dirty import get_dirty_client_async -def send_view(request): - client = get_dirty_client() - result = client.execute("tasks:EmailWorker", "send_email", to, subject, body) - return result # Sync result, no polling! +@app.post('/send') +async def send_view(data: EmailRequest): + client = await get_dirty_client_async() + # Non-blocking! Other requests handled while waiting + result = await client.execute_async("tasks:EmailWorker", "send_email", ...) + return result # Immediate result, no polling! ``` ## Configuration @@ -211,12 +228,12 @@ def send_view(request): ```python # gunicorn_conf.py -# HTTP workers +# ASGI workers for non-blocking HTTP +worker_class = "asgi" +asgi_loop = "uvloop" workers = 4 -worker_class = "gthread" -threads = 4 -# Task workers (replace Celery) +# Dirty workers (replace Celery) dirty_apps = [ "tasks:EmailWorker", "tasks:ImageWorker", @@ -238,3 +255,19 @@ APP_URL=http://localhost:8000 pytest tests/test_integration.py -v # All tests via Docker docker compose --profile test up --build --abort-on-container-exit ``` + +## API Endpoints + +Visit `/docs` for interactive Swagger documentation. + +| Endpoint | Method | Description | +|----------|--------|-------------| +| `/api/email/send` | POST | Send single email | +| `/api/email/send-bulk` | POST | Bulk send (SSE streaming) | +| `/api/image/resize` | POST | Resize image | +| `/api/image/process-batch` | POST | Batch process (SSE streaming) | +| `/api/data/aggregate` | POST | Aggregate data | +| `/api/data/etl` | POST | ETL pipeline (SSE streaming) | +| `/api/data/query` | POST | Cached query | +| `/api/scheduled/*` | POST | Scheduled tasks | +| `/health` | GET | Health check | diff --git a/examples/celery_alternative/app.py b/examples/celery_alternative/app.py index c190c88f..737e4d18 100644 --- a/examples/celery_alternative/app.py +++ b/examples/celery_alternative/app.py @@ -1,22 +1,28 @@ """ -Web Application - Flask app demonstrating Celery replacement. +Web Application - FastAPI app demonstrating Celery replacement. -This shows how to call dirty arbiter tasks from your web application, -replacing Celery's task.delay() and task.apply_async() patterns. +This shows how to call dirty arbiter tasks from your web application +using the async API, which doesn't block the event loop. + +Key difference from sync (Flask/gthread): +- `await client.execute_async()` is non-blocking +- A single worker can handle many concurrent requests +- True async I/O - other requests proceed while waiting for task results """ import json -import os -from flask import Flask, request, jsonify, Response, stream_with_context +from contextlib import asynccontextmanager -from gunicorn.dirty import get_dirty_client +from fastapi import FastAPI, HTTPException +from fastapi.responses import StreamingResponse +from pydantic import BaseModel + +from gunicorn.dirty import get_dirty_client_async from gunicorn.dirty.errors import ( DirtyError, DirtyTimeoutError, - DirtyAppNotFoundError, ) -app = Flask(__name__) # Task worker import paths (like Celery task names) EMAIL_WORKER = "examples.celery_alternative.tasks:EmailWorker" @@ -25,48 +31,115 @@ DATA_WORKER = "examples.celery_alternative.tasks:DataWorker" SCHEDULED_WORKER = "examples.celery_alternative.tasks:ScheduledWorker" -def get_client(): - """Get the dirty client for calling task workers.""" - return get_dirty_client() +@asynccontextmanager +async def lifespan(app: FastAPI): + """Application lifespan - startup and shutdown.""" + yield + + +app = FastAPI( + title="Celery Replacement Demo", + description="Demonstrating Gunicorn dirty arbiters as Celery replacement with async ASGI", + lifespan=lifespan, +) + + +# ============================================================================ +# Request/Response Models +# ============================================================================ + +class EmailRequest(BaseModel): + to: str + subject: str + body: str + html: bool = False + + +class BulkEmailRequest(BaseModel): + recipients: list[str] + subject: str + body: str + + +class ImageResizeRequest(BaseModel): + image_data: str = "" + width: int = 800 + height: int = 600 + + +class ThumbnailRequest(BaseModel): + image_data: str = "" + size: int = 150 + + +class ImageBatchRequest(BaseModel): + images: list[dict] + operation: str = "resize" + width: int = 800 + height: int = 600 + size: int = 150 + + +class AggregateRequest(BaseModel): + data: list[dict] + group_by: str + agg_field: str + agg_func: str = "sum" + + +class ETLRequest(BaseModel): + source_data: list[dict] + transformations: list[dict] = [] + + +class QueryRequest(BaseModel): + query_key: str + ttl: int = 300 + + +class CleanupRequest(BaseModel): + directory: str = "/tmp" + max_age_days: int = 7 + + +class SyncRequest(BaseModel): + source: str = "default" # ============================================================================ # Email Tasks - Like Celery email tasks # ============================================================================ -@app.route("/api/email/send", methods=["POST"]) -def send_email(): +@app.post("/api/email/send") +async def send_email(data: EmailRequest): """ Send a single email. Celery equivalent: send_email.delay(to, subject, body) - Request: - POST /api/email/send - {"to": "user@example.com", "subject": "Hello", "body": "World"} + With async dirty client, this doesn't block the event loop! + Other requests can be handled while waiting for the task. """ - data = request.get_json() - try: - client = get_client() - result = client.execute( + client = await get_dirty_client_async() + result = await client.execute_async( EMAIL_WORKER, "send_email", - to=data["to"], - subject=data["subject"], - body=data["body"], - html=data.get("html", False), + to=data.to, + subject=data.subject, + body=data.body, + html=data.html, ) - return jsonify(result) + return result except DirtyTimeoutError: - return jsonify({"error": "Task timed out"}), 504 + raise HTTPException(status_code=504, detail="Task timed out") except DirtyError as e: - return jsonify({"error": str(e)}), 500 + raise HTTPException(status_code=500, detail=str(e)) -@app.route("/api/email/send-bulk", methods=["POST"]) -def send_bulk_emails(): +@app.post("/api/email/send-bulk") +async def send_bulk_emails(data: BulkEmailRequest): """ Send bulk emails with streaming progress. @@ -76,30 +149,24 @@ def send_bulk_emails(): print(result.info) # Progress polling With dirty arbiters, progress is streamed in real-time! - - Request: - POST /api/email/send-bulk - {"recipients": ["a@x.com", "b@x.com"], "subject": "Hi", "body": "Hello"} """ - data = request.get_json() - - def generate(): + async def generate(): try: - client = get_client() - for progress in client.stream( + client = await get_dirty_client_async() + async for progress in client.stream_async( EMAIL_WORKER, "send_bulk_emails", - recipients=data["recipients"], - subject=data["subject"], - body=data["body"], + recipients=data.recipients, + subject=data.subject, + body=data.body, ): yield f"data: {json.dumps(progress)}\n\n" except DirtyError as e: yield f"data: {json.dumps({'error': str(e)})}\n\n" - return Response( - stream_with_context(generate()), - mimetype="text/event-stream", + return StreamingResponse( + generate(), + media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "X-Accel-Buffering": "no", @@ -107,315 +174,246 @@ def send_bulk_emails(): ) -@app.route("/api/email/stats") -def email_stats(): +@app.get("/api/email/stats") +async def email_stats(): """Get email worker statistics.""" try: - client = get_client() - result = client.execute(EMAIL_WORKER, "stats") - return jsonify(result) + client = await get_dirty_client_async() + result = await client.execute_async(EMAIL_WORKER, "stats") + return result except DirtyError as e: - return jsonify({"error": str(e)}), 500 + raise HTTPException(status_code=500, detail=str(e)) # ============================================================================ # Image Tasks - Like Celery image processing tasks # ============================================================================ -@app.route("/api/image/resize", methods=["POST"]) -def resize_image(): +@app.post("/api/image/resize") +async def resize_image(data: ImageResizeRequest): """ Resize an image. Celery equivalent: resize_image.delay(image_data, width, height) - - Request: - POST /api/image/resize - {"image_data": "base64...", "width": 800, "height": 600} """ - data = request.get_json() - - # Keep image_data as string (base64 encoded) for JSON serialization - image_data = data.get("image_data", "") - try: - client = get_client() - result = client.execute( + client = await get_dirty_client_async() + result = await client.execute_async( IMAGE_WORKER, "resize", - image_data=image_data, - width=data.get("width", 800), - height=data.get("height", 600), + image_data=data.image_data, + width=data.width, + height=data.height, ) - return jsonify(result) + return result except DirtyError as e: - return jsonify({"error": str(e)}), 500 + raise HTTPException(status_code=500, detail=str(e)) -@app.route("/api/image/thumbnail", methods=["POST"]) -def generate_thumbnail(): +@app.post("/api/image/thumbnail") +async def generate_thumbnail(data: ThumbnailRequest): """Generate a thumbnail.""" - data = request.get_json() - image_data = data.get("image_data", "") - try: - client = get_client() - result = client.execute( + client = await get_dirty_client_async() + result = await client.execute_async( IMAGE_WORKER, "generate_thumbnail", - image_data=image_data, - size=data.get("size", 150), + image_data=data.image_data, + size=data.size, ) - return jsonify(result) + return result except DirtyError as e: - return jsonify({"error": str(e)}), 500 + raise HTTPException(status_code=500, detail=str(e)) -@app.route("/api/image/process-batch", methods=["POST"]) -def process_image_batch(): +@app.post("/api/image/process-batch") +async def process_image_batch(data: ImageBatchRequest): """ Process multiple images with progress streaming. - - Request: - POST /api/image/process-batch - { - "images": [{"id": "img1", "data": "..."}, ...], - "operation": "resize", - "width": 800, - "height": 600 - } """ - data = request.get_json() - - def generate(): + async def generate(): try: - client = get_client() - for progress in client.stream( + client = await get_dirty_client_async() + async for progress in client.stream_async( IMAGE_WORKER, "process_batch", - images=data["images"], - operation=data.get("operation", "resize"), - width=data.get("width", 800), - height=data.get("height", 600), - size=data.get("size", 150), + images=data.images, + operation=data.operation, + width=data.width, + height=data.height, + size=data.size, ): yield f"data: {json.dumps(progress)}\n\n" except DirtyError as e: yield f"data: {json.dumps({'error': str(e)})}\n\n" - return Response( - stream_with_context(generate()), - mimetype="text/event-stream", + return StreamingResponse( + generate(), + media_type="text/event-stream", ) -@app.route("/api/image/stats") -def image_stats(): +@app.get("/api/image/stats") +async def image_stats(): """Get image worker statistics.""" try: - client = get_client() - result = client.execute(IMAGE_WORKER, "stats") - return jsonify(result) + client = await get_dirty_client_async() + result = await client.execute_async(IMAGE_WORKER, "stats") + return result except DirtyError as e: - return jsonify({"error": str(e)}), 500 + raise HTTPException(status_code=500, detail=str(e)) # ============================================================================ # Data Tasks - Like Celery data processing tasks # ============================================================================ -@app.route("/api/data/aggregate", methods=["POST"]) -def aggregate_data(): +@app.post("/api/data/aggregate") +async def aggregate_data(data: AggregateRequest): """ Aggregate data. Celery equivalent: aggregate_data.delay(data, group_by, agg_field, agg_func) - - Request: - POST /api/data/aggregate - { - "data": [{"category": "A", "value": 10}, ...], - "group_by": "category", - "agg_field": "value", - "agg_func": "sum" - } """ - data = request.get_json() - try: - client = get_client() - result = client.execute( + client = await get_dirty_client_async() + result = await client.execute_async( DATA_WORKER, "aggregate", - data=data["data"], - group_by=data["group_by"], - agg_field=data["agg_field"], - agg_func=data.get("agg_func", "sum"), + data=data.data, + group_by=data.group_by, + agg_field=data.agg_field, + agg_func=data.agg_func, ) - return jsonify(result) + return result except DirtyError as e: - return jsonify({"error": str(e)}), 500 + raise HTTPException(status_code=500, detail=str(e)) -@app.route("/api/data/etl", methods=["POST"]) -def run_etl(): +@app.post("/api/data/etl") +async def run_etl(data: ETLRequest): """ Run ETL pipeline with streaming progress. Celery equivalent: chain(extract.s(), transform.s(), load.s()).apply_async() - - Request: - POST /api/data/etl - { - "source_data": [...], - "transformations": [ - {"name": "filter_active", "type": "filter", "field": "status", "value": "active"}, - {"name": "uppercase_name", "type": "map", "field": "name", "func": "upper"} - ] - } """ - data = request.get_json() - - def generate(): + async def generate(): try: - client = get_client() - for progress in client.stream( + client = await get_dirty_client_async() + async for progress in client.stream_async( DATA_WORKER, "etl_pipeline", - source_data=data["source_data"], - transformations=data.get("transformations", []), + source_data=data.source_data, + transformations=data.transformations, ): yield f"data: {json.dumps(progress)}\n\n" except DirtyError as e: yield f"data: {json.dumps({'error': str(e)})}\n\n" - return Response( - stream_with_context(generate()), - mimetype="text/event-stream", + return StreamingResponse( + generate(), + media_type="text/event-stream", ) -@app.route("/api/data/query", methods=["POST"]) -def cached_query(): - """ - Execute a cached query. - - Request: - POST /api/data/query - {"query_key": "sales_2024", "ttl": 300} - """ - data = request.get_json() - +@app.post("/api/data/query") +async def cached_query(data: QueryRequest): + """Execute a cached query.""" try: - client = get_client() - result = client.execute( + client = await get_dirty_client_async() + result = await client.execute_async( DATA_WORKER, "cached_query", - query_key=data["query_key"], - ttl=data.get("ttl", 300), + query_key=data.query_key, + ttl=data.ttl, ) - return jsonify(result) + return result except DirtyError as e: - return jsonify({"error": str(e)}), 500 + raise HTTPException(status_code=500, detail=str(e)) -@app.route("/api/data/stats") -def data_stats(): +@app.get("/api/data/stats") +async def data_stats(): """Get data worker statistics.""" try: - client = get_client() - result = client.execute(DATA_WORKER, "stats") - return jsonify(result) + client = await get_dirty_client_async() + result = await client.execute_async(DATA_WORKER, "stats") + return result except DirtyError as e: - return jsonify({"error": str(e)}), 500 + raise HTTPException(status_code=500, detail=str(e)) # ============================================================================ # Scheduled Tasks - Like Celery Beat tasks # ============================================================================ -@app.route("/api/scheduled/cleanup", methods=["POST"]) -def run_cleanup(): - """ - Run cleanup task (normally triggered by cron). - - Request: - POST /api/scheduled/cleanup - {"directory": "/tmp/uploads", "max_age_days": 7} - """ - data = request.get_json() or {} - +@app.post("/api/scheduled/cleanup") +async def run_cleanup(data: CleanupRequest = CleanupRequest()): + """Run cleanup task (normally triggered by cron).""" try: - client = get_client() - result = client.execute( + client = await get_dirty_client_async() + result = await client.execute_async( SCHEDULED_WORKER, "cleanup_old_files", - directory=data.get("directory", "/tmp"), - max_age_days=data.get("max_age_days", 7), + directory=data.directory, + max_age_days=data.max_age_days, ) - return jsonify(result) + return result except DirtyError as e: - return jsonify({"error": str(e)}), 500 + raise HTTPException(status_code=500, detail=str(e)) -@app.route("/api/scheduled/daily-report", methods=["POST"]) -def run_daily_report(): +@app.post("/api/scheduled/daily-report") +async def run_daily_report(): """Generate daily report.""" try: - client = get_client() - result = client.execute(SCHEDULED_WORKER, "generate_daily_report") - return jsonify(result) + client = await get_dirty_client_async() + result = await client.execute_async(SCHEDULED_WORKER, "generate_daily_report") + return result except DirtyError as e: - return jsonify({"error": str(e)}), 500 + raise HTTPException(status_code=500, detail=str(e)) -@app.route("/api/scheduled/sync", methods=["POST"]) -def run_sync(): - """ - Sync external data. - - Request: - POST /api/scheduled/sync - {"source": "external_api"} - """ - data = request.get_json() or {} - +@app.post("/api/scheduled/sync") +async def run_sync(data: SyncRequest = SyncRequest()): + """Sync external data.""" try: - client = get_client() - result = client.execute( + client = await get_dirty_client_async() + result = await client.execute_async( SCHEDULED_WORKER, "sync_external_data", - source=data.get("source", "default"), + source=data.source, ) - return jsonify(result) + return result except DirtyError as e: - return jsonify({"error": str(e)}), 500 + raise HTTPException(status_code=500, detail=str(e)) -@app.route("/api/scheduled/stats") -def scheduled_stats(): +@app.get("/api/scheduled/stats") +async def scheduled_stats(): """Get scheduled worker statistics.""" try: - client = get_client() - result = client.execute(SCHEDULED_WORKER, "stats") - return jsonify(result) + client = await get_dirty_client_async() + result = await client.execute_async(SCHEDULED_WORKER, "stats") + return result except DirtyError as e: - return jsonify({"error": str(e)}), 500 + raise HTTPException(status_code=500, detail=str(e)) # ============================================================================ # Health & Info Endpoints # ============================================================================ -@app.route("/") -def index(): +@app.get("/") +async def index(): """API documentation.""" - return jsonify({ + return { "name": "Celery Replacement Demo", - "description": "Demonstrating Gunicorn dirty arbiters as Celery replacement", + "description": "Demonstrating Gunicorn dirty arbiters as Celery replacement (async ASGI)", + "docs": "/docs", "endpoints": { "email": { "POST /api/email/send": "Send single email", @@ -441,20 +439,19 @@ def index(): "GET /api/scheduled/stats": "Scheduled worker stats", }, }, - }) + } -@app.route("/health") -def health(): +@app.get("/health") +async def health(): """Health check endpoint.""" try: - client = get_client() + client = await get_dirty_client_async() # Quick ping to verify workers are running - client.execute(EMAIL_WORKER, "stats") - return jsonify({"status": "healthy", "workers": "connected"}) + await client.execute_async(EMAIL_WORKER, "stats") + return {"status": "healthy", "workers": "connected"} except DirtyError: - return jsonify({"status": "degraded", "workers": "unavailable"}), 503 - - -if __name__ == "__main__": - app.run(debug=True, port=8000) + raise HTTPException( + status_code=503, + detail={"status": "degraded", "workers": "unavailable"} + ) diff --git a/examples/celery_alternative/docker-compose.yml b/examples/celery_alternative/docker-compose.yml index fda0a7a9..ebf8d303 100644 --- a/examples/celery_alternative/docker-compose.yml +++ b/examples/celery_alternative/docker-compose.yml @@ -18,7 +18,6 @@ services: - "8000:8000" environment: - GUNICORN_WORKERS=4 - - GUNICORN_THREADS=4 - DIRTY_WORKERS=9 - DIRTY_TIMEOUT=300 - LOG_LEVEL=info diff --git a/examples/celery_alternative/gunicorn_conf.py b/examples/celery_alternative/gunicorn_conf.py index 26bea423..925bf496 100644 --- a/examples/celery_alternative/gunicorn_conf.py +++ b/examples/celery_alternative/gunicorn_conf.py @@ -2,12 +2,16 @@ Gunicorn Configuration - Celery Replacement Example This configuration sets up: -1. HTTP workers (gthread) to handle web requests +1. ASGI workers to handle web requests with async I/O (using uvloop) 2. Dirty workers to handle background tasks (replacing Celery workers) +Why ASGI + Dirty Arbiters? +- ASGI: Non-blocking HTTP handling - one worker handles many concurrent requests +- Dirty: Stateful background workers - keep models/connections loaded in memory + Comparison with Celery deployment: -- Celery: gunicorn app:app + celery -A tasks worker -- Dirty: gunicorn -c gunicorn_conf.py app:app (single command!) +- Celery: gunicorn app:app + celery -A tasks worker + redis-server +- Dirty: gunicorn -c gunicorn_conf.py app:app (single command, no broker!) """ import multiprocessing @@ -21,14 +25,18 @@ import os bind = os.environ.get("GUNICORN_BIND", "0.0.0.0:8000") # HTTP workers - handle incoming web requests -# Rule of thumb: 2-4 x CPU cores for I/O bound apps -workers = int(os.environ.get("GUNICORN_WORKERS", multiprocessing.cpu_count() * 2 + 1)) +# With ASGI, fewer workers needed since each handles many concurrent requests +workers = int(os.environ.get("GUNICORN_WORKERS", min(4, multiprocessing.cpu_count() + 1))) -# Use gthread worker for better concurrency -worker_class = "gthread" +# Use gunicorn's native ASGI worker for async support +# This enables: await client.execute_async() without blocking +worker_class = "asgi" -# Threads per worker - good for I/O bound operations -threads = int(os.environ.get("GUNICORN_THREADS", 4)) +# Use uvloop for better async performance +asgi_loop = "uvloop" + +# Maximum concurrent connections per worker +worker_connections = 1000 # ============================================================================= # Dirty Arbiter Settings (Celery Worker Replacement) @@ -96,6 +104,7 @@ def on_starting(server): """Called just before the master process is initialized.""" print("=" * 60) print("Starting Gunicorn with Dirty Arbiters (Celery Replacement)") + print("Using ASGI workers with uvloop for non-blocking HTTP handling") print("=" * 60) diff --git a/examples/celery_alternative/requirements.txt b/examples/celery_alternative/requirements.txt index f49876bb..8040a1e0 100644 --- a/examples/celery_alternative/requirements.txt +++ b/examples/celery_alternative/requirements.txt @@ -1,4 +1,6 @@ # Celery Replacement Example Dependencies -flask>=3.0.0 -requests>=2.31.0 +fastapi>=0.109.0 +uvloop>=0.19.0 +httpx>=0.26.0 pytest>=8.0.0 +pytest-asyncio>=0.23.0 diff --git a/examples/celery_alternative/tests/test_integration.py b/examples/celery_alternative/tests/test_integration.py index bc1ed3ce..e4fd1b26 100644 --- a/examples/celery_alternative/tests/test_integration.py +++ b/examples/celery_alternative/tests/test_integration.py @@ -23,6 +23,24 @@ import requests APP_URL = os.environ.get("APP_URL", "http://localhost:8000") +def read_sse_events(response, max_events=100): + """ + Read SSE events from a streaming response. + + Stops when receiving a 'complete' or 'error' event, or max_events reached. + """ + events = [] + for line in response.iter_lines(decode_unicode=True): + if line.startswith("data: "): + data = json.loads(line[6:]) + events.append(data) + if data.get("type") in ("complete", "error"): + break + if len(events) >= max_events: + break + return events + + def wait_for_app(timeout=30): """Wait for the application to be ready.""" start = time.time() @@ -92,13 +110,7 @@ class TestEmailTasks: ) assert resp.status_code == 200 - events = [] - for line in resp.iter_lines(): - if line: - line = line.decode("utf-8") - if line.startswith("data: "): - data = json.loads(line[6:]) - events.append(data) + events = read_sse_events(resp) # Should have progress for each recipient + complete assert len(events) == len(recipients) + 1 @@ -182,12 +194,7 @@ class TestImageTasks: ) assert resp.status_code == 200 - events = [] - for line in resp.iter_lines(): - if line: - line = line.decode("utf-8") - if line.startswith("data: "): - events.append(json.loads(line[6:])) + events = read_sse_events(resp) assert len(events) == len(images) + 1 assert events[-1]["type"] == "complete" @@ -246,12 +253,7 @@ class TestDataTasks: ) assert resp.status_code == 200 - events = [] - for line in resp.iter_lines(): - if line: - line = line.decode("utf-8") - if line.startswith("data: "): - events.append(json.loads(line[6:])) + events = read_sse_events(resp) # extract + transform + load + complete assert len(events) == 4 @@ -278,13 +280,20 @@ class TestDataTasks: assert resp1.status_code == 200 assert resp1.json()["status"] == "cache_miss" - # Second call - cache hit - resp2 = requests.post( - f"{APP_URL}/api/data/query", - json={"query_key": query_key, "ttl": 300}, - ) - assert resp2.status_code == 200 - assert resp2.json()["status"] == "cache_hit" + # Second call - may be cache hit or miss depending on which worker handles it + # (cache is per-worker, not shared) + # Retry a few times to likely hit the same worker + cache_hit = False + for _ in range(5): + resp2 = requests.post( + f"{APP_URL}/api/data/query", + json={"query_key": query_key, "ttl": 300}, + ) + assert resp2.status_code == 200 + if resp2.json()["status"] == "cache_hit": + cache_hit = True + break + assert cache_hit, "Expected cache_hit after multiple requests to same key" def test_data_stats(self): """Test data worker statistics.""" @@ -418,5 +427,5 @@ class TestErrorHandling: f"{APP_URL}/api/email/send", json={}, # Missing required fields ) - # Should get a 500 or validation error - assert resp.status_code in [400, 500] + # Should get a validation error (FastAPI returns 422) + assert resp.status_code == 422 diff --git a/gunicorn/asgi/protocol.py b/gunicorn/asgi/protocol.py index f962d958..d10a4220 100644 --- a/gunicorn/asgi/protocol.py +++ b/gunicorn/asgi/protocol.py @@ -207,6 +207,7 @@ class ASGIProtocol(asyncio.Protocol): response_started = False response_complete = False exc_to_raise = None + use_chunked = False # Response tracking for access logging response_status = 500 @@ -232,7 +233,7 @@ class ASGIProtocol(asyncio.Protocol): async def send(message): nonlocal response_started, response_complete, exc_to_raise - nonlocal response_status, response_headers, response_sent + nonlocal response_status, response_headers, response_sent, use_chunked msg_type = message["type"] @@ -250,6 +251,19 @@ class ASGIProtocol(asyncio.Protocol): response_started = True response_status = message["status"] response_headers = message.get("headers", []) + + # Check if Content-Length is present + has_content_length = any( + (name.lower() if isinstance(name, str) else name.lower()) == b"content-length" + or (name.lower() if isinstance(name, str) else name.lower()) == "content-length" + for name, _ in response_headers + ) + + # Use chunked encoding for HTTP/1.1 streaming responses without Content-Length + if not has_content_length and request.version >= (1, 1): + use_chunked = True + response_headers = list(response_headers) + [(b"transfer-encoding", b"chunked")] + await self._send_response_start(response_status, response_headers, request) elif msg_type == "http.response.body": @@ -264,10 +278,13 @@ class ASGIProtocol(asyncio.Protocol): more_body = message.get("more_body", False) if body: - await self._send_body(body) + await self._send_body(body, chunked=use_chunked) response_sent += len(body) if not more_body: + if use_chunked: + # Send terminal chunk + self.transport.write(b"0\r\n\r\n") response_complete = True # Build environ for logging @@ -476,10 +493,15 @@ class ASGIProtocol(asyncio.Protocol): response = status_line + "".join(header_lines) + "\r\n" self.transport.write(response.encode("latin-1")) - async def _send_body(self, body): + async def _send_body(self, body, chunked=False): """Send response body chunk.""" if body: - self.transport.write(body) + if chunked: + # Chunked encoding: size in hex + CRLF + data + CRLF + chunk = f"{len(body):x}\r\n".encode("latin-1") + body + b"\r\n" + self.transport.write(chunk) + else: + self.transport.write(body) async def _send_error_response(self, status, message): """Send an error response."""