fix(asgi): add chunked transfer encoding for streaming responses

Add chunked transfer encoding support to the ASGI worker for HTTP/1.1
streaming responses that don't have a Content-Length header. This fixes
SSE (Server-Sent Events) connections not closing properly.

Without chunked encoding or Content-Length, HTTP/1.1 clients wait for
the connection to close to determine end-of-response, causing streaming
endpoints to hang.

Also updates the celery_alternative example to use FastAPI with the
native ASGI worker and uvloop, demonstrating async task execution with
proper SSE streaming.
This commit is contained in:
Benoit Chesneau 2026-02-02 10:13:33 +01:00
parent be54850ff8
commit ce352dc230
8 changed files with 465 additions and 395 deletions

View File

@ -23,7 +23,7 @@ RUN pip install --no-cache-dir /gunicorn-src
# Copy example application # Copy example application
COPY examples/celery_alternative /app COPY examples/celery_alternative /app
RUN pip install --no-cache-dir flask RUN pip install --no-cache-dir fastapi uvloop requests pytest
# Environment variables # Environment variables
ENV PYTHONUNBUFFERED=1 ENV PYTHONUNBUFFERED=1
@ -31,7 +31,6 @@ ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONPATH=/gunicorn-src ENV PYTHONPATH=/gunicorn-src
ENV GUNICORN_BIND=0.0.0.0:8000 ENV GUNICORN_BIND=0.0.0.0:8000
ENV GUNICORN_WORKERS=4 ENV GUNICORN_WORKERS=4
ENV GUNICORN_THREADS=4
ENV DIRTY_WORKERS=9 ENV DIRTY_WORKERS=9
ENV DIRTY_TIMEOUT=300 ENV DIRTY_TIMEOUT=300
ENV LOG_LEVEL=info ENV LOG_LEVEL=info

View File

