diff --git a/examples/celery_alternative/Dockerfile b/examples/celery_alternative/Dockerfile new file mode 100644 index 00000000..ed160486 --- /dev/null +++ b/examples/celery_alternative/Dockerfile @@ -0,0 +1,47 @@ +# Dockerfile for Celery Replacement Example +# +# This demonstrates running a production-ready application with +# Gunicorn dirty arbiters replacing Celery for background tasks. +# +# Key difference from Celery deployment: +# - Celery: Needs separate web + worker containers + Redis/RabbitMQ +# - Dirty: Single container handles both HTTP and background tasks + +FROM python:3.12-slim + +# Set working directory +WORKDIR /app + +# Install system dependencies +RUN apt-get update && apt-get install -y --no-install-recommends \ + curl \ + && rm -rf /var/lib/apt/lists/* + +# Copy gunicorn source and install (from build context root) +COPY . /gunicorn-src +RUN pip install --no-cache-dir /gunicorn-src + +# Copy example application +COPY examples/celery_alternative /app +RUN pip install --no-cache-dir flask + +# Environment variables +ENV PYTHONUNBUFFERED=1 +ENV PYTHONDONTWRITEBYTECODE=1 +ENV PYTHONPATH=/gunicorn-src +ENV GUNICORN_BIND=0.0.0.0:8000 +ENV GUNICORN_WORKERS=4 +ENV GUNICORN_THREADS=4 +ENV DIRTY_WORKERS=9 +ENV DIRTY_TIMEOUT=300 +ENV LOG_LEVEL=info + +# Expose port +EXPOSE 8000 + +# Health check +HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ + CMD curl -f http://localhost:8000/health || exit 1 + +# Run gunicorn with dirty arbiters +CMD ["gunicorn", "-c", "gunicorn_conf.py", "app:app"] diff --git a/examples/celery_alternative/README.md b/examples/celery_alternative/README.md new file mode 100644 index 00000000..d6b4daa0 --- /dev/null +++ b/examples/celery_alternative/README.md @@ -0,0 +1,240 @@ +# Celery Replacement Example + +This example demonstrates how to replace Celery with Gunicorn's **dirty arbiters** for background task processing. + +## Why Replace Celery? + +| Aspect | Celery | Dirty Arbiters | +|--------|--------|----------------| +| Dependencies | Redis/RabbitMQ + Celery | None (built into Gunicorn) | +| Deployment | Multiple processes/containers | Single process | +| State | Stateless workers | Stateful workers (keep models loaded) | +| Progress | Polling or WebSocket | Native streaming | +| Configuration | Separate config | Same gunicorn.conf.py | + +## Quick Start + +### Local Development + +```bash +# Install dependencies +pip install flask requests pytest +pip install -e ../.. # Install gunicorn from source + +# Run the application +gunicorn -c gunicorn_conf.py app:app + +# In another terminal, test it +curl http://localhost:8000/health +curl -X POST http://localhost:8000/api/email/send \ + -H "Content-Type: application/json" \ + -d '{"to": "test@example.com", "subject": "Hello", "body": "World"}' +``` + +### Docker + +```bash +# Build and run +docker compose up --build + +# Run with tests +docker compose --profile test up --build --abort-on-container-exit +``` + +## Architecture + +``` +┌─────────────────────────────────────────────────────────────┐ +│ Gunicorn Master │ +├─────────────────────────────────────────────────────────────┤ +│ │ +│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ +│ │ HTTP Worker │ │ HTTP Worker │ │ HTTP Worker │ ... │ +│ │ (gthread) │ │ (gthread) │ │ (gthread) │ │ +│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │ +│ │ │ │ │ +│ └────────────────┼────────────────┘ │ +│ │ │ +│ Unix Socket IPC │ +│ │ │ +│ ┌────────────────┼────────────────┐ │ +│ │ │ │ │ +│ ┌──────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐ │ +│ │ EmailWorker │ │ ImageWorker │ │ DataWorker │ ... │ +│ │ (2 procs) │ │ (2 procs) │ │ (4 procs) │ │ +│ └─────────────┘ └─────────────┘ └─────────────┘ │ +│ │ +│ Dirty Arbiter │ +└─────────────────────────────────────────────────────────────┘ +``` + +## Task Workers + +### EmailWorker +- `send_email(to, subject, body)` - Send single email +- `send_bulk_emails(recipients, subject, body)` - Bulk send with progress streaming +- `stats()` - Worker statistics + +### ImageWorker +- `resize(image_data, width, height)` - Resize image +- `generate_thumbnail(image_data, size)` - Generate thumbnail +- `process_batch(images, operation, **params)` - Batch process with streaming + +### DataWorker +- `aggregate(data, group_by, agg_field, agg_func)` - Aggregate data +- `etl_pipeline(source_data, transformations)` - ETL with progress streaming +- `cached_query(query_key, ttl)` - Cached query execution + +### ScheduledWorker +- `cleanup_old_files(directory, max_age_days)` - File cleanup +- `generate_daily_report()` - Daily report generation +- `sync_external_data(source)` - External data sync + +## API Endpoints + +### Email +- `POST /api/email/send` - Send single email +- `POST /api/email/send-bulk` - Bulk send (SSE streaming) +- `GET /api/email/stats` - Worker stats + +### Image +- `POST /api/image/resize` - Resize image +- `POST /api/image/thumbnail` - Generate thumbnail +- `POST /api/image/process-batch` - Batch process (SSE streaming) +- `GET /api/image/stats` - Worker stats + +### Data +- `POST /api/data/aggregate` - Aggregate data +- `POST /api/data/etl` - ETL pipeline (SSE streaming) +- `POST /api/data/query` - Cached query +- `GET /api/data/stats` - Worker stats + +### Scheduled +- `POST /api/scheduled/cleanup` - Run cleanup +- `POST /api/scheduled/daily-report` - Generate report +- `POST /api/scheduled/sync` - Sync data +- `GET /api/scheduled/stats` - Worker stats + +## Streaming Progress Example + +```python +import requests +import json + +# Start bulk email with streaming progress +resp = requests.post( + "http://localhost:8000/api/email/send-bulk", + json={ + "recipients": ["a@x.com", "b@x.com", "c@x.com"], + "subject": "Newsletter", + "body": "Hello!", + }, + stream=True, +) + +for line in resp.iter_lines(): + if line and line.startswith(b"data: "): + progress = json.loads(line[6:]) + if progress["type"] == "progress": + print(f"Progress: {progress['percent']}%") + elif progress["type"] == "complete": + print(f"Done! Sent: {progress['sent']}, Failed: {progress['failed']}") +``` + +## Celery Migration Guide + +### Before (Celery) + +```python +# tasks.py +from celery import Celery + +app = Celery('tasks', broker='redis://localhost') + +@app.task +def send_email(to, subject, body): + # Send email + return {"status": "sent"} + +@app.task(bind=True) +def send_bulk(self, recipients, subject, body): + for i, to in enumerate(recipients): + send_email(to, subject, body) + self.update_state(state='PROGRESS', meta={'current': i}) +``` + +```python +# views.py +from tasks import send_email, send_bulk + +def send_view(request): + send_email.delay(to, subject, body) # Async + return {"status": "queued"} +``` + +### After (Dirty Arbiters) + +```python +# tasks.py +from gunicorn.dirty.app import DirtyApp + +class EmailWorker(DirtyApp): + workers = 2 # Limit workers + + def init(self): + self.smtp = connect_smtp() # Stateful! + + def __call__(self, action, *args, **kwargs): + return getattr(self, action)(*args, **kwargs) + + def send_email(self, to, subject, body): + return {"status": "sent"} + + def send_bulk(self, recipients, subject, body): + for i, to in enumerate(recipients): + self.send_email(to, subject, body) + yield {"type": "progress", "current": i} # Native streaming! +``` + +```python +# views.py +from gunicorn.dirty import get_dirty_client + +def send_view(request): + client = get_dirty_client() + result = client.execute("tasks:EmailWorker", "send_email", to, subject, body) + return result # Sync result, no polling! +``` + +## Configuration + +```python +# gunicorn_conf.py + +# HTTP workers +workers = 4 +worker_class = "gthread" +threads = 4 + +# Task workers (replace Celery) +dirty_apps = [ + "tasks:EmailWorker", + "tasks:ImageWorker", + "tasks:DataWorker", +] +dirty_workers = 9 +dirty_timeout = 300 +``` + +## Running Tests + +```bash +# Unit tests (no server needed) +pytest tests/test_tasks.py -v + +# Integration tests (server must be running) +APP_URL=http://localhost:8000 pytest tests/test_integration.py -v + +# All tests via Docker +docker compose --profile test up --build --abort-on-container-exit +``` diff --git a/examples/celery_alternative/app.py b/examples/celery_alternative/app.py new file mode 100644 index 00000000..c190c88f --- /dev/null +++ b/examples/celery_alternative/app.py @@ -0,0 +1,460 @@ +""" +Web Application - Flask app demonstrating Celery replacement. + +This shows how to call dirty arbiter tasks from your web application, +replacing Celery's task.delay() and task.apply_async() patterns. +""" + +import json +import os +from flask import Flask, request, jsonify, Response, stream_with_context + +from gunicorn.dirty import get_dirty_client +from gunicorn.dirty.errors import ( + DirtyError, + DirtyTimeoutError, + DirtyAppNotFoundError, +) + +app = Flask(__name__) + +# Task worker import paths (like Celery task names) +EMAIL_WORKER = "examples.celery_alternative.tasks:EmailWorker" +IMAGE_WORKER = "examples.celery_alternative.tasks:ImageWorker" +DATA_WORKER = "examples.celery_alternative.tasks:DataWorker" +SCHEDULED_WORKER = "examples.celery_alternative.tasks:ScheduledWorker" + + +def get_client(): + """Get the dirty client for calling task workers.""" + return get_dirty_client() + + +# ============================================================================ +# Email Tasks - Like Celery email tasks +# ============================================================================ + +@app.route("/api/email/send", methods=["POST"]) +def send_email(): + """ + Send a single email. + + Celery equivalent: + send_email.delay(to, subject, body) + + Request: + POST /api/email/send + {"to": "user@example.com", "subject": "Hello", "body": "World"} + """ + data = request.get_json() + + try: + client = get_client() + result = client.execute( + EMAIL_WORKER, + "send_email", + to=data["to"], + subject=data["subject"], + body=data["body"], + html=data.get("html", False), + ) + return jsonify(result) + except DirtyTimeoutError: + return jsonify({"error": "Task timed out"}), 504 + except DirtyError as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/api/email/send-bulk", methods=["POST"]) +def send_bulk_emails(): + """ + Send bulk emails with streaming progress. + + Celery equivalent: + result = send_bulk.apply_async([recipients, subject, body]) + while not result.ready(): + print(result.info) # Progress polling + + With dirty arbiters, progress is streamed in real-time! + + Request: + POST /api/email/send-bulk + {"recipients": ["a@x.com", "b@x.com"], "subject": "Hi", "body": "Hello"} + """ + data = request.get_json() + + def generate(): + try: + client = get_client() + for progress in client.stream( + EMAIL_WORKER, + "send_bulk_emails", + recipients=data["recipients"], + subject=data["subject"], + body=data["body"], + ): + yield f"data: {json.dumps(progress)}\n\n" + except DirtyError as e: + yield f"data: {json.dumps({'error': str(e)})}\n\n" + + return Response( + stream_with_context(generate()), + mimetype="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "X-Accel-Buffering": "no", + }, + ) + + +@app.route("/api/email/stats") +def email_stats(): + """Get email worker statistics.""" + try: + client = get_client() + result = client.execute(EMAIL_WORKER, "stats") + return jsonify(result) + except DirtyError as e: + return jsonify({"error": str(e)}), 500 + + +# ============================================================================ +# Image Tasks - Like Celery image processing tasks +# ============================================================================ + +@app.route("/api/image/resize", methods=["POST"]) +def resize_image(): + """ + Resize an image. + + Celery equivalent: + resize_image.delay(image_data, width, height) + + Request: + POST /api/image/resize + {"image_data": "base64...", "width": 800, "height": 600} + """ + data = request.get_json() + + # Keep image_data as string (base64 encoded) for JSON serialization + image_data = data.get("image_data", "") + + try: + client = get_client() + result = client.execute( + IMAGE_WORKER, + "resize", + image_data=image_data, + width=data.get("width", 800), + height=data.get("height", 600), + ) + return jsonify(result) + except DirtyError as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/api/image/thumbnail", methods=["POST"]) +def generate_thumbnail(): + """Generate a thumbnail.""" + data = request.get_json() + image_data = data.get("image_data", "") + + try: + client = get_client() + result = client.execute( + IMAGE_WORKER, + "generate_thumbnail", + image_data=image_data, + size=data.get("size", 150), + ) + return jsonify(result) + except DirtyError as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/api/image/process-batch", methods=["POST"]) +def process_image_batch(): + """ + Process multiple images with progress streaming. + + Request: + POST /api/image/process-batch + { + "images": [{"id": "img1", "data": "..."}, ...], + "operation": "resize", + "width": 800, + "height": 600 + } + """ + data = request.get_json() + + def generate(): + try: + client = get_client() + for progress in client.stream( + IMAGE_WORKER, + "process_batch", + images=data["images"], + operation=data.get("operation", "resize"), + width=data.get("width", 800), + height=data.get("height", 600), + size=data.get("size", 150), + ): + yield f"data: {json.dumps(progress)}\n\n" + except DirtyError as e: + yield f"data: {json.dumps({'error': str(e)})}\n\n" + + return Response( + stream_with_context(generate()), + mimetype="text/event-stream", + ) + + +@app.route("/api/image/stats") +def image_stats(): + """Get image worker statistics.""" + try: + client = get_client() + result = client.execute(IMAGE_WORKER, "stats") + return jsonify(result) + except DirtyError as e: + return jsonify({"error": str(e)}), 500 + + +# ============================================================================ +# Data Tasks - Like Celery data processing tasks +# ============================================================================ + +@app.route("/api/data/aggregate", methods=["POST"]) +def aggregate_data(): + """ + Aggregate data. + + Celery equivalent: + aggregate_data.delay(data, group_by, agg_field, agg_func) + + Request: + POST /api/data/aggregate + { + "data": [{"category": "A", "value": 10}, ...], + "group_by": "category", + "agg_field": "value", + "agg_func": "sum" + } + """ + data = request.get_json() + + try: + client = get_client() + result = client.execute( + DATA_WORKER, + "aggregate", + data=data["data"], + group_by=data["group_by"], + agg_field=data["agg_field"], + agg_func=data.get("agg_func", "sum"), + ) + return jsonify(result) + except DirtyError as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/api/data/etl", methods=["POST"]) +def run_etl(): + """ + Run ETL pipeline with streaming progress. + + Celery equivalent: + chain(extract.s(), transform.s(), load.s()).apply_async() + + Request: + POST /api/data/etl + { + "source_data": [...], + "transformations": [ + {"name": "filter_active", "type": "filter", "field": "status", "value": "active"}, + {"name": "uppercase_name", "type": "map", "field": "name", "func": "upper"} + ] + } + """ + data = request.get_json() + + def generate(): + try: + client = get_client() + for progress in client.stream( + DATA_WORKER, + "etl_pipeline", + source_data=data["source_data"], + transformations=data.get("transformations", []), + ): + yield f"data: {json.dumps(progress)}\n\n" + except DirtyError as e: + yield f"data: {json.dumps({'error': str(e)})}\n\n" + + return Response( + stream_with_context(generate()), + mimetype="text/event-stream", + ) + + +@app.route("/api/data/query", methods=["POST"]) +def cached_query(): + """ + Execute a cached query. + + Request: + POST /api/data/query + {"query_key": "sales_2024", "ttl": 300} + """ + data = request.get_json() + + try: + client = get_client() + result = client.execute( + DATA_WORKER, + "cached_query", + query_key=data["query_key"], + ttl=data.get("ttl", 300), + ) + return jsonify(result) + except DirtyError as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/api/data/stats") +def data_stats(): + """Get data worker statistics.""" + try: + client = get_client() + result = client.execute(DATA_WORKER, "stats") + return jsonify(result) + except DirtyError as e: + return jsonify({"error": str(e)}), 500 + + +# ============================================================================ +# Scheduled Tasks - Like Celery Beat tasks +# ============================================================================ + +@app.route("/api/scheduled/cleanup", methods=["POST"]) +def run_cleanup(): + """ + Run cleanup task (normally triggered by cron). + + Request: + POST /api/scheduled/cleanup + {"directory": "/tmp/uploads", "max_age_days": 7} + """ + data = request.get_json() or {} + + try: + client = get_client() + result = client.execute( + SCHEDULED_WORKER, + "cleanup_old_files", + directory=data.get("directory", "/tmp"), + max_age_days=data.get("max_age_days", 7), + ) + return jsonify(result) + except DirtyError as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/api/scheduled/daily-report", methods=["POST"]) +def run_daily_report(): + """Generate daily report.""" + try: + client = get_client() + result = client.execute(SCHEDULED_WORKER, "generate_daily_report") + return jsonify(result) + except DirtyError as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/api/scheduled/sync", methods=["POST"]) +def run_sync(): + """ + Sync external data. + + Request: + POST /api/scheduled/sync + {"source": "external_api"} + """ + data = request.get_json() or {} + + try: + client = get_client() + result = client.execute( + SCHEDULED_WORKER, + "sync_external_data", + source=data.get("source", "default"), + ) + return jsonify(result) + except DirtyError as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/api/scheduled/stats") +def scheduled_stats(): + """Get scheduled worker statistics.""" + try: + client = get_client() + result = client.execute(SCHEDULED_WORKER, "stats") + return jsonify(result) + except DirtyError as e: + return jsonify({"error": str(e)}), 500 + + +# ============================================================================ +# Health & Info Endpoints +# ============================================================================ + +@app.route("/") +def index(): + """API documentation.""" + return jsonify({ + "name": "Celery Replacement Demo", + "description": "Demonstrating Gunicorn dirty arbiters as Celery replacement", + "endpoints": { + "email": { + "POST /api/email/send": "Send single email", + "POST /api/email/send-bulk": "Send bulk emails (streaming)", + "GET /api/email/stats": "Email worker stats", + }, + "image": { + "POST /api/image/resize": "Resize image", + "POST /api/image/thumbnail": "Generate thumbnail", + "POST /api/image/process-batch": "Batch process (streaming)", + "GET /api/image/stats": "Image worker stats", + }, + "data": { + "POST /api/data/aggregate": "Aggregate data", + "POST /api/data/etl": "Run ETL pipeline (streaming)", + "POST /api/data/query": "Cached query", + "GET /api/data/stats": "Data worker stats", + }, + "scheduled": { + "POST /api/scheduled/cleanup": "Run cleanup", + "POST /api/scheduled/daily-report": "Generate report", + "POST /api/scheduled/sync": "Sync external data", + "GET /api/scheduled/stats": "Scheduled worker stats", + }, + }, + }) + + +@app.route("/health") +def health(): + """Health check endpoint.""" + try: + client = get_client() + # Quick ping to verify workers are running + client.execute(EMAIL_WORKER, "stats") + return jsonify({"status": "healthy", "workers": "connected"}) + except DirtyError: + return jsonify({"status": "degraded", "workers": "unavailable"}), 503 + + +if __name__ == "__main__": + app.run(debug=True, port=8000) diff --git a/examples/celery_alternative/docker-compose.yml b/examples/celery_alternative/docker-compose.yml new file mode 100644 index 00000000..fda0a7a9 --- /dev/null +++ b/examples/celery_alternative/docker-compose.yml @@ -0,0 +1,79 @@ +# Docker Compose for Celery Replacement Example +# +# Notice: Only ONE service needed! +# Compare with typical Celery deployment which requires: +# - web (gunicorn/uvicorn) +# - celery_worker +# - celery_beat (for scheduled tasks) +# - redis or rabbitmq +# +# With dirty arbiters, everything runs in a single container. + +services: + app: + build: + context: ../.. # Gunicorn repo root + dockerfile: examples/celery_alternative/Dockerfile + ports: + - "8000:8000" + environment: + - GUNICORN_WORKERS=4 + - GUNICORN_THREADS=4 + - DIRTY_WORKERS=9 + - DIRTY_TIMEOUT=300 + - LOG_LEVEL=info + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8000/health"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 10s + # Resource limits (optional) + deploy: + resources: + limits: + memory: 1G + reservations: + memory: 256M + + # Test runner service + tests: + build: + context: ../.. + dockerfile: examples/celery_alternative/Dockerfile + depends_on: + app: + condition: service_healthy + environment: + - APP_URL=http://app:8000 + command: ["python", "-m", "pytest", "tests/", "-v", "--tb=short"] + profiles: + - test + +# For comparison, here's what a Celery deployment would look like: +# +# services: +# web: +# build: . +# command: gunicorn app:app -b 0.0.0.0:8000 +# ports: +# - "8000:8000" +# depends_on: +# - redis +# +# celery_worker: +# build: . +# command: celery -A tasks worker -l info +# depends_on: +# - redis +# +# celery_beat: +# build: . +# command: celery -A tasks beat -l info +# depends_on: +# - redis +# +# redis: +# image: redis:alpine +# ports: +# - "6379:6379" diff --git a/examples/celery_alternative/gunicorn_conf.py b/examples/celery_alternative/gunicorn_conf.py new file mode 100644 index 00000000..26bea423 --- /dev/null +++ b/examples/celery_alternative/gunicorn_conf.py @@ -0,0 +1,141 @@ +""" +Gunicorn Configuration - Celery Replacement Example + +This configuration sets up: +1. HTTP workers (gthread) to handle web requests +2. Dirty workers to handle background tasks (replacing Celery workers) + +Comparison with Celery deployment: +- Celery: gunicorn app:app + celery -A tasks worker +- Dirty: gunicorn -c gunicorn_conf.py app:app (single command!) +""" + +import multiprocessing +import os + +# ============================================================================= +# Basic Settings +# ============================================================================= + +# Bind to all interfaces on port 8000 +bind = os.environ.get("GUNICORN_BIND", "0.0.0.0:8000") + +# HTTP workers - handle incoming web requests +# Rule of thumb: 2-4 x CPU cores for I/O bound apps +workers = int(os.environ.get("GUNICORN_WORKERS", multiprocessing.cpu_count() * 2 + 1)) + +# Use gthread worker for better concurrency +worker_class = "gthread" + +# Threads per worker - good for I/O bound operations +threads = int(os.environ.get("GUNICORN_THREADS", 4)) + +# ============================================================================= +# Dirty Arbiter Settings (Celery Worker Replacement) +# ============================================================================= + +# Task workers - these replace Celery workers +# Each dirty app can specify its own worker count via the `workers` class attribute +dirty_apps = [ + # Email tasks - 2 workers (I/O bound) + "examples.celery_alternative.tasks:EmailWorker", + # Image processing - 2 workers (CPU/memory intensive) + "examples.celery_alternative.tasks:ImageWorker", + # Data processing - 4 workers (parallelizable) + "examples.celery_alternative.tasks:DataWorker", + # Scheduled tasks - 1 worker + "examples.celery_alternative.tasks:ScheduledWorker", +] + +# Total dirty workers (distributed among apps based on their `workers` attribute) +# If not set, uses sum of all app worker counts +dirty_workers = int(os.environ.get("DIRTY_WORKERS", 9)) # 2+2+4+1 = 9 + +# Task timeout in seconds (like Celery's task_time_limit) +dirty_timeout = int(os.environ.get("DIRTY_TIMEOUT", 300)) + +# Threads per dirty worker (for concurrent task execution) +dirty_threads = int(os.environ.get("DIRTY_THREADS", 1)) + +# Graceful shutdown timeout +dirty_graceful_timeout = int(os.environ.get("DIRTY_GRACEFUL_TIMEOUT", 30)) + +# ============================================================================= +# Timeouts & Limits +# ============================================================================= + +# Worker timeout (seconds) +timeout = 120 + +# Keep-alive connections +keepalive = 5 + +# Maximum requests per worker before recycling +max_requests = 1000 +max_requests_jitter = 50 + +# ============================================================================= +# Logging +# ============================================================================= + +# Log level +loglevel = os.environ.get("LOG_LEVEL", "info") + +# Access log format +accesslog = "-" +access_log_format = '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s" %(D)s' + +# Error log +errorlog = "-" + +# ============================================================================= +# Lifecycle Hooks +# ============================================================================= + +def on_starting(server): + """Called just before the master process is initialized.""" + print("=" * 60) + print("Starting Gunicorn with Dirty Arbiters (Celery Replacement)") + print("=" * 60) + + +def on_dirty_starting(arbiter): + """Called when the dirty arbiter is starting.""" + print(f"[Dirty] Starting dirty arbiter") + print(f"[Dirty] Registered apps: {list(arbiter.cfg.dirty_apps)}") + + +def dirty_post_fork(arbiter, worker): + """Called after a dirty worker is forked.""" + print(f"[Dirty] Worker {worker.pid} started") + + +def dirty_worker_init(worker): + """Called when a dirty worker initializes its apps.""" + print(f"[Dirty] Worker {worker.pid} initialized apps: {list(worker.apps.keys())}") + + +def dirty_worker_exit(arbiter, worker): + """Called when a dirty worker exits.""" + print(f"[Dirty] Worker {worker.pid} exiting") + + +def worker_int(worker): + """Called when a worker receives SIGINT.""" + print(f"[HTTP] Worker {worker.pid} interrupted") + + +def worker_exit(server, worker): + """Called when a worker exits.""" + print(f"[HTTP] Worker {worker.pid} exited") + + +# ============================================================================= +# Development vs Production +# ============================================================================= + +# Reload on code changes (development only) +reload = os.environ.get("GUNICORN_RELOAD", "false").lower() == "true" + +# Preload app for faster worker startup (production) +preload_app = os.environ.get("GUNICORN_PRELOAD", "false").lower() == "true" diff --git a/examples/celery_alternative/requirements.txt b/examples/celery_alternative/requirements.txt new file mode 100644 index 00000000..f49876bb --- /dev/null +++ b/examples/celery_alternative/requirements.txt @@ -0,0 +1,4 @@ +# Celery Replacement Example Dependencies +flask>=3.0.0 +requests>=2.31.0 +pytest>=8.0.0 diff --git a/examples/celery_alternative/run_tests.sh b/examples/celery_alternative/run_tests.sh new file mode 100755 index 00000000..ba7b7100 --- /dev/null +++ b/examples/celery_alternative/run_tests.sh @@ -0,0 +1,35 @@ +#!/bin/bash +# Run tests for Celery Replacement example + +set -e + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +GUNICORN_ROOT="$(cd "$SCRIPT_DIR/../.." && pwd)" + +# Add gunicorn to Python path +export PYTHONPATH="$GUNICORN_ROOT:$PYTHONPATH" + +cd "$SCRIPT_DIR" + +echo "==========================================" +echo "Running Unit Tests" +echo "==========================================" +python -m pytest tests/test_tasks.py -v --tb=short + +echo "" +echo "==========================================" +echo "Unit tests passed!" +echo "==========================================" + +# Check if integration tests should run +if [ "$1" == "--integration" ] || [ "$1" == "-i" ]; then + APP_URL="${APP_URL:-http://localhost:8000}" + echo "" + echo "==========================================" + echo "Running Integration Tests against $APP_URL" + echo "==========================================" + python -m pytest tests/test_integration.py -v --tb=short +fi + +echo "" +echo "All tests completed successfully!" diff --git a/examples/celery_alternative/tasks.py b/examples/celery_alternative/tasks.py new file mode 100644 index 00000000..6f8c8e9c --- /dev/null +++ b/examples/celery_alternative/tasks.py @@ -0,0 +1,563 @@ +""" +Task Workers - Celery Replacement using Gunicorn Dirty Arbiters + +This module demonstrates how to replace Celery with Gunicorn's dirty arbiter +feature for background task processing. Key benefits: + +1. No external broker (Redis/RabbitMQ) needed - uses Unix sockets +2. Stateful workers - maintain connections, models, caches across requests +3. Integrated with your WSGI/ASGI app - no separate process management +4. Streaming support for progress reporting +5. Per-task-type worker allocation for memory optimization + +Comparison with Celery: +- Celery: @app.task decorator -> Dirty: DirtyApp class with methods +- Celery: task.delay() -> Dirty: client.execute() +- Celery: task.apply_async() -> Dirty: client.execute() with timeout +- Celery: task progress -> Dirty: client.stream() with generators +""" + +import hashlib +import json +import os +import random +import smtplib +import time +from datetime import datetime +from email.mime.text import MIMEText +from typing import Any, Generator + +from gunicorn.dirty.app import DirtyApp + + +class EmailWorker(DirtyApp): + """ + Email task worker - like Celery's @app.task for email sending. + + Maintains SMTP connection pool across requests for efficiency. + In Celery, you'd create a new connection per task or manage it manually. + """ + + # Limit to 2 workers since email sending is I/O bound + workers = 2 + + def __init__(self): + self.smtp_connection = None + self.emails_sent = 0 + self.last_connected = None + + def init(self): + """Called once when worker starts - establish SMTP connection.""" + self._connect_smtp() + + def _connect_smtp(self): + """Establish SMTP connection (simulated for demo).""" + # In production, connect to real SMTP server: + # self.smtp_connection = smtplib.SMTP('smtp.example.com', 587) + # self.smtp_connection.starttls() + # self.smtp_connection.login(user, password) + self.last_connected = datetime.now().isoformat() + self.smtp_connection = "connected" # Simulated + + def __call__(self, action: str, *args, **kwargs) -> Any: + """Dispatch to action methods.""" + method = getattr(self, action, None) + if method is None or action.startswith('_'): + raise ValueError(f"Unknown action: {action}") + return method(*args, **kwargs) + + def send_email(self, to: str, subject: str, body: str, + html: bool = False) -> dict: + """ + Send a single email. + + Equivalent to Celery: + @app.task + def send_email(to, subject, body): + ... + """ + # Simulate email sending delay + time.sleep(random.uniform(0.1, 0.3)) + + self.emails_sent += 1 + + return { + "status": "sent", + "to": to, + "subject": subject, + "message_id": f"msg-{self.emails_sent}-{int(time.time())}", + "timestamp": datetime.now().isoformat(), + } + + def send_bulk_emails(self, recipients: list, subject: str, + body: str) -> Generator[dict, None, None]: + """ + Send bulk emails with progress streaming. + + This is where dirty arbiters shine over Celery - real-time + progress without polling or WebSockets. + + Equivalent to Celery: + @app.task(bind=True) + def send_bulk(self, recipients, subject, body): + for i, to in enumerate(recipients): + send_email(to, subject, body) + self.update_state(state='PROGRESS', + meta={'current': i, 'total': len(recipients)}) + """ + total = len(recipients) + sent = 0 + failed = 0 + + for i, to in enumerate(recipients): + try: + result = self.send_email(to, subject, body) + sent += 1 + yield { + "type": "progress", + "current": i + 1, + "total": total, + "percent": int((i + 1) / total * 100), + "last_sent": to, + "status": "sent", + } + except Exception as e: + failed += 1 + yield { + "type": "progress", + "current": i + 1, + "total": total, + "percent": int((i + 1) / total * 100), + "last_sent": to, + "status": "failed", + "error": str(e), + } + + # Final summary + yield { + "type": "complete", + "total": total, + "sent": sent, + "failed": failed, + } + + def stats(self) -> dict: + """Get worker statistics.""" + return { + "emails_sent": self.emails_sent, + "smtp_connected": self.smtp_connection is not None, + "last_connected": self.last_connected, + "worker_pid": os.getpid(), + } + + def close(self): + """Cleanup on shutdown.""" + if self.smtp_connection and self.smtp_connection != "connected": + self.smtp_connection.quit() + + +class ImageWorker(DirtyApp): + """ + Image processing worker - demonstrates CPU-intensive tasks. + + Like Celery tasks for image resizing, thumbnails, watermarks. + Keeps image processing libraries loaded in memory. + """ + + # Limit to 2 workers - image processing is memory intensive + workers = 2 + + def __init__(self): + self.pil_available = False + self.images_processed = 0 + + def init(self): + """Load image processing libraries once at startup.""" + try: + # Try to import PIL - optional dependency + from PIL import Image + self.pil_available = True + except ImportError: + self.pil_available = False + + def __call__(self, action: str, *args, **kwargs) -> Any: + method = getattr(self, action, None) + if method is None or action.startswith('_'): + raise ValueError(f"Unknown action: {action}") + return method(*args, **kwargs) + + def resize(self, image_data: str, width: int, height: int) -> dict: + """ + Resize an image. + + Equivalent to Celery: + @app.task + def resize_image(image_path, width, height): + img = Image.open(image_path) + img.thumbnail((width, height)) + img.save(output_path) + """ + # Simulate image processing + time.sleep(random.uniform(0.2, 0.5)) + + self.images_processed += 1 + + # Create a fake "processed" result + # In production, image_data would be base64 decoded + data_size = len(image_data) if isinstance(image_data, str) else len(image_data) + result_hash = hashlib.md5( + f"{data_size}{width}{height}".encode() + ).hexdigest()[:16] + + return { + "status": "resized", + "original_size": data_size, + "target_dimensions": f"{width}x{height}", + "result_id": f"img-{result_hash}", + "pil_used": self.pil_available, + } + + def generate_thumbnail(self, image_data: str, size: int = 150) -> dict: + """Generate a thumbnail.""" + return self.resize(image_data, size, size) + + def process_batch(self, images: list, operation: str, + **params) -> Generator[dict, None, None]: + """ + Process multiple images with progress streaming. + """ + total = len(images) + + for i, img_info in enumerate(images): + try: + # Simulate fetching image data + image_data = img_info.get("data", b"fake_image_data") + + if operation == "resize": + result = self.resize( + image_data, + params.get("width", 800), + params.get("height", 600) + ) + elif operation == "thumbnail": + result = self.generate_thumbnail( + image_data, + params.get("size", 150) + ) + else: + result = {"error": f"Unknown operation: {operation}"} + + yield { + "type": "progress", + "current": i + 1, + "total": total, + "percent": int((i + 1) / total * 100), + "image_id": img_info.get("id", f"img-{i}"), + "result": result, + } + except Exception as e: + yield { + "type": "error", + "current": i + 1, + "total": total, + "image_id": img_info.get("id", f"img-{i}"), + "error": str(e), + } + + yield { + "type": "complete", + "total": total, + "processed": self.images_processed, + } + + def stats(self) -> dict: + return { + "images_processed": self.images_processed, + "pil_available": self.pil_available, + "worker_pid": os.getpid(), + } + + +class DataWorker(DirtyApp): + """ + Data processing worker - demonstrates stateful data operations. + + Maintains database connections, caches, and processing state. + Perfect for ETL tasks, report generation, data aggregation. + """ + + # More workers for data tasks - they're often parallelizable + workers = 4 + + def __init__(self): + self.cache = {} + self.db_connection = None + self.tasks_completed = 0 + + def init(self): + """Initialize database connection and cache.""" + # In production: self.db_connection = create_engine(DATABASE_URL) + self.db_connection = "connected" + self.cache = {} + + def __call__(self, action: str, *args, **kwargs) -> Any: + method = getattr(self, action, None) + if method is None or action.startswith('_'): + raise ValueError(f"Unknown action: {action}") + return method(*args, **kwargs) + + def aggregate(self, data: list, group_by: str, + agg_field: str, agg_func: str = "sum") -> dict: + """ + Aggregate data - like a Celery task for report generation. + + Equivalent to Celery: + @app.task + def aggregate_sales(data, group_by, agg_field): + df = pd.DataFrame(data) + return df.groupby(group_by)[agg_field].sum().to_dict() + """ + # Simulate aggregation + time.sleep(random.uniform(0.1, 0.3)) + + result = {} + for item in data: + key = item.get(group_by, "unknown") + value = item.get(agg_field, 0) + + if key not in result: + if agg_func in ("sum", "count"): + result[key] = 0 + else: + result[key] = [] + + if agg_func == "sum": + result[key] += value + elif agg_func == "count": + result[key] += 1 + elif agg_func == "list": + result[key].append(value) + + self.tasks_completed += 1 + + return { + "status": "completed", + "group_by": group_by, + "agg_func": agg_func, + "result": result, + "record_count": len(data), + } + + def etl_pipeline(self, source_data: list, + transformations: list) -> Generator[dict, None, None]: + """ + Run an ETL pipeline with progress streaming. + + This replaces Celery chains/chords for multi-step processing: + chain(extract.s(), transform.s(), load.s()) + """ + total_steps = len(transformations) + 2 # +2 for extract and load + current_step = 0 + data = source_data + + # Extract phase + yield { + "type": "progress", + "phase": "extract", + "step": current_step + 1, + "total_steps": total_steps, + "message": f"Extracting {len(data)} records", + } + time.sleep(0.2) # Simulate extraction + current_step += 1 + + # Transform phases + for i, transform in enumerate(transformations): + transform_name = transform.get("name", f"transform_{i}") + transform_type = transform.get("type", "passthrough") + + yield { + "type": "progress", + "phase": "transform", + "step": current_step + 1, + "total_steps": total_steps, + "message": f"Applying {transform_name}", + } + + # Apply transformation + if transform_type == "filter": + field = transform.get("field") + value = transform.get("value") + data = [d for d in data if d.get(field) == value] + elif transform_type == "map": + field = transform.get("field") + func = transform.get("func", "upper") + for d in data: + if field in d and isinstance(d[field], str): + if func == "upper": + d[field] = d[field].upper() + elif func == "lower": + d[field] = d[field].lower() + + time.sleep(0.2) # Simulate transformation + current_step += 1 + + # Load phase + yield { + "type": "progress", + "phase": "load", + "step": current_step + 1, + "total_steps": total_steps, + "message": f"Loading {len(data)} records", + } + time.sleep(0.2) # Simulate loading + + self.tasks_completed += 1 + + # Final result + yield { + "type": "complete", + "records_processed": len(source_data), + "records_output": len(data), + "transformations_applied": len(transformations), + } + + def cached_query(self, query_key: str, ttl: int = 300) -> dict: + """ + Execute a cached query - demonstrates stateful caching. + + Unlike Celery where you'd use Redis for caching, + the dirty worker maintains its own in-memory cache. + """ + now = time.time() + + if query_key in self.cache: + cached = self.cache[query_key] + if now - cached["timestamp"] < ttl: + return { + "status": "cache_hit", + "data": cached["data"], + "cached_at": cached["timestamp"], + "age_seconds": int(now - cached["timestamp"]), + } + + # Simulate query execution + time.sleep(random.uniform(0.2, 0.4)) + + # Generate fake result + result_data = { + "query": query_key, + "rows": random.randint(10, 100), + "computed_at": now, + } + + self.cache[query_key] = { + "data": result_data, + "timestamp": now, + } + + return { + "status": "cache_miss", + "data": result_data, + "cached_at": now, + } + + def stats(self) -> dict: + return { + "tasks_completed": self.tasks_completed, + "cache_size": len(self.cache), + "db_connected": self.db_connection is not None, + "worker_pid": os.getpid(), + } + + def close(self): + """Cleanup on shutdown.""" + self.cache.clear() + if self.db_connection and self.db_connection != "connected": + self.db_connection.close() + + +class ScheduledWorker(DirtyApp): + """ + Scheduled task worker - for periodic/scheduled tasks. + + While dirty arbiters don't have built-in scheduling like Celery Beat, + you can call these from a simple cron job or scheduler. + """ + + workers = 1 # Single worker for scheduled tasks + + def __init__(self): + self.last_runs = {} + self.run_counts = {} + + def __call__(self, action: str, *args, **kwargs) -> Any: + method = getattr(self, action, None) + if method is None or action.startswith('_'): + raise ValueError(f"Unknown action: {action}") + + # Track runs + self.last_runs[action] = datetime.now().isoformat() + self.run_counts[action] = self.run_counts.get(action, 0) + 1 + + return method(*args, **kwargs) + + def cleanup_old_files(self, directory: str, max_age_days: int = 7) -> dict: + """ + Cleanup old files - like a Celery periodic task. + + Equivalent to Celery Beat: + @app.task + def cleanup(): + ... + + app.conf.beat_schedule = { + 'cleanup-every-hour': { + 'task': 'tasks.cleanup', + 'schedule': 3600.0, + }, + } + """ + # Simulate cleanup + time.sleep(0.3) + + files_deleted = random.randint(0, 10) + + return { + "status": "completed", + "directory": directory, + "files_deleted": files_deleted, + "space_freed_mb": files_deleted * random.uniform(0.1, 5.0), + } + + def generate_daily_report(self) -> dict: + """Generate daily report.""" + time.sleep(0.5) + + return { + "status": "completed", + "report_date": datetime.now().strftime("%Y-%m-%d"), + "metrics": { + "active_users": random.randint(100, 1000), + "new_signups": random.randint(10, 50), + "revenue": random.uniform(1000, 10000), + }, + } + + def sync_external_data(self, source: str) -> dict: + """Sync data from external source.""" + time.sleep(0.4) + + return { + "status": "completed", + "source": source, + "records_synced": random.randint(50, 500), + "sync_time": datetime.now().isoformat(), + } + + def stats(self) -> dict: + return { + "last_runs": self.last_runs, + "run_counts": self.run_counts, + "worker_pid": os.getpid(), + } diff --git a/examples/celery_alternative/tests/__init__.py b/examples/celery_alternative/tests/__init__.py new file mode 100644 index 00000000..d4839a6b --- /dev/null +++ b/examples/celery_alternative/tests/__init__.py @@ -0,0 +1 @@ +# Tests package diff --git a/examples/celery_alternative/tests/conftest.py b/examples/celery_alternative/tests/conftest.py new file mode 100644 index 00000000..e7dbdc1f --- /dev/null +++ b/examples/celery_alternative/tests/conftest.py @@ -0,0 +1,10 @@ +""" +Pytest configuration for Celery Replacement tests. +""" + +import sys +from pathlib import Path + +# Add gunicorn source to path for imports +gunicorn_root = Path(__file__).parent.parent.parent.parent +sys.path.insert(0, str(gunicorn_root)) diff --git a/examples/celery_alternative/tests/test_integration.py b/examples/celery_alternative/tests/test_integration.py new file mode 100644 index 00000000..bc1ed3ce --- /dev/null +++ b/examples/celery_alternative/tests/test_integration.py @@ -0,0 +1,422 @@ +""" +Integration Tests for Celery Replacement Example + +These tests run against the full application with Gunicorn and dirty arbiters. +They can be run locally or in Docker. + +Usage: + # Local (with gunicorn running): + APP_URL=http://localhost:8000 pytest tests/test_integration.py -v + + # Docker: + docker compose --profile test up --build --abort-on-container-exit +""" + +import json +import os +import time + +import pytest +import requests + +# Get app URL from environment or use default +APP_URL = os.environ.get("APP_URL", "http://localhost:8000") + + +def wait_for_app(timeout=30): + """Wait for the application to be ready.""" + start = time.time() + while time.time() - start < timeout: + try: + resp = requests.get(f"{APP_URL}/health", timeout=5) + if resp.status_code == 200: + return True + except requests.exceptions.ConnectionError: + pass + time.sleep(1) + return False + + +@pytest.fixture(scope="module", autouse=True) +def ensure_app_running(): + """Ensure the application is running before tests.""" + if not wait_for_app(): + pytest.skip("Application not available") + + +class TestHealthEndpoint: + """Test health check endpoint.""" + + def test_health_check(self): + """Test that health endpoint returns healthy status.""" + resp = requests.get(f"{APP_URL}/health") + assert resp.status_code == 200 + + data = resp.json() + assert data["status"] == "healthy" + assert data["workers"] == "connected" + + +class TestEmailTasks: + """Integration tests for email tasks.""" + + def test_send_single_email(self): + """Test sending a single email via API.""" + resp = requests.post( + f"{APP_URL}/api/email/send", + json={ + "to": "test@example.com", + "subject": "Integration Test", + "body": "Hello from integration test", + }, + ) + assert resp.status_code == 200 + + data = resp.json() + assert data["status"] == "sent" + assert data["to"] == "test@example.com" + assert "message_id" in data + + def test_send_bulk_emails_streaming(self): + """Test bulk email sending with SSE streaming.""" + recipients = ["a@test.com", "b@test.com", "c@test.com"] + + resp = requests.post( + f"{APP_URL}/api/email/send-bulk", + json={ + "recipients": recipients, + "subject": "Bulk Test", + "body": "Hello all", + }, + stream=True, + ) + assert resp.status_code == 200 + + events = [] + for line in resp.iter_lines(): + if line: + line = line.decode("utf-8") + if line.startswith("data: "): + data = json.loads(line[6:]) + events.append(data) + + # Should have progress for each recipient + complete + assert len(events) == len(recipients) + 1 + + # Check progress events + for i, event in enumerate(events[:-1]): + assert event["type"] == "progress" + assert event["current"] == i + 1 + + # Check complete event + assert events[-1]["type"] == "complete" + assert events[-1]["sent"] == len(recipients) + + def test_email_stats(self): + """Test email worker statistics endpoint.""" + # Send an email first + requests.post( + f"{APP_URL}/api/email/send", + json={"to": "x@x.com", "subject": "S", "body": "B"}, + ) + + resp = requests.get(f"{APP_URL}/api/email/stats") + assert resp.status_code == 200 + + data = resp.json() + assert data["emails_sent"] >= 1 + assert data["smtp_connected"] is True + assert "worker_pid" in data + + +class TestImageTasks: + """Integration tests for image tasks.""" + + def test_resize_image(self): + """Test image resizing via API.""" + resp = requests.post( + f"{APP_URL}/api/image/resize", + json={ + "image_data": "base64_encoded_image_data", + "width": 800, + "height": 600, + }, + ) + assert resp.status_code == 200 + + data = resp.json() + assert data["status"] == "resized" + assert data["target_dimensions"] == "800x600" + + def test_generate_thumbnail(self): + """Test thumbnail generation via API.""" + resp = requests.post( + f"{APP_URL}/api/image/thumbnail", + json={ + "image_data": "base64_image", + "size": 150, + }, + ) + assert resp.status_code == 200 + + data = resp.json() + assert data["status"] == "resized" + assert data["target_dimensions"] == "150x150" + + def test_batch_processing_streaming(self): + """Test batch image processing with streaming.""" + images = [ + {"id": "img1", "data": "data1"}, + {"id": "img2", "data": "data2"}, + ] + + resp = requests.post( + f"{APP_URL}/api/image/process-batch", + json={ + "images": images, + "operation": "resize", + "width": 400, + "height": 300, + }, + stream=True, + ) + assert resp.status_code == 200 + + events = [] + for line in resp.iter_lines(): + if line: + line = line.decode("utf-8") + if line.startswith("data: "): + events.append(json.loads(line[6:])) + + assert len(events) == len(images) + 1 + assert events[-1]["type"] == "complete" + + def test_image_stats(self): + """Test image worker statistics.""" + resp = requests.get(f"{APP_URL}/api/image/stats") + assert resp.status_code == 200 + + data = resp.json() + assert "images_processed" in data + assert "worker_pid" in data + + +class TestDataTasks: + """Integration tests for data processing tasks.""" + + def test_aggregate_data(self): + """Test data aggregation via API.""" + resp = requests.post( + f"{APP_URL}/api/data/aggregate", + json={ + "data": [ + {"category": "A", "value": 10}, + {"category": "B", "value": 20}, + {"category": "A", "value": 30}, + ], + "group_by": "category", + "agg_field": "value", + "agg_func": "sum", + }, + ) + assert resp.status_code == 200 + + data = resp.json() + assert data["status"] == "completed" + assert data["result"]["A"] == 40 + assert data["result"]["B"] == 20 + + def test_etl_pipeline_streaming(self): + """Test ETL pipeline with streaming progress.""" + resp = requests.post( + f"{APP_URL}/api/data/etl", + json={ + "source_data": [ + {"name": "alice", "status": "active"}, + {"name": "bob", "status": "inactive"}, + {"name": "charlie", "status": "active"}, + ], + "transformations": [ + {"name": "filter", "type": "filter", + "field": "status", "value": "active"}, + ], + }, + stream=True, + ) + assert resp.status_code == 200 + + events = [] + for line in resp.iter_lines(): + if line: + line = line.decode("utf-8") + if line.startswith("data: "): + events.append(json.loads(line[6:])) + + # extract + transform + load + complete + assert len(events) == 4 + + # Check phases + phases = [e.get("phase") for e in events[:-1]] + assert "extract" in phases + assert "transform" in phases + assert "load" in phases + + # Final result + assert events[-1]["type"] == "complete" + assert events[-1]["records_output"] == 2 + + def test_cached_query(self): + """Test cached query functionality.""" + query_key = f"test_query_{time.time()}" + + # First call - cache miss + resp1 = requests.post( + f"{APP_URL}/api/data/query", + json={"query_key": query_key, "ttl": 300}, + ) + assert resp1.status_code == 200 + assert resp1.json()["status"] == "cache_miss" + + # Second call - cache hit + resp2 = requests.post( + f"{APP_URL}/api/data/query", + json={"query_key": query_key, "ttl": 300}, + ) + assert resp2.status_code == 200 + assert resp2.json()["status"] == "cache_hit" + + def test_data_stats(self): + """Test data worker statistics.""" + resp = requests.get(f"{APP_URL}/api/data/stats") + assert resp.status_code == 200 + + data = resp.json() + assert "tasks_completed" in data + assert "cache_size" in data + + +class TestScheduledTasks: + """Integration tests for scheduled tasks.""" + + def test_cleanup_task(self): + """Test cleanup task execution.""" + resp = requests.post( + f"{APP_URL}/api/scheduled/cleanup", + json={"directory": "/tmp/test", "max_age_days": 7}, + ) + assert resp.status_code == 200 + + data = resp.json() + assert data["status"] == "completed" + assert "files_deleted" in data + + def test_daily_report(self): + """Test daily report generation.""" + resp = requests.post(f"{APP_URL}/api/scheduled/daily-report") + assert resp.status_code == 200 + + data = resp.json() + assert data["status"] == "completed" + assert "metrics" in data + + def test_sync_task(self): + """Test data sync task.""" + resp = requests.post( + f"{APP_URL}/api/scheduled/sync", + json={"source": "test_source"}, + ) + assert resp.status_code == 200 + + data = resp.json() + assert data["status"] == "completed" + assert data["source"] == "test_source" + + def test_scheduled_stats(self): + """Test scheduled worker statistics.""" + # Run a task first + requests.post(f"{APP_URL}/api/scheduled/daily-report") + + resp = requests.get(f"{APP_URL}/api/scheduled/stats") + assert resp.status_code == 200 + + data = resp.json() + assert "run_counts" in data + assert "generate_daily_report" in data["run_counts"] + + +class TestConcurrency: + """Test concurrent task execution.""" + + def test_concurrent_requests(self): + """Test that multiple concurrent requests are handled.""" + import concurrent.futures + + def send_email(): + return requests.post( + f"{APP_URL}/api/email/send", + json={"to": "x@x.com", "subject": "Concurrent", "body": "Test"}, + ) + + # Send 10 concurrent requests + with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: + futures = [executor.submit(send_email) for _ in range(10)] + results = [f.result() for f in futures] + + # All should succeed + assert all(r.status_code == 200 for r in results) + assert all(r.json()["status"] == "sent" for r in results) + + def test_mixed_task_types(self): + """Test different task types running concurrently.""" + import concurrent.futures + + def email_task(): + return requests.post( + f"{APP_URL}/api/email/send", + json={"to": "x@x.com", "subject": "S", "body": "B"}, + ) + + def image_task(): + return requests.post( + f"{APP_URL}/api/image/resize", + json={"image_data": "x", "width": 100, "height": 100}, + ) + + def data_task(): + return requests.post( + f"{APP_URL}/api/data/aggregate", + json={ + "data": [{"a": 1}], + "group_by": "a", + "agg_field": "a", + "agg_func": "sum", + }, + ) + + with concurrent.futures.ThreadPoolExecutor(max_workers=9) as executor: + futures = [] + for _ in range(3): + futures.append(executor.submit(email_task)) + futures.append(executor.submit(image_task)) + futures.append(executor.submit(data_task)) + + results = [f.result() for f in futures] + + # All should succeed + assert all(r.status_code == 200 for r in results) + + +class TestErrorHandling: + """Test error handling scenarios.""" + + def test_invalid_action(self): + """Test that invalid actions return appropriate errors.""" + # This would require modifying the API to expose raw execute + # For now, we test via a malformed request + resp = requests.post( + f"{APP_URL}/api/email/send", + json={}, # Missing required fields + ) + # Should get a 500 or validation error + assert resp.status_code in [400, 500] diff --git a/examples/celery_alternative/tests/test_tasks.py b/examples/celery_alternative/tests/test_tasks.py new file mode 100644 index 00000000..adccc7a0 --- /dev/null +++ b/examples/celery_alternative/tests/test_tasks.py @@ -0,0 +1,304 @@ +""" +Unit Tests for Task Workers + +These tests verify the task worker logic without running Gunicorn. +They test the DirtyApp classes directly. +""" + +import pytest +from examples.celery_alternative.tasks import ( + EmailWorker, + ImageWorker, + DataWorker, + ScheduledWorker, +) + + +class TestEmailWorker: + """Tests for EmailWorker task class.""" + + def setup_method(self): + """Set up test fixtures.""" + self.worker = EmailWorker() + self.worker.init() + + def test_send_email(self): + """Test sending a single email.""" + result = self.worker("send_email", + to="test@example.com", + subject="Test", + body="Hello") + + assert result["status"] == "sent" + assert result["to"] == "test@example.com" + assert result["subject"] == "Test" + assert "message_id" in result + assert "timestamp" in result + + def test_send_email_increments_counter(self): + """Test that email counter increments.""" + initial_count = self.worker.emails_sent + + self.worker("send_email", to="a@x.com", subject="S", body="B") + self.worker("send_email", to="b@x.com", subject="S", body="B") + + assert self.worker.emails_sent == initial_count + 2 + + def test_send_bulk_emails_streaming(self): + """Test bulk email sending with progress streaming.""" + recipients = ["a@x.com", "b@x.com", "c@x.com"] + + results = list(self.worker("send_bulk_emails", + recipients=recipients, + subject="Bulk", + body="Hello all")) + + # Should have progress updates + final complete + assert len(results) == len(recipients) + 1 + + # Check progress updates + for i, r in enumerate(results[:-1]): + assert r["type"] == "progress" + assert r["current"] == i + 1 + assert r["total"] == len(recipients) + + # Check final result + final = results[-1] + assert final["type"] == "complete" + assert final["total"] == len(recipients) + assert final["sent"] == len(recipients) + + def test_stats(self): + """Test worker statistics.""" + self.worker("send_email", to="x@x.com", subject="S", body="B") + + stats = self.worker("stats") + + assert stats["emails_sent"] >= 1 + assert stats["smtp_connected"] is True + assert "worker_pid" in stats + + def test_unknown_action_raises(self): + """Test that unknown actions raise ValueError.""" + with pytest.raises(ValueError, match="Unknown action"): + self.worker("nonexistent_action") + + def test_private_method_raises(self): + """Test that private methods cannot be called.""" + with pytest.raises(ValueError, match="Unknown action"): + self.worker("_connect_smtp") + + +class TestImageWorker: + """Tests for ImageWorker task class.""" + + def setup_method(self): + """Set up test fixtures.""" + self.worker = ImageWorker() + self.worker.init() + + def test_resize_image(self): + """Test image resizing.""" + result = self.worker("resize", + image_data="fake_image_data", + width=800, + height=600) + + assert result["status"] == "resized" + assert result["target_dimensions"] == "800x600" + assert "result_id" in result + + def test_generate_thumbnail(self): + """Test thumbnail generation.""" + result = self.worker("generate_thumbnail", + image_data="fake_image_data", + size=150) + + assert result["status"] == "resized" + assert result["target_dimensions"] == "150x150" + + def test_process_batch_streaming(self): + """Test batch processing with progress streaming.""" + images = [ + {"id": "img1", "data": b"data1"}, + {"id": "img2", "data": b"data2"}, + {"id": "img3", "data": b"data3"}, + ] + + results = list(self.worker("process_batch", + images=images, + operation="resize", + width=800, + height=600)) + + # Progress for each image + complete + assert len(results) == len(images) + 1 + + # Check progress updates + for i, r in enumerate(results[:-1]): + assert r["type"] == "progress" + assert r["image_id"] == f"img{i+1}" + assert "result" in r + + # Check final result + final = results[-1] + assert final["type"] == "complete" + + def test_stats(self): + """Test worker statistics.""" + self.worker("resize", image_data=b"x", width=100, height=100) + + stats = self.worker("stats") + + assert stats["images_processed"] >= 1 + assert "pil_available" in stats + assert "worker_pid" in stats + + +class TestDataWorker: + """Tests for DataWorker task class.""" + + def setup_method(self): + """Set up test fixtures.""" + self.worker = DataWorker() + self.worker.init() + + def test_aggregate_sum(self): + """Test data aggregation with sum.""" + data = [ + {"category": "A", "value": 10}, + {"category": "B", "value": 20}, + {"category": "A", "value": 30}, + ] + + result = self.worker("aggregate", + data=data, + group_by="category", + agg_field="value", + agg_func="sum") + + assert result["status"] == "completed" + assert result["result"]["A"] == 40 + assert result["result"]["B"] == 20 + + def test_aggregate_count(self): + """Test data aggregation with count.""" + data = [ + {"category": "A", "value": 10}, + {"category": "B", "value": 20}, + {"category": "A", "value": 30}, + ] + + result = self.worker("aggregate", + data=data, + group_by="category", + agg_field="value", + agg_func="count") + + assert result["result"]["A"] == 2 + assert result["result"]["B"] == 1 + + def test_etl_pipeline_streaming(self): + """Test ETL pipeline with progress streaming.""" + source_data = [ + {"name": "alice", "status": "active"}, + {"name": "bob", "status": "inactive"}, + {"name": "charlie", "status": "active"}, + ] + transformations = [ + {"name": "filter_active", "type": "filter", + "field": "status", "value": "active"}, + {"name": "uppercase", "type": "map", + "field": "name", "func": "upper"}, + ] + + results = list(self.worker("etl_pipeline", + source_data=source_data, + transformations=transformations)) + + # extract + transforms + load + complete + expected_steps = 1 + len(transformations) + 1 + 1 + assert len(results) == expected_steps + + # Check phases + assert results[0]["phase"] == "extract" + assert results[1]["phase"] == "transform" + assert results[2]["phase"] == "transform" + assert results[3]["phase"] == "load" + assert results[4]["type"] == "complete" + + # Final should have 2 records (filtered) + assert results[4]["records_output"] == 2 + + def test_cached_query_miss_then_hit(self): + """Test query caching - miss then hit.""" + # First call - cache miss + result1 = self.worker("cached_query", query_key="test_query", ttl=300) + assert result1["status"] == "cache_miss" + + # Second call - cache hit + result2 = self.worker("cached_query", query_key="test_query", ttl=300) + assert result2["status"] == "cache_hit" + + def test_stats(self): + """Test worker statistics.""" + self.worker("aggregate", + data=[{"a": 1}], + group_by="a", + agg_field="a") + + stats = self.worker("stats") + + assert stats["tasks_completed"] >= 1 + assert "cache_size" in stats + assert stats["db_connected"] is True + + +class TestScheduledWorker: + """Tests for ScheduledWorker task class.""" + + def setup_method(self): + """Set up test fixtures.""" + self.worker = ScheduledWorker() + + def test_cleanup_old_files(self): + """Test file cleanup task.""" + result = self.worker("cleanup_old_files", + directory="/tmp/test", + max_age_days=7) + + assert result["status"] == "completed" + assert result["directory"] == "/tmp/test" + assert "files_deleted" in result + assert "space_freed_mb" in result + + def test_generate_daily_report(self): + """Test daily report generation.""" + result = self.worker("generate_daily_report") + + assert result["status"] == "completed" + assert "report_date" in result + assert "metrics" in result + assert "active_users" in result["metrics"] + assert "new_signups" in result["metrics"] + assert "revenue" in result["metrics"] + + def test_sync_external_data(self): + """Test external data sync.""" + result = self.worker("sync_external_data", source="test_api") + + assert result["status"] == "completed" + assert result["source"] == "test_api" + assert "records_synced" in result + + def test_stats_tracks_runs(self): + """Test that stats tracks task runs.""" + self.worker("cleanup_old_files", directory="/tmp", max_age_days=1) + self.worker("cleanup_old_files", directory="/tmp", max_age_days=1) + self.worker("generate_daily_report") + + stats = self.worker("stats") + + assert stats["run_counts"]["cleanup_old_files"] == 2 + assert stats["run_counts"]["generate_daily_report"] == 1 + assert "cleanup_old_files" in stats["last_runs"]