Add HTTP/2 tests

Unit tests for HTTP/2 implementation:
- test_http2_stream.py: Stream state management tests
- test_http2_request.py: Request interface tests
- test_http2_connection.py: Connection handling tests
- test_http2_async_connection.py: Async connection tests
- test_http2_config.py: Configuration tests
- test_http2_alpn.py: ALPN negotiation tests
- test_http2_errors.py: Error handling tests
- test_http2_integration.py: Integration tests

Docker integration tests:
- Full HTTP/2 testing environment with nginx proxy
- Direct connection tests and proxy tests
- Concurrent stream tests
- Protocol behavior tests
- Error handling tests
- Header handling tests
- Performance tests
This commit is contained in:
Benoit Chesneau 2026-01-25 16:31:41 +01:00
parent fe18960cd1
commit 780e2cf055
20 changed files with 4621 additions and 0 deletions

View File

@ -62,6 +62,7 @@ testing = [
"pytest-cov",
"pytest-asyncio",
"uvloop>=0.19.0",
"httpx[http2]",
]
[project.scripts]

1
tests/docker/__init__.py Normal file
View File

@ -0,0 +1 @@
"""Docker-based integration tests package."""

View File

@ -0,0 +1,27 @@
FROM python:3.12-slim
# Install build dependencies for h2 and other packages
RUN apt-get update && apt-get install -y --no-install-recommends \
gcc \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
# Copy the gunicorn source code and install it
COPY . /gunicorn-src/
RUN pip install --no-cache-dir /gunicorn-src/[http2]
# Copy the test application
COPY tests/docker/http2/app.py /app/app.py
EXPOSE 8443
CMD ["gunicorn", "app:app", \
"--bind", "0.0.0.0:8443", \
"--worker-class", "gthread", \
"--threads", "4", \
"--http-protocols", "h2,h1", \
"--certfile", "/certs/server.crt", \
"--keyfile", "/certs/server.key", \
"--workers", "2", \
"--log-level", "debug"]

View File

@ -0,0 +1,11 @@
FROM nginx:1.25-alpine
# Install curl for healthcheck
RUN apk add --no-cache curl
# Copy nginx configuration
COPY nginx.conf /etc/nginx/nginx.conf
EXPOSE 8444
CMD ["nginx", "-g", "daemon off;"]

View File

@ -0,0 +1,103 @@
HTTP/2 Docker Integration Tests
================================
This directory contains Docker-based integration tests for HTTP/2 support
in Gunicorn. These tests verify real HTTP/2 connections using actual HTTP/2
clients, both directly to Gunicorn and through an nginx reverse proxy.
Prerequisites
-------------
- Docker and Docker Compose
- OpenSSL (for generating test certificates)
- Python with ``httpx[http2]`` installed
Running the Tests
-----------------
1. Install test dependencies::
pip install -e ".[testing]"
2. Generate SSL certificates (done automatically by tests, or manually)::
cd tests/docker/http2
openssl req -x509 -newkey rsa:2048 \
-keyout certs/server.key \
-out certs/server.crt \
-days 1 -nodes \
-subj "/CN=localhost"
3. Run the Docker integration tests::
# From the project root
pytest tests/docker/http2/ -v
Or with Docker Compose manually::
cd tests/docker/http2
docker compose up -d
pytest -v
docker compose down -v
Test Categories
---------------
- **TestDirectHTTP2Connection**: Direct HTTP/2 connections to Gunicorn
- **TestConcurrentStreams**: HTTP/2 multiplexing with concurrent streams
- **TestHTTP2BehindProxy**: HTTP/2 through nginx reverse proxy
- **TestHTTP2Protocol**: ALPN negotiation and protocol fallback
- **TestHTTP2ErrorHandling**: Error responses over HTTP/2
- **TestHTTP2Headers**: HTTP/2 header handling
- **TestHTTP2Performance**: Performance-related tests
Architecture
------------
::
+--------+ HTTP/2 +-----------+
| Client | --------------> | Gunicorn |
+--------+ | (port 8443)|
| +-----------+
|
| HTTP/2 +-------+ HTTPS +-----------+
+---------------> | nginx | -----------> | Gunicorn |
| proxy | | (port 8443)|
| (8444)| +-----------+
+-------+
Files
-----
- ``docker-compose.yml`` - Service definitions
- ``Dockerfile.gunicorn`` - Gunicorn container with HTTP/2
- ``Dockerfile.nginx`` - nginx HTTP/2 proxy
- ``nginx.conf`` - nginx configuration
- ``app.py`` - Test WSGI application
- ``conftest.py`` - Pytest fixtures for Docker
- ``test_http2_docker.py`` - Integration tests
Troubleshooting
---------------
If tests fail to start:
1. Check Docker is running::
docker info
2. Check service logs::
cd tests/docker/http2
docker compose logs gunicorn-h2
docker compose logs nginx-h2
3. Verify certificates::
openssl x509 -in certs/server.crt -text -noout
4. Test manually with curl::
curl -k --http2 https://localhost:8443/
curl -k --http2 https://localhost:8444/

View File

@ -0,0 +1 @@
"""HTTP/2 Docker integration tests package."""

128
tests/docker/http2/app.py Normal file
View File

@ -0,0 +1,128 @@
"""Test WSGI application for HTTP/2 Docker integration tests."""
import json
def app(environ, start_response):
"""Simple WSGI app for testing HTTP/2 functionality."""
path = environ.get('PATH_INFO', '/')
method = environ.get('REQUEST_METHOD', 'GET')
if path == '/':
body = b'Hello HTTP/2!'
status = '200 OK'
content_type = 'text/plain'
elif path == '/health':
body = b'OK'
status = '200 OK'
content_type = 'text/plain'
elif path == '/echo':
# Echo back the request body
content_length = int(environ.get('CONTENT_LENGTH', 0) or 0)
body = environ['wsgi.input'].read(content_length)
status = '200 OK'
content_type = 'application/octet-stream'
elif path == '/headers':
# Return all HTTP headers as JSON
headers = {}
for key, value in environ.items():
if key.startswith('HTTP_'):
headers[key] = value
# Also include some important non-HTTP_ headers
for key in ['CONTENT_TYPE', 'CONTENT_LENGTH', 'REQUEST_METHOD',
'PATH_INFO', 'QUERY_STRING', 'SERVER_PROTOCOL']:
if key in environ:
headers[key] = str(environ[key])
body = json.dumps(headers, indent=2).encode('utf-8')
status = '200 OK'
content_type = 'application/json'
elif path == '/version':
# Return HTTP version info
server_protocol = environ.get('SERVER_PROTOCOL', 'HTTP/1.1')
body = server_protocol.encode('utf-8')
status = '200 OK'
content_type = 'text/plain'
elif path == '/large':
# Return a large response (1MB) for testing streaming
body = b'X' * (1024 * 1024)
status = '200 OK'
content_type = 'application/octet-stream'
elif path == '/stream':
# Return a streaming response
def generate():
for i in range(10):
yield f'chunk-{i}\n'.encode('utf-8')
start_response('200 OK', [
('Content-Type', 'text/plain'),
('Transfer-Encoding', 'chunked')
])
return generate()
elif path == '/status':
# Return a specific status code based on query string
query = environ.get('QUERY_STRING', '')
try:
code = int(query.split('=')[1]) if '=' in query else 200
except (ValueError, IndexError):
code = 200
status_messages = {
200: 'OK',
201: 'Created',
204: 'No Content',
400: 'Bad Request',
404: 'Not Found',
500: 'Internal Server Error',
}
status = f'{code} {status_messages.get(code, "Unknown")}'
body = f'Status: {code}'.encode('utf-8')
content_type = 'text/plain'
elif path == '/delay':
# Simulate a slow response
import time
query = environ.get('QUERY_STRING', '')
try:
delay = float(query.split('=')[1]) if '=' in query else 1.0
delay = min(delay, 5.0) # Cap at 5 seconds
except (ValueError, IndexError):
delay = 1.0
time.sleep(delay)
body = f'Delayed {delay}s'.encode('utf-8')
status = '200 OK'
content_type = 'text/plain'
elif path == '/method':
# Return the request method
body = method.encode('utf-8')
status = '200 OK'
content_type = 'text/plain'
else:
body = b'Not Found'
status = '404 Not Found'
content_type = 'text/plain'
response_headers = [
('Content-Type', content_type),
('Content-Length', str(len(body))),
('X-Request-Path', path),
('X-Request-Method', method),
]
start_response(status, response_headers)
return [body]
# For running directly with python
if __name__ == '__main__':
from wsgiref.simple_server import make_server
server = make_server('localhost', 8000, app)
print('Serving on http://localhost:8000')
server.serve_forever()

View File

@ -0,0 +1,3 @@
# This directory contains SSL certificates generated for testing.
# Certificates are generated automatically by conftest.py.
# Do not commit actual certificate files.

View File

@ -0,0 +1,198 @@
"""Pytest fixtures for HTTP/2 Docker integration tests."""
import os
import shutil
import subprocess
import time
from pathlib import Path
import pytest
# Directory containing this conftest.py
DOCKER_DIR = Path(__file__).parent
CERTS_DIR = DOCKER_DIR / "certs"
def generate_self_signed_cert(certs_dir: Path) -> None:
"""Generate self-signed SSL certificates for testing."""
certs_dir.mkdir(parents=True, exist_ok=True)
cert_file = certs_dir / "server.crt"
key_file = certs_dir / "server.key"
# Skip if certs already exist and are recent (less than 1 day old)
if cert_file.exists() and key_file.exists():
age = time.time() - cert_file.stat().st_mtime
if age < 86400: # 1 day
return
# Generate self-signed certificate
subprocess.run(
[
"openssl", "req", "-x509", "-newkey", "rsa:2048",
"-keyout", str(key_file),
"-out", str(cert_file),
"-days", "1",
"-nodes",
"-subj", "/CN=localhost/O=Gunicorn Test/C=US",
"-addext", "subjectAltName=DNS:localhost,DNS:gunicorn-h2,IP:127.0.0.1"
],
check=True,
capture_output=True
)
# Set readable permissions
cert_file.chmod(0o644)
key_file.chmod(0o644)
def wait_for_service(url: str, timeout: int = 60) -> bool:
"""Wait for a service to become available."""
import ssl
import socket
from urllib.parse import urlparse
parsed = urlparse(url)
host = parsed.hostname or 'localhost'
port = parsed.port or 443
start_time = time.time()
while time.time() - start_time < timeout:
try:
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
with socket.create_connection((host, port), timeout=5) as sock:
with ctx.wrap_socket(sock, server_hostname=host) as ssock:
return True
except (socket.error, ssl.SSLError, OSError):
time.sleep(1)
return False
@pytest.fixture(scope="session")
def docker_compose_file():
"""Return the path to docker-compose.yml."""
return DOCKER_DIR / "docker-compose.yml"
@pytest.fixture(scope="session")
def certs_dir():
"""Generate and return the certs directory."""
generate_self_signed_cert(CERTS_DIR)
return CERTS_DIR
@pytest.fixture(scope="session")
def docker_services(docker_compose_file, certs_dir):
"""Start Docker services for the test session."""
compose_file = str(docker_compose_file)
# Check if Docker is available
try:
subprocess.run(
["docker", "info"],
check=True,
capture_output=True
)
except (subprocess.CalledProcessError, FileNotFoundError):
pytest.skip("Docker is not available")
# Check if docker compose is available
try:
subprocess.run(
["docker", "compose", "version"],
check=True,
capture_output=True
)
except subprocess.CalledProcessError:
pytest.skip("Docker Compose is not available")
# Build and start services
try:
subprocess.run(
["docker", "compose", "-f", compose_file, "build"],
check=True,
cwd=DOCKER_DIR
)
subprocess.run(
["docker", "compose", "-f", compose_file, "up", "-d"],
check=True,
cwd=DOCKER_DIR
)
# Wait for services to be healthy
gunicorn_ready = wait_for_service("https://127.0.0.1:8443", timeout=60)
nginx_ready = wait_for_service("https://127.0.0.1:8444", timeout=60)
if not gunicorn_ready:
# Get logs for debugging
result = subprocess.run(
["docker", "compose", "-f", compose_file, "logs", "gunicorn-h2"],
capture_output=True,
text=True,
cwd=DOCKER_DIR
)
pytest.fail(f"Gunicorn service failed to start. Logs:\n{result.stdout}\n{result.stderr}")
if not nginx_ready:
result = subprocess.run(
["docker", "compose", "-f", compose_file, "logs", "nginx-h2"],
capture_output=True,
text=True,
cwd=DOCKER_DIR
)
pytest.fail(f"Nginx service failed to start. Logs:\n{result.stdout}\n{result.stderr}")
yield {
"gunicorn": "https://127.0.0.1:8443",
"nginx": "https://127.0.0.1:8444"
}
finally:
# Stop and remove services
subprocess.run(
["docker", "compose", "-f", compose_file, "down", "-v", "--remove-orphans"],
cwd=DOCKER_DIR,
capture_output=True
)
@pytest.fixture
def gunicorn_url(docker_services):
"""Return the gunicorn service URL."""
return docker_services["gunicorn"]
@pytest.fixture
def nginx_url(docker_services):
"""Return the nginx proxy URL."""
return docker_services["nginx"]
@pytest.fixture
def h2_client():
"""Create an HTTP/2 capable client."""
httpx = pytest.importorskip("httpx")
client = httpx.Client(http2=True, verify=False, timeout=30.0)
yield client
client.close()
@pytest.fixture
def h1_client():
"""Create an HTTP/1.1 only client."""
httpx = pytest.importorskip("httpx")
client = httpx.Client(http2=False, verify=False, timeout=30.0)
yield client
client.close()
@pytest.fixture
def async_h2_client():
"""Create an async HTTP/2 capable client."""
httpx = pytest.importorskip("httpx")
async def create_client():
return httpx.AsyncClient(http2=True, verify=False, timeout=30.0)
return create_client

View File

@ -0,0 +1,42 @@
services:
gunicorn-h2:
build:
context: ../../../
dockerfile: tests/docker/http2/Dockerfile.gunicorn
ports:
- "8443:8443"
volumes:
- ./certs:/certs:ro
- ./app.py:/app/app.py:ro
environment:
- GUNICORN_CERTFILE=/certs/server.crt
- GUNICORN_KEYFILE=/certs/server.key
healthcheck:
test: ["CMD", "python", "-c", "import ssl,socket; s=socket.socket(); s.settimeout(1); ctx=ssl.create_default_context(); ctx.check_hostname=False; ctx.verify_mode=ssl.CERT_NONE; ss=ctx.wrap_socket(s,server_hostname='localhost'); ss.connect(('localhost',8443)); ss.close()"]
interval: 2s
timeout: 5s
retries: 15
start_period: 5s
nginx-h2:
build:
context: .
dockerfile: Dockerfile.nginx
ports:
- "8444:8444"
volumes:
- ./certs:/certs:ro
- ./nginx.conf:/etc/nginx/nginx.conf:ro
depends_on:
gunicorn-h2:
condition: service_healthy
healthcheck:
test: ["CMD", "curl", "-k", "-f", "https://localhost:8444/health"]
interval: 2s
timeout: 5s
retries: 15
start_period: 5s
networks:
default:
driver: bridge