@ -1,16 +1,73 @@
# Celery Alternative Example # Celery Alternative Example
This example demonstrates how to replace Celery with Gunicorn's **dirty arbiters** for background task processing. This example demonstrates how to replace Celery with Gunicorn's **dirty arbiters** for background task processing, using **async ASGI** for non-blocking HTTP handling.
## Why Replace Celery? ## Why Use This Instead of Celery?
| Aspect | Celery | Dirty Arbiters | ### The Problem with Celery
|--------|--------|----------------|
| Dependencies | Redis/RabbitMQ + Celery | None (built into Gunicorn) | Celery requires:
| Deployment | Multiple processes/containers | Single process | - An external message broker (Redis or RabbitMQ)
| State | Stateless workers | Stateful workers (keep models loaded) | - Separate worker processes (`celery -A app worker`)
| Progress | Polling or WebSocket | Native streaming | - Stateless workers that reload models/connections on every task
| Configuration | Separate config | Same gunicorn.conf.py | - Polling or WebSockets for progress updates
### What Dirty Arbiters Provide
| Feature | Celery | Dirty Arbiters |
|---------|--------|----------------|
| **External broker** | Required (Redis/RabbitMQ) | None - uses Unix sockets |
| **Deployment** | Multiple processes | Single `gunicorn` command |
| **Worker state** | Stateless | Stateful - keep ML models, DB connections loaded |
| **Progress updates** | Polling or WebSocket | Native streaming |
| **HTTP blocking** | N/A (separate process) | Non-blocking with async ASGI |
### When to Use Dirty Arbiters
**Good fit:**
- Tasks that benefit from keeping state (ML models, DB connection pools, caches)
- Tasks where you want immediate results (not fire-and-forget)
- Real-time progress streaming
- Simpler deployment without external dependencies
**Not ideal for:**
- True fire-and-forget queuing with persistence
- Distributed task execution across multiple machines
- Tasks that must survive server restarts
## How It Works
```
┌─────────────────────────────────────────────────────────────┐
│ Gunicorn Master │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ ASGI Workers (uvloop) │ │
│ │ Non-blocking! One worker handles many requests │ │
│ │ await client.execute_async() doesn't block │ │
│ └──────────────────────────┬──────────────────────────┘ │
│ │ │
│ Unix Socket IPC │
│ │ │
│ ┌──────────────────────────┼──────────────────────────┐ │
│ │ Dirty Workers (Stateful) │ │
│ │ │ │
│ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │
│ │ │EmailWorker │ │ImageWorker │ │DataWorker │ ... │ │
│ │ │ (2 procs) │ │ (2 procs) │ │ (4 procs) │ │ │
│ │ │ │ │ │ │ │ │ │
│ │ │ SMTP conn │ │ PIL loaded │ │ DB pool │ │ │
│ │ │ kept alive │ │ in memory │ │ cached │ │ │
│ │ └────────────┘ └────────────┘ └────────────┘ │ │
│ │ │ │
│ │ Dirty Arbiter │ │
│ └──────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
```
**Key insight:** The HTTP workers use async I/O, so `await client.execute_async()` doesn't block the event loop. One ASGI worker can handle thousands of concurrent requests while waiting for dirty workers to complete tasks.
## Quick Start ## Quick Start
@ -18,7 +75,7 @@ This example demonstrates how to replace Celery with Gunicorn's **dirty arbiters
```bash ```bash
# Install dependencies # Install dependencies
pip install flask requests pytest pip install fastapi uvloop httpx pytest pytest-asyncio
pip install -e ../.. # Install gunicorn from source pip install -e ../.. # Install gunicorn from source
# Run the application # Run the application
@ -29,6 +86,9 @@ curl http://localhost:8000/health
curl -X POST http://localhost:8000/api/email/send \ curl -X POST http://localhost:8000/api/email/send \
-H "Content-Type: application/json" \ -H "Content-Type: application/json" \
-d '{"to": "test@example.com", "subject": "Hello", "body": "World"}' -d '{"to": "test@example.com", "subject": "Hello", "body": "World"}'
# Interactive API docs
open http://localhost:8000/docs
``` ```
### Docker ### Docker
@ -41,104 +101,56 @@ docker compose up --build
docker compose --profile test up --build --abort-on-container-exit docker compose --profile test up --build --abort-on-container-exit
``` ```
## Architecture
```
┌─────────────────────────────────────────────────────────────┐
│ Gunicorn Master │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ HTTP Worker │ │ HTTP Worker │ │ HTTP Worker │ ... │
│ │ (gthread) │ │ (gthread) │ │ (gthread) │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ └────────────────┼────────────────┘ │
│ │ │
│ Unix Socket IPC │
│ │ │
│ ┌────────────────┼────────────────┐ │
│ │ │ │ │
│ ┌──────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐ │
│ │ EmailWorker │ │ ImageWorker │ │ DataWorker │ ... │
│ │ (2 procs) │ │ (2 procs) │ │ (4 procs) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ Dirty Arbiter │
└─────────────────────────────────────────────────────────────┘
```
## Task Workers ## Task Workers
### EmailWorker Each worker class maintains state across requests:
### EmailWorker (2 workers)
- Keeps SMTP connection alive
- `send_email(to, subject, body)` - Send single email - `send_email(to, subject, body)` - Send single email
- `send_bulk_emails(recipients, subject, body)` - Bulk send with progress streaming - `send_bulk_emails(recipients, subject, body)` - Bulk send with streaming progress
- `stats()` - Worker statistics
### ImageWorker ### ImageWorker (2 workers)
- Keeps PIL/image libraries loaded
- `resize(image_data, width, height)` - Resize image - `resize(image_data, width, height)` - Resize image
- `generate_thumbnail(image_data, size)` - Generate thumbnail - `process_batch(images, operation)` - Batch process with streaming
- `process_batch(images, operation, **params)` - Batch process with streaming
### DataWorker ### DataWorker (4 workers)
- `aggregate(data, group_by, agg_field, agg_func)` - Aggregate data - Maintains DB connection pool and query cache
- `etl_pipeline(source_data, transformations)` - ETL with progress streaming - `aggregate(data, group_by, agg_field)` - Aggregate data
- `cached_query(query_key, ttl)` - Cached query execution - `etl_pipeline(source_data, transformations)` - ETL with streaming progress
- `cached_query(query_key, ttl)` - Query with in-memory caching
### ScheduledWorker ### ScheduledWorker (1 worker)
- `cleanup_old_files(directory, max_age_days)` - File cleanup - For periodic tasks (call from cron)
- `generate_daily_report()` - Daily report generation - `cleanup_old_files(directory, max_age_days)`
- `sync_external_data(source)` - External data sync - `generate_daily_report()`
## API Endpoints
### Email
- `POST /api/email/send` - Send single email
- `POST /api/email/send-bulk` - Bulk send (SSE streaming)
- `GET /api/email/stats` - Worker stats
### Image
- `POST /api/image/resize` - Resize image
- `POST /api/image/thumbnail` - Generate thumbnail
- `POST /api/image/process-batch` - Batch process (SSE streaming)
- `GET /api/image/stats` - Worker stats
### Data
- `POST /api/data/aggregate` - Aggregate data
- `POST /api/data/etl` - ETL pipeline (SSE streaming)
- `POST /api/data/query` - Cached query
- `GET /api/data/stats` - Worker stats
### Scheduled
- `POST /api/scheduled/cleanup` - Run cleanup
- `POST /api/scheduled/daily-report` - Generate report
- `POST /api/scheduled/sync` - Sync data
- `GET /api/scheduled/stats` - Worker stats
## Streaming Progress Example ## Streaming Progress Example
Real-time progress without polling:
```python ```python
import requests import httpx
import json import json
# Start bulk email with streaming progress async with httpx.AsyncClient() as client:
resp = requests.post( async with client.stream(
"http://localhost:8000/api/email/send-bulk", "POST",
json={ "http://localhost:8000/api/email/send-bulk",
"recipients": ["a@x.com", "b@x.com", "c@x.com"], json={
"subject": "Newsletter", "recipients": ["a@x.com", "b@x.com", "c@x.com"],
"body": "Hello!", "subject": "Newsletter",
}, "body": "Hello!",
stream=True, },
) ) as response:
async for line in response.aiter_lines():
for line in resp.iter_lines(): if line.startswith("data: "):
if line and line.startswith(b"data: "): progress = json.loads(line[6:])
progress = json.loads(line[6:]) if progress["type"] == "progress":
if progress["type"] == "progress": print(f"Progress: {progress['percent']}%")
print(f"Progress: {progress['percent']}%") elif progress["type"] == "complete":
elif progress["type"] == "complete": print(f"Done! Sent: {progress['sent']}")
print(f"Done! Sent: {progress['sent']}, Failed: {progress['failed']}")
``` ```
## Celery Migration Guide ## Celery Migration Guide
@ -153,23 +165,25 @@ app = Celery('tasks', broker='redis://localhost')
@app.task @app.task
def send_email(to, subject, body): def send_email(to, subject, body):
# Send email smtp = smtplib.SMTP(...) # New connection every task!
smtp.send(...)
return {"status": "sent"} return {"status": "sent"}
@app.task(bind=True) @app.task(bind=True)
def send_bulk(self, recipients, subject, body): def send_bulk(self, recipients, subject, body):
for i, to in enumerate(recipients): for i, to in enumerate(recipients):
send_email(to, subject, body) send_email(to, subject, body)
self.update_state(state='PROGRESS', meta={'current': i}) self.update_state(state='PROGRESS', meta={'current': i}) # Requires polling!
``` ```
```python ```python
# views.py # views.py - Flask
from tasks import send_email, send_bulk from tasks import send_email
def send_view(request): @app.route('/send')
send_email.delay(to, subject, body) # Async def send_view():
return {"status": "queued"} send_email.delay(to, subject, body) # Fire and forget
return {"status": "queued"} # Can't get result without polling
``` ```
### After (Dirty Arbiters) ### After (Dirty Arbiters)
@ -179,15 +193,16 @@ def send_view(request):
from gunicorn.dirty.app import DirtyApp from gunicorn.dirty.app import DirtyApp
class EmailWorker(DirtyApp): class EmailWorker(DirtyApp):
workers = 2 # Limit workers workers = 2
def init(self): def init(self):
self.smtp = connect_smtp() # Stateful! self.smtp = smtplib.SMTP(...) # Connected once, reused!
def __call__(self, action, *args, **kwargs): def __call__(self, action, *args, **kwargs):
return getattr(self, action)(*args, **kwargs) return getattr(self, action)(*args, **kwargs)
def send_email(self, to, subject, body): def send_email(self, to, subject, body):
self.smtp.send(...) # Reuses connection
return {"status": "sent"} return {"status": "sent"}
def send_bulk(self, recipients, subject, body): def send_bulk(self, recipients, subject, body):
@ -197,13 +212,15 @@ class EmailWorker(DirtyApp):
``` ```
```python ```python
# views.py # views.py - FastAPI (async)
from gunicorn.dirty import get_dirty_client from gunicorn.dirty import get_dirty_client_async
def send_view(request): @app.post('/send')
client = get_dirty_client() async def send_view(data: EmailRequest):
result = client.execute("tasks:EmailWorker", "send_email", to, subject, body) client = await get_dirty_client_async()
return result # Sync result, no polling! # Non-blocking! Other requests handled while waiting
result = await client.execute_async("tasks:EmailWorker", "send_email", ...)
return result # Immediate result, no polling!
``` ```
## Configuration ## Configuration
@ -211,12 +228,12 @@ def send_view(request):
```python ```python
# gunicorn_conf.py # gunicorn_conf.py
# HTTP workers # ASGI workers for non-blocking HTTP
worker_class = "asgi"
asgi_loop = "uvloop"
workers = 4 workers = 4
worker_class = "gthread"
threads = 4
# Task workers (replace Celery) # Dirty workers (replace Celery)
dirty_apps = [ dirty_apps = [
"tasks:EmailWorker", "tasks:EmailWorker",
"tasks:ImageWorker", "tasks:ImageWorker",
@ -238,3 +255,19 @@ APP_URL=http://localhost:8000 pytest tests/test_integration.py -v
# All tests via Docker # All tests via Docker
docker compose --profile test up --build --abort-on-container-exit docker compose --profile test up --build --abort-on-container-exit
``` ```
## API Endpoints
Visit `/docs` for interactive Swagger documentation.
| Endpoint | Method | Description |
|----------|--------|-------------|
| `/api/email/send` | POST | Send single email |
| `/api/email/send-bulk` | POST | Bulk send (SSE streaming) |
| `/api/image/resize` | POST | Resize image |
| `/api/image/process-batch` | POST | Batch process (SSE streaming) |
| `/api/data/aggregate` | POST | Aggregate data |
| `/api/data/etl` | POST | ETL pipeline (SSE streaming) |
| `/api/data/query` | POST | Cached query |
| `/api/scheduled/*` | POST | Scheduled tasks |
| `/health` | GET | Health check |

View File

@ -1,22 +1,28 @@
""" """
Web Application - Flask app demonstrating Celery replacement. Web Application - FastAPI app demonstrating Celery replacement.
This shows how to call dirty arbiter tasks from your web application, This shows how to call dirty arbiter tasks from your web application
replacing Celery's task.delay() and task.apply_async() patterns. using the async API, which doesn't block the event loop.
Key difference from sync (Flask/gthread):
- `await client.execute_async()` is non-blocking
- A single worker can handle many concurrent requests
- True async I/O - other requests proceed while waiting for task results
""" """
import json import json
import os from contextlib import asynccontextmanager
from flask import Flask, request, jsonify, Response, stream_with_context
from gunicorn.dirty import get_dirty_client from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from gunicorn.dirty import get_dirty_client_async
from gunicorn.dirty.errors import ( from gunicorn.dirty.errors import (
DirtyError, DirtyError,
DirtyTimeoutError, DirtyTimeoutError,
DirtyAppNotFoundError,
) )
app = Flask(__name__)
# Task worker import paths (like Celery task names) # Task worker import paths (like Celery task names)
EMAIL_WORKER = "examples.celery_alternative.tasks:EmailWorker" EMAIL_WORKER = "examples.celery_alternative.tasks:EmailWorker"
@ -25,48 +31,115 @@ DATA_WORKER = "examples.celery_alternative.tasks:DataWorker"
SCHEDULED_WORKER = "examples.celery_alternative.tasks:ScheduledWorker" SCHEDULED_WORKER = "examples.celery_alternative.tasks:ScheduledWorker"
def get_client(): @asynccontextmanager
"""Get the dirty client for calling task workers.""" async def lifespan(app: FastAPI):
return get_dirty_client() """Application lifespan - startup and shutdown."""
yield
app = FastAPI(
title="Celery Replacement Demo",
description="Demonstrating Gunicorn dirty arbiters as Celery replacement with async ASGI",
lifespan=lifespan,
)
# ============================================================================
# Request/Response Models
# ============================================================================
class EmailRequest(BaseModel):
to: str
subject: str
body: str
html: bool = False
class BulkEmailRequest(BaseModel):
recipients: list[str]
subject: str
body: str
class ImageResizeRequest(BaseModel):
image_data: str = ""
width: int = 800
height: int = 600
class ThumbnailRequest(BaseModel):
image_data: str = ""
size: int = 150
class ImageBatchRequest(BaseModel):
images: list[dict]
operation: str = "resize"
width: int = 800
height: int = 600
size: int = 150
class AggregateRequest(BaseModel):
data: list[dict]
group_by: str
agg_field: str
agg_func: str = "sum"
class ETLRequest(BaseModel):
source_data: list[dict]
transformations: list[dict] = []
class QueryRequest(BaseModel):
query_key: str
ttl: int = 300
class CleanupRequest(BaseModel):
directory: str = "/tmp"
max_age_days: int = 7
class SyncRequest(BaseModel):
source: str = "default"
# ============================================================================ # ============================================================================
# Email Tasks - Like Celery email tasks # Email Tasks - Like Celery email tasks
# ============================================================================ # ============================================================================
@app.route("/api/email/send", methods=["POST"]) @app.post("/api/email/send")
def send_email(): async def send_email(data: EmailRequest):
""" """
Send a single email. Send a single email.
Celery equivalent: Celery equivalent:
send_email.delay(to, subject, body) send_email.delay(to, subject, body)
Request: With async dirty client, this doesn't block the event loop!
POST /api/email/send Other requests can be handled while waiting for the task.
{"to": "user@example.com", "subject": "Hello", "body": "World"}
""" """
data = request.get_json()
try: try:
client = get_client() client = await get_dirty_client_async()
result = client.execute( result = await client.execute_async(
EMAIL_WORKER, EMAIL_WORKER,
"send_email", "send_email",
to=data["to"], to=data.to,
subject=data["subject"], subject=data.subject,
body=data["body"], body=data.body,
html=data.get("html", False), html=data.html,
) )
return jsonify(result) return result
except DirtyTimeoutError: except DirtyTimeoutError:
return jsonify({"error": "Task timed out"}), 504 raise HTTPException(status_code=504, detail="Task timed out")
except DirtyError as e: except DirtyError as e:
return jsonify({"error": str(e)}), 500 raise HTTPException(status_code=500, detail=str(e))
@app.route("/api/email/send-bulk", methods=["POST"]) @app.post("/api/email/send-bulk")
def send_bulk_emails(): async def send_bulk_emails(data: BulkEmailRequest):
""" """
Send bulk emails with streaming progress. Send bulk emails with streaming progress.
@ -76,30 +149,24 @@ def send_bulk_emails():
print(result.info) # Progress polling print(result.info) # Progress polling
With dirty arbiters, progress is streamed in real-time! With dirty arbiters, progress is streamed in real-time!
Request:
POST /api/email/send-bulk
{"recipients": ["a@x.com", "b@x.com"], "subject": "Hi", "body": "Hello"}
""" """
data = request.get_json() async def generate():
def generate():
try: try:
client = get_client() client = await get_dirty_client_async()
for progress in client.stream( async for progress in client.stream_async(
EMAIL_WORKER, EMAIL_WORKER,
"send_bulk_emails", "send_bulk_emails",
recipients=data["recipients"], recipients=data.recipients,
subject=data["subject"], subject=data.subject,
body=data["body"], body=data.body,
): ):
yield f"data: {json.dumps(progress)}\n\n" yield f"data: {json.dumps(progress)}\n\n"
except DirtyError as e: except DirtyError as e:
yield f"data: {json.dumps({'error': str(e)})}\n\n" yield f"data: {json.dumps({'error': str(e)})}\n\n"
return Response( return StreamingResponse(
stream_with_context(generate()), generate(),
mimetype="text/event-stream", media_type="text/event-stream",
headers={ headers={
"Cache-Control": "no-cache", "Cache-Control": "no-cache",
"X-Accel-Buffering": "no", "X-Accel-Buffering": "no",
@ -107,315 +174,246 @@ def send_bulk_emails():
) )
@app.route("/api/email/stats") @app.get("/api/email/stats")
def email_stats(): async def email_stats():
"""Get email worker statistics.""" """Get email worker statistics."""
try: try:
client = get_client() client = await get_dirty_client_async()
result = client.execute(EMAIL_WORKER, "stats") result = await client.execute_async(EMAIL_WORKER, "stats")
return jsonify(result) return result
except DirtyError as e: except DirtyError as e:
return jsonify({"error": str(e)}), 500 raise HTTPException(status_code=500, detail=str(e))
# ============================================================================ # ============================================================================
# Image Tasks - Like Celery image processing tasks # Image Tasks - Like Celery image processing tasks
# ============================================================================ # ============================================================================
@app.route("/api/image/resize", methods=["POST"]) @app.post("/api/image/resize")
def resize_image(): async def resize_image(data: ImageResizeRequest):
""" """
Resize an image. Resize an image.
Celery equivalent: Celery equivalent:
resize_image.delay(image_data, width, height) resize_image.delay(image_data, width, height)
Request:
POST /api/image/resize
{"image_data": "base64...", "width": 800, "height": 600}
""" """
data = request.get_json()
# Keep image_data as string (base64 encoded) for JSON serialization
image_data = data.get("image_data", "")
try: try:
client = get_client() client = await get_dirty_client_async()
result = client.execute( result = await client.execute_async(
IMAGE_WORKER, IMAGE_WORKER,
"resize", "resize",
image_data=image_data, image_data=data.image_data,
width=data.get("width", 800), width=data.width,
height=data.get("height", 600), height=data.height,
) )
return jsonify(result) return result
except DirtyError as e: except DirtyError as e:
return jsonify({"error": str(e)}), 500 raise HTTPException(status_code=500, detail=str(e))
@app.route("/api/image/thumbnail", methods=["POST"]) @app.post("/api/image/thumbnail")
def generate_thumbnail(): async def generate_thumbnail(data: ThumbnailRequest):
"""Generate a thumbnail.""" """Generate a thumbnail."""
data = request.get_json()
image_data = data.get("image_data", "")
try: try:
client = get_client() client = await get_dirty_client_async()
result = client.execute( result = await client.execute_async(
IMAGE_WORKER, IMAGE_WORKER,
"generate_thumbnail", "generate_thumbnail",
image_data=image_data, image_data=data.image_data,
size=data.get("size", 150), size=data.size,
) )
return jsonify(result) return result
except DirtyError as e: except DirtyError as e:
return jsonify({"error": str(e)}), 500 raise HTTPException(status_code=500, detail=str(e))
@app.route("/api/image/process-batch", methods=["POST"]) @app.post("/api/image/process-batch")
def process_image_batch(): async def process_image_batch(data: ImageBatchRequest):
""" """
Process multiple images with progress streaming. Process multiple images with progress streaming.
Request:
POST /api/image/process-batch
{
"images": [{"id": "img1", "data": "..."}, ...],
"operation": "resize",
"width": 800,
"height": 600
}
""" """
data = request.get_json() async def generate():
def generate():
try: try:
client = get_client() client = await get_dirty_client_async()
for progress in client.stream( async for progress in client.stream_async(
IMAGE_WORKER, IMAGE_WORKER,
"process_batch", "process_batch",
images=data["images"], images=data.images,
operation=data.get("operation", "resize"), operation=data.operation,
width=data.get("width", 800), width=data.width,
height=data.get("height", 600), height=data.height,
size=data.get("size", 150), size=data.size,
): ):
yield f"data: {json.dumps(progress)}\n\n" yield f"data: {json.dumps(progress)}\n\n"
except DirtyError as e: except DirtyError as e:
yield f"data: {json.dumps({'error': str(e)})}\n\n" yield f"data: {json.dumps({'error': str(e)})}\n\n"
return Response( return StreamingResponse(
stream_with_context(generate()), generate(),
mimetype="text/event-stream", media_type="text/event-stream",
) )
@app.route("/api/image/stats") @app.get("/api/image/stats")
def image_stats(): async def image_stats():
"""Get image worker statistics.""" """Get image worker statistics."""
try: try:
client = get_client() client = await get_dirty_client_async()
result = client.execute(IMAGE_WORKER, "stats") result = await client.execute_async(IMAGE_WORKER, "stats")
return jsonify(result) return result
except DirtyError as e: except DirtyError as e:
return jsonify({"error": str(e)}), 500 raise HTTPException(status_code=500, detail=str(e))
# ============================================================================ # ============================================================================
# Data Tasks - Like Celery data processing tasks # Data Tasks - Like Celery data processing tasks
# ============================================================================ # ============================================================================
@app.route("/api/data/aggregate", methods=["POST"]) @app.post("/api/data/aggregate")
def aggregate_data(): async def aggregate_data(data: AggregateRequest):
""" """
Aggregate data. Aggregate data.
Celery equivalent: Celery equivalent:
aggregate_data.delay(data, group_by, agg_field, agg_func) aggregate_data.delay(data, group_by, agg_field, agg_func)
Request:
POST /api/data/aggregate
{
"data": [{"category": "A", "value": 10}, ...],
"group_by": "category",
"agg_field": "value",
"agg_func": "sum"
}
""" """
data = request.get_json()
try: try:
client = get_client() client = await get_dirty_client_async()
result = client.execute( result = await client.execute_async(
DATA_WORKER, DATA_WORKER,
"aggregate", "aggregate",
data=data["data"], data=data.data,
group_by=data["group_by"], group_by=data.group_by,
agg_field=data["agg_field"], agg_field=data.agg_field,
agg_func=data.get("agg_func", "sum"), agg_func=data.agg_func,
) )
return jsonify(result) return result
except DirtyError as e: except DirtyError as e:
return jsonify({"error": str(e)}), 500 raise HTTPException(status_code=500, detail=str(e))
@app.route("/api/data/etl", methods=["POST"]) @app.post("/api/data/etl")
def run_etl(): async def run_etl(data: ETLRequest):
""" """
Run ETL pipeline with streaming progress. Run ETL pipeline with streaming progress.
Celery equivalent: Celery equivalent:
chain(extract.s(), transform.s(), load.s()).apply_async() chain(extract.s(), transform.s(), load.s()).apply_async()
Request:
POST /api/data/etl
{
"source_data": [...],
"transformations": [
{"name": "filter_active", "type": "filter", "field": "status", "value": "active"},
{"name": "uppercase_name", "type": "map", "field": "name", "func": "upper"}
]
}
""" """
data = request.get_json() async def generate():
def generate():
try: try:
client = get_client() client = await get_dirty_client_async()
for progress in client.stream( async for progress in client.stream_async(
DATA_WORKER, DATA_WORKER,
"etl_pipeline", "etl_pipeline",
source_data=data["source_data"], source_data=data.source_data,
transformations=data.get("transformations", []), transformations=data.transformations,
): ):
yield f"data: {json.dumps(progress)}\n\n" yield f"data: {json.dumps(progress)}\n\n"
except DirtyError as e: except DirtyError as e:
yield f"data: {json.dumps({'error': str(e)})}\n\n" yield f"data: {json.dumps({'error': str(e)})}\n\n"
return Response( return StreamingResponse(
stream_with_context(generate()), generate(),
mimetype="text/event-stream", media_type="text/event-stream",
) )
@app.route("/api/data/query", methods=["POST"]) @app.post("/api/data/query")
def cached_query(): async def cached_query(data: QueryRequest):
""" """Execute a cached query."""
Execute a cached query.
Request:
POST /api/data/query
{"query_key": "sales_2024", "ttl": 300}
"""
data = request.get_json()
try: try:
client = get_client() client = await get_dirty_client_async()
result = client.execute( result = await client.execute_async(
DATA_WORKER, DATA_WORKER,
"cached_query", "cached_query",
query_key=data["query_key"], query_key=data.query_key,
ttl=data.get("ttl", 300), ttl=data.ttl,
) )
return jsonify(result) return result
except DirtyError as e: except DirtyError as e:
return jsonify({"error": str(e)}), 500 raise HTTPException(status_code=500, detail=str(e))
@app.route("/api/data/stats") @app.get("/api/data/stats")
def data_stats(): async def data_stats():
"""Get data worker statistics.""" """Get data worker statistics."""
try: try:
client = get_client() client = await get_dirty_client_async()
result = client.execute(DATA_WORKER, "stats") result = await client.execute_async(DATA_WORKER, "stats")
return jsonify(result) return result
except DirtyError as e: except DirtyError as e:
return jsonify({"error": str(e)}), 500 raise HTTPException(status_code=500, detail=str(e))
# ============================================================================ # ============================================================================
# Scheduled Tasks - Like Celery Beat tasks # Scheduled Tasks - Like Celery Beat tasks
# ============================================================================ # ============================================================================
@app.route("/api/scheduled/cleanup", methods=["POST"]) @app.post("/api/scheduled/cleanup")
def run_cleanup(): async def run_cleanup(data: CleanupRequest = CleanupRequest()):
""" """Run cleanup task (normally triggered by cron)."""
Run cleanup task (normally triggered by cron).
Request:
POST /api/scheduled/cleanup
{"directory": "/tmp/uploads", "max_age_days": 7}
"""
data = request.get_json() or {}
try: try:
client = get_client() client = await get_dirty_client_async()
result = client.execute( result = await client.execute_async(
SCHEDULED_WORKER, SCHEDULED_WORKER,
"cleanup_old_files", "cleanup_old_files",
directory=data.get("directory", "/tmp"), directory=data.directory,
max_age_days=data.get("max_age_days", 7), max_age_days=data.max_age_days,
) )
return jsonify(result) return result
except DirtyError as e: except DirtyError as e:
return jsonify({"error": str(e)}), 500 raise HTTPException(status_code=500, detail=str(e))
@app.route("/api/scheduled/daily-report", methods=["POST"]) @app.post("/api/scheduled/daily-report")
def run_daily_report(): async def run_daily_report():
"""Generate daily report.""" """Generate daily report."""
try: try:
client = get_client() client = await get_dirty_client_async()
result = client.execute(SCHEDULED_WORKER, "generate_daily_report") result = await client.execute_async(SCHEDULED_WORKER, "generate_daily_report")
return jsonify(result) return result
except DirtyError as e: except DirtyError as e:
return jsonify({"error": str(e)}), 500 raise HTTPException(status_code=500, detail=str(e))
@app.route("/api/scheduled/sync", methods=["POST"]) @app.post("/api/scheduled/sync")
def run_sync(): async def run_sync(data: SyncRequest = SyncRequest()):
""" """Sync external data."""
Sync external data.
Request:
POST /api/scheduled/sync
{"source": "external_api"}
"""
data = request.get_json() or {}
try: try:
client = get_client() client = await get_dirty_client_async()
result = client.execute( result = await client.execute_async(
SCHEDULED_WORKER, SCHEDULED_WORKER,
"sync_external_data", "sync_external_data",
source=data.get("source", "default"), source=data.source,
) )
return jsonify(result) return result
except DirtyError as e: except DirtyError as e:
return jsonify({"error": str(e)}), 500 raise HTTPException(status_code=500, detail=str(e))
@app.route("/api/scheduled/stats") @app.get("/api/scheduled/stats")
def scheduled_stats(): async def scheduled_stats():
"""Get scheduled worker statistics.""" """Get scheduled worker statistics."""
try: try:
client = get_client() client = await get_dirty_client_async()
result = client.execute(SCHEDULED_WORKER, "stats") result = await client.execute_async(SCHEDULED_WORKER, "stats")
return jsonify(result) return result
except DirtyError as e: except DirtyError as e:
return jsonify({"error": str(e)}), 500 raise HTTPException(status_code=500, detail=str(e))
# ============================================================================ # ============================================================================
# Health & Info Endpoints # Health & Info Endpoints
# ============================================================================ # ============================================================================
@app.route("/") @app.get("/")
def index(): async def index():
"""API documentation.""" """API documentation."""
return jsonify({ return {
"name": "Celery Replacement Demo", "name": "Celery Replacement Demo",
"description": "Demonstrating Gunicorn dirty arbiters as Celery replacement", "description": "Demonstrating Gunicorn dirty arbiters as Celery replacement (async ASGI)",
"docs": "/docs",
"endpoints": { "endpoints": {
"email": { "email": {
"POST /api/email/send": "Send single email", "POST /api/email/send": "Send single email",
@ -441,20 +439,19 @@ def index():
"GET /api/scheduled/stats": "Scheduled worker stats", "GET /api/scheduled/stats": "Scheduled worker stats",
}, },
}, },
}) }
@app.route("/health") @app.get("/health")
def health(): async def health():
"""Health check endpoint.""" """Health check endpoint."""
try: try:
client = get_client() client = await get_dirty_client_async()
# Quick ping to verify workers are running # Quick ping to verify workers are running
client.execute(EMAIL_WORKER, "stats") await client.execute_async(EMAIL_WORKER, "stats")
return jsonify({"status": "healthy", "workers": "connected"}) return {"status": "healthy", "workers": "connected"}
except DirtyError: except DirtyError:
return jsonify({"status": "degraded", "workers": "unavailable"}), 503 raise HTTPException(
status_code=503,
detail={"status": "degraded", "workers": "unavailable"}
if __name__ == "__main__": )
app.run(debug=True, port=8000)

View File

@ -18,7 +18,6 @@ services:
- "8000:8000" - "8000:8000"
environment: environment:
- GUNICORN_WORKERS=4 - GUNICORN_WORKERS=4
- GUNICORN_THREADS=4
- DIRTY_WORKERS=9 - DIRTY_WORKERS=9
- DIRTY_TIMEOUT=300 - DIRTY_TIMEOUT=300
- LOG_LEVEL=info - LOG_LEVEL=info

View File

@ -2,12 +2,16 @@
Gunicorn Configuration - Celery Replacement Example Gunicorn Configuration - Celery Replacement Example
This configuration sets up: This configuration sets up:
1. HTTP workers (gthread) to handle web requests 1. ASGI workers to handle web requests with async I/O (using uvloop)
2. Dirty workers to handle background tasks (replacing Celery workers) 2. Dirty workers to handle background tasks (replacing Celery workers)
Why ASGI + Dirty Arbiters?
- ASGI: Non-blocking HTTP handling - one worker handles many concurrent requests
- Dirty: Stateful background workers - keep models/connections loaded in memory
Comparison with Celery deployment: Comparison with Celery deployment:
- Celery: gunicorn app:app + celery -A tasks worker - Celery: gunicorn app:app + celery -A tasks worker + redis-server
- Dirty: gunicorn -c gunicorn_conf.py app:app (single command!) - Dirty: gunicorn -c gunicorn_conf.py app:app (single command, no broker!)
""" """
import multiprocessing import multiprocessing
@ -21,14 +25,18 @@ import os
bind = os.environ.get("GUNICORN_BIND", "0.0.0.0:8000") bind = os.environ.get("GUNICORN_BIND", "0.0.0.0:8000")
# HTTP workers - handle incoming web requests # HTTP workers - handle incoming web requests
# Rule of thumb: 2-4 x CPU cores for I/O bound apps # With ASGI, fewer workers needed since each handles many concurrent requests
workers = int(os.environ.get("GUNICORN_WORKERS", multiprocessing.cpu_count() * 2 + 1)) workers = int(os.environ.get("GUNICORN_WORKERS", min(4, multiprocessing.cpu_count() + 1)))
# Use gthread worker for better concurrency # Use gunicorn's native ASGI worker for async support
worker_class = "gthread" # This enables: await client.execute_async() without blocking
worker_class = "asgi"
# Threads per worker - good for I/O bound operations # Use uvloop for better async performance
threads = int(os.environ.get("GUNICORN_THREADS", 4)) asgi_loop = "uvloop"
# Maximum concurrent connections per worker
worker_connections = 1000
# ============================================================================= # =============================================================================
# Dirty Arbiter Settings (Celery Worker Replacement) # Dirty Arbiter Settings (Celery Worker Replacement)
@ -96,6 +104,7 @@ def on_starting(server):
"""Called just before the master process is initialized.""" """Called just before the master process is initialized."""
print("=" * 60) print("=" * 60)
print("Starting Gunicorn with Dirty Arbiters (Celery Replacement)") print("Starting Gunicorn with Dirty Arbiters (Celery Replacement)")
print("Using ASGI workers with uvloop for non-blocking HTTP handling")
print("=" * 60) print("=" * 60)

View File

@ -1,4 +1,6 @@
# Celery Replacement Example Dependencies # Celery Replacement Example Dependencies
flask>=3.0.0 fastapi>=0.109.0
requests>=2.31.0 uvloop>=0.19.0
httpx>=0.26.0
pytest>=8.0.0 pytest>=8.0.0
pytest-asyncio>=0.23.0

View File

@ -23,6 +23,24 @@ import requests
APP_URL = os.environ.get("APP_URL", "http://localhost:8000") APP_URL = os.environ.get("APP_URL", "http://localhost:8000")
def read_sse_events(response, max_events=100):
"""
Read SSE events from a streaming response.
Stops when receiving a 'complete' or 'error' event, or max_events reached.
"""
events = []
for line in response.iter_lines(decode_unicode=True):
if line.startswith("data: "):
data = json.loads(line[6:])
events.append(data)
if data.get("type") in ("complete", "error"):
break
if len(events) >= max_events:
break
return events
def wait_for_app(timeout=30): def wait_for_app(timeout=30):
"""Wait for the application to be ready.""" """Wait for the application to be ready."""
start = time.time() start = time.time()
@ -92,13 +110,7 @@ class TestEmailTasks:
) )
assert resp.status_code == 200 assert resp.status_code == 200
events = [] events = read_sse_events(resp)
for line in resp.iter_lines():
if line:
line = line.decode("utf-8")
if line.startswith("data: "):
data = json.loads(line[6:])
events.append(data)
# Should have progress for each recipient + complete # Should have progress for each recipient + complete
assert len(events) == len(recipients) + 1 assert len(events) == len(recipients) + 1
@ -182,12 +194,7 @@ class TestImageTasks:
) )
assert resp.status_code == 200 assert resp.status_code == 200
events = [] events = read_sse_events(resp)
for line in resp.iter_lines():
if line:
line = line.decode("utf-8")
if line.startswith("data: "):
events.append(json.loads(line[6:]))
assert len(events) == len(images) + 1 assert len(events) == len(images) + 1
assert events[-1]["type"] == "complete" assert events[-1]["type"] == "complete"
@ -246,12 +253,7 @@ class TestDataTasks:
) )
assert resp.status_code == 200 assert resp.status_code == 200
events = [] events = read_sse_events(resp)
for line in resp.iter_lines():
if line:
line = line.decode("utf-8")
if line.startswith("data: "):
events.append(json.loads(line[6:]))
# extract + transform + load + complete # extract + transform + load + complete
assert len(events) == 4 assert len(events) == 4
@ -278,13 +280,20 @@ class TestDataTasks:
assert resp1.status_code == 200 assert resp1.status_code == 200
assert resp1.json()["status"] == "cache_miss" assert resp1.json()["status"] == "cache_miss"
# Second call - cache hit # Second call - may be cache hit or miss depending on which worker handles it
resp2 = requests.post( # (cache is per-worker, not shared)
f"{APP_URL}/api/data/query", # Retry a few times to likely hit the same worker
json={"query_key": query_key, "ttl": 300}, cache_hit = False
) for _ in range(5):
assert resp2.status_code == 200 resp2 = requests.post(
assert resp2.json()["status"] == "cache_hit" f"{APP_URL}/api/data/query",
json={"query_key": query_key, "ttl": 300},
)
assert resp2.status_code == 200
if resp2.json()["status"] == "cache_hit":
cache_hit = True
break
assert cache_hit, "Expected cache_hit after multiple requests to same key"
def test_data_stats(self): def test_data_stats(self):
"""Test data worker statistics.""" """Test data worker statistics."""
@ -418,5 +427,5 @@ class TestErrorHandling:
f"{APP_URL}/api/email/send", f"{APP_URL}/api/email/send",
json={}, # Missing required fields json={}, # Missing required fields
) )
# Should get a 500 or validation error # Should get a validation error (FastAPI returns 422)
assert resp.status_code in [400, 500] assert resp.status_code == 422

