mirror of
https://github.com/frappe/gunicorn.git
synced 2026-07-01 18:21:30 +08:00
- Bump version to 25.0.2 - Update copyright year to 2026 in LICENSE and NOTICE - Add license headers to all Python source files - Add changelog entry for 25.0.2
436 lines
13 KiB
Python
436 lines
13 KiB
Python
#
|
|
# This file is part of gunicorn released under the MIT license.
|
|
# See the NOTICE for more information.
|
|
|
|
"""
|
|
Integration Tests for Celery Replacement Example
|
|
|
|
These tests run against the full application with Gunicorn and dirty arbiters.
|
|
They can be run locally or in Docker.
|
|
|
|
Usage:
|
|
# Local (with gunicorn running):
|
|
APP_URL=http://localhost:8000 pytest tests/test_integration.py -v
|
|
|
|
# Docker:
|
|
docker compose --profile test up --build --abort-on-container-exit
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import time
|
|
|
|
import pytest
|
|
import requests
|
|
|
|
# Get app URL from environment or use default
|
|
APP_URL = os.environ.get("APP_URL", "http://localhost:8000")
|
|
|
|
|
|
def read_sse_events(response, max_events=100):
|
|
"""
|
|
Read SSE events from a streaming response.
|
|
|
|
Stops when receiving a 'complete' or 'error' event, or max_events reached.
|
|
"""
|
|
events = []
|
|
for line in response.iter_lines(decode_unicode=True):
|
|
if line.startswith("data: "):
|
|
data = json.loads(line[6:])
|
|
events.append(data)
|
|
if data.get("type") in ("complete", "error"):
|
|
break
|
|
if len(events) >= max_events:
|
|
break
|
|
return events
|
|
|
|
|
|
def wait_for_app(timeout=30):
|
|
"""Wait for the application to be ready."""
|
|
start = time.time()
|
|
while time.time() - start < timeout:
|
|
try:
|
|
resp = requests.get(f"{APP_URL}/health", timeout=5)
|
|
if resp.status_code == 200:
|
|
return True
|
|
except requests.exceptions.ConnectionError:
|
|
pass
|
|
time.sleep(1)
|
|
return False
|
|
|
|
|
|
@pytest.fixture(scope="module", autouse=True)
|
|
def ensure_app_running():
|
|
"""Ensure the application is running before tests."""
|
|
if not wait_for_app():
|
|
pytest.skip("Application not available")
|
|
|
|
|
|
class TestHealthEndpoint:
|
|
"""Test health check endpoint."""
|
|
|
|
def test_health_check(self):
|
|
"""Test that health endpoint returns healthy status."""
|
|
resp = requests.get(f"{APP_URL}/health")
|
|
assert resp.status_code == 200
|
|
|
|
data = resp.json()
|
|
assert data["status"] == "healthy"
|
|
assert data["workers"] == "connected"
|
|
|
|
|
|
class TestEmailTasks:
|
|
"""Integration tests for email tasks."""
|
|
|
|
def test_send_single_email(self):
|
|
"""Test sending a single email via API."""
|
|
resp = requests.post(
|
|
f"{APP_URL}/api/email/send",
|
|
json={
|
|
"to": "test@example.com",
|
|
"subject": "Integration Test",
|
|
"body": "Hello from integration test",
|
|
},
|
|
)
|
|
assert resp.status_code == 200
|
|
|
|
data = resp.json()
|
|
assert data["status"] == "sent"
|
|
assert data["to"] == "test@example.com"
|
|
assert "message_id" in data
|
|
|
|
def test_send_bulk_emails_streaming(self):
|
|
"""Test bulk email sending with SSE streaming."""
|
|
recipients = ["a@test.com", "b@test.com", "c@test.com"]
|
|
|
|
resp = requests.post(
|
|
f"{APP_URL}/api/email/send-bulk",
|
|
json={
|
|
"recipients": recipients,
|
|
"subject": "Bulk Test",
|
|
"body": "Hello all",
|
|
},
|
|
stream=True,
|
|
)
|
|
assert resp.status_code == 200
|
|
|
|
events = read_sse_events(resp)
|
|
|
|
# Should have progress for each recipient + complete
|
|
assert len(events) == len(recipients) + 1
|
|
|
|
# Check progress events
|
|
for i, event in enumerate(events[:-1]):
|
|
assert event["type"] == "progress"
|
|
assert event["current"] == i + 1
|
|
|
|
# Check complete event
|
|
assert events[-1]["type"] == "complete"
|
|
assert events[-1]["sent"] == len(recipients)
|
|
|
|
def test_email_stats(self):
|
|
"""Test email worker statistics endpoint."""
|
|
# Send an email first
|
|
requests.post(
|
|
f"{APP_URL}/api/email/send",
|
|
json={"to": "x@x.com", "subject": "S", "body": "B"},
|
|
)
|
|
|
|
resp = requests.get(f"{APP_URL}/api/email/stats")
|
|
assert resp.status_code == 200
|
|
|
|
data = resp.json()
|
|
assert data["emails_sent"] >= 1
|
|
assert data["smtp_connected"] is True
|
|
assert "worker_pid" in data
|
|
|
|
|
|
class TestImageTasks:
|
|
"""Integration tests for image tasks."""
|
|
|
|
def test_resize_image(self):
|
|
"""Test image resizing via API."""
|
|
resp = requests.post(
|
|
f"{APP_URL}/api/image/resize",
|
|
json={
|
|
"image_data": "base64_encoded_image_data",
|
|
"width": 800,
|
|
"height": 600,
|
|
},
|
|
)
|
|
assert resp.status_code == 200
|
|
|
|
data = resp.json()
|
|
assert data["status"] == "resized"
|
|
assert data["target_dimensions"] == "800x600"
|
|
|
|
def test_generate_thumbnail(self):
|
|
"""Test thumbnail generation via API."""
|
|
resp = requests.post(
|
|
f"{APP_URL}/api/image/thumbnail",
|
|
json={
|
|
"image_data": "base64_image",
|
|
"size": 150,
|
|
},
|
|
)
|
|
assert resp.status_code == 200
|
|
|
|
data = resp.json()
|
|
assert data["status"] == "resized"
|
|
assert data["target_dimensions"] == "150x150"
|
|
|
|
def test_batch_processing_streaming(self):
|
|
"""Test batch image processing with streaming."""
|
|
images = [
|
|
{"id": "img1", "data": "data1"},
|
|
{"id": "img2", "data": "data2"},
|
|
]
|
|
|
|
resp = requests.post(
|
|
f"{APP_URL}/api/image/process-batch",
|
|
json={
|
|
"images": images,
|
|
"operation": "resize",
|
|
"width": 400,
|
|
"height": 300,
|
|
},
|
|
stream=True,
|
|
)
|
|
assert resp.status_code == 200
|
|
|
|
events = read_sse_events(resp)
|
|
|
|
assert len(events) == len(images) + 1
|
|
assert events[-1]["type"] == "complete"
|
|
|
|
def test_image_stats(self):
|
|
"""Test image worker statistics."""
|
|
resp = requests.get(f"{APP_URL}/api/image/stats")
|
|
assert resp.status_code == 200
|
|
|
|
data = resp.json()
|
|
assert "images_processed" in data
|
|
assert "worker_pid" in data
|
|
|
|
|
|
class TestDataTasks:
|
|
"""Integration tests for data processing tasks."""
|
|
|
|
def test_aggregate_data(self):
|
|
"""Test data aggregation via API."""
|
|
resp = requests.post(
|
|
f"{APP_URL}/api/data/aggregate",
|
|
json={
|
|
"data": [
|
|
{"category": "A", "value": 10},
|
|
{"category": "B", "value": 20},
|
|
{"category": "A", "value": 30},
|
|
],
|
|
"group_by": "category",
|
|
"agg_field": "value",
|
|
"agg_func": "sum",
|
|
},
|
|
)
|
|
assert resp.status_code == 200
|
|
|
|
data = resp.json()
|
|
assert data["status"] == "completed"
|
|
assert data["result"]["A"] == 40
|
|
assert data["result"]["B"] == 20
|
|
|
|
def test_etl_pipeline_streaming(self):
|
|
"""Test ETL pipeline with streaming progress."""
|
|
resp = requests.post(
|
|
f"{APP_URL}/api/data/etl",
|
|
json={
|
|
"source_data": [
|
|
{"name": "alice", "status": "active"},
|
|
{"name": "bob", "status": "inactive"},
|
|
{"name": "charlie", "status": "active"},
|
|
],
|
|
"transformations": [
|
|
{"name": "filter", "type": "filter",
|
|
"field": "status", "value": "active"},
|
|
],
|
|
},
|
|
stream=True,
|
|
)
|
|
assert resp.status_code == 200
|
|
|
|
events = read_sse_events(resp)
|
|
|
|
# extract + transform + load + complete
|
|
assert len(events) == 4
|
|
|
|
# Check phases
|
|
phases = [e.get("phase") for e in events[:-1]]
|
|
assert "extract" in phases
|
|
assert "transform" in phases
|
|
assert "load" in phases
|
|
|
|
# Final result
|
|
assert events[-1]["type"] == "complete"
|
|
assert events[-1]["records_output"] == 2
|
|
|
|
def test_cached_query(self):
|
|
"""Test cached query functionality."""
|
|
query_key = f"test_query_{time.time()}"
|
|
|
|
# First call - cache miss
|
|
resp1 = requests.post(
|
|
f"{APP_URL}/api/data/query",
|
|
json={"query_key": query_key, "ttl": 300},
|
|
)
|
|
assert resp1.status_code == 200
|
|
assert resp1.json()["status"] == "cache_miss"
|
|
|
|
# Second call - may be cache hit or miss depending on which worker handles it
|
|
# (cache is per-worker, not shared)
|
|
# Retry a few times to likely hit the same worker
|
|
cache_hit = False
|
|
for _ in range(5):
|
|
resp2 = requests.post(
|
|
f"{APP_URL}/api/data/query",
|
|
json={"query_key": query_key, "ttl": 300},
|
|
)
|
|
assert resp2.status_code == 200
|
|
if resp2.json()["status"] == "cache_hit":
|
|
cache_hit = True
|
|
break
|
|
assert cache_hit, "Expected cache_hit after multiple requests to same key"
|
|
|
|
def test_data_stats(self):
|
|
"""Test data worker statistics."""
|
|
resp = requests.get(f"{APP_URL}/api/data/stats")
|
|
assert resp.status_code == 200
|
|
|
|
data = resp.json()
|
|
assert "tasks_completed" in data
|
|
assert "cache_size" in data
|
|
|
|
|
|
class TestScheduledTasks:
|
|
"""Integration tests for scheduled tasks."""
|
|
|
|
def test_cleanup_task(self):
|
|
"""Test cleanup task execution."""
|
|
resp = requests.post(
|
|
f"{APP_URL}/api/scheduled/cleanup",
|
|
json={"directory": "/tmp/test", "max_age_days": 7},
|
|
)
|
|
assert resp.status_code == 200
|
|
|
|
data = resp.json()
|
|
assert data["status"] == "completed"
|
|
assert "files_deleted" in data
|
|
|
|
def test_daily_report(self):
|
|
"""Test daily report generation."""
|
|
resp = requests.post(f"{APP_URL}/api/scheduled/daily-report")
|
|
assert resp.status_code == 200
|
|
|
|
data = resp.json()
|
|
assert data["status"] == "completed"
|
|
assert "metrics" in data
|
|
|
|
def test_sync_task(self):
|
|
"""Test data sync task."""
|
|
resp = requests.post(
|
|
f"{APP_URL}/api/scheduled/sync",
|
|
json={"source": "test_source"},
|
|
)
|
|
assert resp.status_code == 200
|
|
|
|
data = resp.json()
|
|
assert data["status"] == "completed"
|
|
assert data["source"] == "test_source"
|
|
|
|
def test_scheduled_stats(self):
|
|
"""Test scheduled worker statistics."""
|
|
# Run a task first
|
|
requests.post(f"{APP_URL}/api/scheduled/daily-report")
|
|
|
|
resp = requests.get(f"{APP_URL}/api/scheduled/stats")
|
|
assert resp.status_code == 200
|
|
|
|
data = resp.json()
|
|
assert "run_counts" in data
|
|
assert "generate_daily_report" in data["run_counts"]
|
|
|
|
|
|
class TestConcurrency:
|
|
"""Test concurrent task execution."""
|
|
|
|
def test_concurrent_requests(self):
|
|
"""Test that multiple concurrent requests are handled."""
|
|
import concurrent.futures
|
|
|
|
def send_email():
|
|
return requests.post(
|
|
f"{APP_URL}/api/email/send",
|
|
json={"to": "x@x.com", "subject": "Concurrent", "body": "Test"},
|
|
)
|
|
|
|
# Send 10 concurrent requests
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
|
|
futures = [executor.submit(send_email) for _ in range(10)]
|
|
results = [f.result() for f in futures]
|
|
|
|
# All should succeed
|
|
assert all(r.status_code == 200 for r in results)
|
|
assert all(r.json()["status"] == "sent" for r in results)
|
|
|
|
def test_mixed_task_types(self):
|
|
"""Test different task types running concurrently."""
|
|
import concurrent.futures
|
|
|
|
def email_task():
|
|
return requests.post(
|
|
f"{APP_URL}/api/email/send",
|
|
json={"to": "x@x.com", "subject": "S", "body": "B"},
|
|
)
|
|
|
|
def image_task():
|
|
return requests.post(
|
|
f"{APP_URL}/api/image/resize",
|
|
json={"image_data": "x", "width": 100, "height": 100},
|
|
)
|
|
|
|
def data_task():
|
|
return requests.post(
|
|
f"{APP_URL}/api/data/aggregate",
|
|
json={
|
|
"data": [{"a": 1}],
|
|
"group_by": "a",
|
|
"agg_field": "a",
|
|
"agg_func": "sum",
|
|
},
|
|
)
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=9) as executor:
|
|
futures = []
|
|
for _ in range(3):
|
|
futures.append(executor.submit(email_task))
|
|
futures.append(executor.submit(image_task))
|
|
futures.append(executor.submit(data_task))
|
|
|
|
results = [f.result() for f in futures]
|
|
|
|
# All should succeed
|
|
assert all(r.status_code == 200 for r in results)
|
|
|
|
|
|
class TestErrorHandling:
|
|
"""Test error handling scenarios."""
|
|
|
|
def test_invalid_action(self):
|
|
"""Test that invalid actions return appropriate errors."""
|
|
# This would require modifying the API to expose raw execute
|
|
# For now, we test via a malformed request
|
|
resp = requests.post(
|
|
f"{APP_URL}/api/email/send",
|
|
json={}, # Missing required fields
|
|
)
|
|
# Should get a validation error (FastAPI returns 422)
|
|
assert resp.status_code == 422
|