View File

@ -0,0 +1,73 @@
worker_processes auto;
error_log /var/log/nginx/error.log warn;
pid /var/run/nginx.pid;
events {
worker_connections 1024;
}
http {
include /etc/nginx/mime.types;
default_type application/octet-stream;
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';
access_log /var/log/nginx/access.log main;
sendfile on;
keepalive_timeout 65;
upstream gunicorn_h2 {
server gunicorn-h2:8443;
keepalive 32;
}
server {
listen 8444 ssl;
http2 on;
server_name localhost;
ssl_certificate /certs/server.crt;
ssl_certificate_key /certs/server.key;
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers HIGH:!aNULL:!MD5;
ssl_prefer_server_ciphers on;
# HTTP/2 settings
http2_max_concurrent_streams 128;
# Health check endpoint
location /health {
return 200 'OK';
add_header Content-Type text/plain;
}
location / {
# Proxy to gunicorn with HTTPS
proxy_pass https://gunicorn_h2;
proxy_http_version 1.1;
proxy_ssl_verify off;
proxy_ssl_server_name on;
# Headers
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_set_header X-Forwarded-Host $host;
proxy_set_header X-Forwarded-Port $server_port;
# Buffering settings
proxy_buffering on;
proxy_buffer_size 4k;
proxy_buffers 8 4k;
# Timeouts
proxy_connect_timeout 60s;
proxy_send_timeout 60s;
proxy_read_timeout 60s;
}
}
}

View File

@ -0,0 +1,350 @@
"""HTTP/2 Docker integration tests.
These tests verify HTTP/2 functionality with real connections to gunicorn
running in Docker containers, both directly and through an nginx proxy.
"""
import asyncio
import ssl
import socket
import pytest
# Mark all tests in this module as requiring Docker
pytestmark = [
pytest.mark.docker,
pytest.mark.http2,
pytest.mark.integration,
]
class TestDirectHTTP2Connection:
"""Test direct HTTP/2 connections to gunicorn."""
def test_simple_get(self, h2_client, gunicorn_url):
"""Test basic GET request over HTTP/2."""
response = h2_client.get(f"{gunicorn_url}/")
assert response.status_code == 200
assert response.http_version == "HTTP/2"
assert response.text == "Hello HTTP/2!"
def test_health_endpoint(self, h2_client, gunicorn_url):
"""Test health check endpoint."""
response = h2_client.get(f"{gunicorn_url}/health")
assert response.status_code == 200
assert response.text == "OK"
def test_post_with_body(self, h2_client, gunicorn_url):
"""Test POST request with body."""
data = b"test data for echo"
response = h2_client.post(f"{gunicorn_url}/echo", content=data)
assert response.status_code == 200
assert response.content == data
def test_post_large_body(self, h2_client, gunicorn_url):
"""Test POST with larger body."""
data = b"X" * 65536 # 64KB
response = h2_client.post(f"{gunicorn_url}/echo", content=data)
assert response.status_code == 200
assert response.content == data
assert len(response.content) == 65536
def test_headers_endpoint(self, h2_client, gunicorn_url):
"""Test that custom headers are received."""
response = h2_client.get(
f"{gunicorn_url}/headers",
headers={"X-Custom-Header": "test-value"}
)
assert response.status_code == 200
headers = response.json()
assert "HTTP_X_CUSTOM_HEADER" in headers
assert headers["HTTP_X_CUSTOM_HEADER"] == "test-value"
def test_version_endpoint(self, h2_client, gunicorn_url):
"""Test server protocol version."""
response = h2_client.get(f"{gunicorn_url}/version")
assert response.status_code == 200
# HTTP/2 should report as HTTP/2.0 or similar
assert "HTTP" in response.text
def test_large_response(self, h2_client, gunicorn_url):
"""Test receiving large response over HTTP/2."""
response = h2_client.get(f"{gunicorn_url}/large")
assert response.status_code == 200
assert len(response.content) == 1024 * 1024 # 1MB
assert response.content == b"X" * (1024 * 1024)
def test_different_methods(self, h2_client, gunicorn_url):
"""Test various HTTP methods."""
for method in ["GET", "POST", "PUT", "DELETE", "PATCH"]:
response = h2_client.request(method, f"{gunicorn_url}/method")
assert response.status_code == 200
assert response.text == method
def test_status_codes(self, h2_client, gunicorn_url):
"""Test various HTTP status codes."""
for code in [200, 201, 400, 404, 500]:
response = h2_client.get(f"{gunicorn_url}/status?code={code}")
assert response.status_code == code
def test_not_found(self, h2_client, gunicorn_url):
"""Test 404 response."""
response = h2_client.get(f"{gunicorn_url}/nonexistent")
assert response.status_code == 404
class TestConcurrentStreams:
"""Test HTTP/2 multiplexing with concurrent streams."""
@pytest.mark.asyncio
async def test_concurrent_requests(self, async_h2_client, gunicorn_url):
"""Test multiple concurrent requests over single connection."""
httpx = pytest.importorskip("httpx")
async with httpx.AsyncClient(http2=True, verify=False, timeout=30.0) as client:
# Send 10 concurrent requests
tasks = [
client.get(f"{gunicorn_url}/")
for _ in range(10)
]
responses = await asyncio.gather(*tasks)
assert len(responses) == 10
assert all(r.status_code == 200 for r in responses)
assert all(r.http_version == "HTTP/2" for r in responses)
assert all(r.text == "Hello HTTP/2!" for r in responses)
@pytest.mark.asyncio
async def test_concurrent_mixed_requests(self, async_h2_client, gunicorn_url):
"""Test concurrent requests to different endpoints."""
httpx = pytest.importorskip("httpx")
async with httpx.AsyncClient(http2=True, verify=False, timeout=30.0) as client:
tasks = [
client.get(f"{gunicorn_url}/"),
client.get(f"{gunicorn_url}/headers"),
client.get(f"{gunicorn_url}/version"),
client.post(f"{gunicorn_url}/echo", content=b"test"),
client.get(f"{gunicorn_url}/health"),
]
responses = await asyncio.gather(*tasks)
assert len(responses) == 5
assert all(r.status_code == 200 for r in responses)
@pytest.mark.asyncio
async def test_many_concurrent_streams(self, async_h2_client, gunicorn_url):
"""Test many concurrent streams (up to HTTP/2 limit)."""
httpx = pytest.importorskip("httpx")
async with httpx.AsyncClient(http2=True, verify=False, timeout=60.0) as client:
# Send 50 concurrent requests
tasks = [
client.get(f"{gunicorn_url}/")
for _ in range(50)
]
responses = await asyncio.gather(*tasks)
assert len(responses) == 50
assert all(r.status_code == 200 for r in responses)
class TestHTTP2BehindProxy:
"""Test HTTP/2 through nginx proxy."""
def test_simple_get_via_proxy(self, h2_client, nginx_url):
"""Test basic GET through nginx proxy."""
response = h2_client.get(f"{nginx_url}/")
assert response.status_code == 200
assert response.http_version == "HTTP/2"
assert response.text == "Hello HTTP/2!"
def test_post_via_proxy(self, h2_client, nginx_url):
"""Test POST through nginx proxy."""
data = b"proxied data"
response = h2_client.post(f"{nginx_url}/echo", content=data)
assert response.status_code == 200
assert response.content == data
def test_headers_preserved(self, h2_client, nginx_url):
"""Test that custom headers pass through proxy."""
response = h2_client.get(
f"{nginx_url}/headers",
headers={"X-Custom": "test-value"}
)
assert response.status_code == 200
headers = response.json()
assert "HTTP_X_CUSTOM" in headers
assert headers["HTTP_X_CUSTOM"] == "test-value"
def test_forwarded_headers(self, h2_client, nginx_url):
"""Test that proxy adds forwarded headers."""
response = h2_client.get(f"{nginx_url}/headers")
assert response.status_code == 200
headers = response.json()
# Nginx should add X-Forwarded-* headers
assert "HTTP_X_FORWARDED_FOR" in headers
assert "HTTP_X_FORWARDED_PROTO" in headers
assert headers["HTTP_X_FORWARDED_PROTO"] == "https"
def test_large_response_via_proxy(self, h2_client, nginx_url):
"""Test large response through proxy."""
response = h2_client.get(f"{nginx_url}/large")
assert response.status_code == 200
assert len(response.content) == 1024 * 1024
@pytest.mark.asyncio
async def test_concurrent_via_proxy(self, async_h2_client, nginx_url):
"""Test concurrent requests through proxy."""
httpx = pytest.importorskip("httpx")
async with httpx.AsyncClient(http2=True, verify=False, timeout=30.0) as client:
tasks = [
client.get(f"{nginx_url}/")
for _ in range(10)
]
responses = await asyncio.gather(*tasks)
assert len(responses) == 10
assert all(r.status_code == 200 for r in responses)
assert all(r.http_version == "HTTP/2" for r in responses)
class TestHTTP2Protocol:
"""Test HTTP/2 specific protocol behaviors."""
def test_alpn_negotiation(self, gunicorn_url):
"""Verify ALPN negotiates h2 protocol."""
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
ctx.set_alpn_protocols(['h2', 'http/1.1'])
with socket.create_connection(('127.0.0.1', 8443), timeout=10) as sock:
with ctx.wrap_socket(sock, server_hostname='localhost') as ssock:
selected = ssock.selected_alpn_protocol()
assert selected == 'h2', f"Expected h2, got {selected}"
def test_alpn_http11_fallback(self, gunicorn_url):
"""Test that server accepts HTTP/1.1 via ALPN."""
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
ctx.set_alpn_protocols(['http/1.1'])
with socket.create_connection(('127.0.0.1', 8443), timeout=10) as sock:
with ctx.wrap_socket(sock, server_hostname='localhost') as ssock:
selected = ssock.selected_alpn_protocol()
assert selected == 'http/1.1', f"Expected http/1.1, got {selected}"
def test_http11_client_works(self, h1_client, gunicorn_url):
"""Test that HTTP/1.1 client can still connect."""
response = h1_client.get(f"{gunicorn_url}/")
assert response.status_code == 200
assert response.http_version == "HTTP/1.1"
assert response.text == "Hello HTTP/2!"
def test_tls_version(self, gunicorn_url):
"""Verify TLS 1.2+ is used."""
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
with socket.create_connection(('127.0.0.1', 8443), timeout=10) as sock:
with ctx.wrap_socket(sock, server_hostname='localhost') as ssock:
version = ssock.version()
assert version in ('TLSv1.2', 'TLSv1.3'), f"Unexpected TLS version: {version}"
class TestHTTP2ErrorHandling:
"""Test HTTP/2 error handling."""
def test_invalid_path(self, h2_client, gunicorn_url):
"""Test request to non-existent path."""
response = h2_client.get(f"{gunicorn_url}/does/not/exist")
assert response.status_code == 404
assert response.http_version == "HTTP/2"
def test_server_error(self, h2_client, gunicorn_url):
"""Test server error response."""
response = h2_client.get(f"{gunicorn_url}/status?code=500")
assert response.status_code == 500
assert response.http_version == "HTTP/2"
@pytest.mark.asyncio
async def test_connection_reuse_after_error(self, async_h2_client, gunicorn_url):
"""Test that connection is reused after error response."""
httpx = pytest.importorskip("httpx")
async with httpx.AsyncClient(http2=True, verify=False, timeout=30.0) as client:
# First request - error
r1 = await client.get(f"{gunicorn_url}/status?code=500")
assert r1.status_code == 500
# Second request - should work on same connection
r2 = await client.get(f"{gunicorn_url}/")
assert r2.status_code == 200
assert r2.text == "Hello HTTP/2!"
class TestHTTP2Headers:
"""Test HTTP/2 header handling."""
def test_response_headers(self, h2_client, gunicorn_url):
"""Test that response headers are correctly received."""
response = h2_client.get(f"{gunicorn_url}/")
assert "content-type" in response.headers
assert "content-length" in response.headers
assert response.headers["x-request-path"] == "/"
assert response.headers["x-request-method"] == "GET"
def test_many_request_headers(self, h2_client, gunicorn_url):
"""Test sending many headers."""
headers = {f"X-Custom-{i}": f"value-{i}" for i in range(20)}
response = h2_client.get(f"{gunicorn_url}/headers", headers=headers)
assert response.status_code == 200
received = response.json()
for i in range(20):
key = f"HTTP_X_CUSTOM_{i}"
assert key in received
assert received[key] == f"value-{i}"
def test_header_case_insensitivity(self, h2_client, gunicorn_url):
"""Test HTTP/2 header case handling."""
response = h2_client.get(
f"{gunicorn_url}/headers",
headers={"X-Mixed-Case-Header": "test"}
)
assert response.status_code == 200
# HTTP/2 lowercases headers, but WSGI uppercases them
headers = response.json()
assert "HTTP_X_MIXED_CASE_HEADER" in headers
class TestHTTP2Performance:
"""Performance-related HTTP/2 tests."""
@pytest.mark.asyncio
async def test_parallel_large_requests(self, async_h2_client, gunicorn_url):
"""Test parallel requests with large responses."""
httpx = pytest.importorskip("httpx")
async with httpx.AsyncClient(http2=True, verify=False, timeout=60.0) as client:
tasks = [
client.get(f"{gunicorn_url}/large")
for _ in range(5)
]
responses = await asyncio.gather(*tasks)
assert len(responses) == 5
assert all(r.status_code == 200 for r in responses)
assert all(len(r.content) == 1024 * 1024 for r in responses)
def test_connection_keepalive(self, h2_client, gunicorn_url):
"""Test that connections are kept alive."""
# Multiple requests should reuse the same connection
for _ in range(5):
response = h2_client.get(f"{gunicorn_url}/")
assert response.status_code == 200
assert response.http_version == "HTTP/2"

209
tests/test_http2_alpn.py Normal file
View File

