gunicorn/tests/test_asgi_streaming.py
Benoit Chesneau 41ec7527db fix: keep Content-Length on HEAD and 304 responses
RFC 9110 §6.4.2 forbids Content-Length only on 1xx and 204 responses.
HEAD MAY include the Content-Length the same GET would return, and 304
MAY include the Content-Length the unconditional response would carry.
WSGI preserves app-supplied Content-Length on those statuses; ASGI was
stripping it indiscriminately for any no-body response.

Split the predicate: _response_forbids_content_length() returns True
only for 1xx/204; _strip_body_framing_headers(headers, status) always
strips Transfer-Encoding (no body, no chunked terminator) and strips
Content-Length only when forbidden.
2026-05-03 22:32:28 +02:00

582 lines
19 KiB
Python

#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
"""
ASGI streaming response tests.
Tests for chunked transfer encoding, Server-Sent Events (SSE),
and streaming response handling.
"""
from unittest import mock
import pytest
from gunicorn.config import Config
# ============================================================================
# Chunked Transfer Encoding Tests
# ============================================================================
class TestChunkedTransferEncoding:
"""Tests for HTTP/1.1 chunked transfer encoding."""
def test_chunked_encoding_format(self):
"""Test chunked encoding format: size in hex + CRLF + data + CRLF."""
body = b"Hello"
chunk = f"{len(body):x}\r\n".encode("latin-1") + body + b"\r\n"
assert chunk == b"5\r\nHello\r\n"
def test_chunked_encoding_large_chunk(self):
"""Test chunked encoding with larger data."""
body = b"x" * 1000
chunk = f"{len(body):x}\r\n".encode("latin-1") + body + b"\r\n"
# 1000 in hex is 3e8
assert chunk.startswith(b"3e8\r\n")
assert chunk.endswith(b"\r\n")
def test_chunked_encoding_terminal_chunk(self):
"""Test terminal chunk (zero-length)."""
terminal = b"0\r\n\r\n"
# Parse it
assert terminal == b"0\r\n\r\n"
def test_chunked_encoding_empty_chunk(self):
"""Test encoding empty body chunk."""
body = b""
chunk = f"{len(body):x}\r\n".encode("latin-1") + body + b"\r\n"
assert chunk == b"0\r\n\r\n"
def test_chunked_encoding_multiple_chunks(self):
"""Test multiple chunks in sequence."""
chunks = []
# First chunk
body1 = b"Hello, "
chunks.append(f"{len(body1):x}\r\n".encode() + body1 + b"\r\n")
# Second chunk
body2 = b"World!"
chunks.append(f"{len(body2):x}\r\n".encode() + body2 + b"\r\n")
# Terminal chunk
chunks.append(b"0\r\n\r\n")
full_response = b"".join(chunks)
assert b"7\r\nHello, \r\n" in full_response
assert b"6\r\nWorld!\r\n" in full_response
assert full_response.endswith(b"0\r\n\r\n")
# ============================================================================
# ASGI Streaming Response Tests
# ============================================================================
class TestASGIStreamingResponse:
"""Tests for ASGI streaming response handling."""
def test_streaming_response_more_body_true(self):
"""Test streaming response with more_body=True."""
messages = [
{
"type": "http.response.body",
"body": b"chunk1",
"more_body": True,
},
{
"type": "http.response.body",
"body": b"chunk2",
"more_body": True,
},
{
"type": "http.response.body",
"body": b"chunk3",
"more_body": False,
},
]
assert messages[0]["more_body"] is True
assert messages[1]["more_body"] is True
assert messages[2]["more_body"] is False
def test_streaming_response_empty_final_chunk(self):
"""Test streaming response with empty final chunk."""
final_message = {
"type": "http.response.body",
"body": b"",
"more_body": False,
}
assert final_message["body"] == b""
assert final_message["more_body"] is False
def test_response_start_without_content_length(self):
"""Test response start without Content-Length triggers chunked encoding."""
# When Content-Length is missing, HTTP/1.1 should use chunked encoding
message = {
"type": "http.response.start",
"status": 200,
"headers": [
(b"content-type", b"text/plain"),
# No content-length header
],
}
# Check no content-length in headers
header_names = [name.lower() for name, _ in message["headers"]]
assert b"content-length" not in header_names
# ============================================================================
# Server-Sent Events (SSE) Format Tests
# ============================================================================
class TestSSEFormat:
"""Tests for Server-Sent Events format."""
def test_sse_data_event(self):
"""Test SSE data event format."""
data = "Hello, SSE!"
event = f"data: {data}\n\n"
assert event == "data: Hello, SSE!\n\n"
def test_sse_named_event(self):
"""Test SSE named event format."""
event_name = "message"
data = "Hello"
event = f"event: {event_name}\ndata: {data}\n\n"
assert "event: message\n" in event
assert "data: Hello\n" in event
assert event.endswith("\n\n")
def test_sse_event_with_id(self):
"""Test SSE event with ID."""
event_id = "12345"
data = "Some data"
event = f"id: {event_id}\ndata: {data}\n\n"
assert "id: 12345\n" in event
def test_sse_multiline_data(self):
"""Test SSE multiline data."""
lines = ["line1", "line2", "line3"]
data_lines = "\n".join(f"data: {line}" for line in lines)
event = f"{data_lines}\n\n"
assert event == "data: line1\ndata: line2\ndata: line3\n\n"
def test_sse_retry_directive(self):
"""Test SSE retry directive."""
retry_ms = 3000
directive = f"retry: {retry_ms}\n\n"
assert directive == "retry: 3000\n\n"
def test_sse_comment(self):
"""Test SSE comment (keep-alive)."""
comment = ": keep-alive\n\n"
assert comment.startswith(":")
def test_sse_content_type(self):
"""Test SSE Content-Type header."""
headers = [
(b"content-type", b"text/event-stream"),
(b"cache-control", b"no-cache"),
(b"connection", b"keep-alive"),
]
content_type = dict(headers).get(b"content-type")
assert content_type == b"text/event-stream"
# ============================================================================
# Protocol Send Body Tests
# ============================================================================
class TestProtocolSendBody:
"""Tests for ASGIProtocol._send_body method."""
def _create_protocol(self):
"""Create an ASGIProtocol instance for testing."""
from gunicorn.asgi.protocol import ASGIProtocol
worker = mock.Mock()
worker.cfg = Config()
worker.log = mock.Mock()
worker.asgi = mock.Mock()
protocol = ASGIProtocol(worker)
protocol.transport = mock.Mock()
return protocol
def test_send_body_without_chunking(self):
"""Test sending body without chunked encoding."""
protocol = self._create_protocol()
protocol._send_body(b"Hello, World!", chunked=False)
protocol.transport.write.assert_called_once_with(b"Hello, World!")
def test_send_body_with_chunking(self):
"""Test sending body with chunked encoding."""
protocol = self._create_protocol()
protocol._send_body(b"Hello", chunked=True)
# Should write: "5\r\nHello\r\n"
protocol.transport.write.assert_called_once()
call_arg = protocol.transport.write.call_args[0][0]
assert call_arg == b"5\r\nHello\r\n"
def test_send_body_empty_without_chunking(self):
"""Test sending empty body without chunked encoding."""
protocol = self._create_protocol()
protocol._send_body(b"", chunked=False)
# Empty body should not write anything
protocol.transport.write.assert_not_called()
def test_send_body_empty_with_chunking(self):
"""Test sending empty body with chunked encoding."""
protocol = self._create_protocol()
protocol._send_body(b"", chunked=True)
# Empty body should not write (terminal chunk handled separately)
protocol.transport.write.assert_not_called()
# ============================================================================
# Content-Length Detection Tests
# ============================================================================
class TestContentLengthDetection:
"""Tests for Content-Length header detection."""
def test_has_content_length_bytes(self):
"""Test detecting Content-Length header (bytes)."""
headers = [
(b"content-type", b"text/plain"),
(b"content-length", b"100"),
]
has_cl = any(
name.lower() == b"content-length"
for name, _ in headers
)
assert has_cl is True
def test_has_content_length_string(self):
"""Test detecting Content-Length header (string)."""
headers = [
("content-type", "text/plain"),
("content-length", "100"),
]
has_cl = any(
name.lower() == "content-length"
for name, _ in headers
)
assert has_cl is True
def test_no_content_length(self):
"""Test when Content-Length is missing."""
headers = [
(b"content-type", b"text/plain"),
]
has_cl = any(
name.lower() == b"content-length"
for name, _ in headers
)
assert has_cl is False
def test_content_length_case_insensitive(self):
"""Test Content-Length detection is case-insensitive."""
headers = [
(b"Content-Length", b"100"),
]
has_cl = any(
name.lower() == b"content-length"
for name, _ in headers
)
assert has_cl is True
# ============================================================================
# HTTP Version Check for Chunked Encoding
# ============================================================================
class TestHTTPVersionForChunked:
"""Tests for HTTP version requirements for chunked encoding."""
def test_http11_supports_chunked(self):
"""Test HTTP/1.1 supports chunked encoding."""
version = (1, 1)
supports_chunked = version >= (1, 1)
assert supports_chunked is True
def test_http10_no_chunked(self):
"""Test HTTP/1.0 does not support chunked encoding."""
version = (1, 0)
supports_chunked = version >= (1, 1)
assert supports_chunked is False
def test_http2_no_chunked(self):
"""Test HTTP/2 doesn't use chunked encoding (uses framing)."""
# HTTP/2 has its own framing mechanism
version = (2, 0)
# Chunked encoding is not used in HTTP/2
uses_http1_chunked = version[0] == 1 and version >= (1, 1)
assert uses_http1_chunked is False
# ============================================================================
# No-Body Response Tests (RFC 9110)
# ============================================================================
class TestResponseOmitsBody:
"""Verify HEAD/1xx/204/304 are flagged as bodyless responses."""
def _omits(self, method, status):
from gunicorn.asgi.protocol import ASGIProtocol
return ASGIProtocol._response_omits_body(method, status)
def test_head_omits_body(self):
assert self._omits("HEAD", 200) is True
assert self._omits("HEAD", 500) is True
def test_204_omits_body(self):
assert self._omits("GET", 204) is True
assert self._omits("POST", 204) is True
def test_304_omits_body(self):
assert self._omits("GET", 304) is True
def test_informational_omits_body(self):
assert self._omits("GET", 100) is True
assert self._omits("GET", 103) is True
assert self._omits("GET", 199) is True
def test_get_200_has_body(self):
assert self._omits("GET", 200) is False
def test_post_200_has_body(self):
assert self._omits("POST", 200) is False
def test_404_has_body(self):
assert self._omits("GET", 404) is False
class TestStripBodyFramingHeaders:
"""Verify the framing-header strip honours RFC 9110 §6.4.2:
Transfer-Encoding is always stripped on no-body responses; Content-Length
is stripped only when the status forbids it (1xx, 204), not for HEAD or 304.
"""
def _strip(self, headers, status):
from gunicorn.asgi.protocol import ASGIProtocol
return ASGIProtocol._strip_body_framing_headers(headers, status)
def test_204_strips_both_lowercase_bytes(self):
result = self._strip([
(b"content-type", b"text/plain"),
(b"content-length", b"5"),
(b"transfer-encoding", b"chunked"),
], 204)
assert result == [(b"content-type", b"text/plain")]
def test_103_strips_both_mixed_case_str(self):
result = self._strip([
("Content-Type", "text/plain"),
("Content-Length", "5"),
("Transfer-Encoding", "chunked"),
], 103)
assert result == [("Content-Type", "text/plain")]
def test_304_keeps_content_length_strips_te(self):
result = self._strip([
(b"etag", b"\"abc\""),
(b"content-length", b"42"),
(b"transfer-encoding", b"chunked"),
], 304)
assert result == [(b"etag", b"\"abc\""), (b"content-length", b"42")]
def test_head_response_keeps_content_length_strips_te(self):
# The caller passes the response status; HEAD responses are detected
# via request.method, but the strip itself only sees status. Verify
# a 200 status preserves Content-Length even though the strip is
# invoked for a HEAD request.
result = self._strip([
(b"content-length", b"1024"),
(b"transfer-encoding", b"chunked"),
], 200)
assert result == [(b"content-length", b"1024")]
def test_preserves_unrelated_headers(self):
headers = [(b"x-custom", b"value"), (b"server", b"gunicorn")]
assert self._strip(headers, 204) == headers
class TestResponseForbidsContentLength:
"""Verify the 1xx/204 forbid-rule (RFC 9110 §6.4.2) is encoded correctly."""
def _forbids(self, status):
from gunicorn.asgi.protocol import ASGIProtocol
return ASGIProtocol._response_forbids_content_length(status)
def test_204(self):
assert self._forbids(204) is True
def test_1xx(self):
assert self._forbids(100) is True
assert self._forbids(103) is True
assert self._forbids(199) is True
def test_304_allowed(self):
assert self._forbids(304) is False
def test_200_allowed(self):
assert self._forbids(200) is False
# ============================================================================
# Streaming Response Message Sequence Tests
# ============================================================================
class TestStreamingMessageSequence:
"""Tests for valid streaming response message sequences."""
def test_valid_sequence_single_body(self):
"""Test valid sequence: start -> body (more_body=False)."""
messages = [
{"type": "http.response.start", "status": 200, "headers": []},
{"type": "http.response.body", "body": b"Hello", "more_body": False},
]
# First message should be start
assert messages[0]["type"] == "http.response.start"
# Last body message should have more_body=False
assert messages[-1]["type"] == "http.response.body"
assert messages[-1]["more_body"] is False
def test_valid_sequence_multiple_bodies(self):
"""Test valid sequence: start -> body (more=True) -> body (more=False)."""
messages = [
{"type": "http.response.start", "status": 200, "headers": []},
{"type": "http.response.body", "body": b"chunk1", "more_body": True},
{"type": "http.response.body", "body": b"chunk2", "more_body": True},
{"type": "http.response.body", "body": b"", "more_body": False},
]
# Verify sequence
assert messages[0]["type"] == "http.response.start"
assert all(m["more_body"] for m in messages[1:-1])
assert messages[-1]["more_body"] is False
def test_valid_sequence_with_informational(self):
"""Test valid sequence with informational response."""
messages = [
{
"type": "http.response.informational",
"status": 103,
"headers": [(b"link", b"</style.css>; rel=preload")],
},
{"type": "http.response.start", "status": 200, "headers": []},
{"type": "http.response.body", "body": b"Hello", "more_body": False},
]
# Informational before start is valid
assert messages[0]["type"] == "http.response.informational"
assert messages[1]["type"] == "http.response.start"
# ============================================================================
# Large Response Tests
# ============================================================================
class TestLargeResponses:
"""Tests for handling large responses."""
def test_chunk_size_encoding(self):
"""Test chunk size encoding for various sizes."""
test_cases = [
(1, b"1\r\n"),
(10, b"a\r\n"),
(15, b"f\r\n"),
(16, b"10\r\n"),
(255, b"ff\r\n"),
(256, b"100\r\n"),
(1024, b"400\r\n"),
(65535, b"ffff\r\n"),
(1048576, b"100000\r\n"), # 1MB
]
for size, expected in test_cases:
chunk_header = f"{size:x}\r\n".encode("latin-1")
assert chunk_header == expected, f"Failed for size {size}"
def test_megabyte_chunk(self):
"""Test encoding 1MB chunk."""
size = 1024 * 1024 # 1MB
body = b"x" * size
chunk = f"{len(body):x}\r\n".encode("latin-1") + body + b"\r\n"
# Verify structure
assert chunk.startswith(b"100000\r\n") # 1MB in hex
assert chunk.endswith(b"\r\n")
# Total size: header (8) + body (1048576) + trailer (2)
assert len(chunk) == 8 + 1048576 + 2
# ============================================================================
# Transfer-Encoding Header Tests
# ============================================================================
class TestTransferEncodingHeader:
"""Tests for Transfer-Encoding header handling."""
def test_transfer_encoding_chunked(self):
"""Test Transfer-Encoding: chunked header."""
headers = [(b"transfer-encoding", b"chunked")]
te_header = dict(headers).get(b"transfer-encoding")
assert te_header == b"chunked"
def test_add_transfer_encoding_to_headers(self):
"""Test adding Transfer-Encoding header to response."""
headers = [
(b"content-type", b"text/plain"),
]
# Add chunked encoding
headers = list(headers) + [(b"transfer-encoding", b"chunked")]
header_names = [name for name, _ in headers]
assert b"transfer-encoding" in header_names
def test_no_content_length_with_transfer_encoding(self):
"""Test Content-Length should not be present with Transfer-Encoding."""
# Per HTTP spec, Content-Length must be ignored if Transfer-Encoding present
headers = [
(b"content-type", b"text/plain"),
(b"transfer-encoding", b"chunked"),
]
header_names = [name for name, _ in headers]
assert b"content-length" not in header_names