mirror of
https://github.com/frappe/gunicorn.git
synced 2026-07-01 10:11:30 +08:00
New test files covering areas identified as gaps compared to Daphne and Uvicorn test coverage: - test_asgi_header_security.py: Header validation, normalization, injection prevention - test_asgi_error_handling.py: Application errors, body receiver errors, graceful shutdown - test_asgi_protocol_http.py: HTTP connection management, chunked encoding, methods, scope building - test_asgi_websocket_enhanced.py: WebSocket message limits, connection rejection, subprotocols - test_asgi_lifespan.py: Lifespan message formats and behavior - test_asgi_forwarded_headers.py: X-Forwarded-* and proxy header handling
374 lines
11 KiB
Python
374 lines
11 KiB
Python
#
|
|
# This file is part of gunicorn released under the MIT license.
|
|
# See the NOTICE for more information.
|
|
|
|
"""
|
|
ASGI header security tests.
|
|
|
|
Tests for header validation, normalization, and injection prevention
|
|
to ensure secure HTTP header handling per ASGI 3.0 and RFC 9110/9112.
|
|
"""
|
|
|
|
import pytest
|
|
|
|
from gunicorn.asgi.parser import (
|
|
PythonProtocol,
|
|
InvalidHeader,
|
|
ParseError,
|
|
)
|
|
|
|
|
|
# ============================================================================
|
|
# Header Name Validation Tests
|
|
# ============================================================================
|
|
|
|
class TestHeaderNameValidation:
|
|
"""Test validation of HTTP header names."""
|
|
|
|
def test_valid_header_name_accepted(self):
|
|
"""Valid header names should be accepted."""
|
|
parser = PythonProtocol()
|
|
|
|
parser.feed(
|
|
b"GET /test HTTP/1.1\r\n"
|
|
b"Host: localhost\r\n"
|
|
b"X-Custom-Header: value\r\n"
|
|
b"Accept-Language: en-US\r\n"
|
|
b"\r\n"
|
|
)
|
|
|
|
assert parser.is_complete
|
|
|
|
def test_header_name_with_null_rejected(self):
|
|
"""Header name containing null byte must be rejected."""
|
|
parser = PythonProtocol()
|
|
|
|
with pytest.raises((InvalidHeader, ParseError)):
|
|
parser.feed(
|
|
b"GET /test HTTP/1.1\r\n"
|
|
b"Host: localhost\r\n"
|
|
b"X-Bad\x00Header: value\r\n"
|
|
b"\r\n"
|
|
)
|
|
|
|
def test_header_name_with_cr_rejected(self):
|
|
"""Header name containing CR must be rejected."""
|
|
parser = PythonProtocol()
|
|
|
|
with pytest.raises((InvalidHeader, ParseError)):
|
|
parser.feed(
|
|
b"GET /test HTTP/1.1\r\n"
|
|
b"Host: localhost\r\n"
|
|
b"X-Bad\rHeader: value\r\n"
|
|
b"\r\n"
|
|
)
|
|
|
|
def test_header_name_with_lf_rejected(self):
|
|
"""Header name containing LF must be rejected."""
|
|
parser = PythonProtocol()
|
|
|
|
with pytest.raises((InvalidHeader, ParseError)):
|
|
parser.feed(
|
|
b"GET /test HTTP/1.1\r\n"
|
|
b"Host: localhost\r\n"
|
|
b"X-Bad\nHeader: value\r\n"
|
|
b"\r\n"
|
|
)
|
|
|
|
def test_empty_header_name_rejected(self):
|
|
"""Empty header name must be rejected."""
|
|
parser = PythonProtocol()
|
|
|
|
with pytest.raises((InvalidHeader, ParseError)):
|
|
parser.feed(
|
|
b"GET /test HTTP/1.1\r\n"
|
|
b"Host: localhost\r\n"
|
|
b": value\r\n"
|
|
b"\r\n"
|
|
)
|
|
|
|
|
|
# ============================================================================
|
|
# Header Value Validation Tests
|
|
# ============================================================================
|
|
|
|
class TestHeaderValueValidation:
|
|
"""Test validation of HTTP header values."""
|
|
|
|
def test_header_value_with_bare_cr_rejected(self):
|
|
"""Header value containing bare CR must be rejected."""
|
|
parser = PythonProtocol()
|
|
|
|
# Bare CR (not followed by LF) in header value should be rejected
|
|
with pytest.raises((InvalidHeader, ParseError)):
|
|
parser.feed(
|
|
b"GET /test HTTP/1.1\r\n"
|
|
b"Host: localhost\r\n"
|
|
b"X-Bad: value\rmore\r\n"
|
|
b"\r\n"
|
|
)
|
|
|
|
def test_header_value_with_bare_lf_rejected(self):
|
|
"""Header value containing bare LF must be rejected."""
|
|
parser = PythonProtocol()
|
|
|
|
# Bare LF (not preceded by CR) in header value should be rejected
|
|
with pytest.raises((InvalidHeader, ParseError)):
|
|
parser.feed(
|
|
b"GET /test HTTP/1.1\r\n"
|
|
b"Host: localhost\r\n"
|
|
b"X-Bad: value\nmore\r\n"
|
|
b"\r\n"
|
|
)
|
|
|
|
def test_header_value_special_characters_allowed(self):
|
|
"""Header values may contain special printable characters."""
|
|
parser = PythonProtocol()
|
|
|
|
parser.feed(
|
|
b"GET /test HTTP/1.1\r\n"
|
|
b"Host: localhost\r\n"
|
|
b"Authorization: Bearer abc123!@#$%^&*()_+\r\n"
|
|
b"Cookie: session=abc; path=/; domain=.example.com\r\n"
|
|
b"\r\n"
|
|
)
|
|
|
|
assert parser.is_complete
|
|
|
|
def test_header_value_with_tab_allowed(self):
|
|
"""Horizontal tab in header value is allowed (OWS)."""
|
|
parser = PythonProtocol()
|
|
|
|
parser.feed(
|
|
b"GET /test HTTP/1.1\r\n"
|
|
b"Host: localhost\r\n"
|
|
b"X-Tabs: value1\tvalue2\r\n"
|
|
b"\r\n"
|
|
)
|
|
|
|
assert parser.is_complete
|
|
|
|
|
|
# ============================================================================
|
|
# Header Normalization Tests
|
|
# ============================================================================
|
|
|
|
class TestHeaderNormalization:
|
|
"""Test HTTP header normalization per ASGI spec."""
|
|
|
|
def _create_protocol(self):
|
|
"""Create an ASGIProtocol instance for testing."""
|
|
from gunicorn.asgi.protocol import ASGIProtocol
|
|
from gunicorn.config import Config
|
|
from unittest import mock
|
|
|
|
worker = mock.Mock()
|
|
worker.cfg = Config()
|
|
worker.log = mock.Mock()
|
|
worker.asgi = mock.Mock()
|
|
|
|
return ASGIProtocol(worker)
|
|
|
|
def _create_mock_request(self, headers=None):
|
|
"""Create a mock HTTP request with headers."""
|
|
from unittest import mock
|
|
|
|
request = mock.Mock()
|
|
request.method = "GET"
|
|
request.path = "/"
|
|
request.raw_path = b"/"
|
|
request.query = ""
|
|
request.version = (1, 1)
|
|
request.scheme = "http"
|
|
request.headers = headers or []
|
|
return request
|
|
|
|
def test_headers_lowercased_in_scope(self):
|
|
"""Header names must be lowercased in ASGI scope."""
|
|
protocol = self._create_protocol()
|
|
request = self._create_mock_request(
|
|
headers=[
|
|
("CONTENT-TYPE", "application/json"),
|
|
("X-CUSTOM-HEADER", "value"),
|
|
]
|
|
)
|
|
|
|
scope = protocol._build_http_scope(request, None, None)
|
|
|
|
for name, _ in scope["headers"]:
|
|
assert name == name.lower(), f"Header name should be lowercase: {name}"
|
|
|
|
def test_header_names_are_bytes(self):
|
|
"""Header names in scope must be bytes."""
|
|
protocol = self._create_protocol()
|
|
request = self._create_mock_request(
|
|
headers=[
|
|
("Content-Type", "text/plain"),
|
|
]
|
|
)
|
|
|
|
scope = protocol._build_http_scope(request, None, None)
|
|
|
|
for name, _ in scope["headers"]:
|
|
assert isinstance(name, bytes), f"Header name should be bytes: {type(name)}"
|
|
|
|
def test_header_values_are_bytes(self):
|
|
"""Header values in scope must be bytes."""
|
|
protocol = self._create_protocol()
|
|
request = self._create_mock_request(
|
|
headers=[
|
|
("Content-Type", "text/plain"),
|
|
]
|
|
)
|
|
|
|
scope = protocol._build_http_scope(request, None, None)
|
|
|
|
for _, value in scope["headers"]:
|
|
assert isinstance(value, bytes), f"Header value should be bytes: {type(value)}"
|
|
|
|
def test_header_order_preserved(self):
|
|
"""Order of headers should be preserved."""
|
|
protocol = self._create_protocol()
|
|
request = self._create_mock_request(
|
|
headers=[
|
|
("First", "1"),
|
|
("Second", "2"),
|
|
("Third", "3"),
|
|
]
|
|
)
|
|
|
|
scope = protocol._build_http_scope(request, None, None)
|
|
|
|
header_names = [name for name, _ in scope["headers"]]
|
|
assert header_names == [b"first", b"second", b"third"]
|
|
|
|
|
|
# ============================================================================
|
|
# Oversized Header Tests
|
|
# ============================================================================
|
|
|
|
class TestOversizedHeaders:
|
|
"""Test rejection of oversized headers."""
|
|
|
|
def test_oversized_header_value_handled(self):
|
|
"""Very large header values should be handled safely."""
|
|
parser = PythonProtocol()
|
|
|
|
# Parser should handle large headers without crashing
|
|
# The limit is configurable - test the parser doesn't crash
|
|
large_value = b"x" * 8192
|
|
|
|
try:
|
|
parser.feed(
|
|
b"GET /test HTTP/1.1\r\n"
|
|
b"Host: localhost\r\n"
|
|
b"X-Large: " + large_value + b"\r\n"
|
|
b"\r\n"
|
|
)
|
|
# Either succeeds or raises appropriate error
|
|
except (InvalidHeader, ParseError):
|
|
# Rejection is acceptable for very large headers
|
|
pass
|
|
|
|
def test_many_headers_handled(self):
|
|
"""Request with many headers should be handled safely."""
|
|
parser = PythonProtocol()
|
|
|
|
# Build request with many headers
|
|
headers = b"".join(
|
|
f"X-Header-{i}: value{i}\r\n".encode()
|
|
for i in range(100)
|
|
)
|
|
|
|
try:
|
|
parser.feed(
|
|
b"GET /test HTTP/1.1\r\n"
|
|
b"Host: localhost\r\n" +
|
|
headers +
|
|
b"\r\n"
|
|
)
|
|
# May succeed if within limits
|
|
except (InvalidHeader, ParseError):
|
|
# Rejection is acceptable for many headers
|
|
pass
|
|
|
|
|
|
# ============================================================================
|
|
# Host Header Validation Tests
|
|
# ============================================================================
|
|
|
|
class TestHostHeaderValidation:
|
|
"""Test Host header validation."""
|
|
|
|
def test_valid_host_header_accepted(self):
|
|
"""Valid Host header should be accepted."""
|
|
parser = PythonProtocol()
|
|
|
|
parser.feed(
|
|
b"GET /test HTTP/1.1\r\n"
|
|
b"Host: example.com\r\n"
|
|
b"\r\n"
|
|
)
|
|
|
|
assert parser.is_complete
|
|
|
|
def test_host_header_with_port_accepted(self):
|
|
"""Host header with port should be accepted."""
|
|
parser = PythonProtocol()
|
|
|
|
parser.feed(
|
|
b"GET /test HTTP/1.1\r\n"
|
|
b"Host: example.com:8080\r\n"
|
|
b"\r\n"
|
|
)
|
|
|
|
assert parser.is_complete
|
|
|
|
def test_ipv6_host_header_accepted(self):
|
|
"""IPv6 Host header should be accepted."""
|
|
parser = PythonProtocol()
|
|
|
|
parser.feed(
|
|
b"GET /test HTTP/1.1\r\n"
|
|
b"Host: [::1]:8080\r\n"
|
|
b"\r\n"
|
|
)
|
|
|
|
assert parser.is_complete
|
|
|
|
|
|
# ============================================================================
|
|
# Content-Type Header Tests
|
|
# ============================================================================
|
|
|
|
class TestContentTypeHeader:
|
|
"""Test Content-Type header handling."""
|
|
|
|
def test_content_type_with_charset(self):
|
|
"""Content-Type with charset parameter should work."""
|
|
parser = PythonProtocol()
|
|
|
|
parser.feed(
|
|
b"POST /test HTTP/1.1\r\n"
|
|
b"Host: localhost\r\n"
|
|
b"Content-Type: text/html; charset=utf-8\r\n"
|
|
b"Content-Length: 0\r\n"
|
|
b"\r\n"
|
|
)
|
|
|
|
assert parser.is_complete
|
|
|
|
def test_content_type_multipart(self):
|
|
"""Multipart Content-Type should work."""
|
|
parser = PythonProtocol()
|
|
|
|
parser.feed(
|
|
b"POST /test HTTP/1.1\r\n"
|
|
b"Host: localhost\r\n"
|
|
b"Content-Type: multipart/form-data; boundary=----WebKitFormBoundary\r\n"
|
|
b"Content-Length: 0\r\n"
|
|
b"\r\n"
|
|
)
|
|
|
|
assert parser.is_complete
|