@ -0,0 +1,209 @@
# -*- coding: utf-8 -
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
"""Tests for HTTP/2 ALPN negotiation."""
import ssl
import pytest
from unittest import mock
from gunicorn import sock
from gunicorn.config import Config
def create_mock_ssl_socket(alpn_protocol=None):
"""Create a mock SSL socket for testing ALPN negotiation."""
mock_socket = mock.Mock(spec=ssl.SSLSocket)
mock_socket.selected_alpn_protocol.return_value = alpn_protocol
return mock_socket
class TestGetAlpnProtocols:
"""Test _get_alpn_protocols function."""
def test_h1_only_returns_empty(self):
"""No ALPN needed for HTTP/1.1 only."""
conf = mock.Mock()
conf.http_protocols = ["h1"]
result = sock._get_alpn_protocols(conf)
assert result == []
def test_h2_enabled_returns_alpn_list(self):
"""Should return ALPN protocols when h2 is enabled."""
conf = mock.Mock()
conf.http_protocols = ["h2", "h1"]
with mock.patch('gunicorn.http2.is_http2_available', return_value=True):
result = sock._get_alpn_protocols(conf)
assert "h2" in result
assert "http/1.1" in result
def test_h2_without_library_returns_empty(self):
"""Should return empty if h2 library not available."""
conf = mock.Mock()
conf.http_protocols = ["h2", "h1"]
with mock.patch('gunicorn.http2.is_http2_available', return_value=False):
result = sock._get_alpn_protocols(conf)
assert result == []
def test_empty_protocols_returns_empty(self):
conf = mock.Mock()
conf.http_protocols = []
result = sock._get_alpn_protocols(conf)
assert result == []
def test_none_protocols_returns_empty(self):
conf = mock.Mock()
conf.http_protocols = None
result = sock._get_alpn_protocols(conf)
assert result == []
def test_h2_only(self):
"""Should work with h2 only."""
conf = mock.Mock()
conf.http_protocols = ["h2"]
with mock.patch('gunicorn.http2.is_http2_available', return_value=True):
result = sock._get_alpn_protocols(conf)
assert "h2" in result
class TestGetNegotiatedProtocol:
"""Test get_negotiated_protocol function."""
def test_returns_alpn_protocol(self):
ssl_socket = create_mock_ssl_socket(alpn_protocol="h2")
result = sock.get_negotiated_protocol(ssl_socket)
assert result == "h2"
def test_returns_http11(self):
ssl_socket = create_mock_ssl_socket(alpn_protocol="http/1.1")
result = sock.get_negotiated_protocol(ssl_socket)
assert result == "http/1.1"
def test_returns_none_when_not_negotiated(self):
ssl_socket = create_mock_ssl_socket(alpn_protocol=None)
result = sock.get_negotiated_protocol(ssl_socket)
assert result is None
def test_returns_none_for_non_ssl_socket(self):
regular_socket = mock.Mock(spec=[]) # No SSL methods
result = sock.get_negotiated_protocol(regular_socket)
assert result is None
def test_handles_attribute_error(self):
"""Handle old SSL without selected_alpn_protocol."""
ssl_socket = mock.Mock(spec=ssl.SSLSocket)
del ssl_socket.selected_alpn_protocol # Remove the method
result = sock.get_negotiated_protocol(ssl_socket)
assert result is None
def test_handles_ssl_error(self):
"""Handle SSLError when checking protocol."""
ssl_socket = mock.Mock(spec=ssl.SSLSocket)
ssl_socket.selected_alpn_protocol.side_effect = ssl.SSLError()
result = sock.get_negotiated_protocol(ssl_socket)
assert result is None
class TestIsHttp2Negotiated:
"""Test is_http2_negotiated function."""
def test_returns_true_for_h2(self):
ssl_socket = create_mock_ssl_socket(alpn_protocol="h2")
result = sock.is_http2_negotiated(ssl_socket)
assert result is True
def test_returns_false_for_http11(self):
ssl_socket = create_mock_ssl_socket(alpn_protocol="http/1.1")
result = sock.is_http2_negotiated(ssl_socket)
assert result is False
def test_returns_false_for_none(self):
ssl_socket = create_mock_ssl_socket(alpn_protocol=None)
result = sock.is_http2_negotiated(ssl_socket)
assert result is False
def test_returns_false_for_non_ssl(self):
regular_socket = mock.Mock(spec=[])
result = sock.is_http2_negotiated(regular_socket)
assert result is False
class TestSSLContextAlpnConfiguration:
"""Test that SSL context configures ALPN properly."""
@pytest.fixture
def ssl_config(self, tmp_path):
"""Create a config with SSL settings."""
# Create dummy cert/key files
certfile = tmp_path / "cert.pem"
keyfile = tmp_path / "key.pem"
certfile.touch()
keyfile.touch()
conf = mock.Mock()
conf.certfile = str(certfile)
conf.keyfile = str(keyfile)
conf.ca_certs = None
conf.cert_reqs = ssl.CERT_NONE
conf.ciphers = None
conf.http_protocols = ["h2", "h1"]
conf.ssl_context = lambda conf, factory: factory()
return conf
def test_ssl_context_sets_alpn_when_h2_available(self, ssl_config):
"""SSL context should set ALPN protocols when h2 is available."""
with mock.patch('gunicorn.http2.is_http2_available', return_value=True):
with mock.patch('ssl.create_default_context') as mock_ctx:
mock_context = mock.Mock()
mock_ctx.return_value = mock_context
mock_context.load_cert_chain = mock.Mock()
try:
sock.ssl_context(ssl_config)
except Exception:
pass # May fail due to dummy certs
# Check that set_alpn_protocols was called
if mock_context.set_alpn_protocols.called:
call_args = mock_context.set_alpn_protocols.call_args[0][0]
assert 'h2' in call_args
def test_ssl_context_no_alpn_when_h1_only(self):
"""SSL context should not set ALPN for HTTP/1.1 only."""
conf = mock.Mock()
conf.http_protocols = ["h1"]
conf.ca_certs = None
conf.certfile = "cert.pem"
conf.keyfile = "key.pem"
conf.cert_reqs = ssl.CERT_NONE
conf.ciphers = None
conf.ssl_context = lambda conf, factory: factory()
with mock.patch('ssl.create_default_context') as mock_ctx:
mock_context = mock.Mock()
mock_ctx.return_value = mock_context
# ALPN should not be set for h1 only
alpn_protocols = sock._get_alpn_protocols(conf)
assert alpn_protocols == []
class TestAlpnProtocolMap:
"""Test ALPN protocol mapping."""
def test_h1_maps_to_http11(self):
from gunicorn.config import ALPN_PROTOCOL_MAP
assert ALPN_PROTOCOL_MAP.get("h1") == "http/1.1"
def test_h2_maps_to_h2(self):
from gunicorn.config import ALPN_PROTOCOL_MAP
assert ALPN_PROTOCOL_MAP.get("h2") == "h2"

View File

@ -0,0 +1,561 @@
# -*- coding: utf-8 -
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
"""Tests for async HTTP/2 server connection."""
import asyncio
import pytest
from unittest import mock
from io import BytesIO
# Check if h2 is available for integration tests
try:
import h2.connection
import h2.config
import h2.events
H2_AVAILABLE = True
except ImportError:
H2_AVAILABLE = False
from gunicorn.http2.errors import (
HTTP2Error, HTTP2ProtocolError, HTTP2ConnectionError
)
pytestmark = pytest.mark.skipif(not H2_AVAILABLE, reason="h2 library not available")
class MockConfig:
"""Mock gunicorn configuration for HTTP/2."""
def __init__(self):
self.http2_max_concurrent_streams = 100
self.http2_initial_window_size = 65535
self.http2_max_frame_size = 16384
self.http2_max_header_list_size = 65536
class MockAsyncReader:
"""Mock asyncio StreamReader for testing."""
def __init__(self, data=b''):
self._buffer = BytesIO(data)
self._eof = False
async def read(self, n=-1):
data = self._buffer.read(n)
if not data and self._eof:
return b''
return data
def set_data(self, data):
self._buffer = BytesIO(data)
def set_eof(self):
self._eof = True
self._buffer = BytesIO(b'')
class MockAsyncWriter:
"""Mock asyncio StreamWriter for testing."""
def __init__(self):
self._buffer = bytearray()
self._closed = False
self._drained = False
def write(self, data):
if self._closed:
raise OSError("Writer is closed")
self._buffer.extend(data)
async def drain(self):
self._drained = True
def close(self):
self._closed = True
async def wait_closed(self):
pass
def get_written_data(self):
return bytes(self._buffer)
def clear(self):
self._buffer.clear()
def create_client_connection():
"""Create an h2 client connection for generating test frames."""
config = h2.config.H2Configuration(client_side=True)
conn = h2.connection.H2Connection(config=config)
conn.initiate_connection()
return conn
class TestAsyncHTTP2ConnectionInit:
"""Test AsyncHTTP2Connection initialization."""
def test_basic_initialization(self):
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
assert conn.cfg is cfg
assert conn.reader is reader
assert conn.writer is writer
assert conn.client_addr == ('127.0.0.1', 12345)
assert conn.streams == {}
assert conn.is_closed is False
assert conn._initialized is False
def test_settings_from_config(self):
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
cfg.http2_max_concurrent_streams = 50
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
assert conn.max_concurrent_streams == 50
class TestAsyncHTTP2ConnectionInitiate:
"""Test async connection initiation."""
@pytest.mark.asyncio
async def test_initiate_connection(self):
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
await conn.initiate_connection()
assert conn._initialized is True
written_data = writer.get_written_data()
assert len(written_data) > 0
@pytest.mark.asyncio
async def test_initiate_connection_idempotent(self):
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
await conn.initiate_connection()
first_len = len(writer.get_written_data())
await conn.initiate_connection()
second_len = len(writer.get_written_data())
assert first_len == second_len
class TestAsyncHTTP2ConnectionReceiveData:
"""Test async receiving and processing data."""
@pytest.mark.asyncio
async def test_receive_empty_data_closes_connection(self):
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = MockAsyncReader()
reader.set_eof()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
await conn.initiate_connection()
requests = await conn.receive_data()
assert conn.is_closed is True
assert requests == []
@pytest.mark.asyncio
async def test_receive_simple_get_request(self):
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
await conn.initiate_connection()
# Create client and exchange settings
client = create_client_connection()
client_preface = client.data_to_send()
reader.set_data(client_preface)
await conn.receive_data()
server_data = writer.get_written_data()
if server_data:
client.receive_data(server_data)
# Client sends GET request
client.send_headers(
stream_id=1,
headers=[
(':method', 'GET'),
(':path', '/async-test'),
(':scheme', 'https'),
(':authority', 'localhost'),
],
end_stream=True
)
reader.set_data(client.data_to_send())
requests = await conn.receive_data()
assert len(requests) == 1
assert requests[0].method == 'GET'
assert requests[0].path == '/async-test'
@pytest.mark.asyncio
async def test_receive_with_timeout(self):
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
await conn.initiate_connection()
client = create_client_connection()
reader.set_data(client.data_to_send())
# Should complete without timeout
await conn.receive_data(timeout=5.0)
@pytest.mark.asyncio
async def test_receive_timeout_raises(self):
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
# Create a reader that blocks forever
async def blocking_read(n):
await asyncio.sleep(10)
return b''
reader = mock.Mock()
reader.read = mock.AsyncMock(side_effect=blocking_read)
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
await conn.initiate_connection()
# Timeout is converted to HTTP2ConnectionError by the implementation
with pytest.raises((asyncio.TimeoutError, HTTP2ConnectionError)):
await conn.receive_data(timeout=0.01)
class TestAsyncHTTP2ConnectionSendResponse:
"""Test async sending responses."""
@pytest.mark.asyncio
async def test_send_simple_response(self):
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
await conn.initiate_connection()
# Setup stream via request
client = create_client_connection()
reader.set_data(client.data_to_send())
await conn.receive_data()
client.receive_data(writer.get_written_data())
client.send_headers(1, [
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'localhost'),
], end_stream=True)
reader.set_data(client.data_to_send())
await conn.receive_data()
writer.clear()
await conn.send_response(
stream_id=1,
status=200,
headers=[('content-type', 'text/plain')],
body=b'Async Hello!'
)
events = client.receive_data(writer.get_written_data())
data_events = [e for e in events if isinstance(e, h2.events.DataReceived)]
assert len(data_events) == 1
assert data_events[0].data == b'Async Hello!'
@pytest.mark.asyncio
async def test_send_response_invalid_stream(self):
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
await conn.initiate_connection()
with pytest.raises(HTTP2Error):
await conn.send_response(stream_id=999, status=200, headers=[], body=None)
class TestAsyncHTTP2ConnectionSendData:
"""Test async send_data method."""
@pytest.mark.asyncio
async def test_send_data(self):
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
await conn.initiate_connection()
# Setup stream
client = create_client_connection()
reader.set_data(client.data_to_send())
await conn.receive_data()
client.receive_data(writer.get_written_data())
client.send_headers(1, [
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'localhost'),
], end_stream=True)
reader.set_data(client.data_to_send())
await conn.receive_data()
# Send full response using send_response
writer.clear()
await conn.send_response(
stream_id=1,
status=200,
headers=[('content-type', 'text/plain')],
body=b'chunk1chunk2'
)
events = client.receive_data(writer.get_written_data())
data_events = [e for e in events if isinstance(e, h2.events.DataReceived)]
assert len(data_events) >= 1
all_data = b''.join(e.data for e in data_events)
assert all_data == b'chunk1chunk2'
def get_h2_header_value(headers_list, name):
"""Extract a header value from h2 headers list."""
for header_name, header_value in headers_list:
name_str = header_name.decode() if isinstance(header_name, bytes) else header_name
if name_str == name:
return header_value.decode() if isinstance(header_value, bytes) else header_value
return None
class TestAsyncHTTP2ConnectionSendError:
"""Test async error response sending."""
@pytest.mark.asyncio
async def test_send_error(self):
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
await conn.initiate_connection()
client = create_client_connection()
reader.set_data(client.data_to_send())
await conn.receive_data()
client.receive_data(writer.get_written_data())
client.send_headers(1, [
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'localhost'),
], end_stream=True)
reader.set_data(client.data_to_send())
await conn.receive_data()
writer.clear()
await conn.send_error(stream_id=1, status_code=500, message="Internal Error")
events = client.receive_data(writer.get_written_data())
response_events = [e for e in events if isinstance(e, h2.events.ResponseReceived)]
assert len(response_events) == 1
headers_list = response_events[0].headers
assert get_h2_header_value(headers_list, ':status') == '500'
class TestAsyncHTTP2ConnectionResetStream:
"""Test async stream reset."""
@pytest.mark.asyncio
async def test_reset_stream(self):
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
await conn.initiate_connection()
client = create_client_connection()
reader.set_data(client.data_to_send())
await conn.receive_data()
client.receive_data(writer.get_written_data())
client.send_headers(1, [
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'localhost'),
], end_stream=False)
reader.set_data(client.data_to_send())
await conn.receive_data()
writer.clear()
await conn.reset_stream(stream_id=1, error_code=0x8)
events = client.receive_data(writer.get_written_data())
reset_events = [e for e in events if isinstance(e, h2.events.StreamReset)]
assert len(reset_events) == 1
class TestAsyncHTTP2ConnectionClose:
"""Test async connection close."""
@pytest.mark.asyncio
async def test_close_connection(self):
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
await conn.initiate_connection()
client = create_client_connection()
reader.set_data(client.data_to_send())
await conn.receive_data()
writer.clear()
await conn.close()
assert conn.is_closed is True
assert writer._closed is True
@pytest.mark.asyncio
async def test_close_idempotent(self):
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
await conn.initiate_connection()
await conn.close()
await conn.close() # Should not raise
class TestAsyncHTTP2ConnectionCleanup:
"""Test async stream cleanup."""
@pytest.mark.asyncio
async def test_cleanup_stream(self):
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
await conn.initiate_connection()
client = create_client_connection()
reader.set_data(client.data_to_send())
await conn.receive_data()
client.receive_data(writer.get_written_data())
client.send_headers(1, [
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'localhost'),
], end_stream=True)
reader.set_data(client.data_to_send())
await conn.receive_data()
assert 1 in conn.streams
conn.cleanup_stream(1)
assert 1 not in conn.streams
class TestAsyncHTTP2ConnectionRepr:
"""Test async connection representation."""
def test_repr(self):
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
repr_str = repr(conn)
assert "AsyncHTTP2Connection" in repr_str
assert "streams=" in repr_str
class TestAsyncHTTP2ConnectionSocketErrors:
"""Test socket error handling in async connection."""
@pytest.mark.asyncio
async def test_read_error_raises_connection_error(self):
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = mock.Mock()
reader.read = mock.AsyncMock(side_effect=OSError("Connection reset"))
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
await conn.initiate_connection()
with pytest.raises(HTTP2ConnectionError):
await conn.receive_data()
@pytest.mark.asyncio
async def test_write_error_raises_connection_error(self):
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = MockAsyncReader()
writer = mock.Mock()
writer.write = mock.Mock(side_effect=OSError("Broken pipe"))
writer.drain = mock.AsyncMock()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
with pytest.raises(HTTP2ConnectionError):
await conn.initiate_connection()