View File

@ -207,6 +207,7 @@ class ASGIProtocol(asyncio.Protocol):
response_started = False response_started = False
response_complete = False response_complete = False
exc_to_raise = None exc_to_raise = None
use_chunked = False
# Response tracking for access logging # Response tracking for access logging
response_status = 500 response_status = 500
@ -232,7 +233,7 @@ class ASGIProtocol(asyncio.Protocol):
async def send(message): async def send(message):
nonlocal response_started, response_complete, exc_to_raise nonlocal response_started, response_complete, exc_to_raise
nonlocal response_status, response_headers, response_sent nonlocal response_status, response_headers, response_sent, use_chunked
msg_type = message["type"] msg_type = message["type"]
@ -250,6 +251,19 @@ class ASGIProtocol(asyncio.Protocol):
response_started = True response_started = True
response_status = message["status"] response_status = message["status"]
response_headers = message.get("headers", []) response_headers = message.get("headers", [])
# Check if Content-Length is present
has_content_length = any(
(name.lower() if isinstance(name, str) else name.lower()) == b"content-length"
or (name.lower() if isinstance(name, str) else name.lower()) == "content-length"
for name, _ in response_headers
)
# Use chunked encoding for HTTP/1.1 streaming responses without Content-Length
if not has_content_length and request.version >= (1, 1):
use_chunked = True
response_headers = list(response_headers) + [(b"transfer-encoding", b"chunked")]
await self._send_response_start(response_status, response_headers, request) await self._send_response_start(response_status, response_headers, request)
elif msg_type == "http.response.body": elif msg_type == "http.response.body":
@ -264,10 +278,13 @@ class ASGIProtocol(asyncio.Protocol):
more_body = message.get("more_body", False) more_body = message.get("more_body", False)
if body: if body:
await self._send_body(body) await self._send_body(body, chunked=use_chunked)
response_sent += len(body) response_sent += len(body)
if not more_body: if not more_body:
if use_chunked:
# Send terminal chunk
self.transport.write(b"0\r\n\r\n")
response_complete = True response_complete = True
# Build environ for logging # Build environ for logging
@ -476,10 +493,15 @@ class ASGIProtocol(asyncio.Protocol):
response = status_line + "".join(header_lines) + "\r\n" response = status_line + "".join(header_lines) + "\r\n"
self.transport.write(response.encode("latin-1")) self.transport.write(response.encode("latin-1"))
async def _send_body(self, body): async def _send_body(self, body, chunked=False):
"""Send response body chunk.""" """Send response body chunk."""
if body: if body:
self.transport.write(body) if chunked:
# Chunked encoding: size in hex + CRLF + data + CRLF
chunk = f"{len(body):x}\r\n".encode("latin-1") + body + b"\r\n"
self.transport.write(chunk)
else:
self.transport.write(body)
async def _send_error_response(self, status, message): async def _send_error_response(self, status, message):
"""Send an error response.""" """Send an error response."""