mirror of
https://github.com/frappe/gunicorn.git
synced 2026-07-01 18:21:30 +08:00
- Bump version to 25.0.2 - Update copyright year to 2026 in LICENSE and NOTICE - Add license headers to all Python source files - Add changelog entry for 25.0.2
462 lines
13 KiB
Python
462 lines
13 KiB
Python
#
|
|
# This file is part of gunicorn released under the MIT license.
|
|
# See the NOTICE for more information.
|
|
|
|
"""
|
|
Web Application - FastAPI app demonstrating Celery replacement.
|
|
|
|
This shows how to call dirty arbiter tasks from your web application
|
|
using the async API, which doesn't block the event loop.
|
|
|
|
Key difference from sync (Flask/gthread):
|
|
- `await client.execute_async()` is non-blocking
|
|
- A single worker can handle many concurrent requests
|
|
- True async I/O - other requests proceed while waiting for task results
|
|
"""
|
|
|
|
import json
|
|
from contextlib import asynccontextmanager
|
|
|
|
from fastapi import FastAPI, HTTPException
|
|
from fastapi.responses import StreamingResponse
|
|
from pydantic import BaseModel
|
|
|
|
from gunicorn.dirty import get_dirty_client_async
|
|
from gunicorn.dirty.errors import (
|
|
DirtyError,
|
|
DirtyTimeoutError,
|
|
)
|
|
|
|
|
|
# Task worker import paths (like Celery task names)
|
|
EMAIL_WORKER = "examples.celery_alternative.tasks:EmailWorker"
|
|
IMAGE_WORKER = "examples.celery_alternative.tasks:ImageWorker"
|
|
DATA_WORKER = "examples.celery_alternative.tasks:DataWorker"
|
|
SCHEDULED_WORKER = "examples.celery_alternative.tasks:ScheduledWorker"
|
|
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
"""Application lifespan - startup and shutdown."""
|
|
yield
|
|
|
|
|
|
app = FastAPI(
|
|
title="Celery Replacement Demo",
|
|
description="Demonstrating Gunicorn dirty arbiters as Celery replacement with async ASGI",
|
|
lifespan=lifespan,
|
|
)
|
|
|
|
|
|
# ============================================================================
|
|
# Request/Response Models
|
|
# ============================================================================
|
|
|
|
class EmailRequest(BaseModel):
|
|
to: str
|
|
subject: str
|
|
body: str
|
|
html: bool = False
|
|
|
|
|
|
class BulkEmailRequest(BaseModel):
|
|
recipients: list[str]
|
|
subject: str
|
|
body: str
|
|
|
|
|
|
class ImageResizeRequest(BaseModel):
|
|
image_data: str = ""
|
|
width: int = 800
|
|
height: int = 600
|
|
|
|
|
|
class ThumbnailRequest(BaseModel):
|
|
image_data: str = ""
|
|
size: int = 150
|
|
|
|
|
|
class ImageBatchRequest(BaseModel):
|
|
images: list[dict]
|
|
operation: str = "resize"
|
|
width: int = 800
|
|
height: int = 600
|
|
size: int = 150
|
|
|
|
|
|
class AggregateRequest(BaseModel):
|
|
data: list[dict]
|
|
group_by: str
|
|
agg_field: str
|
|
agg_func: str = "sum"
|
|
|
|
|
|
class ETLRequest(BaseModel):
|
|
source_data: list[dict]
|
|
transformations: list[dict] = []
|
|
|
|
|
|
class QueryRequest(BaseModel):
|
|
query_key: str
|
|
ttl: int = 300
|
|
|
|
|
|
class CleanupRequest(BaseModel):
|
|
directory: str = "/tmp"
|
|
max_age_days: int = 7
|
|
|
|
|
|
class SyncRequest(BaseModel):
|
|
source: str = "default"
|
|
|
|
|
|
# ============================================================================
|
|
# Email Tasks - Like Celery email tasks
|
|
# ============================================================================
|
|
|
|
@app.post("/api/email/send")
|
|
async def send_email(data: EmailRequest):
|
|
"""
|
|
Send a single email.
|
|
|
|
Celery equivalent:
|
|
send_email.delay(to, subject, body)
|
|
|
|
With async dirty client, this doesn't block the event loop!
|
|
Other requests can be handled while waiting for the task.
|
|
"""
|
|
try:
|
|
client = await get_dirty_client_async()
|
|
result = await client.execute_async(
|
|
EMAIL_WORKER,
|
|
"send_email",
|
|
to=data.to,
|
|
subject=data.subject,
|
|
body=data.body,
|
|
html=data.html,
|
|
)
|
|
return result
|
|
except DirtyTimeoutError:
|
|
raise HTTPException(status_code=504, detail="Task timed out")
|
|
except DirtyError as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@app.post("/api/email/send-bulk")
|
|
async def send_bulk_emails(data: BulkEmailRequest):
|
|
"""
|
|
Send bulk emails with streaming progress.
|
|
|
|
Celery equivalent:
|
|
result = send_bulk.apply_async([recipients, subject, body])
|
|
while not result.ready():
|
|
print(result.info) # Progress polling
|
|
|
|
With dirty arbiters, progress is streamed in real-time!
|
|
"""
|
|
async def generate():
|
|
try:
|
|
client = await get_dirty_client_async()
|
|
async for progress in client.stream_async(
|
|
EMAIL_WORKER,
|
|
"send_bulk_emails",
|
|
recipients=data.recipients,
|
|
subject=data.subject,
|
|
body=data.body,
|
|
):
|
|
yield f"data: {json.dumps(progress)}\n\n"
|
|
except DirtyError as e:
|
|
yield f"data: {json.dumps({'error': str(e)})}\n\n"
|
|
|
|
return StreamingResponse(
|
|
generate(),
|
|
media_type="text/event-stream",
|
|
headers={
|
|
"Cache-Control": "no-cache",
|
|
"X-Accel-Buffering": "no",
|
|
},
|
|
)
|
|
|
|
|
|
@app.get("/api/email/stats")
|
|
async def email_stats():
|
|
"""Get email worker statistics."""
|
|
try:
|
|
client = await get_dirty_client_async()
|
|
result = await client.execute_async(EMAIL_WORKER, "stats")
|
|
return result
|
|
except DirtyError as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
# ============================================================================
|
|
# Image Tasks - Like Celery image processing tasks
|
|
# ============================================================================
|
|
|
|
@app.post("/api/image/resize")
|
|
async def resize_image(data: ImageResizeRequest):
|
|
"""
|
|
Resize an image.
|
|
|
|
Celery equivalent:
|
|
resize_image.delay(image_data, width, height)
|
|
"""
|
|
try:
|
|
client = await get_dirty_client_async()
|
|
result = await client.execute_async(
|
|
IMAGE_WORKER,
|
|
"resize",
|
|
image_data=data.image_data,
|
|
width=data.width,
|
|
height=data.height,
|
|
)
|
|
return result
|
|
except DirtyError as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@app.post("/api/image/thumbnail")
|
|
async def generate_thumbnail(data: ThumbnailRequest):
|
|
"""Generate a thumbnail."""
|
|
try:
|
|
client = await get_dirty_client_async()
|
|
result = await client.execute_async(
|
|
IMAGE_WORKER,
|
|
"generate_thumbnail",
|
|
image_data=data.image_data,
|
|
size=data.size,
|
|
)
|
|
return result
|
|
except DirtyError as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@app.post("/api/image/process-batch")
|
|
async def process_image_batch(data: ImageBatchRequest):
|
|
"""
|
|
Process multiple images with progress streaming.
|
|
"""
|
|
async def generate():
|
|
try:
|
|
client = await get_dirty_client_async()
|
|
async for progress in client.stream_async(
|
|
IMAGE_WORKER,
|
|
"process_batch",
|
|
images=data.images,
|
|
operation=data.operation,
|
|
width=data.width,
|
|
height=data.height,
|
|
size=data.size,
|
|
):
|
|
yield f"data: {json.dumps(progress)}\n\n"
|
|
except DirtyError as e:
|
|
yield f"data: {json.dumps({'error': str(e)})}\n\n"
|
|
|
|
return StreamingResponse(
|
|
generate(),
|
|
media_type="text/event-stream",
|
|
)
|
|
|
|
|
|
@app.get("/api/image/stats")
|
|
async def image_stats():
|
|
"""Get image worker statistics."""
|
|
try:
|
|
client = await get_dirty_client_async()
|
|
result = await client.execute_async(IMAGE_WORKER, "stats")
|
|
return result
|
|
except DirtyError as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
# ============================================================================
|
|
# Data Tasks - Like Celery data processing tasks
|
|
# ============================================================================
|
|
|
|
@app.post("/api/data/aggregate")
|
|
async def aggregate_data(data: AggregateRequest):
|
|
"""
|
|
Aggregate data.
|
|
|
|
Celery equivalent:
|
|
aggregate_data.delay(data, group_by, agg_field, agg_func)
|
|
"""
|
|
try:
|
|
client = await get_dirty_client_async()
|
|
result = await client.execute_async(
|
|
DATA_WORKER,
|
|
"aggregate",
|
|
data=data.data,
|
|
group_by=data.group_by,
|
|
agg_field=data.agg_field,
|
|
agg_func=data.agg_func,
|
|
)
|
|
return result
|
|
except DirtyError as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@app.post("/api/data/etl")
|
|
async def run_etl(data: ETLRequest):
|
|
"""
|
|
Run ETL pipeline with streaming progress.
|
|
|
|
Celery equivalent:
|
|
chain(extract.s(), transform.s(), load.s()).apply_async()
|
|
"""
|
|
async def generate():
|
|
try:
|
|
client = await get_dirty_client_async()
|
|
async for progress in client.stream_async(
|
|
DATA_WORKER,
|
|
"etl_pipeline",
|
|
source_data=data.source_data,
|
|
transformations=data.transformations,
|
|
):
|
|
yield f"data: {json.dumps(progress)}\n\n"
|
|
except DirtyError as e:
|
|
yield f"data: {json.dumps({'error': str(e)})}\n\n"
|
|
|
|
return StreamingResponse(
|
|
generate(),
|
|
media_type="text/event-stream",
|
|
)
|
|
|
|
|
|
@app.post("/api/data/query")
|
|
async def cached_query(data: QueryRequest):
|
|
"""Execute a cached query."""
|
|
try:
|
|
client = await get_dirty_client_async()
|
|
result = await client.execute_async(
|
|
DATA_WORKER,
|
|
"cached_query",
|
|
query_key=data.query_key,
|
|
ttl=data.ttl,
|
|
)
|
|
return result
|
|
except DirtyError as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@app.get("/api/data/stats")
|
|
async def data_stats():
|
|
"""Get data worker statistics."""
|
|
try:
|
|
client = await get_dirty_client_async()
|
|
result = await client.execute_async(DATA_WORKER, "stats")
|
|
return result
|
|
except DirtyError as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
# ============================================================================
|
|
# Scheduled Tasks - Like Celery Beat tasks
|
|
# ============================================================================
|
|
|
|
@app.post("/api/scheduled/cleanup")
|
|
async def run_cleanup(data: CleanupRequest = CleanupRequest()):
|
|
"""Run cleanup task (normally triggered by cron)."""
|
|
try:
|
|
client = await get_dirty_client_async()
|
|
result = await client.execute_async(
|
|
SCHEDULED_WORKER,
|
|
"cleanup_old_files",
|
|
directory=data.directory,
|
|
max_age_days=data.max_age_days,
|
|
)
|
|
return result
|
|
except DirtyError as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@app.post("/api/scheduled/daily-report")
|
|
async def run_daily_report():
|
|
"""Generate daily report."""
|
|
try:
|
|
client = await get_dirty_client_async()
|
|
result = await client.execute_async(SCHEDULED_WORKER, "generate_daily_report")
|
|
return result
|
|
except DirtyError as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@app.post("/api/scheduled/sync")
|
|
async def run_sync(data: SyncRequest = SyncRequest()):
|
|
"""Sync external data."""
|
|
try:
|
|
client = await get_dirty_client_async()
|
|
result = await client.execute_async(
|
|
SCHEDULED_WORKER,
|
|
"sync_external_data",
|
|
source=data.source,
|
|
)
|
|
return result
|
|
except DirtyError as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@app.get("/api/scheduled/stats")
|
|
async def scheduled_stats():
|
|
"""Get scheduled worker statistics."""
|
|
try:
|
|
client = await get_dirty_client_async()
|
|
result = await client.execute_async(SCHEDULED_WORKER, "stats")
|
|
return result
|
|
except DirtyError as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
# ============================================================================
|
|
# Health & Info Endpoints
|
|
# ============================================================================
|
|
|
|
@app.get("/")
|
|
async def index():
|
|
"""API documentation."""
|
|
return {
|
|
"name": "Celery Replacement Demo",
|
|
"description": "Demonstrating Gunicorn dirty arbiters as Celery replacement (async ASGI)",
|
|
"docs": "/docs",
|
|
"endpoints": {
|
|
"email": {
|
|
"POST /api/email/send": "Send single email",
|
|
"POST /api/email/send-bulk": "Send bulk emails (streaming)",
|
|
"GET /api/email/stats": "Email worker stats",
|
|
},
|
|
"image": {
|
|
"POST /api/image/resize": "Resize image",
|
|
"POST /api/image/thumbnail": "Generate thumbnail",
|
|
"POST /api/image/process-batch": "Batch process (streaming)",
|
|
"GET /api/image/stats": "Image worker stats",
|
|
},
|
|
"data": {
|
|
"POST /api/data/aggregate": "Aggregate data",
|
|
"POST /api/data/etl": "Run ETL pipeline (streaming)",
|
|
"POST /api/data/query": "Cached query",
|
|
"GET /api/data/stats": "Data worker stats",
|
|
},
|
|
"scheduled": {
|
|
"POST /api/scheduled/cleanup": "Run cleanup",
|
|
"POST /api/scheduled/daily-report": "Generate report",
|
|
"POST /api/scheduled/sync": "Sync external data",
|
|
"GET /api/scheduled/stats": "Scheduled worker stats",
|
|
},
|
|
},
|
|
}
|
|
|
|
|
|
@app.get("/health")
|
|
async def health():
|
|
"""Health check endpoint."""
|
|
try:
|
|
client = await get_dirty_client_async()
|
|
# Quick ping to verify workers are running
|
|
await client.execute_async(EMAIL_WORKER, "stats")
|
|
return {"status": "healthy", "workers": "connected"}
|
|
except DirtyError:
|
|
raise HTTPException(
|
|
status_code=503,
|
|
detail={"status": "degraded", "workers": "unavailable"}
|
|
)
|