260
tests/test_http2_config.py Normal file
View File

@ -0,0 +1,260 @@
# -*- coding: utf-8 -
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
"""Tests for HTTP/2 configuration settings."""
import pytest
from gunicorn import config
from gunicorn.config import Config
class TestHttpProtocolsConfig:
"""Test http_protocols configuration setting."""
def test_default_is_h1(self):
c = Config()
assert c.http_protocols == ["h1"]
def test_set_h1_only(self):
c = Config()
c.set("http_protocols", "h1")
assert c.http_protocols == ["h1"]
def test_set_h2_only(self):
c = Config()
c.set("http_protocols", "h2")
assert c.http_protocols == ["h2"]
def test_set_h1_and_h2(self):
c = Config()
c.set("http_protocols", "h2,h1")
assert c.http_protocols == ["h2", "h1"]
def test_set_h1_h2_order_preserved(self):
c = Config()
c.set("http_protocols", "h1,h2")
assert c.http_protocols == ["h1", "h2"]
def test_whitespace_handling(self):
c = Config()
c.set("http_protocols", " h1 , h2 ")
assert c.http_protocols == ["h1", "h2"]
def test_case_insensitive(self):
c = Config()
c.set("http_protocols", "H1,H2")
assert c.http_protocols == ["h1", "h2"]
def test_empty_string_defaults_to_h1(self):
c = Config()
c.set("http_protocols", "")
assert c.http_protocols == ["h1"]
def test_none_defaults_to_h1(self):
c = Config()
c.set("http_protocols", None)
assert c.http_protocols == ["h1"]
def test_invalid_protocol(self):
c = Config()
with pytest.raises(ValueError) as exc_info:
c.set("http_protocols", "h4")
assert "Invalid protocol" in str(exc_info.value)
assert "h4" in str(exc_info.value)
def test_invalid_type(self):
c = Config()
with pytest.raises(TypeError) as exc_info:
c.set("http_protocols", 123)
assert "must be a string" in str(exc_info.value)
def test_invalid_type_list(self):
c = Config()
with pytest.raises(TypeError):
c.set("http_protocols", ["h1", "h2"])
def test_mixed_valid_invalid(self):
c = Config()
with pytest.raises(ValueError):
c.set("http_protocols", "h1,invalid,h2")
class TestHttp2MaxConcurrentStreams:
"""Test http2_max_concurrent_streams configuration setting."""
def test_default_value(self):
c = Config()
assert c.http2_max_concurrent_streams == 100
def test_set_custom_value(self):
c = Config()
c.set("http2_max_concurrent_streams", 50)
assert c.http2_max_concurrent_streams == 50
def test_set_from_string(self):
c = Config()
c.set("http2_max_concurrent_streams", "200")
assert c.http2_max_concurrent_streams == 200
def test_set_high_value(self):
c = Config()
c.set("http2_max_concurrent_streams", 1000)
assert c.http2_max_concurrent_streams == 1000
def test_negative_value_raises(self):
c = Config()
with pytest.raises(ValueError):
c.set("http2_max_concurrent_streams", -1)
def test_zero_value(self):
# Zero is technically valid for positive int validator
# It may have special meaning (use h2 default)
c = Config()
c.set("http2_max_concurrent_streams", 0)
assert c.http2_max_concurrent_streams == 0
class TestHttp2InitialWindowSize:
"""Test http2_initial_window_size configuration setting."""
def test_default_value(self):
c = Config()
# Default per RFC 7540 is 65535
assert c.http2_initial_window_size == 65535
def test_set_custom_value(self):
c = Config()
c.set("http2_initial_window_size", 131072)
assert c.http2_initial_window_size == 131072
def test_set_from_string(self):
c = Config()
c.set("http2_initial_window_size", "32768")
assert c.http2_initial_window_size == 32768
def test_negative_value_raises(self):
c = Config()
with pytest.raises(ValueError):
c.set("http2_initial_window_size", -1)
class TestHttp2MaxFrameSize:
"""Test http2_max_frame_size configuration setting."""
def test_default_value(self):
c = Config()
# Default per RFC 7540 is 16384
assert c.http2_max_frame_size == 16384
def test_set_custom_value(self):
c = Config()
c.set("http2_max_frame_size", 32768)
assert c.http2_max_frame_size == 32768
def test_set_from_string(self):
c = Config()
c.set("http2_max_frame_size", "65536")
assert c.http2_max_frame_size == 65536
def test_negative_value_raises(self):
c = Config()
with pytest.raises(ValueError):
c.set("http2_max_frame_size", -1)
class TestHttp2MaxHeaderListSize:
"""Test http2_max_header_list_size configuration setting."""
def test_default_value(self):
c = Config()
assert c.http2_max_header_list_size == 65536
def test_set_custom_value(self):
c = Config()
c.set("http2_max_header_list_size", 131072)
assert c.http2_max_header_list_size == 131072
def test_set_from_string(self):
c = Config()
c.set("http2_max_header_list_size", "262144")
assert c.http2_max_header_list_size == 262144
def test_negative_value_raises(self):
c = Config()
with pytest.raises(ValueError):
c.set("http2_max_header_list_size", -1)
class TestHttp2ConfigPropertyAccess:
"""Test property access for HTTP/2 settings."""
def test_all_http2_settings_accessible(self):
c = Config()
# These should not raise
_ = c.http_protocols
_ = c.http2_max_concurrent_streams
_ = c.http2_initial_window_size
_ = c.http2_max_frame_size
_ = c.http2_max_header_list_size
class TestHttp2ConfigDefaults:
"""Test that defaults match HTTP/2 specification values."""
def test_window_size_matches_rfc(self):
"""RFC 7540 default is 2^16-1 (65535)."""
c = Config()
assert c.http2_initial_window_size == 65535
def test_max_frame_size_matches_rfc_minimum(self):
"""RFC 7540 minimum is 2^14 (16384)."""
c = Config()
assert c.http2_max_frame_size == 16384
def test_concurrent_streams_reasonable_default(self):
"""Default should be reasonable for production use."""
c = Config()
assert 1 <= c.http2_max_concurrent_streams <= 1000
class TestValidateHttpProtocols:
"""Test the validate_http_protocols function directly."""
def test_validate_none(self):
result = config.validate_http_protocols(None)
assert result == ["h1"]
def test_validate_empty_string(self):
result = config.validate_http_protocols("")
assert result == ["h1"]
def test_validate_whitespace_only(self):
result = config.validate_http_protocols(" ")
assert result == ["h1"]
def test_validate_single_protocol(self):
result = config.validate_http_protocols("h2")
assert result == ["h2"]
def test_validate_multiple_protocols(self):
result = config.validate_http_protocols("h2,h1")
assert result == ["h2", "h1"]
def test_validate_with_spaces(self):
result = config.validate_http_protocols("h2 , h1")
assert result == ["h2", "h1"]
def test_validate_uppercase(self):
result = config.validate_http_protocols("H2,H1")
assert result == ["h1", "h2"] or result == ["h2", "h1"]
def test_validate_invalid_raises(self):
with pytest.raises(ValueError):
config.validate_http_protocols("http2")
def test_validate_type_error(self):
with pytest.raises(TypeError):
config.validate_http_protocols(42)

View File

