examples: add celery_alternative example using dirty arbiters

Demonstrates replacing Celery with Gunicorn's dirty arbiters for
background task processing. Includes:

- 4 task workers: Email, Image, Data, Scheduled
- Stateful workers with persistent connections/caches
- Streaming progress for long-running tasks
- Per-app worker allocation
- Flask API with 15+ endpoints
- Docker deployment (single container vs Celery's 4+)
- Unit tests (19 tests) and integration tests
- Migration guide from Celery
This commit is contained in:
Benoit Chesneau 2026-02-01 16:51:48 +01:00
parent 9ece4a6873
commit 17ac6a5254
12 changed files with 2306 additions and 0 deletions

View File

@ -0,0 +1,47 @@
# Dockerfile for Celery Replacement Example
#
# This demonstrates running a production-ready application with
# Gunicorn dirty arbiters replacing Celery for background tasks.
#
# Key difference from Celery deployment:
# - Celery: Needs separate web + worker containers + Redis/RabbitMQ
# - Dirty: Single container handles both HTTP and background tasks
FROM python:3.12-slim
# Set working directory
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
curl \
&& rm -rf /var/lib/apt/lists/*
# Copy gunicorn source and install (from build context root)
COPY . /gunicorn-src
RUN pip install --no-cache-dir /gunicorn-src
# Copy example application
COPY examples/celery_alternative /app
RUN pip install --no-cache-dir flask
# Environment variables
ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONPATH=/gunicorn-src
ENV GUNICORN_BIND=0.0.0.0:8000
ENV GUNICORN_WORKERS=4
ENV GUNICORN_THREADS=4
ENV DIRTY_WORKERS=9
ENV DIRTY_TIMEOUT=300
ENV LOG_LEVEL=info
# Expose port
EXPOSE 8000
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# Run gunicorn with dirty arbiters
CMD ["gunicorn", "-c", "gunicorn_conf.py", "app:app"]

View File

@ -0,0 +1,240 @@
# Celery Replacement Example
This example demonstrates how to replace Celery with Gunicorn's **dirty arbiters** for background task processing.
## Why Replace Celery?
| Aspect | Celery | Dirty Arbiters |
|--------|--------|----------------|
| Dependencies | Redis/RabbitMQ + Celery | None (built into Gunicorn) |
| Deployment | Multiple processes/containers | Single process |
| State | Stateless workers | Stateful workers (keep models loaded) |
| Progress | Polling or WebSocket | Native streaming |
| Configuration | Separate config | Same gunicorn.conf.py |
## Quick Start
### Local Development
```bash
# Install dependencies
pip install flask requests pytest
pip install -e ../.. # Install gunicorn from source
# Run the application
gunicorn -c gunicorn_conf.py app:app
# In another terminal, test it
curl http://localhost:8000/health
curl -X POST http://localhost:8000/api/email/send \
-H "Content-Type: application/json" \
-d '{"to": "test@example.com", "subject": "Hello", "body": "World"}'
```
### Docker
```bash
# Build and run
docker compose up --build
# Run with tests
docker compose --profile test up --build --abort-on-container-exit
```
## Architecture
```
┌─────────────────────────────────────────────────────────────┐
│ Gunicorn Master │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ HTTP Worker │ │ HTTP Worker │ │ HTTP Worker │ ... │
│ │ (gthread) │ │ (gthread) │ │ (gthread) │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ └────────────────┼────────────────┘ │
│ │ │
│ Unix Socket IPC │
│ │ │
│ ┌────────────────┼────────────────┐ │
│ │ │ │ │
│ ┌──────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐ │
│ │ EmailWorker │ │ ImageWorker │ │ DataWorker │ ... │
│ │ (2 procs) │ │ (2 procs) │ │ (4 procs) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ Dirty Arbiter │
└─────────────────────────────────────────────────────────────┘
```
## Task Workers
### EmailWorker
- `send_email(to, subject, body)` - Send single email
- `send_bulk_emails(recipients, subject, body)` - Bulk send with progress streaming
- `stats()` - Worker statistics
### ImageWorker
- `resize(image_data, width, height)` - Resize image
- `generate_thumbnail(image_data, size)` - Generate thumbnail
- `process_batch(images, operation, **params)` - Batch process with streaming
### DataWorker
- `aggregate(data, group_by, agg_field, agg_func)` - Aggregate data
- `etl_pipeline(source_data, transformations)` - ETL with progress streaming
- `cached_query(query_key, ttl)` - Cached query execution
### ScheduledWorker
- `cleanup_old_files(directory, max_age_days)` - File cleanup
- `generate_daily_report()` - Daily report generation
- `sync_external_data(source)` - External data sync
## API Endpoints
### Email
- `POST /api/email/send` - Send single email
- `POST /api/email/send-bulk` - Bulk send (SSE streaming)
- `GET /api/email/stats` - Worker stats
### Image
- `POST /api/image/resize` - Resize image
- `POST /api/image/thumbnail` - Generate thumbnail
- `POST /api/image/process-batch` - Batch process (SSE streaming)
- `GET /api/image/stats` - Worker stats
### Data
- `POST /api/data/aggregate` - Aggregate data
- `POST /api/data/etl` - ETL pipeline (SSE streaming)
- `POST /api/data/query` - Cached query
- `GET /api/data/stats` - Worker stats
### Scheduled
- `POST /api/scheduled/cleanup` - Run cleanup
- `POST /api/scheduled/daily-report` - Generate report
- `POST /api/scheduled/sync` - Sync data
- `GET /api/scheduled/stats` - Worker stats
## Streaming Progress Example
```python
import requests
import json
# Start bulk email with streaming progress
resp = requests.post(
"http://localhost:8000/api/email/send-bulk",
json={
"recipients": ["a@x.com", "b@x.com", "c@x.com"],
"subject": "Newsletter",
"body": "Hello!",
},
stream=True,
)
for line in resp.iter_lines():
if line and line.startswith(b"data: "):
progress = json.loads(line[6:])
if progress["type"] == "progress":
print(f"Progress: {progress['percent']}%")
elif progress["type"] == "complete":
print(f"Done! Sent: {progress['sent']}, Failed: {progress['failed']}")
```
## Celery Migration Guide
### Before (Celery)
```python
# tasks.py
from celery import Celery
app = Celery('tasks', broker='redis://localhost')
@app.task
def send_email(to, subject, body):
# Send email
return {"status": "sent"}
@app.task(bind=True)
def send_bulk(self, recipients, subject, body):
for i, to in enumerate(recipients):
send_email(to, subject, body)
self.update_state(state='PROGRESS', meta={'current': i})
```
```python
# views.py
from tasks import send_email, send_bulk
def send_view(request):
send_email.delay(to, subject, body) # Async
return {"status": "queued"}
```
### After (Dirty Arbiters)
```python
# tasks.py
from gunicorn.dirty.app import DirtyApp
class EmailWorker(DirtyApp):
workers = 2 # Limit workers
def init(self):
self.smtp = connect_smtp() # Stateful!
def __call__(self, action, *args, **kwargs):
return getattr(self, action)(*args, **kwargs)
def send_email(self, to, subject, body):
return {"status": "sent"}
def send_bulk(self, recipients, subject, body):
for i, to in enumerate(recipients):
self.send_email(to, subject, body)
yield {"type": "progress", "current": i} # Native streaming!
```
```python
# views.py
from gunicorn.dirty import get_dirty_client
def send_view(request):
client = get_dirty_client()
result = client.execute("tasks:EmailWorker", "send_email", to, subject, body)
return result # Sync result, no polling!
```
## Configuration
```python
# gunicorn_conf.py
# HTTP workers
workers = 4
worker_class = "gthread"
threads = 4
# Task workers (replace Celery)
dirty_apps = [
"tasks:EmailWorker",
"tasks:ImageWorker",
"tasks:DataWorker",
]
dirty_workers = 9
dirty_timeout = 300
```
## Running Tests
```bash
# Unit tests (no server needed)
pytest tests/test_tasks.py -v
# Integration tests (server must be running)
APP_URL=http://localhost:8000 pytest tests/test_integration.py -v
# All tests via Docker
docker compose --profile test up --build --abort-on-container-exit
```

View File

@ -0,0 +1,460 @@
"""
Web Application - Flask app demonstrating Celery replacement.
This shows how to call dirty arbiter tasks from your web application,
replacing Celery's task.delay() and task.apply_async() patterns.
"""
import json
import os
from flask import Flask, request, jsonify, Response, stream_with_context
from gunicorn.dirty import get_dirty_client
from gunicorn.dirty.errors import (
DirtyError,
DirtyTimeoutError,
DirtyAppNotFoundError,
)
app = Flask(__name__)
# Task worker import paths (like Celery task names)
EMAIL_WORKER = "examples.celery_alternative.tasks:EmailWorker"
IMAGE_WORKER = "examples.celery_alternative.tasks:ImageWorker"
DATA_WORKER = "examples.celery_alternative.tasks:DataWorker"
SCHEDULED_WORKER = "examples.celery_alternative.tasks:ScheduledWorker"
def get_client():
"""Get the dirty client for calling task workers."""
return get_dirty_client()
# ============================================================================
# Email Tasks - Like Celery email tasks
# ============================================================================
@app.route("/api/email/send", methods=["POST"])
def send_email():
"""
Send a single email.
Celery equivalent:
send_email.delay(to, subject, body)
Request:
POST /api/email/send
{"to": "user@example.com", "subject": "Hello", "body": "World"}
"""
data = request.get_json()
try:
client = get_client()
result = client.execute(
EMAIL_WORKER,
"send_email",
to=data["to"],
subject=data["subject"],
body=data["body"],
html=data.get("html", False),
)
return jsonify(result)
except DirtyTimeoutError:
return jsonify({"error": "Task timed out"}), 504
except DirtyError as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/email/send-bulk", methods=["POST"])
def send_bulk_emails():
"""
Send bulk emails with streaming progress.
Celery equivalent:
result = send_bulk.apply_async([recipients, subject, body])
while not result.ready():
print(result.info) # Progress polling
With dirty arbiters, progress is streamed in real-time!
Request:
POST /api/email/send-bulk
{"recipients": ["a@x.com", "b@x.com"], "subject": "Hi", "body": "Hello"}
"""
data = request.get_json()
def generate():
try:
client = get_client()
for progress in client.stream(
EMAIL_WORKER,
"send_bulk_emails",
recipients=data["recipients"],
subject=data["subject"],
body=data["body"],
):
yield f"data: {json.dumps(progress)}\n\n"
except DirtyError as e:
yield f"data: {json.dumps({'error': str(e)})}\n\n"
return Response(
stream_with_context(generate()),
mimetype="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
},
)
@app.route("/api/email/stats")
def email_stats():
"""Get email worker statistics."""
try:
client = get_client()
result = client.execute(EMAIL_WORKER, "stats")
return jsonify(result)
except DirtyError as e:
return jsonify({"error": str(e)}), 500
# ============================================================================
# Image Tasks - Like Celery image processing tasks
# ============================================================================
@app.route("/api/image/resize", methods=["POST"])
def resize_image():
"""
Resize an image.
Celery equivalent:
resize_image.delay(image_data, width, height)
Request:
POST /api/image/resize
{"image_data": "base64...", "width": 800, "height": 600}
"""
data = request.get_json()
# Keep image_data as string (base64 encoded) for JSON serialization
image_data = data.get("image_data", "")
try:
client = get_client()
result = client.execute(
IMAGE_WORKER,
"resize",
image_data=image_data,
width=data.get("width", 800),
height=data.get("height", 600),
)
return jsonify(result)
except DirtyError as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/image/thumbnail", methods=["POST"])
def generate_thumbnail():
"""Generate a thumbnail."""
data = request.get_json()
image_data = data.get("image_data", "")
try:
client = get_client()
result = client.execute(
IMAGE_WORKER,
"generate_thumbnail",
image_data=image_data,
size=data.get("size", 150),
)
return jsonify(result)
except DirtyError as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/image/process-batch", methods=["POST"])
def process_image_batch():
"""
Process multiple images with progress streaming.
Request:
POST /api/image/process-batch
{
"images": [{"id": "img1", "data": "..."}, ...],
"operation": "resize",
"width": 800,
"height": 600
}
"""
data = request.get_json()
def generate():
try:
client = get_client()
for progress in client.stream(
IMAGE_WORKER,
"process_batch",
images=data["images"],
operation=data.get("operation", "resize"),
width=data.get("width", 800),
height=data.get("height", 600),
size=data.get("size", 150),
):
yield f"data: {json.dumps(progress)}\n\n"
except DirtyError as e:
yield f"data: {json.dumps({'error': str(e)})}\n\n"
return Response(
stream_with_context(generate()),
mimetype="text/event-stream",
)
@app.route("/api/image/stats")
def image_stats():
"""Get image worker statistics."""
try:
client = get_client()
result = client.execute(IMAGE_WORKER, "stats")
return jsonify(result)
except DirtyError as e:
return jsonify({"error": str(e)}), 500
# ============================================================================
# Data Tasks - Like Celery data processing tasks
# ============================================================================
@app.route("/api/data/aggregate", methods=["POST"])
def aggregate_data():
"""
Aggregate data.
Celery equivalent:
aggregate_data.delay(data, group_by, agg_field, agg_func)
Request:
POST /api/data/aggregate
{
"data": [{"category": "A", "value": 10}, ...],
"group_by": "category",
"agg_field": "value",
"agg_func": "sum"
}
"""
data = request.get_json()
try:
client = get_client()
result = client.execute(
DATA_WORKER,
"aggregate",
data=data["data"],
group_by=data["group_by"],
agg_field=data["agg_field"],
agg_func=data.get("agg_func", "sum"),
)
return jsonify(result)
except DirtyError as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/data/etl", methods=["POST"])
def run_etl():
"""
Run ETL pipeline with streaming progress.
Celery equivalent:
chain(extract.s(), transform.s(), load.s()).apply_async()
Request:
POST /api/data/etl
{
"source_data": [...],
"transformations": [
{"name": "filter_active", "type": "filter", "field": "status", "value": "active"},
{"name": "uppercase_name", "type": "map", "field": "name", "func": "upper"}
]
}
"""
data = request.get_json()
def generate():
try:
client = get_client()
for progress in client.stream(
DATA_WORKER,
"etl_pipeline",
source_data=data["source_data"],
transformations=data.get("transformations", []),
):
yield f"data: {json.dumps(progress)}\n\n"
except DirtyError as e:
yield f"data: {json.dumps({'error': str(e)})}\n\n"
return Response(
stream_with_context(generate()),
mimetype="text/event-stream",
)
@app.route("/api/data/query", methods=["POST"])
def cached_query():
"""
Execute a cached query.
Request:
POST /api/data/query
{"query_key": "sales_2024", "ttl": 300}
"""
data = request.get_json()
try:
client = get_client()
result = client.execute(
DATA_WORKER,
"cached_query",
query_key=data["query_key"],
ttl=data.get("ttl", 300),
)
return jsonify(result)
except DirtyError as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/data/stats")
def data_stats():
"""Get data worker statistics."""
try:
client = get_client()
result = client.execute(DATA_WORKER, "stats")
return jsonify(result)
except DirtyError as e:
return jsonify({"error": str(e)}), 500
# ============================================================================
# Scheduled Tasks - Like Celery Beat tasks
# ============================================================================
@app.route("/api/scheduled/cleanup", methods=["POST"])
def run_cleanup():
"""
Run cleanup task (normally triggered by cron).
Request:
POST /api/scheduled/cleanup
{"directory": "/tmp/uploads", "max_age_days": 7}
"""
data = request.get_json() or {}
try:
client = get_client()
result = client.execute(
SCHEDULED_WORKER,
"cleanup_old_files",
directory=data.get("directory", "/tmp"),
max_age_days=data.get("max_age_days", 7),
)
return jsonify(result)
except DirtyError as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/scheduled/daily-report", methods=["POST"])
def run_daily_report():
"""Generate daily report."""
try:
client = get_client()
result = client.execute(SCHEDULED_WORKER, "generate_daily_report")
return jsonify(result)
except DirtyError as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/scheduled/sync", methods=["POST"])
def run_sync():
"""
Sync external data.
Request:
POST /api/scheduled/sync
{"source": "external_api"}
"""
data = request.get_json() or {}
try:
client = get_client()
result = client.execute(
SCHEDULED_WORKER,
"sync_external_data",
source=data.get("source", "default"),
)
return jsonify(result)
except DirtyError as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/scheduled/stats")
def scheduled_stats():
"""Get scheduled worker statistics."""
try:
client = get_client()
result = client.execute(SCHEDULED_WORKER, "stats")
return jsonify(result)
except DirtyError as e:
return jsonify({"error": str(e)}), 500
# ============================================================================
# Health & Info Endpoints
# ============================================================================
@app.route("/")
def index():
"""API documentation."""
return jsonify({
"name": "Celery Replacement Demo",
"description": "Demonstrating Gunicorn dirty arbiters as Celery replacement",
"endpoints": {
"email": {
"POST /api/email/send": "Send single email",
"POST /api/email/send-bulk": "Send bulk emails (streaming)",
"GET /api/email/stats": "Email worker stats",
},
"image": {
"POST /api/image/resize": "Resize image",
"POST /api/image/thumbnail": "Generate thumbnail",
"POST /api/image/process-batch": "Batch process (streaming)",
"GET /api/image/stats": "Image worker stats",
},
"data": {
"POST /api/data/aggregate": "Aggregate data",
"POST /api/data/etl": "Run ETL pipeline (streaming)",
"POST /api/data/query": "Cached query",
"GET /api/data/stats": "Data worker stats",
},
"scheduled": {
"POST /api/scheduled/cleanup": "Run cleanup",
"POST /api/scheduled/daily-report": "Generate report",
"POST /api/scheduled/sync": "Sync external data",
"GET /api/scheduled/stats": "Scheduled worker stats",
},
},
})
@app.route("/health")
def health():
"""Health check endpoint."""
try:
client = get_client()
# Quick ping to verify workers are running
client.execute(EMAIL_WORKER, "stats")
return jsonify({"status": "healthy", "workers": "connected"})
except DirtyError:
return jsonify({"status": "degraded", "workers": "unavailable"}), 503
if __name__ == "__main__":
app.run(debug=True, port=8000)

View File

@ -0,0 +1,79 @@
# Docker Compose for Celery Replacement Example
#
# Notice: Only ONE service needed!
# Compare with typical Celery deployment which requires:
# - web (gunicorn/uvicorn)
# - celery_worker
# - celery_beat (for scheduled tasks)
# - redis or rabbitmq
#
# With dirty arbiters, everything runs in a single container.
services:
app:
build:
context: ../.. # Gunicorn repo root
dockerfile: examples/celery_alternative/Dockerfile
ports:
- "8000:8000"
environment:
- GUNICORN_WORKERS=4
- GUNICORN_THREADS=4
- DIRTY_WORKERS=9
- DIRTY_TIMEOUT=300
- LOG_LEVEL=info
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 10s
# Resource limits (optional)
deploy:
resources:
limits:
memory: 1G
reservations:
memory: 256M
# Test runner service
tests:
build:
context: ../..
dockerfile: examples/celery_alternative/Dockerfile
depends_on:
app:
condition: service_healthy
environment:
- APP_URL=http://app:8000
command: ["python", "-m", "pytest", "tests/", "-v", "--tb=short"]
profiles:
- test
# For comparison, here's what a Celery deployment would look like:
#
# services:
# web:
# build: .
# command: gunicorn app:app -b 0.0.0.0:8000
# ports:
# - "8000:8000"
# depends_on:
# - redis
#
# celery_worker:
# build: .
# command: celery -A tasks worker -l info
# depends_on:
# - redis
#
# celery_beat:
# build: .
# command: celery -A tasks beat -l info
# depends_on:
# - redis
#
# redis:
# image: redis:alpine
# ports:
# - "6379:6379"

View File

@ -0,0 +1,141 @@
"""
Gunicorn Configuration - Celery Replacement Example
This configuration sets up:
1. HTTP workers (gthread) to handle web requests
2. Dirty workers to handle background tasks (replacing Celery workers)
Comparison with Celery deployment:
- Celery: gunicorn app:app + celery -A tasks worker
- Dirty: gunicorn -c gunicorn_conf.py app:app (single command!)
"""
import multiprocessing
import os
# =============================================================================
# Basic Settings
# =============================================================================
# Bind to all interfaces on port 8000
bind = os.environ.get("GUNICORN_BIND", "0.0.0.0:8000")
# HTTP workers - handle incoming web requests
# Rule of thumb: 2-4 x CPU cores for I/O bound apps
workers = int(os.environ.get("GUNICORN_WORKERS", multiprocessing.cpu_count() * 2 + 1))
# Use gthread worker for better concurrency
worker_class = "gthread"
# Threads per worker - good for I/O bound operations
threads = int(os.environ.get("GUNICORN_THREADS", 4))
# =============================================================================
# Dirty Arbiter Settings (Celery Worker Replacement)
# =============================================================================
# Task workers - these replace Celery workers
# Each dirty app can specify its own worker count via the `workers` class attribute
dirty_apps = [
# Email tasks - 2 workers (I/O bound)
"examples.celery_alternative.tasks:EmailWorker",
# Image processing - 2 workers (CPU/memory intensive)
"examples.celery_alternative.tasks:ImageWorker",
# Data processing - 4 workers (parallelizable)
"examples.celery_alternative.tasks:DataWorker",
# Scheduled tasks - 1 worker
"examples.celery_alternative.tasks:ScheduledWorker",
]
# Total dirty workers (distributed among apps based on their `workers` attribute)
# If not set, uses sum of all app worker counts
dirty_workers = int(os.environ.get("DIRTY_WORKERS", 9)) # 2+2+4+1 = 9
# Task timeout in seconds (like Celery's task_time_limit)
dirty_timeout = int(os.environ.get("DIRTY_TIMEOUT", 300))
# Threads per dirty worker (for concurrent task execution)
dirty_threads = int(os.environ.get("DIRTY_THREADS", 1))
# Graceful shutdown timeout
dirty_graceful_timeout = int(os.environ.get("DIRTY_GRACEFUL_TIMEOUT", 30))
# =============================================================================
# Timeouts & Limits
# =============================================================================
# Worker timeout (seconds)
timeout = 120
# Keep-alive connections
keepalive = 5
# Maximum requests per worker before recycling
max_requests = 1000
max_requests_jitter = 50
# =============================================================================
# Logging
# =============================================================================
# Log level
loglevel = os.environ.get("LOG_LEVEL", "info")
# Access log format
accesslog = "-"
access_log_format = '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s" %(D)s'
# Error log
errorlog = "-"
# =============================================================================
# Lifecycle Hooks
# =============================================================================
def on_starting(server):
"""Called just before the master process is initialized."""
print("=" * 60)
print("Starting Gunicorn with Dirty Arbiters (Celery Replacement)")
print("=" * 60)
def on_dirty_starting(arbiter):
"""Called when the dirty arbiter is starting."""
print(f"[Dirty] Starting dirty arbiter")
print(f"[Dirty] Registered apps: {list(arbiter.cfg.dirty_apps)}")
def dirty_post_fork(arbiter, worker):
"""Called after a dirty worker is forked."""
print(f"[Dirty] Worker {worker.pid} started")
def dirty_worker_init(worker):
"""Called when a dirty worker initializes its apps."""
print(f"[Dirty] Worker {worker.pid} initialized apps: {list(worker.apps.keys())}")
def dirty_worker_exit(arbiter, worker):
"""Called when a dirty worker exits."""
print(f"[Dirty] Worker {worker.pid} exiting")
def worker_int(worker):
"""Called when a worker receives SIGINT."""
print(f"[HTTP] Worker {worker.pid} interrupted")
def worker_exit(server, worker):
"""Called when a worker exits."""
print(f"[HTTP] Worker {worker.pid} exited")
# =============================================================================
# Development vs Production
# =============================================================================
# Reload on code changes (development only)
reload = os.environ.get("GUNICORN_RELOAD", "false").lower() == "true"
# Preload app for faster worker startup (production)
preload_app = os.environ.get("GUNICORN_PRELOAD", "false").lower() == "true"

View File

@ -0,0 +1,4 @@
# Celery Replacement Example Dependencies
flask>=3.0.0
requests>=2.31.0
pytest>=8.0.0

View File

@ -0,0 +1,35 @@
#!/bin/bash
# Run tests for Celery Replacement example
set -e
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
GUNICORN_ROOT="$(cd "$SCRIPT_DIR/../.." && pwd)"
# Add gunicorn to Python path
export PYTHONPATH="$GUNICORN_ROOT:$PYTHONPATH"
cd "$SCRIPT_DIR"
echo "=========================================="
echo "Running Unit Tests"
echo "=========================================="
python -m pytest tests/test_tasks.py -v --tb=short
echo ""
echo "=========================================="
echo "Unit tests passed!"
echo "=========================================="
# Check if integration tests should run
if [ "$1" == "--integration" ] || [ "$1" == "-i" ]; then
APP_URL="${APP_URL:-http://localhost:8000}"
echo ""
echo "=========================================="
echo "Running Integration Tests against $APP_URL"
echo "=========================================="
python -m pytest tests/test_integration.py -v --tb=short
fi
echo ""
echo "All tests completed successfully!"

View File

@ -0,0 +1,563 @@
"""
Task Workers - Celery Replacement using Gunicorn Dirty Arbiters
This module demonstrates how to replace Celery with Gunicorn's dirty arbiter
feature for background task processing. Key benefits:
1. No external broker (Redis/RabbitMQ) needed - uses Unix sockets
2. Stateful workers - maintain connections, models, caches across requests
3. Integrated with your WSGI/ASGI app - no separate process management
4. Streaming support for progress reporting
5. Per-task-type worker allocation for memory optimization
Comparison with Celery:
- Celery: @app.task decorator -> Dirty: DirtyApp class with methods
- Celery: task.delay() -> Dirty: client.execute()
- Celery: task.apply_async() -> Dirty: client.execute() with timeout
- Celery: task progress -> Dirty: client.stream() with generators
"""
import hashlib
import json
import os
import random
import smtplib
import time
from datetime import datetime
from email.mime.text import MIMEText
from typing import Any, Generator
from gunicorn.dirty.app import DirtyApp
class EmailWorker(DirtyApp):
"""
Email task worker - like Celery's @app.task for email sending.
Maintains SMTP connection pool across requests for efficiency.
In Celery, you'd create a new connection per task or manage it manually.
"""
# Limit to 2 workers since email sending is I/O bound
workers = 2
def __init__(self):
self.smtp_connection = None
self.emails_sent = 0
self.last_connected = None
def init(self):
"""Called once when worker starts - establish SMTP connection."""
self._connect_smtp()
def _connect_smtp(self):
"""Establish SMTP connection (simulated for demo)."""
# In production, connect to real SMTP server:
# self.smtp_connection = smtplib.SMTP('smtp.example.com', 587)
# self.smtp_connection.starttls()
# self.smtp_connection.login(user, password)
self.last_connected = datetime.now().isoformat()
self.smtp_connection = "connected" # Simulated
def __call__(self, action: str, *args, **kwargs) -> Any:
"""Dispatch to action methods."""
method = getattr(self, action, None)
if method is None or action.startswith('_'):
raise ValueError(f"Unknown action: {action}")
return method(*args, **kwargs)
def send_email(self, to: str, subject: str, body: str,
html: bool = False) -> dict:
"""
Send a single email.
Equivalent to Celery:
@app.task
def send_email(to, subject, body):
...
"""
# Simulate email sending delay
time.sleep(random.uniform(0.1, 0.3))
self.emails_sent += 1
return {
"status": "sent",
"to": to,
"subject": subject,
"message_id": f"msg-{self.emails_sent}-{int(time.time())}",
"timestamp": datetime.now().isoformat(),
}
def send_bulk_emails(self, recipients: list, subject: str,
body: str) -> Generator[dict, None, None]:
"""
Send bulk emails with progress streaming.
This is where dirty arbiters shine over Celery - real-time
progress without polling or WebSockets.
Equivalent to Celery:
@app.task(bind=True)
def send_bulk(self, recipients, subject, body):
for i, to in enumerate(recipients):
send_email(to, subject, body)
self.update_state(state='PROGRESS',
meta={'current': i, 'total': len(recipients)})
"""
total = len(recipients)
sent = 0
failed = 0
for i, to in enumerate(recipients):
try:
result = self.send_email(to, subject, body)
sent += 1
yield {
"type": "progress",
"current": i + 1,
"total": total,
"percent": int((i + 1) / total * 100),
"last_sent": to,
"status": "sent",
}
except Exception as e:
failed += 1
yield {
"type": "progress",
"current": i + 1,
"total": total,
"percent": int((i + 1) / total * 100),
"last_sent": to,
"status": "failed",
"error": str(e),
}
# Final summary
yield {
"type": "complete",
"total": total,
"sent": sent,
"failed": failed,
}
def stats(self) -> dict:
"""Get worker statistics."""
return {
"emails_sent": self.emails_sent,
"smtp_connected": self.smtp_connection is not None,
"last_connected": self.last_connected,
"worker_pid": os.getpid(),
}
def close(self):
"""Cleanup on shutdown."""
if self.smtp_connection and self.smtp_connection != "connected":
self.smtp_connection.quit()
class ImageWorker(DirtyApp):
"""
Image processing worker - demonstrates CPU-intensive tasks.
Like Celery tasks for image resizing, thumbnails, watermarks.
Keeps image processing libraries loaded in memory.
"""
# Limit to 2 workers - image processing is memory intensive
workers = 2
def __init__(self):
self.pil_available = False
self.images_processed = 0
def init(self):
"""Load image processing libraries once at startup."""
try:
# Try to import PIL - optional dependency
from PIL import Image
self.pil_available = True
except ImportError:
self.pil_available = False
def __call__(self, action: str, *args, **kwargs) -> Any:
method = getattr(self, action, None)
if method is None or action.startswith('_'):
raise ValueError(f"Unknown action: {action}")
return method(*args, **kwargs)
def resize(self, image_data: str, width: int, height: int) -> dict:
"""
Resize an image.
Equivalent to Celery:
@app.task
def resize_image(image_path, width, height):
img = Image.open(image_path)
img.thumbnail((width, height))
img.save(output_path)
"""
# Simulate image processing
time.sleep(random.uniform(0.2, 0.5))
self.images_processed += 1
# Create a fake "processed" result
# In production, image_data would be base64 decoded
data_size = len(image_data) if isinstance(image_data, str) else len(image_data)
result_hash = hashlib.md5(
f"{data_size}{width}{height}".encode()
).hexdigest()[:16]
return {
"status": "resized",
"original_size": data_size,
"target_dimensions": f"{width}x{height}",
"result_id": f"img-{result_hash}",
"pil_used": self.pil_available,
}
def generate_thumbnail(self, image_data: str, size: int = 150) -> dict:
"""Generate a thumbnail."""
return self.resize(image_data, size, size)
def process_batch(self, images: list, operation: str,
**params) -> Generator[dict, None, None]:
"""
Process multiple images with progress streaming.
"""
total = len(images)
for i, img_info in enumerate(images):
try:
# Simulate fetching image data
image_data = img_info.get("data", b"fake_image_data")
if operation == "resize":
result = self.resize(
image_data,
params.get("width", 800),
params.get("height", 600)
)
elif operation == "thumbnail":
result = self.generate_thumbnail(
image_data,
params.get("size", 150)
)
else:
result = {"error": f"Unknown operation: {operation}"}
yield {
"type": "progress",
"current": i + 1,
"total": total,
"percent": int((i + 1) / total * 100),
"image_id": img_info.get("id", f"img-{i}"),
"result": result,
}
except Exception as e:
yield {
"type": "error",
"current": i + 1,
"total": total,
"image_id": img_info.get("id", f"img-{i}"),
"error": str(e),
}
yield {
"type": "complete",
"total": total,
"processed": self.images_processed,
}
def stats(self) -> dict:
return {
"images_processed": self.images_processed,
"pil_available": self.pil_available,
"worker_pid": os.getpid(),
}
class DataWorker(DirtyApp):
"""
Data processing worker - demonstrates stateful data operations.
Maintains database connections, caches, and processing state.
Perfect for ETL tasks, report generation, data aggregation.
"""
# More workers for data tasks - they're often parallelizable
workers = 4
def __init__(self):
self.cache = {}
self.db_connection = None
self.tasks_completed = 0
def init(self):
"""Initialize database connection and cache."""
# In production: self.db_connection = create_engine(DATABASE_URL)
self.db_connection = "connected"
self.cache = {}
def __call__(self, action: str, *args, **kwargs) -> Any:
method = getattr(self, action, None)
if method is None or action.startswith('_'):
raise ValueError(f"Unknown action: {action}")
return method(*args, **kwargs)
def aggregate(self, data: list, group_by: str,
agg_field: str, agg_func: str = "sum") -> dict:
"""
Aggregate data - like a Celery task for report generation.
Equivalent to Celery:
@app.task
def aggregate_sales(data, group_by, agg_field):
df = pd.DataFrame(data)
return df.groupby(group_by)[agg_field].sum().to_dict()
"""
# Simulate aggregation
time.sleep(random.uniform(0.1, 0.3))
result = {}
for item in data:
key = item.get(group_by, "unknown")
value = item.get(agg_field, 0)
if key not in result:
if agg_func in ("sum", "count"):
result[key] = 0
else:
result[key] = []
if agg_func == "sum":
result[key] += value
elif agg_func == "count":
result[key] += 1
elif agg_func == "list":
result[key].append(value)
self.tasks_completed += 1
return {
"status": "completed",
"group_by": group_by,
"agg_func": agg_func,
"result": result,
"record_count": len(data),
}
def etl_pipeline(self, source_data: list,
transformations: list) -> Generator[dict, None, None]:
"""
Run an ETL pipeline with progress streaming.
This replaces Celery chains/chords for multi-step processing:
chain(extract.s(), transform.s(), load.s())
"""
total_steps = len(transformations) + 2 # +2 for extract and load
current_step = 0
data = source_data
# Extract phase
yield {
"type": "progress",
"phase": "extract",
"step": current_step + 1,
"total_steps": total_steps,
"message": f"Extracting {len(data)} records",
}
time.sleep(0.2) # Simulate extraction
current_step += 1
# Transform phases
for i, transform in enumerate(transformations):
transform_name = transform.get("name", f"transform_{i}")
transform_type = transform.get("type", "passthrough")
yield {
"type": "progress",
"phase": "transform",
"step": current_step + 1,
"total_steps": total_steps,
"message": f"Applying {transform_name}",
}
# Apply transformation
if transform_type == "filter":
field = transform.get("field")
value = transform.get("value")
data = [d for d in data if d.get(field) == value]
elif transform_type == "map":
field = transform.get("field")
func = transform.get("func", "upper")
for d in data:
if field in d and isinstance(d[field], str):
if func == "upper":
d[field] = d[field].upper()
elif func == "lower":
d[field] = d[field].lower()
time.sleep(0.2) # Simulate transformation
current_step += 1
# Load phase
yield {
"type": "progress",
"phase": "load",
"step": current_step + 1,
"total_steps": total_steps,
"message": f"Loading {len(data)} records",
}
time.sleep(0.2) # Simulate loading
self.tasks_completed += 1
# Final result
yield {
"type": "complete",
"records_processed": len(source_data),
"records_output": len(data),
"transformations_applied": len(transformations),
}
def cached_query(self, query_key: str, ttl: int = 300) -> dict:
"""
Execute a cached query - demonstrates stateful caching.
Unlike Celery where you'd use Redis for caching,
the dirty worker maintains its own in-memory cache.
"""
now = time.time()
if query_key in self.cache:
cached = self.cache[query_key]
if now - cached["timestamp"] < ttl:
return {
"status": "cache_hit",
"data": cached["data"],
"cached_at": cached["timestamp"],
"age_seconds": int(now - cached["timestamp"]),
}
# Simulate query execution
time.sleep(random.uniform(0.2, 0.4))
# Generate fake result
result_data = {
"query": query_key,
"rows": random.randint(10, 100),
"computed_at": now,
}
self.cache[query_key] = {
"data": result_data,
"timestamp": now,
}
return {
"status": "cache_miss",
"data": result_data,
"cached_at": now,
}
def stats(self) -> dict:
return {
"tasks_completed": self.tasks_completed,
"cache_size": len(self.cache),
"db_connected": self.db_connection is not None,
"worker_pid": os.getpid(),
}
def close(self):
"""Cleanup on shutdown."""
self.cache.clear()
if self.db_connection and self.db_connection != "connected":
self.db_connection.close()
class ScheduledWorker(DirtyApp):
"""
Scheduled task worker - for periodic/scheduled tasks.
While dirty arbiters don't have built-in scheduling like Celery Beat,
you can call these from a simple cron job or scheduler.
"""
workers = 1 # Single worker for scheduled tasks
def __init__(self):
self.last_runs = {}
self.run_counts = {}
def __call__(self, action: str, *args, **kwargs) -> Any:
method = getattr(self, action, None)
if method is None or action.startswith('_'):
raise ValueError(f"Unknown action: {action}")
# Track runs
self.last_runs[action] = datetime.now().isoformat()
self.run_counts[action] = self.run_counts.get(action, 0) + 1
return method(*args, **kwargs)
def cleanup_old_files(self, directory: str, max_age_days: int = 7) -> dict:
"""
Cleanup old files - like a Celery periodic task.
Equivalent to Celery Beat:
@app.task
def cleanup():
...
app.conf.beat_schedule = {
'cleanup-every-hour': {
'task': 'tasks.cleanup',
'schedule': 3600.0,
},
}
"""
# Simulate cleanup
time.sleep(0.3)
files_deleted = random.randint(0, 10)
return {
"status": "completed",
"directory": directory,
"files_deleted": files_deleted,
"space_freed_mb": files_deleted * random.uniform(0.1, 5.0),
}
def generate_daily_report(self) -> dict:
"""Generate daily report."""
time.sleep(0.5)
return {
"status": "completed",
"report_date": datetime.now().strftime("%Y-%m-%d"),
"metrics": {
"active_users": random.randint(100, 1000),
"new_signups": random.randint(10, 50),
"revenue": random.uniform(1000, 10000),
},
}
def sync_external_data(self, source: str) -> dict:
"""Sync data from external source."""
time.sleep(0.4)
return {
"status": "completed",
"source": source,
"records_synced": random.randint(50, 500),
"sync_time": datetime.now().isoformat(),
}
def stats(self) -> dict:
return {
"last_runs": self.last_runs,
"run_counts": self.run_counts,
"worker_pid": os.getpid(),
}

View File

@ -0,0 +1 @@
# Tests package

View File

@ -0,0 +1,10 @@
"""
Pytest configuration for Celery Replacement tests.
"""
import sys
from pathlib import Path
# Add gunicorn source to path for imports
gunicorn_root = Path(__file__).parent.parent.parent.parent
sys.path.insert(0, str(gunicorn_root))

View File

@ -0,0 +1,422 @@
"""
Integration Tests for Celery Replacement Example
These tests run against the full application with Gunicorn and dirty arbiters.
They can be run locally or in Docker.
Usage:
# Local (with gunicorn running):
APP_URL=http://localhost:8000 pytest tests/test_integration.py -v
# Docker:
docker compose --profile test up --build --abort-on-container-exit
"""
import json
import os
import time
import pytest
import requests
# Get app URL from environment or use default
APP_URL = os.environ.get("APP_URL", "http://localhost:8000")
def wait_for_app(timeout=30):
"""Wait for the application to be ready."""
start = time.time()
while time.time() - start < timeout:
try:
resp = requests.get(f"{APP_URL}/health", timeout=5)
if resp.status_code == 200:
return True
except requests.exceptions.ConnectionError:
pass
time.sleep(1)
return False
@pytest.fixture(scope="module", autouse=True)
def ensure_app_running():
"""Ensure the application is running before tests."""
if not wait_for_app():
pytest.skip("Application not available")
class TestHealthEndpoint:
"""Test health check endpoint."""
def test_health_check(self):
"""Test that health endpoint returns healthy status."""
resp = requests.get(f"{APP_URL}/health")
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "healthy"
assert data["workers"] == "connected"
class TestEmailTasks:
"""Integration tests for email tasks."""
def test_send_single_email(self):
"""Test sending a single email via API."""
resp = requests.post(
f"{APP_URL}/api/email/send",
json={
"to": "test@example.com",
"subject": "Integration Test",
"body": "Hello from integration test",
},
)
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "sent"
assert data["to"] == "test@example.com"
assert "message_id" in data
def test_send_bulk_emails_streaming(self):
"""Test bulk email sending with SSE streaming."""
recipients = ["a@test.com", "b@test.com", "c@test.com"]
resp = requests.post(
f"{APP_URL}/api/email/send-bulk",
json={
"recipients": recipients,
"subject": "Bulk Test",
"body": "Hello all",
},
stream=True,
)
assert resp.status_code == 200
events = []
for line in resp.iter_lines():
if line:
line = line.decode("utf-8")
if line.startswith("data: "):
data = json.loads(line[6:])
events.append(data)
# Should have progress for each recipient + complete
assert len(events) == len(recipients) + 1
# Check progress events
for i, event in enumerate(events[:-1]):
assert event["type"] == "progress"
assert event["current"] == i + 1
# Check complete event
assert events[-1]["type"] == "complete"
assert events[-1]["sent"] == len(recipients)
def test_email_stats(self):
"""Test email worker statistics endpoint."""
# Send an email first
requests.post(
f"{APP_URL}/api/email/send",
json={"to": "x@x.com", "subject": "S", "body": "B"},
)
resp = requests.get(f"{APP_URL}/api/email/stats")
assert resp.status_code == 200
data = resp.json()
assert data["emails_sent"] >= 1
assert data["smtp_connected"] is True
assert "worker_pid" in data
class TestImageTasks:
"""Integration tests for image tasks."""
def test_resize_image(self):
"""Test image resizing via API."""
resp = requests.post(
f"{APP_URL}/api/image/resize",
json={
"image_data": "base64_encoded_image_data",
"width": 800,
"height": 600,
},
)
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "resized"
assert data["target_dimensions"] == "800x600"
def test_generate_thumbnail(self):
"""Test thumbnail generation via API."""
resp = requests.post(
f"{APP_URL}/api/image/thumbnail",
json={
"image_data": "base64_image",
"size": 150,
},
)
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "resized"
assert data["target_dimensions"] == "150x150"
def test_batch_processing_streaming(self):
"""Test batch image processing with streaming."""
images = [
{"id": "img1", "data": "data1"},
{"id": "img2", "data": "data2"},
]
resp = requests.post(
f"{APP_URL}/api/image/process-batch",
json={
"images": images,
"operation": "resize",
"width": 400,
"height": 300,
},
stream=True,
)
assert resp.status_code == 200
events = []
for line in resp.iter_lines():
if line:
line = line.decode("utf-8")
if line.startswith("data: "):
events.append(json.loads(line[6:]))
assert len(events) == len(images) + 1
assert events[-1]["type"] == "complete"
def test_image_stats(self):
"""Test image worker statistics."""
resp = requests.get(f"{APP_URL}/api/image/stats")
assert resp.status_code == 200
data = resp.json()
assert "images_processed" in data
assert "worker_pid" in data
class TestDataTasks:
"""Integration tests for data processing tasks."""
def test_aggregate_data(self):
"""Test data aggregation via API."""
resp = requests.post(
f"{APP_URL}/api/data/aggregate",
json={
"data": [
{"category": "A", "value": 10},
{"category": "B", "value": 20},
{"category": "A", "value": 30},
],
"group_by": "category",
"agg_field": "value",
"agg_func": "sum",
},
)
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "completed"
assert data["result"]["A"] == 40
assert data["result"]["B"] == 20
def test_etl_pipeline_streaming(self):
"""Test ETL pipeline with streaming progress."""
resp = requests.post(
f"{APP_URL}/api/data/etl",
json={
"source_data": [
{"name": "alice", "status": "active"},
{"name": "bob", "status": "inactive"},
{"name": "charlie", "status": "active"},
],
"transformations": [
{"name": "filter", "type": "filter",
"field": "status", "value": "active"},
],
},
stream=True,
)
assert resp.status_code == 200
events = []
for line in resp.iter_lines():
if line:
line = line.decode("utf-8")
if line.startswith("data: "):
events.append(json.loads(line[6:]))
# extract + transform + load + complete
assert len(events) == 4
# Check phases
phases = [e.get("phase") for e in events[:-1]]
assert "extract" in phases
assert "transform" in phases
assert "load" in phases
# Final result
assert events[-1]["type"] == "complete"
assert events[-1]["records_output"] == 2
def test_cached_query(self):
"""Test cached query functionality."""
query_key = f"test_query_{time.time()}"
# First call - cache miss
resp1 = requests.post(
f"{APP_URL}/api/data/query",
json={"query_key": query_key, "ttl": 300},
)
assert resp1.status_code == 200
assert resp1.json()["status"] == "cache_miss"
# Second call - cache hit
resp2 = requests.post(
f"{APP_URL}/api/data/query",
json={"query_key": query_key, "ttl": 300},
)
assert resp2.status_code == 200
assert resp2.json()["status"] == "cache_hit"
def test_data_stats(self):
"""Test data worker statistics."""
resp = requests.get(f"{APP_URL}/api/data/stats")
assert resp.status_code == 200
data = resp.json()
assert "tasks_completed" in data
assert "cache_size" in data
class TestScheduledTasks:
"""Integration tests for scheduled tasks."""
def test_cleanup_task(self):
"""Test cleanup task execution."""
resp = requests.post(
f"{APP_URL}/api/scheduled/cleanup",
json={"directory": "/tmp/test", "max_age_days": 7},
)
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "completed"
assert "files_deleted" in data
def test_daily_report(self):
"""Test daily report generation."""
resp = requests.post(f"{APP_URL}/api/scheduled/daily-report")
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "completed"
assert "metrics" in data
def test_sync_task(self):
"""Test data sync task."""
resp = requests.post(
f"{APP_URL}/api/scheduled/sync",
json={"source": "test_source"},
)
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "completed"
assert data["source"] == "test_source"
def test_scheduled_stats(self):
"""Test scheduled worker statistics."""
# Run a task first
requests.post(f"{APP_URL}/api/scheduled/daily-report")
resp = requests.get(f"{APP_URL}/api/scheduled/stats")
assert resp.status_code == 200
data = resp.json()
assert "run_counts" in data
assert "generate_daily_report" in data["run_counts"]
class TestConcurrency:
"""Test concurrent task execution."""
def test_concurrent_requests(self):
"""Test that multiple concurrent requests are handled."""
import concurrent.futures
def send_email():
return requests.post(
f"{APP_URL}/api/email/send",
json={"to": "x@x.com", "subject": "Concurrent", "body": "Test"},
)
# Send 10 concurrent requests
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(send_email) for _ in range(10)]
results = [f.result() for f in futures]
# All should succeed
assert all(r.status_code == 200 for r in results)
assert all(r.json()["status"] == "sent" for r in results)
def test_mixed_task_types(self):
"""Test different task types running concurrently."""
import concurrent.futures
def email_task():
return requests.post(
f"{APP_URL}/api/email/send",
json={"to": "x@x.com", "subject": "S", "body": "B"},
)
def image_task():
return requests.post(
f"{APP_URL}/api/image/resize",
json={"image_data": "x", "width": 100, "height": 100},
)
def data_task():
return requests.post(
f"{APP_URL}/api/data/aggregate",
json={
"data": [{"a": 1}],
"group_by": "a",
"agg_field": "a",
"agg_func": "sum",
},
)
with concurrent.futures.ThreadPoolExecutor(max_workers=9) as executor:
futures = []
for _ in range(3):
futures.append(executor.submit(email_task))
futures.append(executor.submit(image_task))
futures.append(executor.submit(data_task))
results = [f.result() for f in futures]
# All should succeed
assert all(r.status_code == 200 for r in results)
class TestErrorHandling:
"""Test error handling scenarios."""
def test_invalid_action(self):
"""Test that invalid actions return appropriate errors."""
# This would require modifying the API to expose raw execute
# For now, we test via a malformed request
resp = requests.post(
f"{APP_URL}/api/email/send",
json={}, # Missing required fields
)
# Should get a 500 or validation error
assert resp.status_code in [400, 500]

View File

@ -0,0 +1,304 @@
"""
Unit Tests for Task Workers
These tests verify the task worker logic without running Gunicorn.
They test the DirtyApp classes directly.
"""
import pytest
from examples.celery_alternative.tasks import (
EmailWorker,
ImageWorker,
DataWorker,
ScheduledWorker,
)
class TestEmailWorker:
"""Tests for EmailWorker task class."""
def setup_method(self):
"""Set up test fixtures."""
self.worker = EmailWorker()
self.worker.init()
def test_send_email(self):
"""Test sending a single email."""
result = self.worker("send_email",
to="test@example.com",
subject="Test",
body="Hello")
assert result["status"] == "sent"
assert result["to"] == "test@example.com"
assert result["subject"] == "Test"
assert "message_id" in result
assert "timestamp" in result
def test_send_email_increments_counter(self):
"""Test that email counter increments."""
initial_count = self.worker.emails_sent
self.worker("send_email", to="a@x.com", subject="S", body="B")
self.worker("send_email", to="b@x.com", subject="S", body="B")
assert self.worker.emails_sent == initial_count + 2
def test_send_bulk_emails_streaming(self):
"""Test bulk email sending with progress streaming."""
recipients = ["a@x.com", "b@x.com", "c@x.com"]
results = list(self.worker("send_bulk_emails",
recipients=recipients,
subject="Bulk",
body="Hello all"))
# Should have progress updates + final complete
assert len(results) == len(recipients) + 1
# Check progress updates
for i, r in enumerate(results[:-1]):
assert r["type"] == "progress"
assert r["current"] == i + 1
assert r["total"] == len(recipients)
# Check final result
final = results[-1]
assert final["type"] == "complete"
assert final["total"] == len(recipients)
assert final["sent"] == len(recipients)
def test_stats(self):
"""Test worker statistics."""
self.worker("send_email", to="x@x.com", subject="S", body="B")
stats = self.worker("stats")
assert stats["emails_sent"] >= 1
assert stats["smtp_connected"] is True
assert "worker_pid" in stats
def test_unknown_action_raises(self):
"""Test that unknown actions raise ValueError."""
with pytest.raises(ValueError, match="Unknown action"):
self.worker("nonexistent_action")
def test_private_method_raises(self):
"""Test that private methods cannot be called."""
with pytest.raises(ValueError, match="Unknown action"):
self.worker("_connect_smtp")
class TestImageWorker:
"""Tests for ImageWorker task class."""
def setup_method(self):
"""Set up test fixtures."""
self.worker = ImageWorker()
self.worker.init()
def test_resize_image(self):
"""Test image resizing."""
result = self.worker("resize",
image_data="fake_image_data",
width=800,
height=600)
assert result["status"] == "resized"
assert result["target_dimensions"] == "800x600"
assert "result_id" in result
def test_generate_thumbnail(self):
"""Test thumbnail generation."""
result = self.worker("generate_thumbnail",
image_data="fake_image_data",
size=150)
assert result["status"] == "resized"
assert result["target_dimensions"] == "150x150"
def test_process_batch_streaming(self):
"""Test batch processing with progress streaming."""
images = [
{"id": "img1", "data": b"data1"},
{"id": "img2", "data": b"data2"},
{"id": "img3", "data": b"data3"},
]
results = list(self.worker("process_batch",
images=images,
operation="resize",
width=800,
height=600))
# Progress for each image + complete
assert len(results) == len(images) + 1
# Check progress updates
for i, r in enumerate(results[:-1]):
assert r["type"] == "progress"
assert r["image_id"] == f"img{i+1}"
assert "result" in r
# Check final result
final = results[-1]
assert final["type"] == "complete"
def test_stats(self):
"""Test worker statistics."""
self.worker("resize", image_data=b"x", width=100, height=100)
stats = self.worker("stats")
assert stats["images_processed"] >= 1
assert "pil_available" in stats
assert "worker_pid" in stats
class TestDataWorker:
"""Tests for DataWorker task class."""
def setup_method(self):
"""Set up test fixtures."""
self.worker = DataWorker()
self.worker.init()
def test_aggregate_sum(self):
"""Test data aggregation with sum."""
data = [
{"category": "A", "value": 10},
{"category": "B", "value": 20},
{"category": "A", "value": 30},
]
result = self.worker("aggregate",
data=data,
group_by="category",
agg_field="value",
agg_func="sum")
assert result["status"] == "completed"
assert result["result"]["A"] == 40
assert result["result"]["B"] == 20
def test_aggregate_count(self):
"""Test data aggregation with count."""
data = [
{"category": "A", "value": 10},
{"category": "B", "value": 20},
{"category": "A", "value": 30},
]
result = self.worker("aggregate",
data=data,
group_by="category",
agg_field="value",
agg_func="count")
assert result["result"]["A"] == 2
assert result["result"]["B"] == 1
def test_etl_pipeline_streaming(self):
"""Test ETL pipeline with progress streaming."""
source_data = [
{"name": "alice", "status": "active"},
{"name": "bob", "status": "inactive"},
{"name": "charlie", "status": "active"},
]
transformations = [
{"name": "filter_active", "type": "filter",
"field": "status", "value": "active"},
{"name": "uppercase", "type": "map",
"field": "name", "func": "upper"},
]
results = list(self.worker("etl_pipeline",
source_data=source_data,
transformations=transformations))
# extract + transforms + load + complete
expected_steps = 1 + len(transformations) + 1 + 1
assert len(results) == expected_steps
# Check phases
assert results[0]["phase"] == "extract"
assert results[1]["phase"] == "transform"
assert results[2]["phase"] == "transform"
assert results[3]["phase"] == "load"
assert results[4]["type"] == "complete"
# Final should have 2 records (filtered)
assert results[4]["records_output"] == 2
def test_cached_query_miss_then_hit(self):
"""Test query caching - miss then hit."""
# First call - cache miss
result1 = self.worker("cached_query", query_key="test_query", ttl=300)
assert result1["status"] == "cache_miss"
# Second call - cache hit
result2 = self.worker("cached_query", query_key="test_query", ttl=300)
assert result2["status"] == "cache_hit"
def test_stats(self):
"""Test worker statistics."""
self.worker("aggregate",
data=[{"a": 1}],
group_by="a",
agg_field="a")
stats = self.worker("stats")
assert stats["tasks_completed"] >= 1
assert "cache_size" in stats
assert stats["db_connected"] is True
class TestScheduledWorker:
"""Tests for ScheduledWorker task class."""
def setup_method(self):
"""Set up test fixtures."""
self.worker = ScheduledWorker()
def test_cleanup_old_files(self):
"""Test file cleanup task."""
result = self.worker("cleanup_old_files",
directory="/tmp/test",
max_age_days=7)
assert result["status"] == "completed"
assert result["directory"] == "/tmp/test"
assert "files_deleted" in result
assert "space_freed_mb" in result
def test_generate_daily_report(self):
"""Test daily report generation."""
result = self.worker("generate_daily_report")
assert result["status"] == "completed"
assert "report_date" in result
assert "metrics" in result
assert "active_users" in result["metrics"]
assert "new_signups" in result["metrics"]
assert "revenue" in result["metrics"]
def test_sync_external_data(self):
"""Test external data sync."""
result = self.worker("sync_external_data", source="test_api")
assert result["status"] == "completed"
assert result["source"] == "test_api"
assert "records_synced" in result
def test_stats_tracks_runs(self):
"""Test that stats tracks task runs."""
self.worker("cleanup_old_files", directory="/tmp", max_age_days=1)
self.worker("cleanup_old_files", directory="/tmp", max_age_days=1)
self.worker("generate_daily_report")
stats = self.worker("stats")
assert stats["run_counts"]["cleanup_old_files"] == 2
assert stats["run_counts"]["generate_daily_report"] == 1
assert "cleanup_old_files" in stats["last_runs"]