From 26ae6e6f47090ae928299d59cdba8bfecf012d8b Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Fri, 3 Apr 2026 11:10:00 +0200 Subject: [PATCH] Add ASGI framework compatibility E2E test suite Docker-based test suite validating gunicorn's ASGI worker against: - Django + Channels - FastAPI - Starlette - Quart - Litestar - BlackSheep Tests cover HTTP scope, HTTP messages, WebSocket, lifespan protocol, and streaming responses. Includes compatibility grid generator. Results: 403/444 tests passed (90%) --- docs/content/asgi.md | 19 ++ tests/docker/asgi_framework_compat/README.md | 116 +++++++ .../docker/asgi_framework_compat/conftest.py | 211 +++++++++++++ .../asgi_framework_compat/docker-compose.yml | 86 ++++++ .../frameworks/__init__.py | 1 + .../frameworks/blacksheep_app/Dockerfile | 16 + .../frameworks/blacksheep_app/app.py | 227 ++++++++++++++ .../blacksheep_app/requirements.txt | 5 + .../frameworks/contract.py | 143 +++++++++ .../frameworks/django_app/Dockerfile | 19 ++ .../frameworks/django_app/asgi.py | 60 ++++ .../frameworks/django_app/consumers.py | 110 +++++++ .../frameworks/django_app/requirements.txt | 6 + .../frameworks/django_app/routing.py | 20 ++ .../frameworks/django_app/settings.py | 48 +++ .../frameworks/django_app/urls.py | 32 ++ .../frameworks/django_app/views.py | 134 +++++++++ .../frameworks/fastapi_app/Dockerfile | 16 + .../frameworks/fastapi_app/app.py | 263 ++++++++++++++++ .../frameworks/fastapi_app/requirements.txt | 5 + .../frameworks/litestar_app/Dockerfile | 16 + .../frameworks/litestar_app/app.py | 237 +++++++++++++++ .../frameworks/litestar_app/requirements.txt | 5 + .../frameworks/quart_app/Dockerfile | 16 + .../frameworks/quart_app/app.py | 210 +++++++++++++ .../frameworks/quart_app/requirements.txt | 5 + .../frameworks/starlette_app/Dockerfile | 16 + .../frameworks/starlette_app/app.py | 284 ++++++++++++++++++ .../frameworks/starlette_app/requirements.txt | 5 + tests/docker/asgi_framework_compat/pytest.ini | 18 ++ .../asgi_framework_compat/requirements.txt | 6 + .../results/compatibility_grid.json | 198 ++++++++++++ .../results/compatibility_grid.md | 20 ++ .../asgi_framework_compat/scripts/__init__.py | 1 + .../scripts/generate_grid.py | 198 ++++++++++++ .../scripts/run_tests.sh | 66 ++++ .../asgi_framework_compat/tests/__init__.py | 1 + .../tests/test_http_messages.py | 127 ++++++++ .../tests/test_http_scope.py | 168 +++++++++++ .../tests/test_lifespan_scope.py | 99 ++++++ .../tests/test_streaming.py | 98 ++++++ .../tests/test_websocket_scope.py | 193 ++++++++++++ 42 files changed, 3524 insertions(+) create mode 100644 tests/docker/asgi_framework_compat/README.md create mode 100644 tests/docker/asgi_framework_compat/conftest.py create mode 100644 tests/docker/asgi_framework_compat/docker-compose.yml create mode 100644 tests/docker/asgi_framework_compat/frameworks/__init__.py create mode 100644 tests/docker/asgi_framework_compat/frameworks/blacksheep_app/Dockerfile create mode 100644 tests/docker/asgi_framework_compat/frameworks/blacksheep_app/app.py create mode 100644 tests/docker/asgi_framework_compat/frameworks/blacksheep_app/requirements.txt create mode 100644 tests/docker/asgi_framework_compat/frameworks/contract.py create mode 100644 tests/docker/asgi_framework_compat/frameworks/django_app/Dockerfile create mode 100644 tests/docker/asgi_framework_compat/frameworks/django_app/asgi.py create mode 100644 tests/docker/asgi_framework_compat/frameworks/django_app/consumers.py create mode 100644 tests/docker/asgi_framework_compat/frameworks/django_app/requirements.txt create mode 100644 tests/docker/asgi_framework_compat/frameworks/django_app/routing.py create mode 100644 tests/docker/asgi_framework_compat/frameworks/django_app/settings.py create mode 100644 tests/docker/asgi_framework_compat/frameworks/django_app/urls.py create mode 100644 tests/docker/asgi_framework_compat/frameworks/django_app/views.py create mode 100644 tests/docker/asgi_framework_compat/frameworks/fastapi_app/Dockerfile create mode 100644 tests/docker/asgi_framework_compat/frameworks/fastapi_app/app.py create mode 100644 tests/docker/asgi_framework_compat/frameworks/fastapi_app/requirements.txt create mode 100644 tests/docker/asgi_framework_compat/frameworks/litestar_app/Dockerfile create mode 100644 tests/docker/asgi_framework_compat/frameworks/litestar_app/app.py create mode 100644 tests/docker/asgi_framework_compat/frameworks/litestar_app/requirements.txt create mode 100644 tests/docker/asgi_framework_compat/frameworks/quart_app/Dockerfile create mode 100644 tests/docker/asgi_framework_compat/frameworks/quart_app/app.py create mode 100644 tests/docker/asgi_framework_compat/frameworks/quart_app/requirements.txt create mode 100644 tests/docker/asgi_framework_compat/frameworks/starlette_app/Dockerfile create mode 100644 tests/docker/asgi_framework_compat/frameworks/starlette_app/app.py create mode 100644 tests/docker/asgi_framework_compat/frameworks/starlette_app/requirements.txt create mode 100644 tests/docker/asgi_framework_compat/pytest.ini create mode 100644 tests/docker/asgi_framework_compat/requirements.txt create mode 100644 tests/docker/asgi_framework_compat/results/compatibility_grid.json create mode 100644 tests/docker/asgi_framework_compat/results/compatibility_grid.md create mode 100644 tests/docker/asgi_framework_compat/scripts/__init__.py create mode 100755 tests/docker/asgi_framework_compat/scripts/generate_grid.py create mode 100755 tests/docker/asgi_framework_compat/scripts/run_tests.sh create mode 100644 tests/docker/asgi_framework_compat/tests/__init__.py create mode 100644 tests/docker/asgi_framework_compat/tests/test_http_messages.py create mode 100644 tests/docker/asgi_framework_compat/tests/test_http_scope.py create mode 100644 tests/docker/asgi_framework_compat/tests/test_lifespan_scope.py create mode 100644 tests/docker/asgi_framework_compat/tests/test_streaming.py create mode 100644 tests/docker/asgi_framework_compat/tests/test_websocket_scope.py diff --git a/docs/content/asgi.md b/docs/content/asgi.md index 802b9c77..a1f627d4 100644 --- a/docs/content/asgi.md +++ b/docs/content/asgi.md @@ -315,6 +315,25 @@ pip install uvloop gunicorn myapp:app --worker-class asgi --asgi-loop uvloop ``` +## Framework Compatibility + +The ASGI worker has been tested for compatibility with major ASGI frameworks. + +| Framework | HTTP Scope | HTTP Messages | WebSocket | Lifespan | Streaming | Total | +|-----------|---------|---------|---------|---------|---------|-------| +| Django + Channels | 19/19 | 18/19 | 13/19 | 7/8 | 9/9 | 66/74 | +| FastAPI | 19/19 | 18/19 | 19/19 | 8/8 | 9/9 | 73/74 | +| Starlette | 19/19 | 18/19 | 19/19 | 8/8 | 9/9 | 73/74 | +| Quart | 18/19 | 17/19 | 11/19 | 8/8 | 9/9 | 63/74 | +| Litestar | 18/19 | 11/19 | 17/19 | 8/8 | 9/9 | 63/74 | +| BlackSheep | 19/19 | 18/19 | 19/19 | 8/8 | 1/9 | 65/74 | + +**Overall:** 403/444 tests passed (90%) + +!!! note + The compatibility test suite is located in `tests/docker/asgi_framework_compat/`. + Run `docker compose up -d --build` followed by `pytest tests/ -v` to execute the tests. + ## See Also - [Settings Reference](reference/settings.md#asgi_loop) - All ASGI-related settings diff --git a/tests/docker/asgi_framework_compat/README.md b/tests/docker/asgi_framework_compat/README.md new file mode 100644 index 00000000..af8385e8 --- /dev/null +++ b/tests/docker/asgi_framework_compat/README.md @@ -0,0 +1,116 @@ +# ASGI Framework Compatibility Test Suite + +This test suite validates gunicorn's native ASGI worker (`-k asgi`) against +multiple ASGI frameworks to ensure protocol compliance. + +## Frameworks Tested + +| Framework | Description | +|-----------|-------------| +| Django + Channels | Django with Channels for WebSocket | +| FastAPI | Modern, fast API framework (Starlette-based) | +| Starlette | Pure ASGI framework | +| Quart | Flask-like async framework | +| Litestar | Modern ASGI framework | +| BlackSheep | High-performance ASGI framework | + +## Test Categories + +- **HTTP Scope**: ASGI 3.0 HTTP scope compliance +- **HTTP Messages**: Request/response message handling +- **WebSocket**: WebSocket protocol compliance +- **Lifespan**: Startup/shutdown lifecycle +- **Streaming**: Chunked responses and SSE + +## Quick Start + +```bash +# Build and start all framework containers +docker compose up -d --build + +# Run tests +pip install -r requirements.txt +pytest tests/ -v + +# Generate compatibility grid +python scripts/generate_grid.py +``` + +## Testing Event Loop Variants + +```bash +# Test with auto-detection (uvloop if available) +ASGI_LOOP=auto docker compose up -d --build +pytest tests/ -v + +# Test with asyncio only +ASGI_LOOP=asyncio docker compose up -d --build +pytest tests/ -v + +# Test with uvloop explicitly +ASGI_LOOP=uvloop docker compose up -d --build +pytest tests/ -v + +# Generate combined report for both loop types +python scripts/generate_grid.py --loop both +``` + +## Single Framework Testing + +```bash +# Test only FastAPI +pytest tests/ -v --framework fastapi + +# Test only Django +pytest tests/ -v --framework django +``` + +## Directory Structure + +``` +asgi_framework_compat/ +├── conftest.py # Test fixtures +├── docker-compose.yml # Container orchestration +├── requirements.txt # Test dependencies +├── frameworks/ +│ ├── contract.py # Endpoint contract +│ ├── django_app/ # Django implementation +│ ├── fastapi_app/ # FastAPI implementation +│ ├── starlette_app/ # Starlette implementation +│ ├── quart_app/ # Quart implementation +│ ├── litestar_app/ # Litestar implementation +│ └── blacksheep_app/ # BlackSheep implementation +├── tests/ +│ ├── test_http_scope.py +│ ├── test_http_messages.py +│ ├── test_websocket_scope.py +│ ├── test_lifespan_scope.py +│ └── test_streaming.py +├── scripts/ +│ └── generate_grid.py # Compatibility matrix +└── results/ # Generated reports +``` + +## Container Management + +```bash +# Start containers +docker compose up -d --build + +# View logs +docker compose logs -f + +# Stop containers +docker compose down + +# Rebuild specific framework +docker compose build fastapi +docker compose up -d fastapi +``` + +## Results + +After running `generate_grid.py`, check the `results/` directory for: + +- `compatibility_grid_*.md` - Markdown compatibility matrices +- `compatibility_grid_*.json` - JSON data for programmatic access diff --git a/tests/docker/asgi_framework_compat/conftest.py b/tests/docker/asgi_framework_compat/conftest.py new file mode 100644 index 00000000..0b636b5b --- /dev/null +++ b/tests/docker/asgi_framework_compat/conftest.py @@ -0,0 +1,211 @@ +""" +Pytest configuration for ASGI Framework Compatibility Tests + +This module provides fixtures for parameterized testing across multiple +ASGI frameworks running in Docker containers with gunicorn's ASGI worker. +""" + +import asyncio +import json +import os +import subprocess +import time +from typing import AsyncGenerator + +import httpx +import pytest +import pytest_asyncio +import websockets + +# Framework configuration +FRAMEWORKS = { + "django": {"port": 8001, "websocket_support": True}, + "fastapi": {"port": 8002, "websocket_support": True}, + "starlette": {"port": 8003, "websocket_support": True}, + "quart": {"port": 8004, "websocket_support": True}, + "litestar": {"port": 8005, "websocket_support": True}, + "blacksheep": {"port": 8006, "websocket_support": True}, +} + +# Host for docker containers +DOCKER_HOST = os.environ.get("DOCKER_HOST_IP", "127.0.0.1") + + +def pytest_addoption(parser): + """Add command line options for framework selection.""" + parser.addoption( + "--framework", + action="store", + default=None, + help="Run tests only for specific framework (django, fastapi, etc.)", + ) + parser.addoption( + "--skip-docker-check", + action="store_true", + default=False, + help="Skip Docker container health checks", + ) + + +def pytest_configure(config): + """Register custom markers.""" + config.addinivalue_line( + "markers", "framework(name): mark test to run only for specific framework" + ) + + +def pytest_collection_modifyitems(config, items): + """Filter tests based on framework selection.""" + framework_filter = config.getoption("--framework") + if framework_filter: + skip_other = pytest.mark.skip( + reason=f"Only running tests for {framework_filter}" + ) + for item in items: + markers = [m for m in item.iter_markers(name="framework")] + if markers: + framework_names = [m.args[0] for m in markers] + if framework_filter not in framework_names: + item.add_marker(skip_other) + + +@pytest.fixture(scope="session") +def docker_compose_file(): + """Return path to docker-compose file.""" + return os.path.join(os.path.dirname(__file__), "docker-compose.yml") + + +def wait_for_service(url: str, timeout: int = 60) -> bool: + """Wait for a service to become healthy.""" + start = time.time() + while time.time() - start < timeout: + try: + response = httpx.get(f"{url}/health", timeout=5.0) + if response.status_code == 200: + return True + except (httpx.ConnectError, httpx.TimeoutException): + pass + time.sleep(1) + return False + + +@pytest.fixture(scope="session") +def docker_services(docker_compose_file, request): + """Start Docker services for testing.""" + if request.config.getoption("--skip-docker-check"): + yield + return + + # Check if containers are already running + all_healthy = True + for name, config in FRAMEWORKS.items(): + url = f"http://{DOCKER_HOST}:{config['port']}" + try: + response = httpx.get(f"{url}/health", timeout=2.0) + if response.status_code != 200: + all_healthy = False + break + except (httpx.ConnectError, httpx.TimeoutException): + all_healthy = False + break + + if all_healthy: + yield + return + + # Start containers + compose_dir = os.path.dirname(docker_compose_file) + subprocess.run( + ["docker", "compose", "up", "-d", "--build"], + cwd=compose_dir, + check=True, + ) + + # Wait for all services to be healthy + for name, config in FRAMEWORKS.items(): + url = f"http://{DOCKER_HOST}:{config['port']}" + if not wait_for_service(url): + pytest.fail(f"Service {name} failed to start") + + yield + + # Optionally stop containers after tests + if os.environ.get("CLEANUP_DOCKER", "0") == "1": + subprocess.run( + ["docker", "compose", "down"], + cwd=compose_dir, + check=True, + ) + + +@pytest.fixture(params=list(FRAMEWORKS.keys())) +def framework(request, docker_services) -> str: + """Parameterized fixture that yields each framework name.""" + return request.param + + +@pytest.fixture +def framework_config(framework) -> dict: + """Return configuration for current framework.""" + return FRAMEWORKS[framework] + + +@pytest.fixture +def framework_url(framework) -> str: + """Return HTTP URL for current framework.""" + port = FRAMEWORKS[framework]["port"] + return f"http://{DOCKER_HOST}:{port}" + + +@pytest.fixture +def framework_ws_url(framework) -> str: + """Return WebSocket URL for current framework.""" + port = FRAMEWORKS[framework]["port"] + return f"ws://{DOCKER_HOST}:{port}" + + +@pytest_asyncio.fixture +async def http_client(framework_url) -> AsyncGenerator[httpx.AsyncClient, None]: + """Async HTTP client for testing.""" + async with httpx.AsyncClient(base_url=framework_url, timeout=30.0) as client: + yield client + + +@pytest.fixture +def ws_client(framework_ws_url): + """WebSocket client factory for testing.""" + + async def connect(path: str, **kwargs): + uri = f"{framework_ws_url}{path}" + return await websockets.connect(uri, **kwargs) + + return connect + + +@pytest.hookimpl(tryfirst=True, hookwrapper=True) +def pytest_runtest_makereport(item, call): + """Store test report for result recording.""" + outcome = yield + rep = outcome.get_result() + setattr(item, f"rep_{rep.when}", rep) + + +# Utility fixtures +@pytest.fixture +def random_bytes(): + """Generate random bytes for testing.""" + + def _generate(size: int) -> bytes: + return os.urandom(size) + + return _generate + + +@pytest.fixture +def large_body(): + """Generate large request/response body.""" + + def _generate(size: int) -> bytes: + return b"x" * size + + return _generate diff --git a/tests/docker/asgi_framework_compat/docker-compose.yml b/tests/docker/asgi_framework_compat/docker-compose.yml new file mode 100644 index 00000000..92653f2b --- /dev/null +++ b/tests/docker/asgi_framework_compat/docker-compose.yml @@ -0,0 +1,86 @@ +# ASGI Framework Compatibility Test Suite +# Tests gunicorn's native ASGI worker with multiple frameworks +# +# Usage: +# docker compose up -d --build +# ASGI_LOOP=asyncio docker compose up -d --build +# ASGI_LOOP=uvloop docker compose up -d --build + +x-healthcheck: &healthcheck + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8000/health"] + interval: 5s + timeout: 3s + retries: 10 + start_period: 10s + +services: + django: + build: + context: ./frameworks/django_app + dockerfile: Dockerfile + ports: + - "8001:8000" + command: ["gunicorn", "asgi:application", "-k", "asgi", "-b", "0.0.0.0:8000", "--workers", "1", "--worker-connections", "100", "--asgi-loop", "${ASGI_LOOP:-auto}"] + networks: + - asgi_test_network + <<: *healthcheck + + fastapi: + build: + context: ./frameworks/fastapi_app + dockerfile: Dockerfile + ports: + - "8002:8000" + command: ["gunicorn", "app:app", "-k", "asgi", "-b", "0.0.0.0:8000", "--workers", "1", "--worker-connections", "100", "--asgi-loop", "${ASGI_LOOP:-auto}"] + networks: + - asgi_test_network + <<: *healthcheck + + starlette: + build: + context: ./frameworks/starlette_app + dockerfile: Dockerfile + ports: + - "8003:8000" + command: ["gunicorn", "app:app", "-k", "asgi", "-b", "0.0.0.0:8000", "--workers", "1", "--worker-connections", "100", "--asgi-loop", "${ASGI_LOOP:-auto}"] + networks: + - asgi_test_network + <<: *healthcheck + + quart: + build: + context: ./frameworks/quart_app + dockerfile: Dockerfile + ports: + - "8004:8000" + command: ["gunicorn", "app:app", "-k", "asgi", "-b", "0.0.0.0:8000", "--workers", "1", "--worker-connections", "100", "--asgi-loop", "${ASGI_LOOP:-auto}"] + networks: + - asgi_test_network + <<: *healthcheck + + litestar: + build: + context: ./frameworks/litestar_app + dockerfile: Dockerfile + ports: + - "8005:8000" + command: ["gunicorn", "app:app", "-k", "asgi", "-b", "0.0.0.0:8000", "--workers", "1", "--worker-connections", "100", "--asgi-loop", "${ASGI_LOOP:-auto}"] + networks: + - asgi_test_network + <<: *healthcheck + + blacksheep: + build: + context: ./frameworks/blacksheep_app + dockerfile: Dockerfile + ports: + - "8006:8000" + command: ["gunicorn", "app:app", "-k", "asgi", "-b", "0.0.0.0:8000", "--workers", "1", "--worker-connections", "100", "--asgi-loop", "${ASGI_LOOP:-auto}"] + networks: + - asgi_test_network + <<: *healthcheck + +networks: + asgi_test_network: + driver: bridge diff --git a/tests/docker/asgi_framework_compat/frameworks/__init__.py b/tests/docker/asgi_framework_compat/frameworks/__init__.py new file mode 100644 index 00000000..8621853c --- /dev/null +++ b/tests/docker/asgi_framework_compat/frameworks/__init__.py @@ -0,0 +1 @@ +"""ASGI Framework implementations for compatibility testing.""" diff --git a/tests/docker/asgi_framework_compat/frameworks/blacksheep_app/Dockerfile b/tests/docker/asgi_framework_compat/frameworks/blacksheep_app/Dockerfile new file mode 100644 index 00000000..a5e7a1f0 --- /dev/null +++ b/tests/docker/asgi_framework_compat/frameworks/blacksheep_app/Dockerfile @@ -0,0 +1,16 @@ +FROM python:3.12-slim + +WORKDIR /app + +# Install curl for healthcheck and git for pip install from git +RUN apt-get update && apt-get install -y curl git && rm -rf /var/lib/apt/lists/* + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY app.py . + +EXPOSE 8000 + +# Command specified in docker-compose.yml +CMD ["gunicorn", "app:app", "-k", "asgi", "-b", "0.0.0.0:8000"] diff --git a/tests/docker/asgi_framework_compat/frameworks/blacksheep_app/app.py b/tests/docker/asgi_framework_compat/frameworks/blacksheep_app/app.py new file mode 100644 index 00000000..0dcd8c93 --- /dev/null +++ b/tests/docker/asgi_framework_compat/frameworks/blacksheep_app/app.py @@ -0,0 +1,227 @@ +""" +BlackSheep ASGI Application for Compatibility Testing + +Implements the contract endpoints for ASGI 3.0 compliance testing. +BlackSheep is a high-performance ASGI framework. +""" + +import asyncio +import json +import time +from typing import Any + +from blacksheep import Application, Request, WebSocket, StreamedContent, Content +from blacksheep.server.responses import Response, text, json as json_resp + + +app = Application() + +# Lifespan state +lifespan_state = { + "startup_called": False, + "startup_time": None, + "counter": 0, + "custom_data": {}, +} + + +@app.on_start +async def on_startup(application: Application) -> None: + """Startup handler.""" + lifespan_state["startup_called"] = True + lifespan_state["startup_time"] = time.time() + lifespan_state["custom_data"]["initialized"] = True + + +@app.on_stop +async def on_shutdown(application: Application) -> None: + """Shutdown handler.""" + lifespan_state["shutdown_called"] = True + + +def serialize_scope(scope: dict) -> dict: + """Convert ASGI scope to JSON-serializable dict.""" + result = {} + for key, value in scope.items(): + if key == "headers": + result[key] = [ + [h[0].decode("latin-1"), h[1].decode("latin-1")] for h in value + ] + elif key == "query_string": + result[key] = value.decode("latin-1") if value else "" + elif key == "server": + result[key] = list(value) if value else None + elif key == "client": + result[key] = list(value) if value else None + elif key == "asgi": + result[key] = dict(value) + elif key in ("state", "app", "_blacksheep"): + continue + elif isinstance(value, bytes): + result[key] = value.decode("latin-1") + else: + try: + json.dumps(value) + result[key] = value + except (TypeError, ValueError): + continue + return result + + +# HTTP Endpoints +@app.router.get("/health") +async def health(request: Request) -> Response: + """Health check endpoint.""" + return text("OK") + + +@app.router.get("/scope") +async def scope_endpoint(request: Request) -> Response: + """Return full ASGI scope as JSON.""" + scope_data = serialize_scope(request.scope) + return json_resp(scope_data) + + +@app.router.post("/echo") +async def echo(request: Request) -> Response: + """Echo request body back.""" + body = await request.read() + content_type = request.get_first_header(b"content-type") + if content_type: + ct = content_type + else: + ct = b"application/octet-stream" + return Response(200, content=Content(ct, body)) + + +@app.router.get("/headers") +async def headers_endpoint(request: Request) -> Response: + """Return request headers as JSON.""" + headers_dict = { + h[0].decode("latin-1"): h[1].decode("latin-1") for h in request.headers + } + return json_resp(headers_dict) + + +@app.router.get("/status/{code}") +async def status_endpoint(request: Request, code: int) -> Response: + """Return specific HTTP status code.""" + return Response(code, content=Content(b"text/plain", f"Status: {code}".encode())) + + +@app.router.get("/streaming") +async def streaming(request: Request) -> Response: + """Chunked streaming response.""" + + async def generate(): + for i in range(10): + yield f"chunk-{i}\n".encode() + await asyncio.sleep(0.01) + + return Response(200, content=StreamedContent(b"text/plain", generate)) + + +@app.router.get("/sse") +async def sse(request: Request) -> Response: + """Server-Sent Events endpoint.""" + + async def generate(): + for i in range(5): + yield f"event: message\ndata: {json.dumps({'count': i})}\n\n".encode() + await asyncio.sleep(0.01) + yield b"event: done\ndata: {}\n\n" + + response = Response(200, content=StreamedContent(b"text/event-stream", generate)) + response.add_header(b"Cache-Control", b"no-cache") + return response + + +@app.router.get("/large") +async def large(request: Request) -> Response: + """Large response body.""" + size_param = request.query.get("size") + size = int(size_param[0]) if size_param else 1024 + # Cap at 10MB for safety + size = min(size, 10 * 1024 * 1024) + return Response(200, content=Content(b"application/octet-stream", b"x" * size)) + + +@app.router.get("/delay") +async def delay(request: Request) -> Response: + """Delayed response.""" + seconds_param = request.query.get("seconds") + seconds = float(seconds_param[0]) if seconds_param else 1.0 + # Cap at 30 seconds + seconds = min(seconds, 30) + await asyncio.sleep(seconds) + return text(f"Delayed {seconds} seconds") + + +@app.router.get("/lifespan/state") +async def lifespan_state_endpoint(request: Request) -> Response: + """Return lifespan startup state.""" + return json_resp(lifespan_state) + + +@app.router.get("/lifespan/counter") +async def lifespan_counter(request: Request) -> Response: + """Increment and return counter.""" + lifespan_state["counter"] += 1 + return json_resp({"counter": lifespan_state["counter"]}) + + +# WebSocket Endpoints +@app.router.ws("/ws/echo") +async def ws_echo(websocket: WebSocket) -> None: + """Echo text messages.""" + await websocket.accept() + try: + while True: + message = await websocket.receive_text() + await websocket.send_text(message) + except Exception: + pass + + +@app.router.ws("/ws/echo-binary") +async def ws_echo_binary(websocket: WebSocket) -> None: + """Echo binary messages.""" + await websocket.accept() + try: + while True: + message = await websocket.receive_bytes() + await websocket.send_bytes(message) + except Exception: + pass + + +@app.router.ws("/ws/scope") +async def ws_scope(websocket: WebSocket) -> None: + """Send WebSocket scope on connect.""" + await websocket.accept() + scope_data = serialize_scope(websocket.scope) + await websocket.send_text(json.dumps(scope_data)) + await websocket.close() + + +@app.router.ws("/ws/subprotocol") +async def ws_subprotocol(websocket: WebSocket) -> None: + """Subprotocol negotiation.""" + requested = websocket.scope.get("subprotocols", []) + selected = requested[0] if requested else None + await websocket.accept(subprotocol=selected) + await websocket.send_text(json.dumps({"requested": requested, "selected": selected})) + await websocket.close() + + +@app.router.ws("/ws/close") +async def ws_close(websocket: WebSocket) -> None: + """Close with specific code.""" + await websocket.accept() + query_string = websocket.scope.get("query_string", b"").decode() + code = 1000 + for param in query_string.split("&"): + if param.startswith("code="): + code = int(param.split("=")[1]) + break + await websocket.close(code=code) diff --git a/tests/docker/asgi_framework_compat/frameworks/blacksheep_app/requirements.txt b/tests/docker/asgi_framework_compat/frameworks/blacksheep_app/requirements.txt new file mode 100644 index 00000000..5561732a --- /dev/null +++ b/tests/docker/asgi_framework_compat/frameworks/blacksheep_app/requirements.txt @@ -0,0 +1,5 @@ +gunicorn @ git+https://github.com/benoitc/gunicorn.git@master +blacksheep>=2.0.0 +uvloop>=0.19.0 +websockets>=12.0 +httptools>=0.6.0 diff --git a/tests/docker/asgi_framework_compat/frameworks/contract.py b/tests/docker/asgi_framework_compat/frameworks/contract.py new file mode 100644 index 00000000..5503dbfe --- /dev/null +++ b/tests/docker/asgi_framework_compat/frameworks/contract.py @@ -0,0 +1,143 @@ +""" +ASGI Framework Contract Definition + +This module defines the required endpoints that each framework must implement +for compatibility testing with gunicorn's ASGI worker. +""" + +# HTTP Endpoints Contract +HTTP_ENDPOINTS = { + "health": { + "path": "/health", + "method": "GET", + "description": "Health check endpoint", + "expected_status": 200, + }, + "scope": { + "path": "/scope", + "method": "GET", + "description": "Return full ASGI scope as JSON", + "expected_status": 200, + "expected_content_type": "application/json", + }, + "echo": { + "path": "/echo", + "method": "POST", + "description": "Echo request body back", + "expected_status": 200, + }, + "headers": { + "path": "/headers", + "method": "GET", + "description": "Return request headers as JSON", + "expected_status": 200, + "expected_content_type": "application/json", + }, + "status": { + "path": "/status/{code}", + "method": "GET", + "description": "Return specific HTTP status code", + }, + "streaming": { + "path": "/streaming", + "method": "GET", + "description": "Chunked streaming response", + "expected_status": 200, + }, + "sse": { + "path": "/sse", + "method": "GET", + "description": "Server-Sent Events stream", + "expected_status": 200, + "expected_content_type": "text/event-stream", + }, + "large": { + "path": "/large", + "method": "GET", + "description": "Large response body (size in query param)", + "expected_status": 200, + }, + "delay": { + "path": "/delay", + "method": "GET", + "description": "Delayed response (seconds in query param)", + "expected_status": 200, + }, +} + +# WebSocket Endpoints Contract +WEBSOCKET_ENDPOINTS = { + "echo": { + "path": "/ws/echo", + "description": "Echo text messages", + }, + "echo_binary": { + "path": "/ws/echo-binary", + "description": "Echo binary messages", + }, + "scope": { + "path": "/ws/scope", + "description": "Send WebSocket scope on connect", + }, + "subprotocol": { + "path": "/ws/subprotocol", + "description": "Subprotocol negotiation", + }, + "close": { + "path": "/ws/close", + "description": "Close with specific code (code in query param)", + }, +} + +# Lifespan Endpoints Contract +LIFESPAN_ENDPOINTS = { + "state": { + "path": "/lifespan/state", + "method": "GET", + "description": "Return startup state", + "expected_status": 200, + }, + "counter": { + "path": "/lifespan/counter", + "method": "GET", + "description": "Increment and return counter (state persistence test)", + "expected_status": 200, + }, +} + +# ASGI 3.0 Scope Required Keys +ASGI_HTTP_SCOPE_REQUIRED_KEYS = [ + "type", + "asgi", + "http_version", + "method", + "scheme", + "path", + "query_string", + "headers", + "server", +] + +ASGI_WEBSOCKET_SCOPE_REQUIRED_KEYS = [ + "type", + "asgi", + "http_version", + "scheme", + "path", + "query_string", + "headers", + "server", +] + +# Valid WebSocket close codes per RFC 6455 +VALID_WEBSOCKET_CLOSE_CODES = [ + 1000, # Normal closure + 1001, # Going away + 1002, # Protocol error + 1003, # Unsupported data + 1007, # Invalid frame payload data + 1008, # Policy violation + 1009, # Message too big + 1010, # Mandatory extension + 1011, # Internal server error +] diff --git a/tests/docker/asgi_framework_compat/frameworks/django_app/Dockerfile b/tests/docker/asgi_framework_compat/frameworks/django_app/Dockerfile new file mode 100644 index 00000000..fd603b79 --- /dev/null +++ b/tests/docker/asgi_framework_compat/frameworks/django_app/Dockerfile @@ -0,0 +1,19 @@ +FROM python:3.12-slim + +WORKDIR /app + +# Install curl for healthcheck and git for pip install from git +RUN apt-get update && apt-get install -y curl git && rm -rf /var/lib/apt/lists/* + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY . . + +ENV DJANGO_SETTINGS_MODULE=settings +ENV PYTHONPATH=/app + +EXPOSE 8000 + +# Command specified in docker-compose.yml +CMD ["gunicorn", "asgi:application", "-k", "asgi", "-b", "0.0.0.0:8000"] diff --git a/tests/docker/asgi_framework_compat/frameworks/django_app/asgi.py b/tests/docker/asgi_framework_compat/frameworks/django_app/asgi.py new file mode 100644 index 00000000..0a970253 --- /dev/null +++ b/tests/docker/asgi_framework_compat/frameworks/django_app/asgi.py @@ -0,0 +1,60 @@ +""" +ASGI config for Django compatibility testing. +""" + +import os +import time + +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "settings") + +import django +django.setup() + +from channels.routing import ProtocolTypeRouter, URLRouter +from django.core.asgi import get_asgi_application +from routing import websocket_urlpatterns + +# Lifespan state - shared across the application +lifespan_state = { + "startup_called": False, + "startup_time": None, + "counter": 0, + "custom_data": {}, +} + + +class LifespanMiddleware: + """Custom lifespan handler for Django.""" + + def __init__(self, app): + self.app = app + + async def __call__(self, scope, receive, send): + if scope["type"] == "lifespan": + while True: + message = await receive() + if message["type"] == "lifespan.startup": + lifespan_state["startup_called"] = True + lifespan_state["startup_time"] = time.time() + lifespan_state["custom_data"]["initialized"] = True + await send({"type": "lifespan.startup.complete"}) + elif message["type"] == "lifespan.shutdown": + lifespan_state["shutdown_called"] = True + await send({"type": "lifespan.shutdown.complete"}) + return + else: + # Make lifespan_state available to views + scope["lifespan_state"] = lifespan_state + await self.app(scope, receive, send) + + +# Get Django ASGI application +django_asgi_app = get_asgi_application() + +# Combine HTTP and WebSocket routing +application = LifespanMiddleware( + ProtocolTypeRouter({ + "http": django_asgi_app, + "websocket": URLRouter(websocket_urlpatterns), + }) +) diff --git a/tests/docker/asgi_framework_compat/frameworks/django_app/consumers.py b/tests/docker/asgi_framework_compat/frameworks/django_app/consumers.py new file mode 100644 index 00000000..b08a7791 --- /dev/null +++ b/tests/docker/asgi_framework_compat/frameworks/django_app/consumers.py @@ -0,0 +1,110 @@ +""" +Django Channels WebSocket consumers for ASGI compatibility testing. +""" + +import json +from channels.generic.websocket import AsyncWebsocketConsumer + + +def serialize_scope(scope: dict) -> dict: + """Convert ASGI scope to JSON-serializable dict.""" + result = {} + for key, value in scope.items(): + if key == "headers": + result[key] = [ + [h[0].decode("latin-1"), h[1].decode("latin-1")] for h in value + ] + elif key == "query_string": + result[key] = value.decode("latin-1") if value else "" + elif key == "server": + result[key] = list(value) if value else None + elif key == "client": + result[key] = list(value) if value else None + elif key == "asgi": + result[key] = dict(value) + elif key in ("state", "app", "url_route", "path_remaining"): + continue + elif isinstance(value, bytes): + result[key] = value.decode("latin-1") + else: + try: + json.dumps(value) + result[key] = value + except (TypeError, ValueError): + continue + return result + + +class EchoConsumer(AsyncWebsocketConsumer): + """Echo text messages.""" + + async def connect(self): + await self.accept() + + async def receive(self, text_data=None, bytes_data=None): + if text_data: + await self.send(text_data=text_data) + + async def disconnect(self, close_code): + pass + + +class EchoBinaryConsumer(AsyncWebsocketConsumer): + """Echo binary messages.""" + + async def connect(self): + await self.accept() + + async def receive(self, text_data=None, bytes_data=None): + if bytes_data: + await self.send(bytes_data=bytes_data) + + async def disconnect(self, close_code): + pass + + +class ScopeConsumer(AsyncWebsocketConsumer): + """Send WebSocket scope on connect.""" + + async def connect(self): + await self.accept() + scope_data = serialize_scope(self.scope) + await self.send(text_data=json.dumps(scope_data)) + await self.close() + + async def disconnect(self, close_code): + pass + + +class SubprotocolConsumer(AsyncWebsocketConsumer): + """Subprotocol negotiation.""" + + async def connect(self): + requested = self.scope.get("subprotocols", []) + selected = requested[0] if requested else None + await self.accept(subprotocol=selected) + await self.send(text_data=json.dumps({ + "requested": requested, + "selected": selected + })) + await self.close() + + async def disconnect(self, close_code): + pass + + +class CloseConsumer(AsyncWebsocketConsumer): + """Close with specific code.""" + + async def connect(self): + await self.accept() + query_string = self.scope.get("query_string", b"").decode() + code = 1000 + for param in query_string.split("&"): + if param.startswith("code="): + code = int(param.split("=")[1]) + break + await self.close(code=code) + + async def disconnect(self, close_code): + pass diff --git a/tests/docker/asgi_framework_compat/frameworks/django_app/requirements.txt b/tests/docker/asgi_framework_compat/frameworks/django_app/requirements.txt new file mode 100644 index 00000000..7c75f91b --- /dev/null +++ b/tests/docker/asgi_framework_compat/frameworks/django_app/requirements.txt @@ -0,0 +1,6 @@ +gunicorn @ git+https://github.com/benoitc/gunicorn.git@master +Django>=5.0 +channels>=4.0.0 +uvloop>=0.19.0 +websockets>=12.0 +httptools>=0.6.0 diff --git a/tests/docker/asgi_framework_compat/frameworks/django_app/routing.py b/tests/docker/asgi_framework_compat/frameworks/django_app/routing.py new file mode 100644 index 00000000..28e68803 --- /dev/null +++ b/tests/docker/asgi_framework_compat/frameworks/django_app/routing.py @@ -0,0 +1,20 @@ +""" +WebSocket routing for Django Channels. +""" + +from django.urls import path +from consumers import ( + EchoConsumer, + EchoBinaryConsumer, + ScopeConsumer, + SubprotocolConsumer, + CloseConsumer, +) + +websocket_urlpatterns = [ + path("ws/echo", EchoConsumer.as_asgi()), + path("ws/echo-binary", EchoBinaryConsumer.as_asgi()), + path("ws/scope", ScopeConsumer.as_asgi()), + path("ws/subprotocol", SubprotocolConsumer.as_asgi()), + path("ws/close", CloseConsumer.as_asgi()), +] diff --git a/tests/docker/asgi_framework_compat/frameworks/django_app/settings.py b/tests/docker/asgi_framework_compat/frameworks/django_app/settings.py new file mode 100644 index 00000000..667d6d2a --- /dev/null +++ b/tests/docker/asgi_framework_compat/frameworks/django_app/settings.py @@ -0,0 +1,48 @@ +""" +Django settings for ASGI compatibility testing. +""" + +import os + +# Build paths inside the project like this: BASE_DIR / 'subdir'. +BASE_DIR = os.path.dirname(os.path.abspath(__file__)) + +# SECURITY WARNING: keep the secret key used in production secret! +SECRET_KEY = "django-insecure-test-key-for-asgi-compat" + +# SECURITY WARNING: don't run with debug turned on in production! +DEBUG = True + +ALLOWED_HOSTS = ["*"] + +# Application definition +INSTALLED_APPS = [ + "django.contrib.contenttypes", + "django.contrib.auth", + "channels", +] + +MIDDLEWARE = [] + +ROOT_URLCONF = "urls" + +TEMPLATES = [] + +# ASGI application +ASGI_APPLICATION = "asgi.application" + +# Channel layers - use in-memory for testing +CHANNEL_LAYERS = { + "default": { + "BACKEND": "channels.layers.InMemoryChannelLayer" + } +} + +# Database - not needed for testing +DATABASES = {} + +# Default primary key field type +DEFAULT_AUTO_FIELD = "django.db.models.BigAutoField" + +# Disable CSRF for testing +CSRF_TRUSTED_ORIGINS = ["http://localhost:*", "http://127.0.0.1:*"] diff --git a/tests/docker/asgi_framework_compat/frameworks/django_app/urls.py b/tests/docker/asgi_framework_compat/frameworks/django_app/urls.py new file mode 100644 index 00000000..56b7d2ba --- /dev/null +++ b/tests/docker/asgi_framework_compat/frameworks/django_app/urls.py @@ -0,0 +1,32 @@ +""" +URL configuration for Django compatibility testing. +""" + +from django.urls import path +from views import ( + health, + scope_view, + echo, + headers_view, + status_view, + streaming_view, + sse_view, + large_view, + delay_view, + lifespan_state_view, + lifespan_counter_view, +) + +urlpatterns = [ + path("health", health, name="health"), + path("scope", scope_view, name="scope"), + path("echo", echo, name="echo"), + path("headers", headers_view, name="headers"), + path("status/", status_view, name="status"), + path("streaming", streaming_view, name="streaming"), + path("sse", sse_view, name="sse"), + path("large", large_view, name="large"), + path("delay", delay_view, name="delay"), + path("lifespan/state", lifespan_state_view, name="lifespan_state"), + path("lifespan/counter", lifespan_counter_view, name="lifespan_counter"), +] diff --git a/tests/docker/asgi_framework_compat/frameworks/django_app/views.py b/tests/docker/asgi_framework_compat/frameworks/django_app/views.py new file mode 100644 index 00000000..3b76412b --- /dev/null +++ b/tests/docker/asgi_framework_compat/frameworks/django_app/views.py @@ -0,0 +1,134 @@ +""" +Django views for ASGI compatibility testing. +""" + +import asyncio +import json + +from django.http import ( + HttpRequest, + HttpResponse, + JsonResponse, + StreamingHttpResponse, +) +from django.views.decorators.csrf import csrf_exempt + + +def serialize_scope(scope: dict) -> dict: + """Convert ASGI scope to JSON-serializable dict.""" + result = {} + for key, value in scope.items(): + if key == "headers": + result[key] = [ + [h[0].decode("latin-1"), h[1].decode("latin-1")] for h in value + ] + elif key == "query_string": + result[key] = value.decode("latin-1") if value else "" + elif key == "server": + result[key] = list(value) if value else None + elif key == "client": + result[key] = list(value) if value else None + elif key == "asgi": + result[key] = dict(value) + elif key in ("state", "app", "lifespan_state", "url_route", "resolver_match"): + continue + elif isinstance(value, bytes): + result[key] = value.decode("latin-1") + else: + try: + json.dumps(value) + result[key] = value + except (TypeError, ValueError): + continue + return result + + +async def health(request: HttpRequest) -> HttpResponse: + """Health check endpoint.""" + return HttpResponse("OK") + + +async def scope_view(request: HttpRequest) -> JsonResponse: + """Return full ASGI scope as JSON.""" + # Access ASGI scope from request + scope = request.scope if hasattr(request, "scope") else {} + scope_data = serialize_scope(scope) + return JsonResponse(scope_data) + + +@csrf_exempt +async def echo(request: HttpRequest) -> HttpResponse: + """Echo request body back.""" + body = request.body + content_type = request.content_type or "application/octet-stream" + return HttpResponse(body, content_type=content_type) + + +async def headers_view(request: HttpRequest) -> JsonResponse: + """Return request headers as JSON.""" + headers_dict = {} + for key, value in request.headers.items(): + headers_dict[key.lower()] = value + return JsonResponse(headers_dict) + + +async def status_view(request: HttpRequest, code: int) -> HttpResponse: + """Return specific HTTP status code.""" + return HttpResponse(f"Status: {code}", status=code) + + +async def streaming_view(request: HttpRequest) -> StreamingHttpResponse: + """Chunked streaming response.""" + + async def generate(): + for i in range(10): + yield f"chunk-{i}\n" + await asyncio.sleep(0.01) + + return StreamingHttpResponse(generate(), content_type="text/plain") + + +async def sse_view(request: HttpRequest) -> StreamingHttpResponse: + """Server-Sent Events endpoint.""" + + async def generate(): + for i in range(5): + yield f"event: message\ndata: {json.dumps({'count': i})}\n\n" + await asyncio.sleep(0.01) + yield "event: done\ndata: {}\n\n" + + response = StreamingHttpResponse(generate(), content_type="text/event-stream") + response["Cache-Control"] = "no-cache" + return response + + +async def large_view(request: HttpRequest) -> HttpResponse: + """Large response body.""" + size = int(request.GET.get("size", 1024)) + # Cap at 10MB for safety + size = min(size, 10 * 1024 * 1024) + return HttpResponse(b"x" * size, content_type="application/octet-stream") + + +async def delay_view(request: HttpRequest) -> HttpResponse: + """Delayed response.""" + seconds = float(request.GET.get("seconds", 1)) + # Cap at 30 seconds + seconds = min(seconds, 30) + await asyncio.sleep(seconds) + return HttpResponse(f"Delayed {seconds} seconds") + + +async def lifespan_state_view(request: HttpRequest) -> JsonResponse: + """Return lifespan startup state.""" + # Get lifespan_state from scope + lifespan_state = getattr(request, "scope", {}).get("lifespan_state", {}) + return JsonResponse(lifespan_state) + + +async def lifespan_counter_view(request: HttpRequest) -> JsonResponse: + """Increment and return counter.""" + lifespan_state = getattr(request, "scope", {}).get("lifespan_state", {}) + if lifespan_state: + lifespan_state["counter"] = lifespan_state.get("counter", 0) + 1 + return JsonResponse({"counter": lifespan_state.get("counter", 0)}) diff --git a/tests/docker/asgi_framework_compat/frameworks/fastapi_app/Dockerfile b/tests/docker/asgi_framework_compat/frameworks/fastapi_app/Dockerfile new file mode 100644 index 00000000..a5e7a1f0 --- /dev/null +++ b/tests/docker/asgi_framework_compat/frameworks/fastapi_app/Dockerfile @@ -0,0 +1,16 @@ +FROM python:3.12-slim + +WORKDIR /app + +# Install curl for healthcheck and git for pip install from git +RUN apt-get update && apt-get install -y curl git && rm -rf /var/lib/apt/lists/* + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY app.py . + +EXPOSE 8000 + +# Command specified in docker-compose.yml +CMD ["gunicorn", "app:app", "-k", "asgi", "-b", "0.0.0.0:8000"] diff --git a/tests/docker/asgi_framework_compat/frameworks/fastapi_app/app.py b/tests/docker/asgi_framework_compat/frameworks/fastapi_app/app.py new file mode 100644 index 00000000..df455a5f --- /dev/null +++ b/tests/docker/asgi_framework_compat/frameworks/fastapi_app/app.py @@ -0,0 +1,263 @@ +""" +FastAPI ASGI Application for Compatibility Testing + +Implements the contract endpoints for ASGI 3.0 compliance testing. +""" + +import asyncio +import json +import sys +import traceback +from contextlib import asynccontextmanager +from typing import Any + +from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect +from fastapi.responses import PlainTextResponse, Response, StreamingResponse, JSONResponse + + +# Lifespan state +lifespan_state = { + "startup_called": False, + "startup_time": None, + "counter": 0, + "custom_data": {}, +} + + +@asynccontextmanager +async def lifespan(app: FastAPI): + """Lifespan context manager for startup/shutdown.""" + import time + + lifespan_state["startup_called"] = True + lifespan_state["startup_time"] = time.time() + lifespan_state["custom_data"]["initialized"] = True + yield + lifespan_state["shutdown_called"] = True + + +app = FastAPI(lifespan=lifespan) + + +def safe_json_serialize(obj: Any) -> Any: + """Recursively convert an object to JSON-serializable form.""" + if obj is None or isinstance(obj, (str, int, float, bool)): + return obj + elif isinstance(obj, bytes): + return obj.decode("latin-1") + elif isinstance(obj, (list, tuple)): + return [safe_json_serialize(item) for item in obj] + elif isinstance(obj, dict): + result = {} + for k, v in obj.items(): + # Only include string keys + if isinstance(k, str): + result[k] = safe_json_serialize(v) + return result + else: + # Skip non-serializable types + return None + + +def serialize_scope(scope: dict) -> dict: + """Convert ASGI scope to JSON-serializable dict.""" + result = {} + + # Keys to explicitly skip (non-serializable objects) + skip_keys = {"state", "app", "router", "endpoint", "path_params", "route", + "extensions", "_cookies", "fastapi_astack"} + + for key, value in scope.items(): + if key in skip_keys: + continue + + try: + if key == "headers": + result[key] = [ + [h[0].decode("latin-1"), h[1].decode("latin-1")] for h in value + ] + elif key == "query_string": + result[key] = value.decode("latin-1") if value else "" + elif key == "raw_path": + result[key] = value.decode("latin-1") if value else "" + elif key == "server": + result[key] = list(value) if value else None + elif key == "client": + result[key] = list(value) if value else None + elif key == "asgi": + # Only serialize simple values from asgi dict + result[key] = { + k: v for k, v in value.items() + if isinstance(k, str) and isinstance(v, (str, int, float, bool, type(None))) + } + elif isinstance(value, bytes): + result[key] = value.decode("latin-1") + elif isinstance(value, (str, int, float, bool, type(None))): + result[key] = value + elif isinstance(value, (list, tuple)): + serialized = safe_json_serialize(value) + if serialized is not None: + result[key] = serialized + elif isinstance(value, dict): + serialized = safe_json_serialize(value) + if serialized is not None: + result[key] = serialized + # Skip other types + except Exception as e: + print(f"Error serializing key {key}: {e}", file=sys.stderr) + continue + return result + + +# HTTP Endpoints +@app.get("/health") +async def health(): + """Health check endpoint.""" + return PlainTextResponse("OK") + + +@app.get("/scope") +async def scope_endpoint(request: Request): + """Return full ASGI scope as JSON.""" + try: + scope_data = serialize_scope(request.scope) + return JSONResponse(scope_data) + except Exception as e: + traceback.print_exc() + return PlainTextResponse(f"Error: {e}", status_code=500) + + +@app.post("/echo") +async def echo(request: Request): + """Echo request body back.""" + body = await request.body() + content_type = request.headers.get("content-type", "application/octet-stream") + return Response(content=body, media_type=content_type) + + +@app.get("/headers") +async def headers_endpoint(request: Request): + """Return request headers as JSON.""" + headers_dict = dict(request.headers) + return headers_dict + + +@app.get("/status/{code}") +async def status_endpoint(code: int): + """Return specific HTTP status code.""" + return PlainTextResponse(f"Status: {code}", status_code=code) + + +@app.get("/streaming") +async def streaming(): + """Chunked streaming response.""" + + async def generate(): + for i in range(10): + yield f"chunk-{i}\n" + await asyncio.sleep(0.01) + + return StreamingResponse(generate(), media_type="text/plain") + + +@app.get("/sse") +async def sse(): + """Server-Sent Events endpoint.""" + + async def generate(): + for i in range(5): + yield f"event: message\ndata: {json.dumps({'count': i})}\n\n" + await asyncio.sleep(0.01) + yield "event: done\ndata: {}\n\n" + + return StreamingResponse(generate(), media_type="text/event-stream") + + +@app.get("/large") +async def large(size: int = 1024): + """Large response body.""" + # Cap at 10MB for safety + size = min(size, 10 * 1024 * 1024) + return Response(content=b"x" * size, media_type="application/octet-stream") + + +@app.get("/delay") +async def delay(seconds: float = 1.0): + """Delayed response.""" + # Cap at 30 seconds + seconds = min(seconds, 30) + await asyncio.sleep(seconds) + return PlainTextResponse(f"Delayed {seconds} seconds") + + +@app.get("/lifespan/state") +async def lifespan_state_endpoint(): + """Return lifespan startup state.""" + return lifespan_state + + +@app.get("/lifespan/counter") +async def lifespan_counter(): + """Increment and return counter.""" + lifespan_state["counter"] += 1 + return {"counter": lifespan_state["counter"]} + + +# WebSocket Endpoints +@app.websocket("/ws/echo") +async def ws_echo(websocket: WebSocket): + """Echo text messages.""" + await websocket.accept() + try: + while True: + message = await websocket.receive_text() + await websocket.send_text(message) + except WebSocketDisconnect: + pass + + +@app.websocket("/ws/echo-binary") +async def ws_echo_binary(websocket: WebSocket): + """Echo binary messages.""" + await websocket.accept() + try: + while True: + message = await websocket.receive_bytes() + await websocket.send_bytes(message) + except WebSocketDisconnect: + pass + + +@app.websocket("/ws/scope") +async def ws_scope(websocket: WebSocket): + """Send WebSocket scope on connect.""" + await websocket.accept() + try: + scope_data = serialize_scope(websocket.scope) + await websocket.send_json(scope_data) + except Exception as e: + await websocket.send_text(f"Error: {e}") + await websocket.close() + + +@app.websocket("/ws/subprotocol") +async def ws_subprotocol(websocket: WebSocket): + """Subprotocol negotiation.""" + requested = websocket.scope.get("subprotocols", []) + selected = requested[0] if requested else None + await websocket.accept(subprotocol=selected) + await websocket.send_json({"requested": requested, "selected": selected}) + await websocket.close() + + +@app.websocket("/ws/close") +async def ws_close(websocket: WebSocket): + """Close with specific code.""" + await websocket.accept() + query_string = websocket.scope.get("query_string", b"").decode() + code = 1000 + for param in query_string.split("&"): + if param.startswith("code="): + code = int(param.split("=")[1]) + break + await websocket.close(code=code) diff --git a/tests/docker/asgi_framework_compat/frameworks/fastapi_app/requirements.txt b/tests/docker/asgi_framework_compat/frameworks/fastapi_app/requirements.txt new file mode 100644 index 00000000..7f526a7f --- /dev/null +++ b/tests/docker/asgi_framework_compat/frameworks/fastapi_app/requirements.txt @@ -0,0 +1,5 @@ +gunicorn @ git+https://github.com/benoitc/gunicorn.git@master +fastapi>=0.110.0 +uvloop>=0.19.0 +websockets>=12.0 +httptools>=0.6.0 diff --git a/tests/docker/asgi_framework_compat/frameworks/litestar_app/Dockerfile b/tests/docker/asgi_framework_compat/frameworks/litestar_app/Dockerfile new file mode 100644 index 00000000..a5e7a1f0 --- /dev/null +++ b/tests/docker/asgi_framework_compat/frameworks/litestar_app/Dockerfile @@ -0,0 +1,16 @@ +FROM python:3.12-slim + +WORKDIR /app + +# Install curl for healthcheck and git for pip install from git +RUN apt-get update && apt-get install -y curl git && rm -rf /var/lib/apt/lists/* + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY app.py . + +EXPOSE 8000 + +# Command specified in docker-compose.yml +CMD ["gunicorn", "app:app", "-k", "asgi", "-b", "0.0.0.0:8000"] diff --git a/tests/docker/asgi_framework_compat/frameworks/litestar_app/app.py b/tests/docker/asgi_framework_compat/frameworks/litestar_app/app.py new file mode 100644 index 00000000..fbfb81b4 --- /dev/null +++ b/tests/docker/asgi_framework_compat/frameworks/litestar_app/app.py @@ -0,0 +1,237 @@ +""" +Litestar ASGI Application for Compatibility Testing + +Implements the contract endpoints for ASGI 3.0 compliance testing. +Litestar is a modern ASGI framework with extensive feature support. +""" + +import asyncio +import json +import time +from typing import Any, Dict + +from litestar import Litestar, Request, get, post +from litestar.connection import ASGIConnection +from litestar.handlers import websocket +from litestar.response import Response, Stream + + +# Lifespan state +lifespan_state = { + "startup_called": False, + "startup_time": None, + "counter": 0, + "custom_data": {}, +} + + +async def on_startup(app: Litestar) -> None: + """Startup handler.""" + lifespan_state["startup_called"] = True + lifespan_state["startup_time"] = time.time() + lifespan_state["custom_data"]["initialized"] = True + + +async def on_shutdown(app: Litestar) -> None: + """Shutdown handler.""" + lifespan_state["shutdown_called"] = True + + +def serialize_scope(scope: dict) -> dict: + """Convert ASGI scope to JSON-serializable dict.""" + result = {} + for key, value in scope.items(): + if key == "headers": + result[key] = [ + [h[0].decode("latin-1"), h[1].decode("latin-1")] for h in value + ] + elif key == "query_string": + result[key] = value.decode("latin-1") if value else "" + elif key == "server": + result[key] = list(value) if value else None + elif key == "client": + result[key] = list(value) if value else None + elif key == "asgi": + result[key] = dict(value) + elif key in ("state", "app", "_litestar", "route_handler", "path_params"): + continue + elif isinstance(value, bytes): + result[key] = value.decode("latin-1") + else: + try: + json.dumps(value) + result[key] = value + except (TypeError, ValueError): + continue + return result + + +# HTTP Endpoints +@get("/health") +async def health() -> str: + """Health check endpoint.""" + return "OK" + + +@get("/scope") +async def scope_endpoint(request: Request) -> Dict[str, Any]: + """Return full ASGI scope as JSON.""" + scope_data = serialize_scope(request.scope) + return scope_data + + +@post("/echo") +async def echo(request: Request) -> Response: + """Echo request body back.""" + body = await request.body() + content_type = request.headers.get("content-type", "application/octet-stream") + return Response(content=body, media_type=content_type) + + +@get("/headers") +async def headers_endpoint(request: Request) -> Dict[str, str]: + """Return request headers as JSON.""" + return dict(request.headers) + + +@get("/status/{code:int}") +async def status_endpoint(code: int) -> Response: + """Return specific HTTP status code.""" + return Response(content=f"Status: {code}", status_code=code) + + +@get("/streaming") +async def streaming() -> Stream: + """Chunked streaming response.""" + + async def generate(): + for i in range(10): + yield f"chunk-{i}\n".encode() + await asyncio.sleep(0.01) + + return Stream(generate(), media_type="text/plain") + + +@get("/sse") +async def sse() -> Stream: + """Server-Sent Events endpoint.""" + + async def generate(): + for i in range(5): + yield f"event: message\ndata: {json.dumps({'count': i})}\n\n".encode() + await asyncio.sleep(0.01) + yield b"event: done\ndata: {}\n\n" + + return Stream(generate(), media_type="text/event-stream") + + +@get("/large") +async def large(size: int = 1024) -> Response: + """Large response body.""" + # Cap at 10MB for safety + size = min(size, 10 * 1024 * 1024) + return Response(content=b"x" * size, media_type="application/octet-stream") + + +@get("/delay") +async def delay(seconds: float = 1.0) -> str: + """Delayed response.""" + # Cap at 30 seconds + seconds = min(seconds, 30) + await asyncio.sleep(seconds) + return f"Delayed {seconds} seconds" + + +@get("/lifespan/state") +async def lifespan_state_endpoint() -> Dict[str, Any]: + """Return lifespan startup state.""" + return lifespan_state + + +@get("/lifespan/counter") +async def lifespan_counter() -> Dict[str, int]: + """Increment and return counter.""" + lifespan_state["counter"] += 1 + return {"counter": lifespan_state["counter"]} + + +# WebSocket Endpoints using raw websocket handler +@websocket("/ws/echo") +async def ws_echo(socket: ASGIConnection) -> None: + """Echo text messages.""" + await socket.accept() + try: + while True: + data = await socket.receive_text() + await socket.send_text(data) + except Exception: + pass + + +@websocket("/ws/echo-binary") +async def ws_echo_binary(socket: ASGIConnection) -> None: + """Echo binary messages.""" + await socket.accept() + try: + while True: + data = await socket.receive_bytes() + await socket.send_bytes(data) + except Exception: + pass + + +@websocket("/ws/scope") +async def ws_scope_handler(socket: ASGIConnection) -> None: + """Send WebSocket scope on connect.""" + await socket.accept() + scope_data = serialize_scope(socket.scope) + await socket.send_json(scope_data) + await socket.close() + + +@websocket("/ws/subprotocol") +async def ws_subprotocol_handler(socket: ASGIConnection) -> None: + """Subprotocol negotiation.""" + requested = socket.scope.get("subprotocols", []) + selected = requested[0] if requested else None + await socket.accept(subprotocols=selected) + await socket.send_json({"requested": requested, "selected": selected}) + await socket.close() + + +@websocket("/ws/close") +async def ws_close_handler(socket: ASGIConnection) -> None: + """Close with specific code.""" + await socket.accept() + query_string = socket.scope.get("query_string", b"").decode() + code = 1000 + for param in query_string.split("&"): + if param.startswith("code="): + code = int(param.split("=")[1]) + break + await socket.close(code=code) + + +# Create app with lifespan handlers +app = Litestar( + route_handlers=[ + health, + scope_endpoint, + echo, + headers_endpoint, + status_endpoint, + streaming, + sse, + large, + delay, + lifespan_state_endpoint, + lifespan_counter, + ws_echo, + ws_echo_binary, + ws_scope_handler, + ws_subprotocol_handler, + ws_close_handler, + ], + on_startup=[on_startup], + on_shutdown=[on_shutdown], +) diff --git a/tests/docker/asgi_framework_compat/frameworks/litestar_app/requirements.txt b/tests/docker/asgi_framework_compat/frameworks/litestar_app/requirements.txt new file mode 100644 index 00000000..f07b970a --- /dev/null +++ b/tests/docker/asgi_framework_compat/frameworks/litestar_app/requirements.txt @@ -0,0 +1,5 @@ +gunicorn @ git+https://github.com/benoitc/gunicorn.git@master +litestar>=2.7.0 +uvloop>=0.19.0 +websockets>=12.0 +httptools>=0.6.0 diff --git a/tests/docker/asgi_framework_compat/frameworks/quart_app/Dockerfile b/tests/docker/asgi_framework_compat/frameworks/quart_app/Dockerfile new file mode 100644 index 00000000..a5e7a1f0 --- /dev/null +++ b/tests/docker/asgi_framework_compat/frameworks/quart_app/Dockerfile @@ -0,0 +1,16 @@ +FROM python:3.12-slim + +WORKDIR /app + +# Install curl for healthcheck and git for pip install from git +RUN apt-get update && apt-get install -y curl git && rm -rf /var/lib/apt/lists/* + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY app.py . + +EXPOSE 8000 + +# Command specified in docker-compose.yml +CMD ["gunicorn", "app:app", "-k", "asgi", "-b", "0.0.0.0:8000"] diff --git a/tests/docker/asgi_framework_compat/frameworks/quart_app/app.py b/tests/docker/asgi_framework_compat/frameworks/quart_app/app.py new file mode 100644 index 00000000..83822a63 --- /dev/null +++ b/tests/docker/asgi_framework_compat/frameworks/quart_app/app.py @@ -0,0 +1,210 @@ +""" +Quart ASGI Application for Compatibility Testing + +Implements the contract endpoints for ASGI 3.0 compliance testing. +Quart is a Flask-like async framework built on ASGI. +""" + +import asyncio +import json +import time + +from quart import Quart, request, websocket, Response, make_response + + +app = Quart(__name__) + +# Lifespan state +lifespan_state = { + "startup_called": False, + "startup_time": None, + "counter": 0, + "custom_data": {}, +} + + +@app.before_serving +async def startup(): + """Startup handler.""" + lifespan_state["startup_called"] = True + lifespan_state["startup_time"] = time.time() + lifespan_state["custom_data"]["initialized"] = True + + +@app.after_serving +async def shutdown(): + """Shutdown handler.""" + lifespan_state["shutdown_called"] = True + + +def serialize_scope(scope: dict) -> dict: + """Convert ASGI scope to JSON-serializable dict.""" + result = {} + for key, value in scope.items(): + if key == "headers": + result[key] = [ + [h[0].decode("latin-1"), h[1].decode("latin-1")] for h in value + ] + elif key == "query_string": + result[key] = value.decode("latin-1") if value else "" + elif key == "server": + result[key] = list(value) if value else None + elif key == "client": + result[key] = list(value) if value else None + elif key == "asgi": + result[key] = dict(value) + elif key in ("state", "app", "_quart"): + continue + elif isinstance(value, bytes): + result[key] = value.decode("latin-1") + else: + try: + json.dumps(value) + result[key] = value + except (TypeError, ValueError): + continue + return result + + +# HTTP Endpoints +@app.route("/health") +async def health(): + """Health check endpoint.""" + return "OK", 200 + + +@app.route("/scope") +async def scope_endpoint(): + """Return full ASGI scope as JSON.""" + # Access the ASGI scope via request + scope = request.scope + scope_data = serialize_scope(scope) + return scope_data + + +@app.route("/echo", methods=["POST"]) +async def echo(): + """Echo request body back.""" + body = await request.get_data() + content_type = request.headers.get("content-type", "application/octet-stream") + response = await make_response(body) + response.headers["Content-Type"] = content_type + return response + + +@app.route("/headers") +async def headers_endpoint(): + """Return request headers as JSON.""" + headers_dict = dict(request.headers) + return headers_dict + + +@app.route("/status/") +async def status_endpoint(code: int): + """Return specific HTTP status code.""" + return f"Status: {code}", code + + +@app.route("/streaming") +async def streaming(): + """Chunked streaming response.""" + + async def generate(): + for i in range(10): + yield f"chunk-{i}\n" + await asyncio.sleep(0.01) + + return generate(), 200, {"Content-Type": "text/plain"} + + +@app.route("/sse") +async def sse(): + """Server-Sent Events endpoint.""" + + async def generate(): + for i in range(5): + yield f"event: message\ndata: {json.dumps({'count': i})}\n\n" + await asyncio.sleep(0.01) + yield "event: done\ndata: {}\n\n" + + return generate(), 200, {"Content-Type": "text/event-stream", "Cache-Control": "no-cache"} + + +@app.route("/large") +async def large(): + """Large response body.""" + size = request.args.get("size", 1024, type=int) + # Cap at 10MB for safety + size = min(size, 10 * 1024 * 1024) + response = await make_response(b"x" * size) + response.headers["Content-Type"] = "application/octet-stream" + return response + + +@app.route("/delay") +async def delay(): + """Delayed response.""" + seconds = request.args.get("seconds", 1.0, type=float) + # Cap at 30 seconds + seconds = min(seconds, 30) + await asyncio.sleep(seconds) + return f"Delayed {seconds} seconds" + + +@app.route("/lifespan/state") +async def lifespan_state_endpoint(): + """Return lifespan startup state.""" + return lifespan_state + + +@app.route("/lifespan/counter") +async def lifespan_counter(): + """Increment and return counter.""" + lifespan_state["counter"] += 1 + return {"counter": lifespan_state["counter"]} + + +# WebSocket Endpoints +@app.websocket("/ws/echo") +async def ws_echo(): + """Echo text messages.""" + while True: + message = await websocket.receive() + await websocket.send(message) + + +@app.websocket("/ws/echo-binary") +async def ws_echo_binary(): + """Echo binary messages.""" + while True: + message = await websocket.receive() + await websocket.send(message) + + +@app.websocket("/ws/scope") +async def ws_scope(): + """Send WebSocket scope on connect.""" + scope_data = serialize_scope(websocket.scope) + await websocket.send_json(scope_data) + + +@app.websocket("/ws/subprotocol") +async def ws_subprotocol(): + """Subprotocol negotiation.""" + requested = websocket.scope.get("subprotocols", []) + selected = requested[0] if requested else None + # Note: Quart handles subprotocol via accept() but we need to check how + await websocket.send_json({"requested": requested, "selected": selected}) + + +@app.websocket("/ws/close") +async def ws_close(): + """Close with specific code.""" + query_string = websocket.scope.get("query_string", b"").decode() + code = 1000 + for param in query_string.split("&"): + if param.startswith("code="): + code = int(param.split("=")[1]) + break + # Quart uses a different close mechanism + await websocket.close(code) diff --git a/tests/docker/asgi_framework_compat/frameworks/quart_app/requirements.txt b/tests/docker/asgi_framework_compat/frameworks/quart_app/requirements.txt new file mode 100644 index 00000000..1f50a34f --- /dev/null +++ b/tests/docker/asgi_framework_compat/frameworks/quart_app/requirements.txt @@ -0,0 +1,5 @@ +gunicorn @ git+https://github.com/benoitc/gunicorn.git@master +quart>=0.19.0 +uvloop>=0.19.0 +websockets>=12.0 +httptools>=0.6.0 diff --git a/tests/docker/asgi_framework_compat/frameworks/starlette_app/Dockerfile b/tests/docker/asgi_framework_compat/frameworks/starlette_app/Dockerfile new file mode 100644 index 00000000..a5e7a1f0 --- /dev/null +++ b/tests/docker/asgi_framework_compat/frameworks/starlette_app/Dockerfile @@ -0,0 +1,16 @@ +FROM python:3.12-slim + +WORKDIR /app + +# Install curl for healthcheck and git for pip install from git +RUN apt-get update && apt-get install -y curl git && rm -rf /var/lib/apt/lists/* + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY app.py . + +EXPOSE 8000 + +# Command specified in docker-compose.yml +CMD ["gunicorn", "app:app", "-k", "asgi", "-b", "0.0.0.0:8000"] diff --git a/tests/docker/asgi_framework_compat/frameworks/starlette_app/app.py b/tests/docker/asgi_framework_compat/frameworks/starlette_app/app.py new file mode 100644 index 00000000..45779215 --- /dev/null +++ b/tests/docker/asgi_framework_compat/frameworks/starlette_app/app.py @@ -0,0 +1,284 @@ +""" +Starlette ASGI Application for Compatibility Testing + +Implements the contract endpoints for ASGI 3.0 compliance testing. +""" + +import asyncio +import json +import sys +import traceback +from contextlib import asynccontextmanager +from typing import Any + +from starlette.applications import Starlette +from starlette.responses import ( + JSONResponse, + PlainTextResponse, + Response, + StreamingResponse, +) +from starlette.routing import Route, WebSocketRoute +from starlette.websockets import WebSocket + + +# Lifespan state +lifespan_state = { + "startup_called": False, + "startup_time": None, + "counter": 0, + "custom_data": {}, +} + + +@asynccontextmanager +async def lifespan(app): + """Lifespan context manager for startup/shutdown.""" + import time + + lifespan_state["startup_called"] = True + lifespan_state["startup_time"] = time.time() + lifespan_state["custom_data"]["initialized"] = True + yield + lifespan_state["shutdown_called"] = True + + +def safe_json_serialize(obj: Any) -> Any: + """Recursively convert an object to JSON-serializable form.""" + if obj is None or isinstance(obj, (str, int, float, bool)): + return obj + elif isinstance(obj, bytes): + return obj.decode("latin-1") + elif isinstance(obj, (list, tuple)): + return [safe_json_serialize(item) for item in obj] + elif isinstance(obj, dict): + result = {} + for k, v in obj.items(): + # Only include string keys + if isinstance(k, str): + result[k] = safe_json_serialize(v) + return result + else: + # Skip non-serializable types + return None + + +def serialize_scope(scope: dict) -> dict: + """Convert ASGI scope to JSON-serializable dict.""" + result = {} + + # Keys to explicitly skip (non-serializable objects) + skip_keys = {"state", "app", "router", "endpoint", "path_params", "route", + "extensions", "_cookies"} + + for key, value in scope.items(): + if key in skip_keys: + continue + + try: + if key == "headers": + result[key] = [ + [h[0].decode("latin-1"), h[1].decode("latin-1")] for h in value + ] + elif key == "query_string": + result[key] = value.decode("latin-1") if value else "" + elif key == "raw_path": + result[key] = value.decode("latin-1") if value else "" + elif key == "server": + result[key] = list(value) if value else None + elif key == "client": + result[key] = list(value) if value else None + elif key == "asgi": + # Only serialize simple values from asgi dict + result[key] = { + k: v for k, v in value.items() + if isinstance(k, str) and isinstance(v, (str, int, float, bool, type(None))) + } + elif isinstance(value, bytes): + result[key] = value.decode("latin-1") + elif isinstance(value, (str, int, float, bool, type(None))): + result[key] = value + elif isinstance(value, (list, tuple)): + serialized = safe_json_serialize(value) + if serialized is not None: + result[key] = serialized + elif isinstance(value, dict): + serialized = safe_json_serialize(value) + if serialized is not None: + result[key] = serialized + # Skip other types + except Exception as e: + print(f"Error serializing key {key}: {e}", file=sys.stderr) + continue + return result + + +# HTTP Endpoints +async def health(request): + """Health check endpoint.""" + return PlainTextResponse("OK") + + +async def scope_endpoint(request): + """Return full ASGI scope as JSON.""" + try: + scope_data = serialize_scope(request.scope) + return JSONResponse(scope_data) + except Exception as e: + traceback.print_exc() + return PlainTextResponse(f"Error: {e}", status_code=500) + + +async def echo(request): + """Echo request body back.""" + body = await request.body() + content_type = request.headers.get("content-type", "application/octet-stream") + return Response(content=body, media_type=content_type) + + +async def headers_endpoint(request): + """Return request headers as JSON.""" + headers_dict = dict(request.headers) + return JSONResponse(headers_dict) + + +async def status_endpoint(request): + """Return specific HTTP status code.""" + code = int(request.path_params["code"]) + return PlainTextResponse(f"Status: {code}", status_code=code) + + +async def streaming(request): + """Chunked streaming response.""" + + async def generate(): + for i in range(10): + yield f"chunk-{i}\n" + await asyncio.sleep(0.01) + + return StreamingResponse(generate(), media_type="text/plain") + + +async def sse(request): + """Server-Sent Events endpoint.""" + + async def generate(): + for i in range(5): + yield f"event: message\ndata: {json.dumps({'count': i})}\n\n" + await asyncio.sleep(0.01) + yield "event: done\ndata: {}\n\n" + + return StreamingResponse(generate(), media_type="text/event-stream") + + +async def large(request): + """Large response body.""" + size = int(request.query_params.get("size", 1024)) + # Cap at 10MB for safety + size = min(size, 10 * 1024 * 1024) + return Response(content=b"x" * size, media_type="application/octet-stream") + + +async def delay(request): + """Delayed response.""" + seconds = float(request.query_params.get("seconds", 1)) + # Cap at 30 seconds + seconds = min(seconds, 30) + await asyncio.sleep(seconds) + return PlainTextResponse(f"Delayed {seconds} seconds") + + +async def lifespan_state_endpoint(request): + """Return lifespan startup state.""" + return JSONResponse(lifespan_state) + + +async def lifespan_counter(request): + """Increment and return counter.""" + lifespan_state["counter"] += 1 + return JSONResponse({"counter": lifespan_state["counter"]}) + + +# WebSocket Endpoints +async def ws_echo(websocket: WebSocket): + """Echo text messages.""" + await websocket.accept() + try: + while True: + message = await websocket.receive_text() + await websocket.send_text(message) + except Exception: + pass + + +async def ws_echo_binary(websocket: WebSocket): + """Echo binary messages.""" + await websocket.accept() + try: + while True: + message = await websocket.receive_bytes() + await websocket.send_bytes(message) + except Exception: + pass + + +async def ws_scope(websocket: WebSocket): + """Send WebSocket scope on connect.""" + await websocket.accept() + try: + scope_data = serialize_scope(websocket.scope) + await websocket.send_json(scope_data) + except Exception as e: + await websocket.send_text(f"Error: {e}") + await websocket.close() + + +async def ws_subprotocol(websocket: WebSocket): + """Subprotocol negotiation.""" + # Get requested subprotocols from scope + requested = websocket.scope.get("subprotocols", []) + # Select first one if available + selected = requested[0] if requested else None + await websocket.accept(subprotocol=selected) + await websocket.send_json( + {"requested": requested, "selected": selected} + ) + await websocket.close() + + +async def ws_close(websocket: WebSocket): + """Close with specific code.""" + await websocket.accept() + # Get close code from query string + query_string = websocket.scope.get("query_string", b"").decode() + code = 1000 + for param in query_string.split("&"): + if param.startswith("code="): + code = int(param.split("=")[1]) + break + await websocket.close(code=code) + + +# Routes +routes = [ + # HTTP endpoints + Route("/health", health), + Route("/scope", scope_endpoint), + Route("/echo", echo, methods=["POST"]), + Route("/headers", headers_endpoint), + Route("/status/{code:int}", status_endpoint), + Route("/streaming", streaming), + Route("/sse", sse), + Route("/large", large), + Route("/delay", delay), + Route("/lifespan/state", lifespan_state_endpoint), + Route("/lifespan/counter", lifespan_counter), + # WebSocket endpoints + WebSocketRoute("/ws/echo", ws_echo), + WebSocketRoute("/ws/echo-binary", ws_echo_binary), + WebSocketRoute("/ws/scope", ws_scope), + WebSocketRoute("/ws/subprotocol", ws_subprotocol), + WebSocketRoute("/ws/close", ws_close), +] + +app = Starlette(routes=routes, lifespan=lifespan) diff --git a/tests/docker/asgi_framework_compat/frameworks/starlette_app/requirements.txt b/tests/docker/asgi_framework_compat/frameworks/starlette_app/requirements.txt new file mode 100644 index 00000000..6bf1eecc --- /dev/null +++ b/tests/docker/asgi_framework_compat/frameworks/starlette_app/requirements.txt @@ -0,0 +1,5 @@ +gunicorn @ git+https://github.com/benoitc/gunicorn.git@master +starlette>=0.37.0 +uvloop>=0.19.0 +websockets>=12.0 +httptools>=0.6.0 diff --git a/tests/docker/asgi_framework_compat/pytest.ini b/tests/docker/asgi_framework_compat/pytest.ini new file mode 100644 index 00000000..c557ef22 --- /dev/null +++ b/tests/docker/asgi_framework_compat/pytest.ini @@ -0,0 +1,18 @@ +[pytest] +testpaths = tests +python_files = test_*.py +python_classes = Test* +python_functions = test_* +asyncio_mode = auto + +markers = + http: HTTP protocol tests + websocket: WebSocket protocol tests + lifespan: Lifespan protocol tests + streaming: Streaming response tests + slow: Slow running tests + framework(name): Test specific framework + +filterwarnings = + ignore::DeprecationWarning + ignore::pytest.PytestUnraisableExceptionWarning diff --git a/tests/docker/asgi_framework_compat/requirements.txt b/tests/docker/asgi_framework_compat/requirements.txt new file mode 100644 index 00000000..6a05a639 --- /dev/null +++ b/tests/docker/asgi_framework_compat/requirements.txt @@ -0,0 +1,6 @@ +# Test dependencies for running the compatibility suite +pytest>=8.0.0 +pytest-asyncio>=0.23.0 +pytest-json-report>=1.5.0 +httpx>=0.27.0 +websockets>=12.0 diff --git a/tests/docker/asgi_framework_compat/results/compatibility_grid.json b/tests/docker/asgi_framework_compat/results/compatibility_grid.json new file mode 100644 index 00000000..796c50c0 --- /dev/null +++ b/tests/docker/asgi_framework_compat/results/compatibility_grid.json @@ -0,0 +1,198 @@ +{ + "generated": "2026-04-03T11:06:45.300191", + "worker": "gunicorn.workers.gasgi.ASGIWorker", + "frameworks": { + "django": { + "name": "Django + Channels", + "categories": { + "http_scope": { + "passed": 19, + "failed": 0, + "total": 19 + }, + "http_messages": { + "passed": 18, + "failed": 1, + "total": 19 + }, + "websocket": { + "passed": 13, + "failed": 6, + "total": 19 + }, + "lifespan": { + "passed": 7, + "failed": 1, + "total": 8 + }, + "streaming": { + "passed": 9, + "failed": 0, + "total": 9 + } + }, + "total_passed": 66, + "total_tests": 74 + }, + "fastapi": { + "name": "FastAPI", + "categories": { + "http_scope": { + "passed": 19, + "failed": 0, + "total": 19 + }, + "http_messages": { + "passed": 18, + "failed": 1, + "total": 19 + }, + "websocket": { + "passed": 19, + "failed": 0, + "total": 19 + }, + "lifespan": { + "passed": 8, + "failed": 0, + "total": 8 + }, + "streaming": { + "passed": 9, + "failed": 0, + "total": 9 + } + }, + "total_passed": 73, + "total_tests": 74 + }, + "starlette": { + "name": "Starlette", + "categories": { + "http_scope": { + "passed": 19, + "failed": 0, + "total": 19 + }, + "http_messages": { + "passed": 18, + "failed": 1, + "total": 19 + }, + "websocket": { + "passed": 19, + "failed": 0, + "total": 19 + }, + "lifespan": { + "passed": 8, + "failed": 0, + "total": 8 + }, + "streaming": { + "passed": 9, + "failed": 0, + "total": 9 + } + }, + "total_passed": 73, + "total_tests": 74 + }, + "quart": { + "name": "Quart", + "categories": { + "http_scope": { + "passed": 18, + "failed": 1, + "total": 19 + }, + "http_messages": { + "passed": 17, + "failed": 2, + "total": 19 + }, + "websocket": { + "passed": 11, + "failed": 8, + "total": 19 + }, + "lifespan": { + "passed": 8, + "failed": 0, + "total": 8 + }, + "streaming": { + "passed": 9, + "failed": 0, + "total": 9 + } + }, + "total_passed": 63, + "total_tests": 74 + }, + "litestar": { + "name": "Litestar", + "categories": { + "http_scope": { + "passed": 18, + "failed": 1, + "total": 19 + }, + "http_messages": { + "passed": 11, + "failed": 8, + "total": 19 + }, + "websocket": { + "passed": 17, + "failed": 2, + "total": 19 + }, + "lifespan": { + "passed": 8, + "failed": 0, + "total": 8 + }, + "streaming": { + "passed": 9, + "failed": 0, + "total": 9 + } + }, + "total_passed": 63, + "total_tests": 74 + }, + "blacksheep": { + "name": "BlackSheep", + "categories": { + "http_scope": { + "passed": 19, + "failed": 0, + "total": 19 + }, + "http_messages": { + "passed": 18, + "failed": 1, + "total": 19 + }, + "websocket": { + "passed": 19, + "failed": 0, + "total": 19 + }, + "lifespan": { + "passed": 8, + "failed": 0, + "total": 8 + }, + "streaming": { + "passed": 1, + "failed": 8, + "total": 9 + } + }, + "total_passed": 65, + "total_tests": 74 + } + } +} \ No newline at end of file diff --git a/tests/docker/asgi_framework_compat/results/compatibility_grid.md b/tests/docker/asgi_framework_compat/results/compatibility_grid.md new file mode 100644 index 00000000..31bf8860 --- /dev/null +++ b/tests/docker/asgi_framework_compat/results/compatibility_grid.md @@ -0,0 +1,20 @@ +# ASGI Framework Compatibility Grid + +**Generated:** 2026-04-03 11:06:45 +**Worker:** gunicorn ASGI worker (`-k asgi`) +**Event Loop:** auto (uvloop if available) + +## Summary + +| Framework | HTTP Scope | HTTP Messages | WebSocket | Lifespan | Streaming | Total | +|-----------|---------|---------|---------|---------|---------|-------| +| Django + Channels | 19/19 | **18/19** | **13/19** | **7/8** | 9/9 | **66/74** | +| FastAPI | 19/19 | **18/19** | 19/19 | 8/8 | 9/9 | **73/74** | +| Starlette | 19/19 | **18/19** | 19/19 | 8/8 | 9/9 | **73/74** | +| Quart | **18/19** | **17/19** | **11/19** | 8/8 | 9/9 | **63/74** | +| Litestar | **18/19** | **11/19** | **17/19** | 8/8 | 9/9 | **63/74** | +| BlackSheep | 19/19 | **18/19** | 19/19 | 8/8 | **1/9** | **65/74** | + +*Bold indicates failures* + +**Overall:** 403/444 tests passed (90%) diff --git a/tests/docker/asgi_framework_compat/scripts/__init__.py b/tests/docker/asgi_framework_compat/scripts/__init__.py new file mode 100644 index 00000000..c20da8fb --- /dev/null +++ b/tests/docker/asgi_framework_compat/scripts/__init__.py @@ -0,0 +1 @@ +"""Scripts for running tests and generating reports.""" diff --git a/tests/docker/asgi_framework_compat/scripts/generate_grid.py b/tests/docker/asgi_framework_compat/scripts/generate_grid.py new file mode 100755 index 00000000..211b45ec --- /dev/null +++ b/tests/docker/asgi_framework_compat/scripts/generate_grid.py @@ -0,0 +1,198 @@ +#!/usr/bin/env python3 +""" +Compatibility Grid Generator + +Generates a compatibility matrix showing test results for each +ASGI framework tested with gunicorn's native ASGI worker. +""" + +import json +import os +from datetime import datetime +from pathlib import Path + + +# Framework configuration +FRAMEWORKS = ["django", "fastapi", "starlette", "quart", "litestar", "blacksheep"] + +FRAMEWORK_NAMES = { + "django": "Django + Channels", + "fastapi": "FastAPI", + "starlette": "Starlette", + "quart": "Quart", + "litestar": "Litestar", + "blacksheep": "BlackSheep", +} + +# Test categories based on file names +CATEGORIES = { + "http_scope": "HTTP Scope", + "http_messages": "HTTP Messages", + "websocket": "WebSocket", + "lifespan": "Lifespan", + "streaming": "Streaming", +} + + +def parse_results(results_file: Path) -> dict: + """Parse pytest JSON results into framework/category structure.""" + with open(results_file) as f: + data = json.load(f) + + results = {fw: {cat: {"passed": 0, "failed": 0, "total": 0} + for cat in CATEGORIES} for fw in FRAMEWORKS} + + tests = data.get("tests", []) + for test in tests: + nodeid = test.get("nodeid", "") + outcome = test.get("outcome", "") + + # Extract framework from test parameters + framework = None + for fw in FRAMEWORKS: + if f"[{fw}]" in nodeid or f"[{fw}-" in nodeid: + framework = fw + break + + if not framework: + continue + + # Determine category from file name + category = None + for cat_key in CATEGORIES: + if f"test_{cat_key}" in nodeid: + category = cat_key + break + + if not category: + continue + + results[framework][category]["total"] += 1 + if outcome == "passed": + results[framework][category]["passed"] += 1 + elif outcome == "failed": + results[framework][category]["failed"] += 1 + + return results + + +def generate_markdown(results: dict) -> str: + """Generate markdown compatibility grid.""" + lines = [] + lines.append("# ASGI Framework Compatibility Grid") + lines.append("") + lines.append(f"**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + lines.append("**Worker:** gunicorn ASGI worker (`-k asgi`)") + lines.append("**Event Loop:** auto (uvloop if available)") + lines.append("") + + # Main compatibility table + lines.append("## Summary") + lines.append("") + + header = "| Framework |" + separator = "|-----------|" + for cat in CATEGORIES.values(): + header += f" {cat} |" + separator += "---------|" + header += " Total |" + separator += "-------|" + + lines.append(header) + lines.append(separator) + + for fw in FRAMEWORKS: + fw_results = results.get(fw, {}) + row = f"| {FRAMEWORK_NAMES[fw]} |" + + total_passed = 0 + total_tests = 0 + + for cat_key in CATEGORIES: + cat_data = fw_results.get(cat_key, {"passed": 0, "total": 0}) + passed = cat_data["passed"] + total = cat_data["total"] + total_passed += passed + total_tests += total + + if total == 0: + row += " - |" + elif passed == total: + row += f" {passed}/{total} |" + else: + row += f" **{passed}/{total}** |" + + if total_tests == 0: + row += " - |" + elif total_passed == total_tests: + row += f" {total_passed}/{total_tests} |" + else: + row += f" **{total_passed}/{total_tests}** |" + + lines.append(row) + + lines.append("") + lines.append("*Bold indicates failures*") + lines.append("") + + # Calculate overall pass rate + all_passed = sum( + results[fw][cat]["passed"] + for fw in FRAMEWORKS + for cat in CATEGORIES + ) + all_total = sum( + results[fw][cat]["total"] + for fw in FRAMEWORKS + for cat in CATEGORIES + ) + + lines.append(f"**Overall:** {all_passed}/{all_total} tests passed ({100*all_passed//all_total}%)") + lines.append("") + + return "\n".join(lines) + + +def main(): + base_dir = Path(__file__).parent.parent + results_dir = base_dir / "results" + results_file = results_dir / "pytest_results.json" + + if not results_file.exists(): + print(f"Results file not found: {results_file}") + return + + results = parse_results(results_file) + md_content = generate_markdown(results) + + # Write to results directory + md_file = results_dir / "compatibility_grid.md" + with open(md_file, "w") as f: + f.write(md_content) + print(f"Written: {md_file}") + + # Also write JSON summary + json_file = results_dir / "compatibility_grid.json" + summary = { + "generated": datetime.now().isoformat(), + "worker": "gunicorn.workers.gasgi.ASGIWorker", + "frameworks": { + fw: { + "name": FRAMEWORK_NAMES[fw], + "categories": results[fw], + "total_passed": sum(results[fw][c]["passed"] for c in CATEGORIES), + "total_tests": sum(results[fw][c]["total"] for c in CATEGORIES), + } + for fw in FRAMEWORKS + } + } + with open(json_file, "w") as f: + json.dump(summary, indent=2, fp=f) + print(f"Written: {json_file}") + + # Print the markdown + print("\n" + md_content) + + +if __name__ == "__main__": + main() diff --git a/tests/docker/asgi_framework_compat/scripts/run_tests.sh b/tests/docker/asgi_framework_compat/scripts/run_tests.sh new file mode 100755 index 00000000..77068cb0 --- /dev/null +++ b/tests/docker/asgi_framework_compat/scripts/run_tests.sh @@ -0,0 +1,66 @@ +#!/bin/bash +# Run ASGI Framework Compatibility Tests +# +# Usage: +# ./scripts/run_tests.sh # Run with auto loop detection +# ./scripts/run_tests.sh asyncio # Run with asyncio loop +# ./scripts/run_tests.sh uvloop # Run with uvloop +# ./scripts/run_tests.sh both # Run both and generate combined report + +set -e + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +BASE_DIR="$(dirname "$SCRIPT_DIR")" + +cd "$BASE_DIR" + +LOOP_TYPE="${1:-auto}" + +echo "=== ASGI Framework Compatibility Test Suite ===" +echo "Loop type: $LOOP_TYPE" +echo "" + +# Install test dependencies if needed +if ! python -c "import pytest" 2>/dev/null; then + echo "Installing test dependencies..." + pip install -r requirements.txt +fi + +if [ "$LOOP_TYPE" = "both" ]; then + echo "Running tests with asyncio loop..." + ASGI_LOOP=asyncio docker compose up -d --build + sleep 10 # Wait for services + pytest tests/ -v --tb=short || true + docker compose down + + echo "" + echo "Running tests with uvloop..." + ASGI_LOOP=uvloop docker compose up -d --build + sleep 10 # Wait for services + pytest tests/ -v --tb=short || true + docker compose down + + echo "" + echo "Generating combined report..." + python scripts/generate_grid.py --loop both --skip-tests +else + echo "Starting containers with $LOOP_TYPE loop..." + ASGI_LOOP="$LOOP_TYPE" docker compose up -d --build + + echo "Waiting for services to be healthy..." + sleep 15 + + echo "" + echo "Running tests..." + pytest tests/ -v --tb=short + + echo "" + echo "Generating compatibility grid..." + python scripts/generate_grid.py --loop "$LOOP_TYPE" + + echo "" + echo "Results saved to results/" +fi + +echo "" +echo "Done!" diff --git a/tests/docker/asgi_framework_compat/tests/__init__.py b/tests/docker/asgi_framework_compat/tests/__init__.py new file mode 100644 index 00000000..1053cde9 --- /dev/null +++ b/tests/docker/asgi_framework_compat/tests/__init__.py @@ -0,0 +1 @@ +"""ASGI Framework Compatibility Tests""" diff --git a/tests/docker/asgi_framework_compat/tests/test_http_messages.py b/tests/docker/asgi_framework_compat/tests/test_http_messages.py new file mode 100644 index 00000000..a549b2f9 --- /dev/null +++ b/tests/docker/asgi_framework_compat/tests/test_http_messages.py @@ -0,0 +1,127 @@ +""" +HTTP Message Type Tests + +Tests ASGI 3.0 HTTP request/response message handling. +""" + +import pytest + + +pytestmark = pytest.mark.http + + +class TestHttpRequestBody: + """Test HTTP request body handling.""" + + async def test_echo_empty_body(self, http_client): + """Echo endpoint handles empty body.""" + response = await http_client.post("/echo", content=b"") + assert response.status_code == 200 + assert response.content == b"" + + async def test_echo_text_body(self, http_client): + """Echo endpoint returns text body.""" + body = "Hello, World!" + response = await http_client.post( + "/echo", + content=body, + headers={"Content-Type": "text/plain"}, + ) + assert response.status_code == 200 + assert response.text == body + + async def test_echo_binary_body(self, http_client): + """Echo endpoint returns binary body.""" + body = b"\x00\x01\x02\x03\xff\xfe" + response = await http_client.post( + "/echo", + content=body, + headers={"Content-Type": "application/octet-stream"}, + ) + assert response.status_code == 200 + assert response.content == body + + async def test_echo_json_body(self, http_client): + """Echo endpoint returns JSON body.""" + body = '{"key": "value", "number": 42}' + response = await http_client.post( + "/echo", + content=body, + headers={"Content-Type": "application/json"}, + ) + assert response.status_code == 200 + assert response.json() == {"key": "value", "number": 42} + + async def test_echo_large_body(self, http_client, large_body): + """Echo endpoint handles large body.""" + body = large_body(100 * 1024) # 100KB + response = await http_client.post( + "/echo", + content=body, + headers={"Content-Type": "application/octet-stream"}, + ) + assert response.status_code == 200 + assert len(response.content) == len(body) + + +class TestHttpResponseStatus: + """Test HTTP response status codes.""" + + @pytest.mark.parametrize("code", [200, 201, 204, 301, 400, 404, 500, 503]) + async def test_status_codes(self, http_client, code): + """Status endpoint returns correct status code.""" + response = await http_client.get(f"/status/{code}") + assert response.status_code == code + + async def test_status_100_continue(self, http_client): + """Handle 100 status (may not be supported by all frameworks).""" + # Some frameworks may not support 100, so we just check it doesn't crash + response = await http_client.get("/status/100") + # 100 is special - some servers transform it + assert response.status_code in (100, 200) + + +class TestHttpResponseHeaders: + """Test HTTP response header handling.""" + + async def test_content_type_header(self, http_client): + """Response has Content-Type header.""" + response = await http_client.get("/scope") + assert "content-type" in response.headers + assert "application/json" in response.headers["content-type"] + + async def test_headers_preserved(self, http_client): + """Custom headers in request are accessible.""" + response = await http_client.get("/headers", headers={"X-Custom": "test123"}) + data = response.json() + assert data.get("x-custom") == "test123" + + +class TestHttpDisconnect: + """Test HTTP disconnect handling.""" + + async def test_delay_can_be_cancelled(self, http_client): + """Long delay can be interrupted (timeout behavior).""" + import httpx + + # This tests that the server handles client disconnects gracefully + with pytest.raises(httpx.TimeoutException): + await http_client.get("/delay?seconds=30", timeout=0.5) + + +class TestHttpResponseBody: + """Test HTTP response body handling.""" + + async def test_large_response_body(self, http_client): + """Large response body endpoint works.""" + size = 100 * 1024 # 100KB + response = await http_client.get(f"/large?size={size}") + assert response.status_code == 200 + assert len(response.content) == size + + async def test_very_large_response_body(self, http_client): + """Very large response body endpoint works.""" + size = 1024 * 1024 # 1MB + response = await http_client.get(f"/large?size={size}") + assert response.status_code == 200 + assert len(response.content) == size diff --git a/tests/docker/asgi_framework_compat/tests/test_http_scope.py b/tests/docker/asgi_framework_compat/tests/test_http_scope.py new file mode 100644 index 00000000..7c710c31 --- /dev/null +++ b/tests/docker/asgi_framework_compat/tests/test_http_scope.py @@ -0,0 +1,168 @@ +""" +HTTP Scope Compliance Tests + +Tests ASGI 3.0 HTTP scope compliance across frameworks. +""" + +import pytest + +from frameworks.contract import ASGI_HTTP_SCOPE_REQUIRED_KEYS + + +pytestmark = pytest.mark.http + + +class TestHttpScopeBasics: + """Test basic HTTP scope attributes.""" + + async def test_scope_endpoint_returns_json(self, http_client): + """Scope endpoint returns valid JSON.""" + response = await http_client.get("/scope") + assert response.status_code == 200 + data = response.json() + assert isinstance(data, dict) + + async def test_scope_has_type_http(self, http_client): + """Scope type is 'http'.""" + response = await http_client.get("/scope") + data = response.json() + assert data.get("type") == "http" + + async def test_scope_has_asgi_dict(self, http_client): + """Scope has 'asgi' dict with version info.""" + response = await http_client.get("/scope") + data = response.json() + assert "asgi" in data + assert isinstance(data["asgi"], dict) + assert "version" in data["asgi"] + + async def test_scope_asgi_version_is_3(self, http_client): + """ASGI version should be 3.x.""" + response = await http_client.get("/scope") + data = response.json() + version = data["asgi"]["version"] + assert version.startswith("3.") + + async def test_scope_has_http_version(self, http_client): + """Scope has http_version field.""" + response = await http_client.get("/scope") + data = response.json() + assert "http_version" in data + assert data["http_version"] in ("1.0", "1.1", "2", "3") + + async def test_scope_has_method(self, http_client): + """Scope has method field matching request method.""" + response = await http_client.get("/scope") + data = response.json() + assert data.get("method") == "GET" + + async def test_scope_has_scheme(self, http_client): + """Scope has scheme field.""" + response = await http_client.get("/scope") + data = response.json() + assert "scheme" in data + assert data["scheme"] in ("http", "https") + + async def test_scope_has_path(self, http_client): + """Scope has path field matching request path.""" + response = await http_client.get("/scope") + data = response.json() + assert data.get("path") == "/scope" + + async def test_scope_has_query_string(self, http_client): + """Scope has query_string field.""" + response = await http_client.get("/scope?foo=bar") + data = response.json() + assert "query_string" in data + assert "foo=bar" in data["query_string"] + + async def test_scope_empty_query_string(self, http_client): + """Empty query string handled correctly.""" + response = await http_client.get("/scope") + data = response.json() + assert "query_string" in data + assert data["query_string"] == "" + + +class TestHttpScopeHeaders: + """Test HTTP scope header handling.""" + + async def test_scope_has_headers(self, http_client): + """Scope has headers field.""" + response = await http_client.get("/scope") + data = response.json() + assert "headers" in data + assert isinstance(data["headers"], list) + + async def test_scope_headers_are_lists(self, http_client): + """Each header is a list of [name, value].""" + response = await http_client.get("/scope") + data = response.json() + for header in data["headers"]: + assert isinstance(header, list) + assert len(header) == 2 + + async def test_scope_header_names_lowercase(self, http_client): + """Header names should be lowercase.""" + response = await http_client.get("/scope", headers={"X-Custom-Header": "test"}) + data = response.json() + custom_headers = [h for h in data["headers"] if h[0] == "x-custom-header"] + assert len(custom_headers) > 0 + + async def test_headers_endpoint_returns_all_headers(self, http_client): + """Headers endpoint returns all sent headers.""" + custom_headers = { + "X-Test-One": "value1", + "X-Test-Two": "value2", + } + response = await http_client.get("/headers", headers=custom_headers) + data = response.json() + assert data.get("x-test-one") == "value1" + assert data.get("x-test-two") == "value2" + + +class TestHttpScopeServer: + """Test HTTP scope server and client fields.""" + + async def test_scope_has_server(self, http_client): + """Scope has server field.""" + response = await http_client.get("/scope") + data = response.json() + assert "server" in data + + async def test_scope_server_is_tuple(self, http_client): + """Server is [host, port] list.""" + response = await http_client.get("/scope") + data = response.json() + if data["server"] is not None: + assert isinstance(data["server"], list) + assert len(data["server"]) == 2 + + async def test_scope_has_client(self, http_client): + """Scope has client field (may be None).""" + response = await http_client.get("/scope") + data = response.json() + # client is optional but should be present + assert "client" in data or data.get("client") is None + + +class TestHttpScopeRequired: + """Test all required scope keys are present.""" + + async def test_all_required_keys_present(self, http_client): + """All ASGI 3.0 required HTTP scope keys are present.""" + response = await http_client.get("/scope") + data = response.json() + for key in ASGI_HTTP_SCOPE_REQUIRED_KEYS: + assert key in data, f"Missing required scope key: {key}" + + +class TestHttpScopeRootPath: + """Test root_path handling.""" + + async def test_scope_has_root_path(self, http_client): + """Scope has root_path field (may be empty).""" + response = await http_client.get("/scope") + data = response.json() + # root_path should be present, defaults to "" + assert "root_path" in data or data.get("root_path", "") == "" diff --git a/tests/docker/asgi_framework_compat/tests/test_lifespan_scope.py b/tests/docker/asgi_framework_compat/tests/test_lifespan_scope.py new file mode 100644 index 00000000..0d9ec67f --- /dev/null +++ b/tests/docker/asgi_framework_compat/tests/test_lifespan_scope.py @@ -0,0 +1,99 @@ +""" +Lifespan Protocol Tests + +Tests ASGI 3.0 lifespan protocol compliance across frameworks. +""" + +import pytest + + +pytestmark = pytest.mark.lifespan + + +class TestLifespanStartup: + """Test lifespan startup handling.""" + + async def test_startup_was_called(self, http_client): + """Startup handler was called.""" + response = await http_client.get("/lifespan/state") + assert response.status_code == 200 + data = response.json() + assert data.get("startup_called") is True + + async def test_startup_time_set(self, http_client): + """Startup time was recorded.""" + response = await http_client.get("/lifespan/state") + data = response.json() + assert data.get("startup_time") is not None + assert isinstance(data["startup_time"], (int, float)) + + async def test_startup_custom_data(self, http_client): + """Custom data set during startup is available.""" + response = await http_client.get("/lifespan/state") + data = response.json() + custom_data = data.get("custom_data", {}) + assert custom_data.get("initialized") is True + + +class TestLifespanState: + """Test lifespan state persistence.""" + + async def test_counter_initial_value(self, http_client): + """Counter starts at expected initial value.""" + # First get the state to see current counter + response = await http_client.get("/lifespan/state") + initial = response.json().get("counter", 0) + + # Increment once + response = await http_client.get("/lifespan/counter") + data = response.json() + assert data["counter"] == initial + 1 + + async def test_counter_increments(self, http_client): + """Counter increments on each request.""" + # Get first value + response1 = await http_client.get("/lifespan/counter") + value1 = response1.json()["counter"] + + # Get second value + response2 = await http_client.get("/lifespan/counter") + value2 = response2.json()["counter"] + + # Should have incremented + assert value2 == value1 + 1 + + async def test_state_persists_across_requests(self, http_client): + """State persists across multiple requests.""" + # Make several requests + values = [] + for _ in range(3): + response = await http_client.get("/lifespan/counter") + values.append(response.json()["counter"]) + + # Each should be incrementing + assert values[1] == values[0] + 1 + assert values[2] == values[1] + 1 + + +class TestLifespanStateSharing: + """Test state sharing between lifespan and request handlers.""" + + async def test_lifespan_state_accessible(self, http_client): + """Lifespan state is accessible from request handlers.""" + response = await http_client.get("/lifespan/state") + assert response.status_code == 200 + data = response.json() + # Should have the startup marker + assert "startup_called" in data + + async def test_state_modifications_persist(self, http_client): + """Modifications to state persist.""" + # Increment counter + await http_client.get("/lifespan/counter") + + # Check state still shows startup was called + response = await http_client.get("/lifespan/state") + data = response.json() + assert data.get("startup_called") is True + # Counter should be > 0 + assert data.get("counter", 0) > 0 diff --git a/tests/docker/asgi_framework_compat/tests/test_streaming.py b/tests/docker/asgi_framework_compat/tests/test_streaming.py new file mode 100644 index 00000000..121d5086 --- /dev/null +++ b/tests/docker/asgi_framework_compat/tests/test_streaming.py @@ -0,0 +1,98 @@ +""" +Streaming Response Tests + +Tests chunked streaming and Server-Sent Events across frameworks. +""" + +import asyncio +import json + +import pytest + + +pytestmark = pytest.mark.streaming + + +class TestChunkedStreaming: + """Test chunked transfer encoding responses.""" + + async def test_streaming_response(self, http_client): + """Streaming endpoint returns chunked response.""" + response = await http_client.get("/streaming") + assert response.status_code == 200 + # Check we got all chunks + content = response.text + for i in range(10): + assert f"chunk-{i}" in content + + async def test_streaming_content_type(self, http_client): + """Streaming response has correct content type.""" + response = await http_client.get("/streaming") + assert "text/plain" in response.headers.get("content-type", "") + + async def test_streaming_order_preserved(self, http_client): + """Chunks arrive in correct order.""" + response = await http_client.get("/streaming") + lines = [l for l in response.text.strip().split("\n") if l] + for i, line in enumerate(lines): + assert line == f"chunk-{i}" + + +class TestServerSentEvents: + """Test Server-Sent Events (SSE) responses.""" + + async def test_sse_response(self, http_client): + """SSE endpoint returns event stream.""" + response = await http_client.get("/sse") + assert response.status_code == 200 + content = response.text + assert "event:" in content + assert "data:" in content + + async def test_sse_content_type(self, http_client): + """SSE response has correct content type.""" + response = await http_client.get("/sse") + content_type = response.headers.get("content-type", "") + assert "text/event-stream" in content_type + + async def test_sse_event_format(self, http_client): + """SSE events have correct format.""" + response = await http_client.get("/sse") + content = response.text + + # Check for message events + assert "event: message" in content + + # Check for done event + assert "event: done" in content + + async def test_sse_data_is_json(self, http_client): + """SSE data fields contain valid JSON.""" + response = await http_client.get("/sse") + lines = response.text.split("\n") + + data_lines = [l for l in lines if l.startswith("data:")] + for line in data_lines: + data_str = line[5:].strip() # Remove "data:" prefix + data = json.loads(data_str) + assert isinstance(data, dict) + + async def test_sse_message_count(self, http_client): + """Correct number of SSE messages received.""" + response = await http_client.get("/sse") + lines = response.text.split("\n") + + message_events = [l for l in lines if l == "event: message"] + # Should have 5 message events + assert len(message_events) == 5 + + +class TestStreamingLargeData: + """Test streaming with large data.""" + + async def test_large_streaming_response(self, http_client): + """Large response body streams correctly.""" + size = 5 * 1024 * 1024 # 5MB + response = await http_client.get(f"/large?size={size}") + assert response.status_code == 200 + assert len(response.content) == size diff --git a/tests/docker/asgi_framework_compat/tests/test_websocket_scope.py b/tests/docker/asgi_framework_compat/tests/test_websocket_scope.py new file mode 100644 index 00000000..15f2e9b9 --- /dev/null +++ b/tests/docker/asgi_framework_compat/tests/test_websocket_scope.py @@ -0,0 +1,193 @@ +""" +WebSocket Scope Compliance Tests + +Tests ASGI 3.0 WebSocket scope compliance across frameworks. +""" + +import asyncio +import json + +import pytest +import websockets +from websockets.exceptions import ConnectionClosedOK, ConnectionClosedError + +from frameworks.contract import ( + ASGI_WEBSOCKET_SCOPE_REQUIRED_KEYS, + VALID_WEBSOCKET_CLOSE_CODES, +) + + +pytestmark = pytest.mark.websocket + + +class TestWebSocketConnection: + """Test WebSocket connection handling.""" + + async def test_websocket_connect(self, ws_client): + """WebSocket connection can be established.""" + ws = await ws_client("/ws/echo") + # websockets v16+ uses state instead of open + from websockets.protocol import State + assert ws.state == State.OPEN + await ws.close() + + async def test_websocket_echo_text(self, ws_client): + """WebSocket echo endpoint echoes text messages.""" + ws = await ws_client("/ws/echo") + try: + await ws.send("Hello, WebSocket!") + response = await asyncio.wait_for(ws.recv(), timeout=5.0) + assert response == "Hello, WebSocket!" + finally: + await ws.close() + + async def test_websocket_echo_multiple_messages(self, ws_client): + """WebSocket echo handles multiple messages.""" + ws = await ws_client("/ws/echo") + try: + messages = ["msg1", "msg2", "msg3"] + for msg in messages: + await ws.send(msg) + response = await asyncio.wait_for(ws.recv(), timeout=5.0) + assert response == msg + finally: + await ws.close() + + +class TestWebSocketBinary: + """Test WebSocket binary message handling.""" + + async def test_websocket_echo_binary(self, ws_client): + """WebSocket binary echo endpoint echoes binary messages.""" + ws = await ws_client("/ws/echo-binary") + try: + data = b"\x00\x01\x02\x03\xff\xfe" + await ws.send(data) + response = await asyncio.wait_for(ws.recv(), timeout=5.0) + assert response == data + finally: + await ws.close() + + async def test_websocket_echo_large_binary(self, ws_client, random_bytes): + """WebSocket handles large binary messages.""" + ws = await ws_client("/ws/echo-binary") + try: + data = random_bytes(64 * 1024) # 64KB + await ws.send(data) + response = await asyncio.wait_for(ws.recv(), timeout=10.0) + assert response == data + finally: + await ws.close() + + +class TestWebSocketScope: + """Test WebSocket scope attributes.""" + + async def test_websocket_scope_endpoint(self, ws_client): + """WebSocket scope endpoint returns scope JSON.""" + ws = await ws_client("/ws/scope") + try: + response = await asyncio.wait_for(ws.recv(), timeout=5.0) + data = json.loads(response) + assert isinstance(data, dict) + except ConnectionClosedOK: + pass + + async def test_websocket_scope_type(self, ws_client): + """WebSocket scope type is 'websocket'.""" + ws = await ws_client("/ws/scope") + try: + response = await asyncio.wait_for(ws.recv(), timeout=5.0) + data = json.loads(response) + assert data.get("type") == "websocket" + except ConnectionClosedOK: + pass + + async def test_websocket_scope_has_path(self, ws_client): + """WebSocket scope has path field.""" + ws = await ws_client("/ws/scope") + try: + response = await asyncio.wait_for(ws.recv(), timeout=5.0) + data = json.loads(response) + assert "/ws/scope" in data.get("path", "") + except ConnectionClosedOK: + pass + + async def test_websocket_scope_has_headers(self, ws_client): + """WebSocket scope has headers field.""" + ws = await ws_client("/ws/scope") + try: + response = await asyncio.wait_for(ws.recv(), timeout=5.0) + data = json.loads(response) + assert "headers" in data + assert isinstance(data["headers"], list) + except ConnectionClosedOK: + pass + + async def test_websocket_scope_required_keys(self, ws_client): + """WebSocket scope has all required keys.""" + ws = await ws_client("/ws/scope") + try: + response = await asyncio.wait_for(ws.recv(), timeout=5.0) + data = json.loads(response) + for key in ASGI_WEBSOCKET_SCOPE_REQUIRED_KEYS: + assert key in data, f"Missing required WebSocket scope key: {key}" + except ConnectionClosedOK: + pass + + +class TestWebSocketSubprotocol: + """Test WebSocket subprotocol negotiation.""" + + async def test_subprotocol_negotiation(self, ws_client): + """WebSocket subprotocol negotiation works.""" + ws = await ws_client("/ws/subprotocol", subprotocols=["proto1", "proto2"]) + try: + response = await asyncio.wait_for(ws.recv(), timeout=5.0) + data = json.loads(response) + assert "requested" in data + assert "proto1" in data["requested"] + assert "proto2" in data["requested"] + except ConnectionClosedOK: + pass + + async def test_subprotocol_selection(self, ws_client): + """First requested subprotocol is selected.""" + ws = await ws_client("/ws/subprotocol", subprotocols=["myproto"]) + try: + response = await asyncio.wait_for(ws.recv(), timeout=5.0) + data = json.loads(response) + assert data.get("selected") == "myproto" + except ConnectionClosedOK: + pass + + +class TestWebSocketClose: + """Test WebSocket close handling.""" + + async def test_close_normal(self, ws_client): + """WebSocket closes with normal code 1000.""" + ws = await ws_client("/ws/close?code=1000") + try: + await asyncio.wait_for(ws.recv(), timeout=5.0) + except (ConnectionClosedOK, ConnectionClosedError) as e: + assert e.code == 1000 + + @pytest.mark.parametrize("code", [1001, 1002, 1003, 1008, 1011]) + async def test_close_codes(self, ws_client, code): + """WebSocket closes with various codes.""" + ws = await ws_client(f"/ws/close?code={code}") + try: + await asyncio.wait_for(ws.recv(), timeout=5.0) + except (ConnectionClosedOK, ConnectionClosedError) as e: + assert e.code == code + + async def test_client_close(self, ws_client): + """Server handles client-initiated close.""" + ws = await ws_client("/ws/echo") + await ws.send("test") + await ws.recv() + await ws.close(code=1000) + # Connection should be closed cleanly + from websockets.protocol import State + assert ws.state == State.CLOSED