@ -0,0 +1,579 @@
# -*- coding: utf-8 -
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
"""Tests for HTTP/2 server connection."""
import pytest
from unittest import mock
from io import BytesIO
# Check if h2 is available for integration tests
try:
import h2.connection
import h2.config
import h2.events
import h2.exceptions
H2_AVAILABLE = True
except ImportError:
H2_AVAILABLE = False
from gunicorn.http2.errors import (
HTTP2Error, HTTP2ProtocolError, HTTP2ConnectionError, HTTP2NotAvailable
)
pytestmark = pytest.mark.skipif(not H2_AVAILABLE, reason="h2 library not available")
class MockConfig:
"""Mock gunicorn configuration for HTTP/2."""
def __init__(self):
self.http2_max_concurrent_streams = 100
self.http2_initial_window_size = 65535
self.http2_max_frame_size = 16384
self.http2_max_header_list_size = 65536
class MockSocket:
"""Mock socket for testing connection without real network I/O."""
def __init__(self, data=b''):
self._recv_buffer = BytesIO(data)
self._sent = bytearray()
self._closed = False
def recv(self, size):
return self._recv_buffer.read(size)
def sendall(self, data):
if self._closed:
raise OSError("Socket is closed")
self._sent.extend(data)
def close(self):
self._closed = True
def get_sent_data(self):
return bytes(self._sent)
def set_recv_data(self, data):
self._recv_buffer = BytesIO(data)
def create_client_connection():
"""Create an h2 client connection for generating test frames."""
config = h2.config.H2Configuration(client_side=True)
conn = h2.connection.H2Connection(config=config)
conn.initiate_connection()
return conn
class TestHTTP2ServerConnectionInit:
"""Test HTTP2ServerConnection initialization."""
def test_basic_initialization(self):
from gunicorn.http2.connection import HTTP2ServerConnection
cfg = MockConfig()
sock = MockSocket()
conn = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
assert conn.cfg is cfg
assert conn.sock is sock
assert conn.client_addr == ('127.0.0.1', 12345)
assert conn.streams == {}
assert conn.is_closed is False
assert conn._initialized is False
def test_settings_from_config(self):
from gunicorn.http2.connection import HTTP2ServerConnection
cfg = MockConfig()
cfg.http2_max_concurrent_streams = 50
cfg.http2_initial_window_size = 32768
sock = MockSocket()
conn = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
assert conn.max_concurrent_streams == 50
assert conn.initial_window_size == 32768
class TestHTTP2ServerConnectionInitiate:
"""Test connection initiation."""
def test_initiate_connection(self):
from gunicorn.http2.connection import HTTP2ServerConnection
cfg = MockConfig()
sock = MockSocket()
conn = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
conn.initiate_connection()
assert conn._initialized is True
# Should have sent settings frame
sent_data = sock.get_sent_data()
assert len(sent_data) > 0
def test_initiate_connection_idempotent(self):
from gunicorn.http2.connection import HTTP2ServerConnection
cfg = MockConfig()
sock = MockSocket()
conn = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
conn.initiate_connection()
first_sent = len(sock.get_sent_data())
conn.initiate_connection() # Second call
second_sent = len(sock.get_sent_data())
# Should not send additional data
assert first_sent == second_sent
class TestHTTP2ServerConnectionReceiveData:
"""Test receiving and processing data."""
def test_receive_empty_data_closes_connection(self):
from gunicorn.http2.connection import HTTP2ServerConnection
cfg = MockConfig()
sock = MockSocket(b'')
conn = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
conn.initiate_connection()
requests = conn.receive_data()
assert conn.is_closed is True
assert requests == []
def test_receive_client_preface_and_headers(self):
from gunicorn.http2.connection import HTTP2ServerConnection
cfg = MockConfig()
sock = MockSocket()
conn = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
conn.initiate_connection()
# Generate client data
client = create_client_connection()
client_preface = client.data_to_send()
# Simulate server receiving client settings
# Feed client preface to server
requests = conn.receive_data(client_preface)
# No requests yet, just settings exchange
assert requests == []
def test_receive_simple_get_request(self):
from gunicorn.http2.connection import HTTP2ServerConnection
cfg = MockConfig()
sock = MockSocket()
conn = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
conn.initiate_connection()
# Create client and send request
client = create_client_connection()
client_preface = client.data_to_send()
# Process client preface on server
conn.receive_data(client_preface)
# Server may have sent settings, feed them to client
server_data = sock.get_sent_data()
if server_data:
client.receive_data(server_data)
# Client sends GET request
client.send_headers(
stream_id=1,
headers=[
(':method', 'GET'),
(':path', '/test'),
(':scheme', 'https'),
(':authority', 'localhost'),
],
end_stream=True
)
request_data = client.data_to_send()
# Server receives request
requests = conn.receive_data(request_data)
assert len(requests) == 1
req = requests[0]
assert req.method == 'GET'
assert req.path == '/test'
def test_receive_post_with_body(self):
from gunicorn.http2.connection import HTTP2ServerConnection
cfg = MockConfig()
sock = MockSocket()
conn = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
conn.initiate_connection()
# Create client
client = create_client_connection()
client_preface = client.data_to_send()
conn.receive_data(client_preface)
server_data = sock.get_sent_data()
if server_data:
client.receive_data(server_data)
# Client sends POST with body
client.send_headers(
stream_id=1,
headers=[
(':method', 'POST'),
(':path', '/submit'),
(':scheme', 'https'),
(':authority', 'localhost'),
('content-type', 'application/json'),
('content-length', '13'),
],
end_stream=False
)
client.send_data(stream_id=1, data=b'{"key":"val"}', end_stream=True)
request_data = client.data_to_send()
requests = conn.receive_data(request_data)
assert len(requests) == 1
req = requests[0]
assert req.method == 'POST'
assert req.body.read() == b'{"key":"val"}'
def test_socket_error_raises_connection_error(self):
from gunicorn.http2.connection import HTTP2ServerConnection
cfg = MockConfig()
sock = mock.Mock()
sock.recv.side_effect = OSError("Connection reset")
conn = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
conn.initiate_connection()
with pytest.raises(HTTP2ConnectionError):
conn.receive_data()
class TestHTTP2ServerConnectionSendResponse:
"""Test sending responses."""
def test_send_simple_response(self):
from gunicorn.http2.connection import HTTP2ServerConnection
cfg = MockConfig()
sock = MockSocket()
conn = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
conn.initiate_connection()
# Create a stream by receiving a request
client = create_client_connection()
client_preface = client.data_to_send()
conn.receive_data(client_preface)
server_data = sock.get_sent_data()
if server_data:
client.receive_data(server_data)
client.send_headers(1, [
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'localhost'),
], end_stream=True)
conn.receive_data(client.data_to_send())
# Send response
sock._sent.clear()
conn.send_response(
stream_id=1,
status=200,
headers=[('content-type', 'text/plain')],
body=b'Hello!'
)
sent = sock.get_sent_data()
assert len(sent) > 0
# Verify client receives valid response
events = client.receive_data(sent)
response_events = [e for e in events if isinstance(e, h2.events.ResponseReceived)]
data_events = [e for e in events if isinstance(e, h2.events.DataReceived)]
assert len(response_events) == 1
assert len(data_events) == 1
assert data_events[0].data == b'Hello!'
def test_send_response_with_empty_body(self):
from gunicorn.http2.connection import HTTP2ServerConnection
cfg = MockConfig()
sock = MockSocket()
conn = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
conn.initiate_connection()
client = create_client_connection()
conn.receive_data(client.data_to_send())
client.receive_data(sock.get_sent_data())
client.send_headers(1, [
(':method', 'HEAD'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'localhost'),
], end_stream=True)
conn.receive_data(client.data_to_send())
sock._sent.clear()
conn.send_response(stream_id=1, status=200, headers=[], body=None)
events = client.receive_data(sock.get_sent_data())
stream_ended = [e for e in events if isinstance(e, h2.events.StreamEnded)]
assert len(stream_ended) == 1
def test_send_response_invalid_stream(self):
from gunicorn.http2.connection import HTTP2ServerConnection
cfg = MockConfig()
sock = MockSocket()
conn = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
conn.initiate_connection()
with pytest.raises(HTTP2Error):
conn.send_response(stream_id=999, status=200, headers=[], body=None)
class TestHTTP2ServerConnectionSendError:
"""Test sending error responses."""
def test_send_error_with_message(self):
from gunicorn.http2.connection import HTTP2ServerConnection
cfg = MockConfig()
sock = MockSocket()
conn = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
conn.initiate_connection()
client = create_client_connection()
conn.receive_data(client.data_to_send())
client.receive_data(sock.get_sent_data())
client.send_headers(1, [
(':method', 'GET'),
(':path', '/notfound'),
(':scheme', 'https'),
(':authority', 'localhost'),
], end_stream=True)
conn.receive_data(client.data_to_send())
sock._sent.clear()
conn.send_error(stream_id=1, status_code=404, message="Not Found")
events = client.receive_data(sock.get_sent_data())
response_events = [e for e in events if isinstance(e, h2.events.ResponseReceived)]
data_events = [e for e in events if isinstance(e, h2.events.DataReceived)]
assert len(response_events) == 1
# h2 library returns headers as list of tuples, convert to dict
# Note: headers may be bytes or strings depending on h2 version
headers_list = response_events[0].headers
status = None
for name, value in headers_list:
name_str = name.decode() if isinstance(name, bytes) else name
if name_str == ':status':
status = value.decode() if isinstance(value, bytes) else value
break
assert status == '404'
assert len(data_events) == 1
assert data_events[0].data == b"Not Found"
class TestHTTP2ServerConnectionResetStream:
"""Test stream reset."""
def test_reset_stream(self):
from gunicorn.http2.connection import HTTP2ServerConnection
cfg = MockConfig()
sock = MockSocket()
conn = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
conn.initiate_connection()
client = create_client_connection()
conn.receive_data(client.data_to_send())
client.receive_data(sock.get_sent_data())
client.send_headers(1, [
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'localhost'),
], end_stream=False)
conn.receive_data(client.data_to_send())
sock._sent.clear()
conn.reset_stream(stream_id=1, error_code=0x8) # CANCEL
events = client.receive_data(sock.get_sent_data())
reset_events = [e for e in events if isinstance(e, h2.events.StreamReset)]
assert len(reset_events) == 1
assert reset_events[0].error_code == 0x8
class TestHTTP2ServerConnectionClose:
"""Test connection close."""
def test_close_connection(self):
from gunicorn.http2.connection import HTTP2ServerConnection
cfg = MockConfig()
sock = MockSocket()
conn = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
conn.initiate_connection()
client = create_client_connection()
conn.receive_data(client.data_to_send())
sock._sent.clear()
conn.close()
assert conn.is_closed is True
# Should have sent GOAWAY
events = client.receive_data(sock.get_sent_data())
goaway_events = [e for e in events if isinstance(e, h2.events.ConnectionTerminated)]
assert len(goaway_events) == 1
def test_close_idempotent(self):
from gunicorn.http2.connection import HTTP2ServerConnection
cfg = MockConfig()
sock = MockSocket()
conn = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
conn.initiate_connection()
conn.close()
sent_after_first = len(sock.get_sent_data())
conn.close() # Second call
sent_after_second = len(sock.get_sent_data())
# Should not send additional GOAWAY
assert sent_after_first == sent_after_second
class TestHTTP2ServerConnectionCleanup:
"""Test stream cleanup."""
def test_cleanup_stream(self):
from gunicorn.http2.connection import HTTP2ServerConnection
cfg = MockConfig()
sock = MockSocket()
conn = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
conn.initiate_connection()
client = create_client_connection()
conn.receive_data(client.data_to_send())
client.receive_data(sock.get_sent_data())
client.send_headers(1, [
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'localhost'),
], end_stream=True)
conn.receive_data(client.data_to_send())
assert 1 in conn.streams
conn.cleanup_stream(1)
assert 1 not in conn.streams
def test_cleanup_nonexistent_stream(self):
from gunicorn.http2.connection import HTTP2ServerConnection
cfg = MockConfig()
sock = MockSocket()
conn = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
conn.initiate_connection()
# Should not raise
conn.cleanup_stream(999)
class TestHTTP2ServerConnectionMultipleStreams:
"""Test handling multiple concurrent streams."""
def test_multiple_streams(self):
from gunicorn.http2.connection import HTTP2ServerConnection
cfg = MockConfig()
sock = MockSocket()
conn = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
conn.initiate_connection()
client = create_client_connection()
conn.receive_data(client.data_to_send())
client.receive_data(sock.get_sent_data())
# Send multiple requests
client.send_headers(1, [
(':method', 'GET'),
(':path', '/one'),
(':scheme', 'https'),
(':authority', 'localhost'),
], end_stream=True)
client.send_headers(3, [
(':method', 'GET'),
(':path', '/two'),
(':scheme', 'https'),
(':authority', 'localhost'),
], end_stream=True)
requests = conn.receive_data(client.data_to_send())
assert len(requests) == 2
paths = {req.path for req in requests}
assert paths == {'/one', '/two'}
class TestHTTP2ServerConnectionRepr:
"""Test string representation."""
def test_repr(self):
from gunicorn.http2.connection import HTTP2ServerConnection
cfg = MockConfig()
sock = MockSocket()
conn = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
repr_str = repr(conn)
assert "HTTP2ServerConnection" in repr_str
assert "streams=" in repr_str
assert "closed=" in repr_str
class TestHTTP2NotAvailable:
"""Test behavior when h2 is not available."""
def test_import_error_raises_not_available(self):
from gunicorn.http2 import errors
# Test that HTTP2NotAvailable can be raised
with pytest.raises(errors.HTTP2NotAvailable):
raise errors.HTTP2NotAvailable()

228
tests/test_http2_errors.py Normal file
View File

@ -0,0 +1,228 @@
# -*- coding: utf-8 -
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
"""Tests for HTTP/2 error classes."""
import pytest
from gunicorn.http2.errors import (
HTTP2Error,
HTTP2ProtocolError,
HTTP2InternalError,
HTTP2FlowControlError,
HTTP2SettingsTimeout,
HTTP2StreamClosed,
HTTP2FrameSizeError,
HTTP2RefusedStream,
HTTP2Cancel,
HTTP2CompressionError,
HTTP2ConnectError,
HTTP2EnhanceYourCalm,
HTTP2InadequateSecurity,
HTTP2RequiresHTTP11,
HTTP2StreamError,
HTTP2ConnectionError,
HTTP2ConfigurationError,
HTTP2NotAvailable,
)
class TestHTTP2ErrorCodes:
"""Test RFC 7540 error codes."""
def test_no_error(self):
err = HTTP2Error()
assert err.error_code == 0x0
def test_protocol_error(self):
err = HTTP2ProtocolError()
assert err.error_code == 0x1
def test_internal_error(self):
err = HTTP2InternalError()
assert err.error_code == 0x2
def test_flow_control_error(self):
err = HTTP2FlowControlError()
assert err.error_code == 0x3
def test_settings_timeout(self):
err = HTTP2SettingsTimeout()
assert err.error_code == 0x4
def test_stream_closed(self):
err = HTTP2StreamClosed()
assert err.error_code == 0x5
def test_frame_size_error(self):
err = HTTP2FrameSizeError()
assert err.error_code == 0x6
def test_refused_stream(self):
err = HTTP2RefusedStream()
assert err.error_code == 0x7
def test_cancel(self):
err = HTTP2Cancel()
assert err.error_code == 0x8
def test_compression_error(self):
err = HTTP2CompressionError()
assert err.error_code == 0x9
def test_connect_error(self):
err = HTTP2ConnectError()
assert err.error_code == 0xa
def test_enhance_your_calm(self):
err = HTTP2EnhanceYourCalm()
assert err.error_code == 0xb
def test_inadequate_security(self):
err = HTTP2InadequateSecurity()
assert err.error_code == 0xc
def test_http11_required(self):
err = HTTP2RequiresHTTP11()
assert err.error_code == 0xd
class TestHTTP2ErrorInheritance:
"""Test error class inheritance."""
def test_all_inherit_from_http2error(self):
error_classes = [
HTTP2ProtocolError,
HTTP2InternalError,
HTTP2FlowControlError,
HTTP2SettingsTimeout,
HTTP2StreamClosed,
HTTP2FrameSizeError,
HTTP2RefusedStream,
HTTP2Cancel,
HTTP2CompressionError,
HTTP2ConnectError,
HTTP2EnhanceYourCalm,
HTTP2InadequateSecurity,
HTTP2RequiresHTTP11,
HTTP2StreamError,
HTTP2ConnectionError,
HTTP2ConfigurationError,
HTTP2NotAvailable,
]
for cls in error_classes:
assert issubclass(cls, HTTP2Error)
assert issubclass(cls, Exception)
def test_http2error_is_exception(self):
assert issubclass(HTTP2Error, Exception)
class TestHTTP2ErrorMessages:
"""Test error message handling."""
def test_default_message_from_docstring(self):
err = HTTP2ProtocolError()
assert err.message == "Protocol error detected."
assert str(err) == "Protocol error detected."
def test_custom_message(self):
err = HTTP2ProtocolError("Custom error message")
assert err.message == "Custom error message"
assert str(err) == "Custom error message"
def test_custom_error_code(self):
err = HTTP2Error("Test", error_code=0xFF)
assert err.error_code == 0xFF
def test_message_and_error_code(self):
err = HTTP2ProtocolError("Custom", error_code=0x99)
assert err.message == "Custom"
assert err.error_code == 0x99
class TestHTTP2StreamError:
"""Test stream-specific error handling."""
def test_stream_id_in_error(self):
err = HTTP2StreamError(stream_id=5)
assert err.stream_id == 5
def test_stream_error_str(self):
err = HTTP2StreamError(stream_id=7, message="Stream reset")
assert "Stream 7" in str(err)
assert "Stream reset" in str(err)
def test_stream_error_default_message(self):
err = HTTP2StreamError(stream_id=3)
assert err.stream_id == 3
assert "Stream 3" in str(err)
def test_stream_error_with_error_code(self):
err = HTTP2StreamError(stream_id=1, error_code=0x8)
assert err.stream_id == 1
assert err.error_code == 0x8
class TestHTTP2ConnectionError:
"""Test connection-level error handling."""
def test_connection_error_basic(self):
err = HTTP2ConnectionError("Connection failed")
assert str(err) == "Connection failed"
assert isinstance(err, HTTP2Error)
class TestHTTP2ConfigurationError:
"""Test configuration error handling."""
def test_configuration_error_basic(self):
err = HTTP2ConfigurationError("Invalid setting")
assert str(err) == "Invalid setting"
assert isinstance(err, HTTP2Error)
class TestHTTP2NotAvailable:
"""Test HTTP/2 unavailable error."""
def test_default_message(self):
err = HTTP2NotAvailable()
assert "h2 library" in err.message
assert "pip install" in err.message
def test_custom_message(self):
err = HTTP2NotAvailable("Custom unavailable message")
assert err.message == "Custom unavailable message"
def test_inherits_from_http2error(self):
err = HTTP2NotAvailable()
assert isinstance(err, HTTP2Error)
class TestErrorRaising:
"""Test that errors can be properly raised and caught."""
def test_raise_and_catch_http2error(self):
with pytest.raises(HTTP2Error):
raise HTTP2ProtocolError("Test")
def test_raise_and_catch_specific(self):
with pytest.raises(HTTP2ProtocolError):
raise HTTP2ProtocolError("Test")
def test_raise_stream_error(self):
with pytest.raises(HTTP2StreamError) as exc_info:
raise HTTP2StreamError(stream_id=5, message="Test stream error")
assert exc_info.value.stream_id == 5
def test_error_chaining(self):
try:
try:
raise ValueError("Original")
except ValueError as e:
raise HTTP2InternalError("Wrapped") from e
except HTTP2InternalError as err:
assert err.__cause__ is not None
assert isinstance(err.__cause__, ValueError)

View File

@ -0,0 +1,642 @@
# -*- coding: utf-8 -
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
"""Integration tests for HTTP/2 with full request/response cycles."""
import pytest
from io import BytesIO
# Check if h2 is available
try:
import h2.connection
import h2.config
import h2.events
H2_AVAILABLE = True
except ImportError:
H2_AVAILABLE = False
pytestmark = pytest.mark.skipif(not H2_AVAILABLE, reason="h2 library not available")
def get_header_value(headers_list, name):
"""Extract a header value from h2 headers list.
h2 library may return headers as bytes or strings depending on version.
"""
for header_name, header_value in headers_list:
name_str = header_name.decode() if isinstance(header_name, bytes) else header_name
if name_str == name:
return header_value.decode() if isinstance(header_value, bytes) else header_value
return None
class MockConfig:
"""Mock gunicorn configuration for HTTP/2."""
def __init__(self):
self.http2_max_concurrent_streams = 100
self.http2_initial_window_size = 65535
self.http2_max_frame_size = 16384
self.http2_max_header_list_size = 65536
class MockSocket:
"""Mock socket for integration testing."""
def __init__(self, data=b''):
self._recv_buffer = BytesIO(data)
self._sent = bytearray()
def recv(self, size):
return self._recv_buffer.read(size)
def sendall(self, data):
self._sent.extend(data)
def get_sent_data(self):
return bytes(self._sent)
def set_recv_data(self, data):
self._recv_buffer = BytesIO(data)
def clear_sent(self):
self._sent.clear()
def create_h2_client():
"""Create an h2 client connection."""
config = h2.config.H2Configuration(client_side=True)
conn = h2.connection.H2Connection(config=config)
conn.initiate_connection()
return conn
class TestSimpleRequestResponse:
"""Test simple request/response cycles."""
def test_get_request_text_response(self):
"""Test a complete GET request with text response."""
from gunicorn.http2.connection import HTTP2ServerConnection
cfg = MockConfig()
sock = MockSocket()
server = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
server.initiate_connection()
# Client setup
client = create_h2_client()
sock.set_recv_data(client.data_to_send())
server.receive_data()
client.receive_data(sock.get_sent_data())
# Client sends request
client.send_headers(1, [
(':method', 'GET'),
(':path', '/hello'),
(':scheme', 'https'),
(':authority', 'example.com'),
('accept', 'text/plain'),
], end_stream=True)
sock.set_recv_data(client.data_to_send())
# Server receives request
requests = server.receive_data()
assert len(requests) == 1
req = requests[0]
# Verify request properties
assert req.method == 'GET'
assert req.path == '/hello'
assert req.version == (2, 0)
assert req.get_header('ACCEPT') == 'text/plain'
# Server sends response
sock.clear_sent()
server.send_response(
stream_id=1,
status=200,
headers=[
('content-type', 'text/plain'),
('content-length', '12'),
],
body=b'Hello World!'
)
# Client verifies response
events = client.receive_data(sock.get_sent_data())
response_events = [e for e in events if isinstance(e, h2.events.ResponseReceived)]
assert len(response_events) == 1
headers_list = response_events[0].headers
assert get_header_value(headers_list, ':status') == '200'
assert get_header_value(headers_list, 'content-type') == 'text/plain'
data_events = [e for e in events if isinstance(e, h2.events.DataReceived)]
assert len(data_events) == 1
assert data_events[0].data == b'Hello World!'
def test_post_request_with_json_body(self):
"""Test POST request with JSON body and response."""
from gunicorn.http2.connection import HTTP2ServerConnection
cfg = MockConfig()
sock = MockSocket()
server = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
server.initiate_connection()
client = create_h2_client()
sock.set_recv_data(client.data_to_send())
server.receive_data()
client.receive_data(sock.get_sent_data())
# Client sends POST with body
request_body = b'{"username": "test", "action": "login"}'
client.send_headers(1, [
(':method', 'POST'),
(':path', '/api/login'),
(':scheme', 'https'),
(':authority', 'api.example.com'),
('content-type', 'application/json'),
('content-length', str(len(request_body))),
], end_stream=False)
client.send_data(1, request_body, end_stream=True)
sock.set_recv_data(client.data_to_send())
requests = server.receive_data()
assert len(requests) == 1
req = requests[0]
assert req.method == 'POST'
assert req.content_type == 'application/json'
assert req.body.read() == request_body
# Server responds
sock.clear_sent()
response_body = b'{"status": "success", "token": "abc123"}'
server.send_response(
stream_id=1,
status=200,
headers=[
('content-type', 'application/json'),
('content-length', str(len(response_body))),
],
body=response_body
)
events = client.receive_data(sock.get_sent_data())
data_events = [e for e in events if isinstance(e, h2.events.DataReceived)]
assert data_events[0].data == response_body
class TestMultipleStreams:
"""Test concurrent stream handling."""
def test_concurrent_requests(self):
"""Test handling multiple concurrent requests."""
from gunicorn.http2.connection import HTTP2ServerConnection
cfg = MockConfig()
sock = MockSocket()
server = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
server.initiate_connection()
client = create_h2_client()
sock.set_recv_data(client.data_to_send())
server.receive_data()
client.receive_data(sock.get_sent_data())
# Client sends three concurrent requests
for stream_id, path in [(1, '/one'), (3, '/two'), (5, '/three')]:
client.send_headers(stream_id, [
(':method', 'GET'),
(':path', path),
(':scheme', 'https'),
(':authority', 'example.com'),
], end_stream=True)
sock.set_recv_data(client.data_to_send())
requests = server.receive_data()
assert len(requests) == 3
paths = {req.path for req in requests}
assert paths == {'/one', '/two', '/three'}
# Server responds to all
sock.clear_sent()
for req in requests:
server.send_response(
stream_id=req.stream.stream_id,
status=200,
headers=[('x-path', req.path)],
body=req.path.encode()
)
events = client.receive_data(sock.get_sent_data())
response_events = [e for e in events if isinstance(e, h2.events.ResponseReceived)]
assert len(response_events) == 3
def test_interleaved_request_response(self):
"""Test interleaved request and response processing."""
from gunicorn.http2.connection import HTTP2ServerConnection
cfg = MockConfig()
sock = MockSocket()
server = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
server.initiate_connection()
client = create_h2_client()
sock.set_recv_data(client.data_to_send())
server.receive_data()
client.receive_data(sock.get_sent_data())
# First request
client.send_headers(1, [
(':method', 'GET'),
(':path', '/first'),
(':scheme', 'https'),
(':authority', 'example.com'),
], end_stream=True)
sock.set_recv_data(client.data_to_send())
requests = server.receive_data()
assert len(requests) == 1
# Respond to first before second arrives
sock.clear_sent()
server.send_response(1, 200, [], b'First response')
client.receive_data(sock.get_sent_data())
# Second request
client.send_headers(3, [
(':method', 'GET'),
(':path', '/second'),
(':scheme', 'https'),
(':authority', 'example.com'),
], end_stream=True)
sock.set_recv_data(client.data_to_send())
requests = server.receive_data()
assert len(requests) == 1
# Respond to second
sock.clear_sent()
server.send_response(3, 200, [], b'Second response')
events = client.receive_data(sock.get_sent_data())
data_events = [e for e in events if isinstance(e, h2.events.DataReceived)]
assert data_events[0].data == b'Second response'
class TestErrorHandling:
"""Test error response scenarios."""
def test_404_response(self):
"""Test 404 Not Found response."""
from gunicorn.http2.connection import HTTP2ServerConnection
cfg = MockConfig()
sock = MockSocket()
server = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
server.initiate_connection()
client = create_h2_client()
sock.set_recv_data(client.data_to_send())
server.receive_data()
client.receive_data(sock.get_sent_data())
client.send_headers(1, [
(':method', 'GET'),
(':path', '/nonexistent'),
(':scheme', 'https'),
(':authority', 'example.com'),
], end_stream=True)
sock.set_recv_data(client.data_to_send())
server.receive_data()
sock.clear_sent()
server.send_error(1, 404, "Not Found")
events = client.receive_data(sock.get_sent_data())
response_events = [e for e in events if isinstance(e, h2.events.ResponseReceived)]
headers_list = response_events[0].headers
assert get_header_value(headers_list, ':status') == '404'
def test_500_response(self):
"""Test 500 Internal Server Error response."""
from gunicorn.http2.connection import HTTP2ServerConnection
cfg = MockConfig()
sock = MockSocket()
server = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
server.initiate_connection()
client = create_h2_client()
sock.set_recv_data(client.data_to_send())
server.receive_data()
client.receive_data(sock.get_sent_data())
client.send_headers(1, [
(':method', 'GET'),
(':path', '/error'),
(':scheme', 'https'),
(':authority', 'example.com'),
], end_stream=True)
sock.set_recv_data(client.data_to_send())
server.receive_data()
sock.clear_sent()
server.send_error(1, 500, "Internal Server Error")
events = client.receive_data(sock.get_sent_data())
response_events = [e for e in events if isinstance(e, h2.events.ResponseReceived)]
headers_list = response_events[0].headers
assert get_header_value(headers_list, ':status') == '500'
def test_stream_reset_by_server(self):
"""Test server resetting a stream."""
from gunicorn.http2.connection import HTTP2ServerConnection
cfg = MockConfig()
sock = MockSocket()
server = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
server.initiate_connection()
client = create_h2_client()
sock.set_recv_data(client.data_to_send())
server.receive_data()
client.receive_data(sock.get_sent_data())
# Start a request but don't finish
client.send_headers(1, [
(':method', 'POST'),
(':path', '/upload'),
(':scheme', 'https'),
(':authority', 'example.com'),
], end_stream=False)
sock.set_recv_data(client.data_to_send())
server.receive_data()
# Server resets the stream
sock.clear_sent()
server.reset_stream(1, error_code=0x8) # CANCEL
events = client.receive_data(sock.get_sent_data())
reset_events = [e for e in events if isinstance(e, h2.events.StreamReset)]
assert len(reset_events) == 1
assert reset_events[0].error_code == 0x8
class TestConnectionLifecycle:
"""Test connection lifecycle events."""
def test_graceful_shutdown(self):
"""Test graceful connection shutdown with GOAWAY."""
from gunicorn.http2.connection import HTTP2ServerConnection
cfg = MockConfig()
sock = MockSocket()
server = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
server.initiate_connection()
client = create_h2_client()
sock.set_recv_data(client.data_to_send())
server.receive_data()
client.receive_data(sock.get_sent_data())
# Process a request first
client.send_headers(1, [
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'example.com'),
], end_stream=True)
sock.set_recv_data(client.data_to_send())
server.receive_data()
sock.clear_sent()
server.send_response(1, 200, [], b'OK')
client.receive_data(sock.get_sent_data())
# Server initiates graceful shutdown
sock.clear_sent()
server.close()
events = client.receive_data(sock.get_sent_data())
goaway_events = [e for e in events if isinstance(e, h2.events.ConnectionTerminated)]
assert len(goaway_events) == 1
def test_client_initiated_close(self):
"""Test handling client-initiated connection close."""
from gunicorn.http2.connection import HTTP2ServerConnection
cfg = MockConfig()
sock = MockSocket()
server = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
server.initiate_connection()
client = create_h2_client()
sock.set_recv_data(client.data_to_send())
server.receive_data()
client.receive_data(sock.get_sent_data())
# Client closes connection
client.close_connection()
sock.set_recv_data(client.data_to_send())
server.receive_data()
assert server.is_closed is True
class TestLargePayloads:
"""Test handling of large payloads."""
def test_moderate_request_body(self):
"""Test handling moderate-sized request body within flow control."""
from gunicorn.http2.connection import HTTP2ServerConnection
cfg = MockConfig()
sock = MockSocket()
server = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
server.initiate_connection()
client = create_h2_client()
sock.set_recv_data(client.data_to_send())
server.receive_data()
client.receive_data(sock.get_sent_data())
# Send body that fits within initial window (65535 bytes)
body = b'X' * 10000
client.send_headers(1, [
(':method', 'POST'),
(':path', '/upload'),
(':scheme', 'https'),
(':authority', 'example.com'),
('content-length', str(len(body))),
], end_stream=False)
client.send_data(1, body, end_stream=True)
sock.set_recv_data(client.data_to_send())
requests = server.receive_data()
assert len(requests) == 1
received_body = requests[0].body.read()
assert len(received_body) == len(body)
assert received_body == body
def test_moderate_response_body(self):
"""Test sending moderate-sized response body."""
from gunicorn.http2.connection import HTTP2ServerConnection
cfg = MockConfig()
sock = MockSocket()
server = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
server.initiate_connection()
client = create_h2_client()
sock.set_recv_data(client.data_to_send())
server.receive_data()
client.receive_data(sock.get_sent_data())
client.send_headers(1, [
(':method', 'GET'),
(':path', '/moderate'),
(':scheme', 'https'),
(':authority', 'example.com'),
], end_stream=True)
sock.set_recv_data(client.data_to_send())
server.receive_data()
# Send moderate response (within max frame size)
moderate_body = b'Y' * 8000
sock.clear_sent()
server.send_response(1, 200, [('content-length', str(len(moderate_body)))], moderate_body)
# Client receives response
events = client.receive_data(sock.get_sent_data())
data_events = [e for e in events if isinstance(e, h2.events.DataReceived)]
received_data = b''.join(e.data for e in data_events)
assert received_data == moderate_body
class TestSpecialCases:
"""Test special/edge cases."""
def test_head_request(self):
"""Test HEAD request (no body in response)."""
from gunicorn.http2.connection import HTTP2ServerConnection
cfg = MockConfig()
sock = MockSocket()
server = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
server.initiate_connection()
client = create_h2_client()
sock.set_recv_data(client.data_to_send())
server.receive_data()
client.receive_data(sock.get_sent_data())
client.send_headers(1, [
(':method', 'HEAD'),
(':path', '/resource'),
(':scheme', 'https'),
(':authority', 'example.com'),
], end_stream=True)
sock.set_recv_data(client.data_to_send())
requests = server.receive_data()
assert requests[0].method == 'HEAD'
# Send response with content-length but no body
sock.clear_sent()
server.send_response(
1, 200,
[('content-length', '1000'), ('content-type', 'text/html')],
body=None
)
events = client.receive_data(sock.get_sent_data())
stream_ended = [e for e in events if isinstance(e, h2.events.StreamEnded)]
assert len(stream_ended) == 1
def test_options_request(self):
"""Test OPTIONS request."""
from gunicorn.http2.connection import HTTP2ServerConnection
cfg = MockConfig()
sock = MockSocket()
server = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
server.initiate_connection()
client = create_h2_client()
sock.set_recv_data(client.data_to_send())
server.receive_data()
client.receive_data(sock.get_sent_data())
client.send_headers(1, [
(':method', 'OPTIONS'),
(':path', '*'),
(':scheme', 'https'),
(':authority', 'example.com'),
], end_stream=True)
sock.set_recv_data(client.data_to_send())
requests = server.receive_data()
assert requests[0].method == 'OPTIONS'
assert requests[0].uri == '*'
def test_request_with_query_string(self):
"""Test request with query string parameters."""
from gunicorn.http2.connection import HTTP2ServerConnection
cfg = MockConfig()
sock = MockSocket()
server = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
server.initiate_connection()
client = create_h2_client()
sock.set_recv_data(client.data_to_send())
server.receive_data()
client.receive_data(sock.get_sent_data())
client.send_headers(1, [
(':method', 'GET'),
(':path', '/search?q=test&page=2&sort=desc'),
(':scheme', 'https'),
(':authority', 'example.com'),
], end_stream=True)
sock.set_recv_data(client.data_to_send())
requests = server.receive_data()
req = requests[0]
assert req.path == '/search'
assert req.query == 'q=test&page=2&sort=desc'
def test_request_with_multiple_headers_same_name(self):
"""Test request with multiple headers of the same name."""
from gunicorn.http2.connection import HTTP2ServerConnection
cfg = MockConfig()
sock = MockSocket()
server = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
server.initiate_connection()
client = create_h2_client()
sock.set_recv_data(client.data_to_send())
server.receive_data()
client.receive_data(sock.get_sent_data())
client.send_headers(1, [
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'example.com'),
('accept', 'text/html'),
('accept', 'application/json'),
('accept', '*/*'),
], end_stream=True)
sock.set_recv_data(client.data_to_send())
requests = server.receive_data()
req = requests[0]
accept_headers = [h[1] for h in req.headers if h[0] == 'ACCEPT']
assert len(accept_headers) == 3

575
tests/test_http2_request.py Normal file
View File

@ -0,0 +1,575 @@
# -*- coding: utf-8 -
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
"""Tests for HTTP/2 request and body classes."""
import pytest
from unittest import mock
from gunicorn.http2.request import HTTP2Request, HTTP2Body
from gunicorn.http2.stream import HTTP2Stream
class MockConnection:
"""Mock HTTP/2 connection for testing."""
def __init__(self, initial_window_size=65535):
self.initial_window_size = initial_window_size
class MockConfig:
"""Mock gunicorn configuration."""
def __init__(self):
pass
class TestHTTP2Body:
"""Test HTTP2Body class."""
def test_init_with_data(self):
body = HTTP2Body(b"Hello, World!")
assert len(body) == 13
def test_init_empty(self):
body = HTTP2Body(b"")
assert len(body) == 0
def test_read_all(self):
body = HTTP2Body(b"Test data")
assert body.read() == b"Test data"
assert body.read() == b"" # Already consumed
def test_read_with_size(self):
body = HTTP2Body(b"Hello, World!")
assert body.read(5) == b"Hello"
assert body.read(2) == b", "
assert body.read(100) == b"World!"
assert body.read(1) == b""
def test_read_none_size(self):
body = HTTP2Body(b"Test")
assert body.read(None) == b"Test"
def test_readline_basic(self):
body = HTTP2Body(b"Line1\nLine2\nLine3")
assert body.readline() == b"Line1\n"
assert body.readline() == b"Line2\n"
assert body.readline() == b"Line3"
def test_readline_with_size(self):
body = HTTP2Body(b"Hello\nWorld")
assert body.readline(3) == b"Hel"
assert body.readline(10) == b"lo\n"
def test_readline_no_newline(self):
body = HTTP2Body(b"No newline here")
assert body.readline() == b"No newline here"
def test_readline_empty(self):
body = HTTP2Body(b"")
assert body.readline() == b""
def test_readline_crlf(self):
body = HTTP2Body(b"Line1\r\nLine2")
# BytesIO readline includes \r\n
assert body.readline() == b"Line1\r\n"
def test_readlines_basic(self):
body = HTTP2Body(b"Line1\nLine2\nLine3")
lines = body.readlines()
assert lines == [b"Line1\n", b"Line2\n", b"Line3"]
def test_readlines_with_hint(self):
body = HTTP2Body(b"Line1\nLine2\nLine3\nLine4")
# Hint affects how many lines are returned
lines = body.readlines(hint=5)
assert len(lines) >= 1
def test_readlines_empty(self):
body = HTTP2Body(b"")
assert body.readlines() == []
def test_iter(self):
body = HTTP2Body(b"Line1\nLine2\nLine3")
lines = list(body)
assert lines == [b"Line1\n", b"Line2\n", b"Line3"]
def test_len(self):
body = HTTP2Body(b"12345")
assert len(body) == 5
def test_close(self):
body = HTTP2Body(b"test")
body.close()
# Should not raise
with pytest.raises(ValueError):
body.read()
class TestHTTP2BodyReadStrategies:
"""Test different reading strategies matching HTTP/1.x patterns."""
def test_read_all_at_once(self):
data = b"A" * 1000
body = HTTP2Body(data)
result = body.read()
assert result == data
def test_read_chunked(self):
data = b"A" * 100
body = HTTP2Body(data)
chunks = []
while True:
chunk = body.read(10)
if not chunk:
break
chunks.append(chunk)
assert b"".join(chunks) == data
assert len(chunks) == 10
def test_read_byte_by_byte(self):
data = b"Hello"
body = HTTP2Body(data)
result = []
for _ in range(len(data)):
result.append(body.read(1))
assert b"".join(result) == data
def test_readline_all_lines(self):
data = b"Line1\nLine2\nLine3\n"
body = HTTP2Body(data)
lines = []
while True:
line = body.readline()
if not line:
break
lines.append(line)
assert lines == [b"Line1\n", b"Line2\n", b"Line3\n"]
class TestHTTP2Request:
"""Test HTTP2Request class."""
def _make_stream(self, headers, body=b""):
"""Helper to create a stream with headers and body."""
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.receive_headers(headers, end_stream=(len(body) == 0))
if body:
stream.request_body.write(body)
stream.request_complete = True
return stream
def test_basic_get_request(self):
stream = self._make_stream([
(':method', 'GET'),
(':path', '/test'),
(':scheme', 'https'),
(':authority', 'example.com'),
])
cfg = MockConfig()
req = HTTP2Request(stream, cfg, ('127.0.0.1', 12345))
assert req.method == 'GET'
assert req.uri == '/test'
assert req.path == '/test'
assert req.scheme == 'https'
assert req.version == (2, 0)
def test_post_request_with_body(self):
stream = self._make_stream(
[
(':method', 'POST'),
(':path', '/submit'),
(':scheme', 'https'),
(':authority', 'api.example.com'),
('content-type', 'application/json'),
('content-length', '13'),
],
body=b'{"key":"val"}'
)
cfg = MockConfig()
req = HTTP2Request(stream, cfg, ('192.168.1.1', 54321))
assert req.method == 'POST'
assert req.body.read() == b'{"key":"val"}'
assert req.content_type == 'application/json'
assert req.content_length == 13
def test_path_with_query_string(self):
stream = self._make_stream([
(':method', 'GET'),
(':path', '/search?q=test&page=1'),
(':scheme', 'https'),
(':authority', 'example.com'),
])
cfg = MockConfig()
req = HTTP2Request(stream, cfg, ('127.0.0.1', 12345))
assert req.path == '/search'
assert req.query == 'q=test&page=1'
assert req.uri == '/search?q=test&page=1'
def test_path_with_fragment(self):
stream = self._make_stream([
(':method', 'GET'),
(':path', '/page#section'),
(':scheme', 'https'),
(':authority', 'example.com'),
])
cfg = MockConfig()
req = HTTP2Request(stream, cfg, ('127.0.0.1', 12345))
assert req.path == '/page'
assert req.fragment == 'section'
def test_headers_uppercase_conversion(self):
"""HTTP/2 headers are lowercase, should be converted to uppercase."""
stream = self._make_stream([
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'example.com'),
('content-type', 'text/html'),
('accept-language', 'en-US'),
('x-custom-header', 'custom-value'),
])
cfg = MockConfig()
req = HTTP2Request(stream, cfg, ('127.0.0.1', 12345))
header_names = [h[0] for h in req.headers]
assert 'CONTENT-TYPE' in header_names
assert 'ACCEPT-LANGUAGE' in header_names
assert 'X-CUSTOM-HEADER' in header_names
def test_host_header_from_authority(self):
"""Host header should be generated from :authority pseudo-header."""
stream = self._make_stream([
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'test.example.com:8080'),
])
cfg = MockConfig()
req = HTTP2Request(stream, cfg, ('127.0.0.1', 12345))
host = req.get_header('HOST')
assert host == 'test.example.com:8080'
def test_existing_host_header_not_duplicated(self):
"""If Host header exists, don't add from :authority."""
stream = self._make_stream([
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'authority.example.com'),
('host', 'explicit.example.com'),
])
cfg = MockConfig()
req = HTTP2Request(stream, cfg, ('127.0.0.1', 12345))
# Count HOST headers
host_headers = [h for h in req.headers if h[0] == 'HOST']
assert len(host_headers) == 1
assert host_headers[0][1] == 'explicit.example.com'
def test_get_header_case_insensitive(self):
stream = self._make_stream([
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'example.com'),
('x-test-header', 'test-value'),
])
cfg = MockConfig()
req = HTTP2Request(stream, cfg, ('127.0.0.1', 12345))
assert req.get_header('X-TEST-HEADER') == 'test-value'
assert req.get_header('x-test-header') == 'test-value'
assert req.get_header('X-Test-Header') == 'test-value'
def test_get_header_not_found(self):
stream = self._make_stream([
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'example.com'),
])
cfg = MockConfig()
req = HTTP2Request(stream, cfg, ('127.0.0.1', 12345))
assert req.get_header('X-Not-Exists') is None
def test_content_length_property(self):
stream = self._make_stream([
(':method', 'POST'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'example.com'),
('content-length', '42'),
])
cfg = MockConfig()
req = HTTP2Request(stream, cfg, ('127.0.0.1', 12345))
assert req.content_length == 42
def test_content_length_none_when_missing(self):
stream = self._make_stream([
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'example.com'),
])
cfg = MockConfig()
req = HTTP2Request(stream, cfg, ('127.0.0.1', 12345))
assert req.content_length is None
def test_content_length_invalid_value(self):
stream = self._make_stream([
(':method', 'POST'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'example.com'),
('content-length', 'not-a-number'),
])
cfg = MockConfig()
req = HTTP2Request(stream, cfg, ('127.0.0.1', 12345))
assert req.content_length is None
def test_content_type_property(self):
stream = self._make_stream([
(':method', 'POST'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'example.com'),
('content-type', 'application/json; charset=utf-8'),
])
cfg = MockConfig()
req = HTTP2Request(stream, cfg, ('127.0.0.1', 12345))
assert req.content_type == 'application/json; charset=utf-8'
def test_content_type_none_when_missing(self):
stream = self._make_stream([
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'example.com'),
])
cfg = MockConfig()
req = HTTP2Request(stream, cfg, ('127.0.0.1', 12345))
assert req.content_type is None
class TestHTTP2RequestConnectionState:
"""Test connection state methods."""
def _make_stream(self, headers):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.receive_headers(headers, end_stream=True)
return stream
def test_should_close_default_false(self):
"""HTTP/2 connections are persistent by default."""
stream = self._make_stream([
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'example.com'),
])
cfg = MockConfig()
req = HTTP2Request(stream, cfg, ('127.0.0.1', 12345))
assert req.should_close() is False
def test_force_close(self):
stream = self._make_stream([
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'example.com'),
])
cfg = MockConfig()
req = HTTP2Request(stream, cfg, ('127.0.0.1', 12345))
req.force_close()
assert req.should_close() is True
assert req.must_close is True
class TestHTTP2RequestTrailers:
"""Test request trailers handling."""
def test_no_trailers(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.receive_headers([
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'example.com'),
], end_stream=True)
cfg = MockConfig()
req = HTTP2Request(stream, cfg, ('127.0.0.1', 12345))
assert req.trailers == []
def test_with_trailers(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.receive_headers([
(':method', 'POST'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'example.com'),
], end_stream=False)
stream.state = stream.state # Keep state
stream.trailers = [
('grpc-status', '0'),
('grpc-message', 'OK'),
]
cfg = MockConfig()
req = HTTP2Request(stream, cfg, ('127.0.0.1', 12345))
assert len(req.trailers) == 2
assert ('GRPC-STATUS', '0') in req.trailers
assert ('GRPC-MESSAGE', 'OK') in req.trailers
class TestHTTP2RequestMetadata:
"""Test request metadata properties."""
def _make_stream(self, headers, stream_id=1):
conn = MockConnection()
stream = HTTP2Stream(stream_id=stream_id, connection=conn)
stream.receive_headers(headers, end_stream=True)
return stream
def test_version_is_http2(self):
stream = self._make_stream([
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'example.com'),
])
cfg = MockConfig()
req = HTTP2Request(stream, cfg, ('127.0.0.1', 12345))
assert req.version == (2, 0)
def test_req_number_is_stream_id(self):
stream = self._make_stream([
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'example.com'),
], stream_id=5)
cfg = MockConfig()
req = HTTP2Request(stream, cfg, ('127.0.0.1', 12345))
assert req.req_number == 5
def test_peer_addr(self):
stream = self._make_stream([
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'example.com'),
])
cfg = MockConfig()
req = HTTP2Request(stream, cfg, ('10.0.0.1', 54321))
assert req.peer_addr == ('10.0.0.1', 54321)
assert req.remote_addr == ('10.0.0.1', 54321)
def test_proxy_protocol_info_none(self):
"""HTTP/2 doesn't use proxy protocol through data stream."""
stream = self._make_stream([
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'example.com'),
])
cfg = MockConfig()
req = HTTP2Request(stream, cfg, ('127.0.0.1', 12345))
assert req.proxy_protocol_info is None
class TestHTTP2RequestRepr:
"""Test request string representation."""
def test_repr_format(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=3, connection=conn)
stream.receive_headers([
(':method', 'POST'),
(':path', '/api/users'),
(':scheme', 'https'),
(':authority', 'example.com'),
], end_stream=True)
cfg = MockConfig()
req = HTTP2Request(stream, cfg, ('127.0.0.1', 12345))
repr_str = repr(req)
assert "HTTP2Request" in repr_str
assert "method=POST" in repr_str
assert "path=/api/users" in repr_str
assert "stream_id=3" in repr_str
class TestHTTP2RequestDefaults:
"""Test default values when pseudo-headers are missing."""
def test_default_method(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.receive_headers([
(':path', '/'),
(':scheme', 'https'),
(':authority', 'example.com'),
], end_stream=True)
cfg = MockConfig()
req = HTTP2Request(stream, cfg, ('127.0.0.1', 12345))
assert req.method == 'GET'
def test_default_scheme(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.receive_headers([
(':method', 'GET'),
(':path', '/'),
(':authority', 'example.com'),
], end_stream=True)
cfg = MockConfig()
req = HTTP2Request(stream, cfg, ('127.0.0.1', 12345))
assert req.scheme == 'https'
def test_default_path(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.receive_headers([
(':method', 'GET'),
(':scheme', 'https'),
(':authority', 'example.com'),
], end_stream=True)
cfg = MockConfig()
req = HTTP2Request(stream, cfg, ('127.0.0.1', 12345))
assert req.uri == '/'
assert req.path == '/'

629
tests/test_http2_stream.py Normal file
View File

@ -0,0 +1,629 @@
# -*- coding: utf-8 -
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
"""Tests for HTTP/2 stream state management."""
import pytest
from unittest import mock
from gunicorn.http2.stream import HTTP2Stream, StreamState
from gunicorn.http2.errors import HTTP2StreamError
class MockConnection:
"""Mock HTTP/2 connection for testing streams."""
def __init__(self, initial_window_size=65535):
self.initial_window_size = initial_window_size
class TestStreamState:
"""Test StreamState enum values."""
def test_state_values_exist(self):
assert StreamState.IDLE is not None
assert StreamState.RESERVED_LOCAL is not None
assert StreamState.RESERVED_REMOTE is not None
assert StreamState.OPEN is not None
assert StreamState.HALF_CLOSED_LOCAL is not None
assert StreamState.HALF_CLOSED_REMOTE is not None
assert StreamState.CLOSED is not None
def test_states_are_unique(self):
states = [
StreamState.IDLE,
StreamState.RESERVED_LOCAL,
StreamState.RESERVED_REMOTE,
StreamState.OPEN,
StreamState.HALF_CLOSED_LOCAL,
StreamState.HALF_CLOSED_REMOTE,
StreamState.CLOSED,
]
assert len(states) == len(set(states))
class TestHTTP2StreamInitialization:
"""Test stream initialization."""
def test_basic_init(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
assert stream.stream_id == 1
assert stream.connection is conn
assert stream.state == StreamState.IDLE
assert stream.request_headers == []
assert stream.request_complete is False
assert stream.response_started is False
assert stream.response_headers_sent is False
assert stream.response_complete is False
assert stream.window_size == 65535
assert stream.trailers is None
def test_custom_window_size(self):
conn = MockConnection(initial_window_size=32768)
stream = HTTP2Stream(stream_id=3, connection=conn)
assert stream.window_size == 32768
class TestStreamIdProperties:
"""Test stream ID classification properties."""
def test_is_client_stream_odd_ids(self):
conn = MockConnection()
for stream_id in [1, 3, 5, 7, 99, 101]:
stream = HTTP2Stream(stream_id=stream_id, connection=conn)
assert stream.is_client_stream is True
assert stream.is_server_stream is False
def test_is_server_stream_even_ids(self):
conn = MockConnection()
for stream_id in [2, 4, 6, 8, 100, 102]:
stream = HTTP2Stream(stream_id=stream_id, connection=conn)
assert stream.is_client_stream is False
assert stream.is_server_stream is True
def test_stream_id_zero(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=0, connection=conn)
assert stream.is_client_stream is False
assert stream.is_server_stream is True
class TestCanReceiveProperty:
"""Test can_receive property."""
def test_can_receive_in_open_state(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.state = StreamState.OPEN
assert stream.can_receive is True
def test_can_receive_in_half_closed_local(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.state = StreamState.HALF_CLOSED_LOCAL
assert stream.can_receive is True
def test_cannot_receive_in_idle(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
assert stream.state == StreamState.IDLE
assert stream.can_receive is False
def test_cannot_receive_in_half_closed_remote(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.state = StreamState.HALF_CLOSED_REMOTE
assert stream.can_receive is False
def test_cannot_receive_in_closed(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.state = StreamState.CLOSED
assert stream.can_receive is False
class TestCanSendProperty:
"""Test can_send property."""
def test_can_send_in_open_state(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.state = StreamState.OPEN
assert stream.can_send is True
def test_can_send_in_half_closed_remote(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.state = StreamState.HALF_CLOSED_REMOTE
assert stream.can_send is True
def test_cannot_send_in_idle(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
assert stream.state == StreamState.IDLE
assert stream.can_send is False
def test_cannot_send_in_half_closed_local(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.state = StreamState.HALF_CLOSED_LOCAL
assert stream.can_send is False
def test_cannot_send_in_closed(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.state = StreamState.CLOSED
assert stream.can_send is False
class TestReceiveHeaders:
"""Test receive_headers method."""
def test_receive_headers_from_idle(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
headers = [(':method', 'GET'), (':path', '/')]
stream.receive_headers(headers, end_stream=False)
assert stream.state == StreamState.OPEN
assert stream.request_headers == headers
assert stream.request_complete is False
def test_receive_headers_with_end_stream(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
headers = [(':method', 'GET'), (':path', '/')]
stream.receive_headers(headers, end_stream=True)
assert stream.state == StreamState.HALF_CLOSED_REMOTE
assert stream.request_complete is True
def test_receive_headers_in_open_state(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.state = StreamState.OPEN
headers = [('content-type', 'text/plain')]
stream.receive_headers(headers, end_stream=False)
assert stream.state == StreamState.OPEN
assert stream.request_headers == headers
def test_receive_headers_extends_existing(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.receive_headers([(':method', 'POST')], end_stream=False)
stream.receive_headers([('content-type', 'text/plain')], end_stream=False)
assert len(stream.request_headers) == 2
def test_receive_headers_in_invalid_state(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.state = StreamState.CLOSED
with pytest.raises(HTTP2StreamError) as exc_info:
stream.receive_headers([], end_stream=False)
assert exc_info.value.stream_id == 1
class TestReceiveData:
"""Test receive_data method."""
def test_receive_data_in_open_state(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.state = StreamState.OPEN
stream.receive_data(b"Hello, World!", end_stream=False)
assert stream.request_body.getvalue() == b"Hello, World!"
assert stream.request_complete is False
def test_receive_data_with_end_stream(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.state = StreamState.OPEN
stream.receive_data(b"Final data", end_stream=True)
assert stream.state == StreamState.HALF_CLOSED_REMOTE
assert stream.request_complete is True
def test_receive_data_accumulates(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.state = StreamState.OPEN
stream.receive_data(b"Part1")
stream.receive_data(b"Part2")
stream.receive_data(b"Part3", end_stream=True)
assert stream.request_body.getvalue() == b"Part1Part2Part3"
def test_receive_data_in_half_closed_local(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.state = StreamState.HALF_CLOSED_LOCAL
stream.receive_data(b"data", end_stream=False)
assert stream.request_body.getvalue() == b"data"
def test_receive_data_in_invalid_state(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.state = StreamState.HALF_CLOSED_REMOTE
with pytest.raises(HTTP2StreamError) as exc_info:
stream.receive_data(b"data", end_stream=False)
assert exc_info.value.stream_id == 1
class TestReceiveTrailers:
"""Test receive_trailers method."""
def test_receive_trailers_in_open_state(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.state = StreamState.OPEN
trailers = [('grpc-status', '0')]
stream.receive_trailers(trailers)
assert stream.trailers == trailers
assert stream.state == StreamState.HALF_CLOSED_REMOTE
assert stream.request_complete is True
def test_receive_trailers_in_invalid_state(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.state = StreamState.CLOSED
with pytest.raises(HTTP2StreamError):
stream.receive_trailers([])
class TestSendHeaders:
"""Test send_headers method."""
def test_send_headers_in_open_state(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.state = StreamState.OPEN
headers = [(':status', '200')]
stream.send_headers(headers, end_stream=False)
assert stream.response_started is True
assert stream.response_headers_sent is True
assert stream.response_complete is False
assert stream.state == StreamState.OPEN
def test_send_headers_with_end_stream(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.state = StreamState.OPEN
stream.send_headers([(':status', '204')], end_stream=True)
assert stream.state == StreamState.HALF_CLOSED_LOCAL
assert stream.response_complete is True
def test_send_headers_in_half_closed_remote(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.state = StreamState.HALF_CLOSED_REMOTE
stream.send_headers([(':status', '200')], end_stream=False)
assert stream.response_headers_sent is True
def test_send_headers_in_invalid_state(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.state = StreamState.HALF_CLOSED_LOCAL
with pytest.raises(HTTP2StreamError):
stream.send_headers([], end_stream=False)
class TestSendData:
"""Test send_data method."""
def test_send_data_in_open_state(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.state = StreamState.OPEN
stream.send_data(b"Response body", end_stream=False)
assert stream.response_complete is False
def test_send_data_with_end_stream(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.state = StreamState.OPEN
stream.send_data(b"Final", end_stream=True)
assert stream.state == StreamState.HALF_CLOSED_LOCAL
assert stream.response_complete is True
def test_send_data_in_half_closed_remote(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.state = StreamState.HALF_CLOSED_REMOTE
stream.send_data(b"data", end_stream=True)
assert stream.state == StreamState.CLOSED
def test_send_data_in_invalid_state(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.state = StreamState.CLOSED
with pytest.raises(HTTP2StreamError):
stream.send_data(b"data", end_stream=False)
class TestStreamReset:
"""Test stream reset method."""
def test_reset_default_error_code(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.state = StreamState.OPEN
stream.reset()
assert stream.state == StreamState.CLOSED
assert stream.response_complete is True
assert stream.request_complete is True
def test_reset_custom_error_code(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.state = StreamState.OPEN
stream.reset(error_code=0x1) # PROTOCOL_ERROR
assert stream.state == StreamState.CLOSED
class TestStreamClose:
"""Test stream close method."""
def test_close_stream(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.state = StreamState.OPEN
stream.close()
assert stream.state == StreamState.CLOSED
assert stream.response_complete is True
assert stream.request_complete is True
class TestHalfCloseTransitions:
"""Test half-close state transitions."""
def test_half_close_local_from_open(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.state = StreamState.OPEN
stream._half_close_local()
assert stream.state == StreamState.HALF_CLOSED_LOCAL
def test_half_close_local_from_half_closed_remote(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.state = StreamState.HALF_CLOSED_REMOTE
stream._half_close_local()
assert stream.state == StreamState.CLOSED
def test_half_close_local_invalid_state(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.state = StreamState.IDLE
with pytest.raises(HTTP2StreamError):
stream._half_close_local()
def test_half_close_remote_from_open(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.state = StreamState.OPEN
stream._half_close_remote()
assert stream.state == StreamState.HALF_CLOSED_REMOTE
def test_half_close_remote_from_half_closed_local(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.state = StreamState.HALF_CLOSED_LOCAL
stream._half_close_remote()
assert stream.state == StreamState.CLOSED
def test_half_close_remote_invalid_state(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.state = StreamState.IDLE
with pytest.raises(HTTP2StreamError):
stream._half_close_remote()
class TestGetRequestBody:
"""Test get_request_body method."""
def test_get_empty_body(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
assert stream.get_request_body() == b""
def test_get_body_after_data(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.state = StreamState.OPEN
stream.receive_data(b"Test body content")
assert stream.get_request_body() == b"Test body content"
class TestGetPseudoHeaders:
"""Test get_pseudo_headers method."""
def test_extract_pseudo_headers(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.request_headers = [
(':method', 'POST'),
(':path', '/api/test'),
(':scheme', 'https'),
(':authority', 'example.com'),
('content-type', 'application/json'),
('accept', '*/*'),
]
pseudo = stream.get_pseudo_headers()
assert pseudo == {
':method': 'POST',
':path': '/api/test',
':scheme': 'https',
':authority': 'example.com',
}
def test_empty_pseudo_headers(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.request_headers = [
('content-type', 'text/plain'),
]
pseudo = stream.get_pseudo_headers()
assert pseudo == {}
class TestGetRegularHeaders:
"""Test get_regular_headers method."""
def test_extract_regular_headers(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.request_headers = [
(':method', 'GET'),
(':path', '/'),
('content-type', 'text/html'),
('accept-language', 'en-US'),
]
regular = stream.get_regular_headers()
assert regular == [
('content-type', 'text/html'),
('accept-language', 'en-US'),
]
def test_no_regular_headers(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.request_headers = [
(':method', 'GET'),
(':path', '/'),
]
regular = stream.get_regular_headers()
assert regular == []
class TestStreamRepr:
"""Test stream string representation."""
def test_repr_format(self):
conn = MockConnection()
stream = HTTP2Stream(stream_id=5, connection=conn)
repr_str = repr(stream)
assert "HTTP2Stream" in repr_str
assert "id=5" in repr_str
assert "state=IDLE" in repr_str
assert "req_complete=False" in repr_str
assert "resp_complete=False" in repr_str
class TestFullStreamLifecycle:
"""Test complete stream lifecycles."""
def test_simple_get_request(self):
"""Test a simple GET request lifecycle."""
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
# Receive request headers (GET with end_stream)
stream.receive_headers([
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'example.com'),
], end_stream=True)
assert stream.state == StreamState.HALF_CLOSED_REMOTE
assert stream.request_complete is True
# Send response headers with body
stream.send_headers([(':status', '200')], end_stream=False)
assert stream.state == StreamState.HALF_CLOSED_REMOTE
# Send response body
stream.send_data(b"Hello!", end_stream=True)
assert stream.state == StreamState.CLOSED
assert stream.response_complete is True
def test_post_request_with_body(self):
"""Test a POST request with body."""
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
# Receive request headers
stream.receive_headers([
(':method', 'POST'),
(':path', '/submit'),
('content-type', 'application/json'),
], end_stream=False)
assert stream.state == StreamState.OPEN
# Receive body data
stream.receive_data(b'{"key": "value"}', end_stream=True)
assert stream.state == StreamState.HALF_CLOSED_REMOTE
assert stream.get_request_body() == b'{"key": "value"}'
# Send response
stream.send_headers([(':status', '201')], end_stream=False)
stream.send_data(b'Created', end_stream=True)
assert stream.state == StreamState.CLOSED
def test_stream_reset_lifecycle(self):
"""Test a stream that gets reset."""
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.receive_headers([(':method', 'GET'), (':path', '/')], end_stream=False)
assert stream.state == StreamState.OPEN
# Reset the stream
stream.reset(error_code=0x8) # CANCEL
assert stream.state == StreamState.CLOSED
assert stream.request_complete is True
assert stream.response_complete is True