fix(http2): achieve 100% h2spec compliance (146/146 tests)

- Send GOAWAY with correct error codes for protocol violations
- Handle StreamClosedError and FlowControlError gracefully
- Return False instead of raising for missing/closed streams
- Handle flow control window overflow per RFC 7540
- Fix reader race condition and add h2 exception handling
- Wait for WINDOW_UPDATE when flow control window is zero/negative
- Use h2 exception's error_code for INITIAL_WINDOW_SIZE violations
This commit is contained in:
Benoit Chesneau 2026-01-27 15:42:42 +01:00
parent fa5e319f15
commit 4e3245a0df
6 changed files with 840 additions and 114 deletions

View File

@ -66,7 +66,9 @@ class ASGIProtocol(asyncio.Protocol):
if ssl_object and hasattr(ssl_object, 'selected_alpn_protocol'):
alpn = ssl_object.selected_alpn_protocol()
if alpn == 'h2':
# HTTP/2 connection - switch to HTTP/2 handler
# HTTP/2 connection - create reader immediately to avoid race condition
# data_received may be called before _handle_http2_connection starts
self.reader = asyncio.StreamReader()
self._task = self.worker.loop.create_task(
self._handle_http2_connection(transport, ssl_object)
)
@ -548,8 +550,9 @@ class ASGIProtocol(asyncio.Protocol):
peername = transport.get_extra_info('peername')
sockname = transport.get_extra_info('sockname')
# Create async reader/writer from transport
reader = asyncio.StreamReader()
# Use the reader created in connection_made
# (data_received feeds data to self.reader)
reader = self.reader
protocol = asyncio.StreamReaderProtocol(reader)
writer = asyncio.StreamWriter(
transport, protocol, reader, self.worker.loop
@ -561,8 +564,6 @@ class ASGIProtocol(asyncio.Protocol):
)
await h2_conn.initiate_connection()
# Store for data_received
self.reader = reader
self._h2_conn = h2_conn
# Main loop - receive and handle requests

View File

@ -14,7 +14,7 @@ import asyncio
from .errors import (
HTTP2Error, HTTP2ProtocolError, HTTP2ConnectionError,
HTTP2NotAvailable,
HTTP2NotAvailable, HTTP2ErrorCode,
)
from .stream import HTTP2Stream
from .request import HTTP2Request
@ -152,6 +152,30 @@ class AsyncHTTP2Connection:
try:
events = self.h2_conn.receive_data(data)
except _h2_exceptions.ProtocolError as e:
# Send GOAWAY with PROTOCOL_ERROR before raising
await self.close(error_code=HTTP2ErrorCode.PROTOCOL_ERROR)
raise HTTP2ProtocolError(str(e))
except _h2_exceptions.FlowControlError as e:
# Send GOAWAY with FLOW_CONTROL_ERROR
await self.close(error_code=HTTP2ErrorCode.FLOW_CONTROL_ERROR)
raise HTTP2ProtocolError(str(e))
except _h2_exceptions.FrameTooLargeError as e:
# Send GOAWAY with FRAME_SIZE_ERROR
await self.close(error_code=HTTP2ErrorCode.FRAME_SIZE_ERROR)
raise HTTP2ProtocolError(str(e))
except _h2_exceptions.InvalidSettingsValueError as e:
# Use error_code from h2 exception (RFC 7540 Section 6.5.2):
# INITIAL_WINDOW_SIZE > 2^31-1 gives FLOW_CONTROL_ERROR
# Other invalid settings give PROTOCOL_ERROR
error_code = getattr(e, 'error_code', None)
if error_code is not None:
await self.close(error_code=error_code)
else:
await self.close(error_code=HTTP2ErrorCode.PROTOCOL_ERROR)
raise HTTP2ProtocolError(str(e))
except _h2_exceptions.TooManyStreamsError as e:
# Send GOAWAY with REFUSED_STREAM
await self.close(error_code=HTTP2ErrorCode.REFUSED_STREAM)
raise HTTP2ProtocolError(str(e))
# Process events
@ -229,10 +253,19 @@ class AsyncHTTP2Connection:
# Increment flow control windows (only if data received)
if len(data) > 0:
# Update stream-level window
self.h2_conn.increment_flow_control_window(len(data), stream_id=stream_id)
# Update connection-level window
self.h2_conn.increment_flow_control_window(len(data), stream_id=None)
try:
# Update stream-level window
self.h2_conn.increment_flow_control_window(len(data), stream_id=stream_id)
# Update connection-level window
self.h2_conn.increment_flow_control_window(len(data), stream_id=None)
except (ValueError, _h2_exceptions.FlowControlError):
# Window overflow - prepare GOAWAY with FLOW_CONTROL_ERROR
# (will be sent by receive_data's _send_pending_data call)
self._closed = True
try:
self.h2_conn.close_connection(error_code=HTTP2ErrorCode.FLOW_CONTROL_ERROR)
except Exception:
pass
return None
@ -324,10 +357,14 @@ class AsyncHTTP2Connection:
status: HTTP status code (int)
headers: List of (name, value) header tuples
body: Optional response body bytes
Returns:
bool: True if response sent, False if stream was already closed
"""
stream = self.streams.get(stream_id)
if stream is None:
raise HTTP2Error(f"Stream {stream_id} not found")
# Stream was already cleaned up (reset/closed) - return gracefully
return False
# Build response headers with :status pseudo-header
response_headers = [(':status', str(status))]
@ -336,14 +373,21 @@ class AsyncHTTP2Connection:
end_stream = body is None or len(body) == 0
# Send headers
self.h2_conn.send_headers(stream_id, response_headers, end_stream=end_stream)
stream.send_headers(response_headers, end_stream=end_stream)
await self._send_pending_data()
try:
# Send headers
self.h2_conn.send_headers(stream_id, response_headers, end_stream=end_stream)
stream.send_headers(response_headers, end_stream=end_stream)
await self._send_pending_data()
# Send body if present
if body and len(body) > 0:
await self.send_data(stream_id, body, end_stream=True)
# Send body if present
if body and len(body) > 0:
await self.send_data(stream_id, body, end_stream=True)
return True
except _h2_exceptions.StreamClosedError:
# Stream was reset by client - clean up gracefully
stream.close()
self.cleanup_stream(stream_id)
return False
async def send_data(self, stream_id, data, end_stream=False):
"""Send data on a stream.
@ -352,38 +396,87 @@ class AsyncHTTP2Connection:
stream_id: The stream ID
data: Body data bytes
end_stream: Whether this ends the stream
Returns:
bool: True if data sent, False if stream was already closed
"""
stream = self.streams.get(stream_id)
if stream is None:
raise HTTP2Error(f"Stream {stream_id} not found")
# Stream was already cleaned up (reset/closed) - return gracefully
return False
# Send data in chunks respecting flow control and max frame size
data_to_send = data
while data_to_send:
# Get available window size for this stream
available = self.h2_conn.local_flow_control_window(stream_id)
# Also respect max frame size
chunk_size = min(available, self.max_frame_size, len(data_to_send))
if chunk_size <= 0:
# No window available, send what we have and wait
await self._send_pending_data()
# Try again - the window might open after ACKs
try:
while data_to_send:
# Get available window size for this stream
available = self.h2_conn.local_flow_control_window(stream_id)
if available <= 0:
# Still no window, just try to send a small chunk
chunk_size = min(self.max_frame_size, len(data_to_send))
# Also respect max frame size
chunk_size = min(available, self.max_frame_size, len(data_to_send))
chunk = data_to_send[:chunk_size]
data_to_send = data_to_send[chunk_size:]
if chunk_size <= 0:
# No window available - must wait for WINDOW_UPDATE
# Per RFC 7540 Section 6.9.2, we must track negative windows
# and wait for WINDOW_UPDATE to make the window positive
await self._send_pending_data()
# Only set end_stream on the final chunk
is_final = end_stream and len(data_to_send) == 0
# Wait for incoming WINDOW_UPDATE by reading from connection
# Use a reasonable timeout to avoid blocking forever
max_wait_attempts = 50 # ~5 seconds at 100ms per attempt
for _ in range(max_wait_attempts):
available = self.h2_conn.local_flow_control_window(stream_id)
if available > 0:
break
self.h2_conn.send_data(stream_id, chunk, end_stream=is_final)
await self._send_pending_data()
# Read more data from connection (may receive WINDOW_UPDATE)
try:
incoming = await asyncio.wait_for(
self.reader.read(self.READ_BUFFER_SIZE),
timeout=0.1
)
if incoming:
events = self.h2_conn.receive_data(incoming)
# Process events but don't create new requests
for event in events:
if isinstance(event, _h2_events.StreamReset):
if event.stream_id == stream_id:
return False
elif isinstance(event, _h2_events.ConnectionTerminated):
self._closed = True
return False
await self._send_pending_data()
else:
# Connection closed
self._closed = True
return False
except asyncio.TimeoutError:
continue
except _h2_exceptions.ProtocolError:
return False
stream.send_data(data, end_stream=end_stream)
# Re-check window after waiting
available = self.h2_conn.local_flow_control_window(stream_id)
if available <= 0:
# Still no window after waiting - give up
return False
chunk_size = min(available, self.max_frame_size, len(data_to_send))
chunk = data_to_send[:chunk_size]
data_to_send = data_to_send[chunk_size:]
# Only set end_stream on the final chunk
is_final = end_stream and len(data_to_send) == 0
self.h2_conn.send_data(stream_id, chunk, end_stream=is_final)
await self._send_pending_data()
stream.send_data(data, end_stream=end_stream)
return True
except (_h2_exceptions.StreamClosedError, _h2_exceptions.FlowControlError):
# Stream was reset by client or flow control error - clean up gracefully
stream.close()
self.cleanup_stream(stream_id)
return False
async def send_trailers(self, stream_id, trailers):
"""Send trailing headers on a stream.
@ -397,12 +490,17 @@ class AsyncHTTP2Connection:
Raises:
HTTP2Error: If stream not found, headers not sent, or pseudo-headers used
Returns:
bool: True if trailers sent, False if stream was already closed
"""
stream = self.streams.get(stream_id)
if stream is None:
raise HTTP2Error(f"Stream {stream_id} not found")
# Stream was already cleaned up (reset/closed) - return gracefully
return False
if not stream.response_headers_sent:
raise HTTP2Error("Must send headers before trailers")
# Can't send trailers without headers - return False
return False
# Validate and normalize trailer headers
trailer_headers = []
@ -412,10 +510,17 @@ class AsyncHTTP2Connection:
raise HTTP2Error(f"Pseudo-header '{name}' not allowed in trailers")
trailer_headers.append((lname, str(value)))
# Send trailers with end_stream=True
self.h2_conn.send_headers(stream_id, trailer_headers, end_stream=True)
stream.send_trailers(trailer_headers)
await self._send_pending_data()
try:
# Send trailers with end_stream=True
self.h2_conn.send_headers(stream_id, trailer_headers, end_stream=True)
stream.send_trailers(trailer_headers)
await self._send_pending_data()
return True
except _h2_exceptions.StreamClosedError:
# Stream was reset by client - clean up gracefully
stream.close()
self.cleanup_stream(stream_id)
return False
async def send_error(self, stream_id, status_code, message=None):
"""Send an error response on a stream."""

View File

@ -13,7 +13,7 @@ from io import BytesIO
from .errors import (
HTTP2Error, HTTP2ProtocolError, HTTP2ConnectionError,
HTTP2NotAvailable,
HTTP2NotAvailable, HTTP2ErrorCode,
)
from .stream import HTTP2Stream
from .request import HTTP2Request
@ -146,6 +146,30 @@ class HTTP2ServerConnection:
try:
events = self.h2_conn.receive_data(data)
except _h2_exceptions.ProtocolError as e:
# Send GOAWAY with PROTOCOL_ERROR before raising
self.close(error_code=HTTP2ErrorCode.PROTOCOL_ERROR)
raise HTTP2ProtocolError(str(e))
except _h2_exceptions.FlowControlError as e:
# Send GOAWAY with FLOW_CONTROL_ERROR
self.close(error_code=HTTP2ErrorCode.FLOW_CONTROL_ERROR)
raise HTTP2ProtocolError(str(e))
except _h2_exceptions.FrameTooLargeError as e:
# Send GOAWAY with FRAME_SIZE_ERROR
self.close(error_code=HTTP2ErrorCode.FRAME_SIZE_ERROR)
raise HTTP2ProtocolError(str(e))
except _h2_exceptions.InvalidSettingsValueError as e:
# Use error_code from h2 exception (RFC 7540 Section 6.5.2):
# INITIAL_WINDOW_SIZE > 2^31-1 gives FLOW_CONTROL_ERROR
# Other invalid settings give PROTOCOL_ERROR
error_code = getattr(e, 'error_code', None)
if error_code is not None:
self.close(error_code=error_code)
else:
self.close(error_code=HTTP2ErrorCode.PROTOCOL_ERROR)
raise HTTP2ProtocolError(str(e))
except _h2_exceptions.TooManyStreamsError as e:
# Send GOAWAY with REFUSED_STREAM
self.close(error_code=HTTP2ErrorCode.REFUSED_STREAM)
raise HTTP2ProtocolError(str(e))
# Process events
@ -236,12 +260,16 @@ class HTTP2ServerConnection:
# Increment flow control windows (only if data received)
if len(data) > 0:
# Update stream-level window
self.h2_conn.increment_flow_control_window(len(data), stream_id=stream_id)
# Update connection-level window
self.h2_conn.increment_flow_control_window(len(data), stream_id=None)
# Send WINDOW_UPDATE frames immediately
self._send_pending_data()
try:
# Update stream-level window
self.h2_conn.increment_flow_control_window(len(data), stream_id=stream_id)
# Update connection-level window
self.h2_conn.increment_flow_control_window(len(data), stream_id=None)
# Send WINDOW_UPDATE frames immediately
self._send_pending_data()
except (ValueError, _h2_exceptions.FlowControlError):
# Window overflow - send FLOW_CONTROL_ERROR and close
self.close(error_code=HTTP2ErrorCode.FLOW_CONTROL_ERROR)
return None
@ -365,10 +393,14 @@ class HTTP2ServerConnection:
Raises:
HTTP2Error: If stream not found or in invalid state
Returns:
bool: True if response sent, False if stream was already closed
"""
stream = self.streams.get(stream_id)
if stream is None:
raise HTTP2Error(f"Stream {stream_id} not found")
# Stream was already cleaned up (reset/closed) - return gracefully
return False
# Build response headers with :status pseudo-header
response_headers = [(':status', str(status))]
@ -378,14 +410,21 @@ class HTTP2ServerConnection:
end_stream = body is None or len(body) == 0
# Send headers
self.h2_conn.send_headers(stream_id, response_headers, end_stream=end_stream)
stream.send_headers(response_headers, end_stream=end_stream)
self._send_pending_data()
try:
# Send headers
self.h2_conn.send_headers(stream_id, response_headers, end_stream=end_stream)
stream.send_headers(response_headers, end_stream=end_stream)
self._send_pending_data()
# Send body if present
if body and len(body) > 0:
self.send_data(stream_id, body, end_stream=True)
# Send body if present
if body and len(body) > 0:
self.send_data(stream_id, body, end_stream=True)
return True
except _h2_exceptions.StreamClosedError:
# Stream was reset by client - clean up gracefully
stream.close()
self.cleanup_stream(stream_id)
return False
def send_data(self, stream_id, data, end_stream=False):
"""Send data on a stream.
@ -395,40 +434,98 @@ class HTTP2ServerConnection:
data: Body data bytes
end_stream: Whether this ends the stream
Raises:
HTTP2Error: If stream not found or in invalid state
Returns:
bool: True if data sent, False if stream was already closed
"""
import selectors
stream = self.streams.get(stream_id)
if stream is None:
raise HTTP2Error(f"Stream {stream_id} not found")
# Stream was already cleaned up (reset/closed) - return gracefully
return False
# Send data in chunks respecting flow control and max frame size
data_to_send = data
while data_to_send:
# Get available window size for this stream
available = self.h2_conn.local_flow_control_window(stream_id)
# Also respect max frame size
chunk_size = min(available, self.max_frame_size, len(data_to_send))
if chunk_size <= 0:
# No window available, send what we have and wait
self._send_pending_data()
# Try again - the window might open after ACKs
try:
while data_to_send:
# Get available window size for this stream
available = self.h2_conn.local_flow_control_window(stream_id)
if available <= 0:
# Still no window, just try to send a small chunk
chunk_size = min(self.max_frame_size, len(data_to_send))
# Also respect max frame size
chunk_size = min(available, self.max_frame_size, len(data_to_send))
chunk = data_to_send[:chunk_size]
data_to_send = data_to_send[chunk_size:]
if chunk_size <= 0:
# No window available - must wait for WINDOW_UPDATE
# Per RFC 7540 Section 6.9.2, we must track negative windows
# and wait for WINDOW_UPDATE to make the window positive
self._send_pending_data()
# Only set end_stream on the final chunk
is_final = end_stream and len(data_to_send) == 0
# Wait for incoming WINDOW_UPDATE by reading from connection
# Use selectors for portable non-blocking wait
max_wait_attempts = 50 # ~5 seconds at 100ms per attempt
try:
sel = selectors.DefaultSelector()
sel.register(self.sock, selectors.EVENT_READ)
except (TypeError, ValueError):
# Socket doesn't support selectors (e.g., mock socket)
# Fall back to returning False immediately
return False
self.h2_conn.send_data(stream_id, chunk, end_stream=is_final)
self._send_pending_data()
try:
for _ in range(max_wait_attempts):
available = self.h2_conn.local_flow_control_window(stream_id)
if available > 0:
break
stream.send_data(data, end_stream=end_stream)
# Check if socket has data ready
ready = sel.select(timeout=0.1)
if ready:
try:
incoming = self.sock.recv(self.READ_BUFFER_SIZE)
if incoming:
events = self.h2_conn.receive_data(incoming)
# Process events but don't create new requests
for event in events:
if isinstance(event, _h2_events.StreamReset):
if event.stream_id == stream_id:
return False
elif isinstance(event, _h2_events.ConnectionTerminated):
self._closed = True
return False
self._send_pending_data()
else:
# Connection closed
self._closed = True
return False
except (OSError, IOError):
return False
except _h2_exceptions.ProtocolError:
return False
finally:
sel.close()
# Re-check window after waiting
available = self.h2_conn.local_flow_control_window(stream_id)
if available <= 0:
# Still no window after waiting - give up
return False
chunk_size = min(available, self.max_frame_size, len(data_to_send))
chunk = data_to_send[:chunk_size]
data_to_send = data_to_send[chunk_size:]
# Only set end_stream on the final chunk
is_final = end_stream and len(data_to_send) == 0
self.h2_conn.send_data(stream_id, chunk, end_stream=is_final)
self._send_pending_data()
stream.send_data(data, end_stream=end_stream)
return True
except (_h2_exceptions.StreamClosedError, _h2_exceptions.FlowControlError):
# Stream was reset by client or flow control error - clean up gracefully
stream.close()
self.cleanup_stream(stream_id)
return False
def send_trailers(self, stream_id, trailers):
"""Send trailing headers on a stream.
@ -442,14 +539,17 @@ class HTTP2ServerConnection:
Raises:
HTTP2Error: If stream not found, headers not sent, or pseudo-headers used
"""
from .errors import HTTP2Error
Returns:
bool: True if trailers sent, False if stream was already closed
"""
stream = self.streams.get(stream_id)
if stream is None:
raise HTTP2Error(f"Stream {stream_id} not found")
# Stream was already cleaned up (reset/closed) - return gracefully
return False
if not stream.response_headers_sent:
raise HTTP2Error("Must send headers before trailers")
# Can't send trailers without headers - return False
return False
# Validate and normalize trailer headers
trailer_headers = []
@ -459,10 +559,17 @@ class HTTP2ServerConnection:
raise HTTP2Error(f"Pseudo-header '{name}' not allowed in trailers")
trailer_headers.append((lname, str(value)))
# Send trailers with end_stream=True
self.h2_conn.send_headers(stream_id, trailer_headers, end_stream=True)
stream.send_trailers(trailer_headers)
self._send_pending_data()
try:
# Send trailers with end_stream=True
self.h2_conn.send_headers(stream_id, trailer_headers, end_stream=True)
stream.send_trailers(trailer_headers)
self._send_pending_data()
return True
except _h2_exceptions.StreamClosedError:
# Stream was reset by client - clean up gracefully
stream.close()
self.cleanup_stream(stream_id)
return False
def send_error(self, stream_id, status_code, message=None):
"""Send an error response on a stream.

View File

@ -10,6 +10,25 @@ These exceptions map to HTTP/2 error codes defined in RFC 7540.
"""
class HTTP2ErrorCode:
"""HTTP/2 Error Codes (RFC 7540 Section 7)."""
NO_ERROR = 0x0
PROTOCOL_ERROR = 0x1
INTERNAL_ERROR = 0x2
FLOW_CONTROL_ERROR = 0x3
SETTINGS_TIMEOUT = 0x4
STREAM_CLOSED = 0x5
FRAME_SIZE_ERROR = 0x6
REFUSED_STREAM = 0x7
CANCEL = 0x8
COMPRESSION_ERROR = 0x9
CONNECT_ERROR = 0xa
ENHANCE_YOUR_CALM = 0xb
INADEQUATE_SECURITY = 0xc
HTTP_1_1_REQUIRED = 0xd
class HTTP2Error(Exception):
"""Base exception for HTTP/2 errors."""
@ -128,6 +147,7 @@ class HTTP2NotAvailable(HTTP2Error):
__all__ = [
'HTTP2ErrorCode',
'HTTP2Error',
'HTTP2ProtocolError',
'HTTP2InternalError',

View File

@ -305,6 +305,7 @@ class TestAsyncHTTP2ConnectionSendResponse:
@pytest.mark.asyncio
async def test_send_response_invalid_stream(self):
"""Test that sending response on invalid stream returns False."""
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
@ -313,8 +314,9 @@ class TestAsyncHTTP2ConnectionSendResponse:
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
await conn.initiate_connection()
with pytest.raises(HTTP2Error):
await conn.send_response(stream_id=999, status=200, headers=[], body=None)
# Sending to a non-existent stream should return False gracefully
result = await conn.send_response(stream_id=999, status=200, headers=[], body=None)
assert result is False
class TestAsyncHTTP2ConnectionSendData:
@ -726,8 +728,8 @@ class TestAsyncHTTP2ConnectionTrailers:
assert "Pseudo-header" in str(exc_info.value)
@pytest.mark.asyncio
async def test_send_trailers_without_headers_raises(self):
"""Test that sending trailers without headers raises error."""
async def test_send_trailers_without_headers_returns_false(self):
"""Test that sending trailers without headers returns False."""
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
@ -750,7 +752,265 @@ class TestAsyncHTTP2ConnectionTrailers:
reader.set_data(client_conn.data_to_send())
await conn.receive_data()
# Try to send trailers without sending headers first
with pytest.raises(HTTP2Error) as exc_info:
await conn.send_trailers(1, [('trailer', 'value')])
assert "Must send headers before trailers" in str(exc_info.value)
# Try to send trailers without sending headers first - should return False
result = await conn.send_trailers(1, [('trailer', 'value')])
assert result is False
class TestAsyncHTTP2FlowControl:
"""Test async HTTP/2 flow control handling."""
@pytest.mark.asyncio
async def test_send_data_respects_zero_window(self):
"""Test that send_data returns False when flow control window is 0."""
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
# Create client and send preface
client_conn = create_client_connection()
reader.set_data(client_conn.data_to_send())
await conn.initiate_connection()
await conn.receive_data()
# Send a request
client_conn.send_headers(1, [
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'localhost'),
], end_stream=True)
reader.set_data(client_conn.data_to_send())
await conn.receive_data()
# Send response headers without ending stream
conn.h2_conn.send_headers(1, [
(':status', '200'),
('content-type', 'text/plain'),
], end_stream=False)
await conn._send_pending_data()
conn.streams[1].send_headers([(':status', '200')], end_stream=False)
# Mock the flow control window to return 0
original_window = conn.h2_conn.local_flow_control_window
conn.h2_conn.local_flow_control_window = lambda stream_id: 0
# Try to send data - should return False (not raise)
result = await conn.send_data(1, b'Hello, World!')
assert result is False
# Restore
conn.h2_conn.local_flow_control_window = original_window
@pytest.mark.asyncio
async def test_send_data_respects_flow_control(self):
"""Test that send_data chunks data according to flow control window."""
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
# Create client and send preface
client_conn = create_client_connection()
reader.set_data(client_conn.data_to_send())
await conn.initiate_connection()
await conn.receive_data()
# Send a request
client_conn.send_headers(1, [
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'localhost'),
], end_stream=True)
reader.set_data(client_conn.data_to_send())
await conn.receive_data()
# Send response headers without ending stream
conn.h2_conn.send_headers(1, [
(':status', '200'),
('content-type', 'text/plain'),
], end_stream=False)
await conn._send_pending_data()
conn.streams[1].send_headers([(':status', '200')], end_stream=False)
# Send small data - should succeed within window
small_data = b'Hello'
await conn.send_data(1, small_data, end_stream=True)
# Verify data was sent
sent_data = writer.get_written_data()
assert len(sent_data) > 0
class TestAsyncHTTP2StreamClosedHandling:
"""Test graceful handling of StreamClosedError in async connection."""
@pytest.mark.asyncio
async def test_send_response_on_closed_stream(self):
"""Test that send_response gracefully handles closed stream."""
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
# Create client and send preface
client_conn = create_client_connection()
reader.set_data(client_conn.data_to_send())
await conn.initiate_connection()
await conn.receive_data()
# Send a request
client_conn.send_headers(1, [
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'localhost'),
], end_stream=True)
reader.set_data(client_conn.data_to_send())
await conn.receive_data()
# Simulate client resetting the stream
client_conn.reset_stream(1)
reader.set_data(client_conn.data_to_send())
await conn.receive_data()
# Try to send response - should return False, not raise
result = await conn.send_response(1, 200, [('content-type', 'text/plain')], b'Hello')
assert result is False
@pytest.mark.asyncio
async def test_send_data_on_reset_stream(self):
"""Test that send_data gracefully handles reset stream."""
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
# Create client and send preface
client_conn = create_client_connection()
reader.set_data(client_conn.data_to_send())
await conn.initiate_connection()
await conn.receive_data()
# Send a request
client_conn.send_headers(1, [
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'localhost'),
], end_stream=True)
reader.set_data(client_conn.data_to_send())
await conn.receive_data()
# Send response headers without ending stream
conn.h2_conn.send_headers(1, [
(':status', '200'),
('content-type', 'text/plain'),
], end_stream=False)
await conn._send_pending_data()
conn.streams[1].send_headers([(':status', '200')], end_stream=False)
# Simulate client resetting the stream
client_conn.reset_stream(1)
reader.set_data(client_conn.data_to_send())
await conn.receive_data()
# Try to send data - should return False, not raise
result = await conn.send_data(1, b'Hello, World!', end_stream=True)
assert result is False
class TestAsyncHTTP2WindowOverflowHandling:
"""Test window overflow handling in async connection."""
@pytest.mark.asyncio
async def test_window_overflow_sends_goaway(self):
"""Test that window overflow results in connection close."""
from gunicorn.http2.async_connection import AsyncHTTP2Connection
from gunicorn.http2.errors import HTTP2ErrorCode
cfg = MockConfig()
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
# Create client and send preface
client_conn = create_client_connection()
reader.set_data(client_conn.data_to_send())
await conn.initiate_connection()
await conn.receive_data()
# Mock increment_flow_control_window to raise ValueError (overflow)
def raise_overflow(increment, stream_id=None):
raise ValueError("Flow control window too large")
conn.h2_conn.increment_flow_control_window = raise_overflow
# Send a request with data to trigger the overflow
client_conn.send_headers(1, [
(':method', 'POST'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'localhost'),
], end_stream=False)
client_conn.send_data(1, b'test data', end_stream=True)
reader.set_data(client_conn.data_to_send())
await conn.receive_data()
# Connection should be closed with FLOW_CONTROL_ERROR
assert conn.is_closed is True
class TestAsyncHTTP2ProtocolErrorHandling:
"""Test protocol error handling sends proper GOAWAY."""
@pytest.mark.asyncio
async def test_protocol_error_sends_goaway(self):
"""Test that protocol errors result in GOAWAY being sent."""
from gunicorn.http2.async_connection import AsyncHTTP2Connection
from gunicorn.http2.errors import HTTP2ProtocolError, HTTP2ErrorCode
cfg = MockConfig()
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
# Create client and send preface
client_conn = create_client_connection()
reader.set_data(client_conn.data_to_send())
await conn.initiate_connection()
await conn.receive_data()
# Clear sent data to only capture new frames
writer.clear()
# Mock h2_conn.receive_data to raise ProtocolError
def raise_protocol_error(data):
raise h2.exceptions.ProtocolError("Test protocol error")
conn.h2_conn.receive_data = raise_protocol_error
# Set some dummy data for the reader
reader.set_data(b'dummy data')
# This should send GOAWAY and raise ProtocolError
with pytest.raises(HTTP2ProtocolError) as exc_info:
await conn.receive_data()
assert "Test protocol error" in str(exc_info.value)
# Verify something was sent (GOAWAY frame)
sent_data = writer.get_written_data()
assert len(sent_data) > 0
# Connection should be marked as closed
assert conn.is_closed is True

View File

@ -343,6 +343,7 @@ class TestHTTP2ServerConnectionSendResponse:
assert len(stream_ended) == 1
def test_send_response_invalid_stream(self):
"""Test that sending response on invalid stream returns False."""
from gunicorn.http2.connection import HTTP2ServerConnection
cfg = MockConfig()
@ -350,8 +351,9 @@ class TestHTTP2ServerConnectionSendResponse:
conn = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
conn.initiate_connection()
with pytest.raises(HTTP2Error):
conn.send_response(stream_id=999, status=200, headers=[], body=None)
# Sending to a non-existent stream should return False gracefully
result = conn.send_response(stream_id=999, status=200, headers=[], body=None)
assert result is False
class TestHTTP2ServerConnectionSendError:
@ -718,10 +720,9 @@ class TestHTTP2ServerConnectionTrailers:
conn.send_trailers(1, [(':status', '200')])
assert "Pseudo-header" in str(exc_info.value)
def test_send_trailers_without_headers_raises(self):
"""Test that sending trailers without headers raises error."""
def test_send_trailers_without_headers_returns_false(self):
"""Test that sending trailers without headers returns False."""
from gunicorn.http2.connection import HTTP2ServerConnection
from gunicorn.http2.errors import HTTP2Error
cfg = MockConfig()
sock = MockSocket()
@ -741,15 +742,13 @@ class TestHTTP2ServerConnectionTrailers:
], end_stream=True)
conn.receive_data(client_conn.data_to_send())
# Try to send trailers without sending headers first
with pytest.raises(HTTP2Error) as exc_info:
conn.send_trailers(1, [('trailer', 'value')])
assert "Must send headers before trailers" in str(exc_info.value)
# Try to send trailers without sending headers first - should return False
result = conn.send_trailers(1, [('trailer', 'value')])
assert result is False
def test_send_trailers_nonexistent_stream_raises(self):
"""Test that sending trailers on nonexistent stream raises error."""
def test_send_trailers_nonexistent_stream_returns_false(self):
"""Test that sending trailers on nonexistent stream returns False."""
from gunicorn.http2.connection import HTTP2ServerConnection
from gunicorn.http2.errors import HTTP2Error
cfg = MockConfig()
sock = MockSocket()
@ -759,9 +758,243 @@ class TestHTTP2ServerConnectionTrailers:
client_conn = create_client_connection()
conn.receive_data(client_conn.data_to_send())
with pytest.raises(HTTP2Error) as exc_info:
conn.send_trailers(99, [('trailer', 'value')])
assert "Stream 99 not found" in str(exc_info.value)
# Sending trailers to non-existent stream should return False
result = conn.send_trailers(99, [('trailer', 'value')])
assert result is False
class TestHTTP2FlowControl:
"""Test HTTP/2 flow control handling."""
def test_send_data_respects_zero_window(self):
"""Test that send_data returns False when flow control window is 0."""
from gunicorn.http2.connection import HTTP2ServerConnection
cfg = MockConfig()
sock = MockSocket()
conn = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
conn.initiate_connection()
# Create client and send preface
client_conn = create_client_connection()
conn.receive_data(client_conn.data_to_send())
# Send a request
client_conn.send_headers(1, [
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'localhost'),
], end_stream=True)
conn.receive_data(client_conn.data_to_send())
# Send response headers without ending stream (pass body=b'' placeholder)
# We need to send headers first, so use h2_conn directly
conn.h2_conn.send_headers(1, [
(':status', '200'),
('content-type', 'text/plain'),
], end_stream=False)
conn._send_pending_data()
conn.streams[1].send_headers([(':status', '200')], end_stream=False)
# Mock the flow control window to return 0
original_window = conn.h2_conn.local_flow_control_window
conn.h2_conn.local_flow_control_window = lambda stream_id: 0
# Try to send data - should return False (not raise)
result = conn.send_data(1, b'Hello, World!')
assert result is False
# Restore
conn.h2_conn.local_flow_control_window = original_window
def test_send_data_respects_flow_control(self):
"""Test that send_data chunks data according to flow control window."""
from gunicorn.http2.connection import HTTP2ServerConnection
cfg = MockConfig()
sock = MockSocket()
conn = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
conn.initiate_connection()
# Create client and send preface
client_conn = create_client_connection()
conn.receive_data(client_conn.data_to_send())
# Send a request
client_conn.send_headers(1, [
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'localhost'),
], end_stream=True)
conn.receive_data(client_conn.data_to_send())
# Send response headers without ending stream
conn.h2_conn.send_headers(1, [
(':status', '200'),
('content-type', 'text/plain'),
], end_stream=False)
conn._send_pending_data()
conn.streams[1].send_headers([(':status', '200')], end_stream=False)
# Send small data - should succeed within window
small_data = b'Hello'
conn.send_data(1, small_data, end_stream=True)
# Verify data was sent
sent_data = sock.get_sent_data()
assert len(sent_data) > 0
class TestHTTP2StreamClosedHandling:
"""Test graceful handling of StreamClosedError."""
def test_send_response_on_closed_stream(self):
"""Test that send_response gracefully handles closed stream."""
from gunicorn.http2.connection import HTTP2ServerConnection
cfg = MockConfig()
sock = MockSocket()
conn = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
conn.initiate_connection()
# Create client and send preface
client_conn = create_client_connection()
conn.receive_data(client_conn.data_to_send())
# Send a request
client_conn.send_headers(1, [
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'localhost'),
], end_stream=True)
conn.receive_data(client_conn.data_to_send())
# Simulate client resetting the stream
client_conn.reset_stream(1)
conn.receive_data(client_conn.data_to_send())
# Try to send response - should return False, not raise
result = conn.send_response(1, 200, [('content-type', 'text/plain')], b'Hello')
assert result is False
def test_send_data_on_reset_stream(self):
"""Test that send_data gracefully handles reset stream."""
from gunicorn.http2.connection import HTTP2ServerConnection
cfg = MockConfig()
sock = MockSocket()
conn = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
conn.initiate_connection()
# Create client and send preface
client_conn = create_client_connection()
conn.receive_data(client_conn.data_to_send())
# Send a request
client_conn.send_headers(1, [
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'localhost'),
], end_stream=True)
conn.receive_data(client_conn.data_to_send())
# Send response headers without ending stream
conn.h2_conn.send_headers(1, [
(':status', '200'),
('content-type', 'text/plain'),
], end_stream=False)
conn._send_pending_data()
conn.streams[1].send_headers([(':status', '200')], end_stream=False)
# Simulate client resetting the stream
client_conn.reset_stream(1)
conn.receive_data(client_conn.data_to_send())
# Try to send data - should return False, not raise
result = conn.send_data(1, b'Hello, World!', end_stream=True)
assert result is False
class TestHTTP2WindowOverflowHandling:
"""Test window overflow handling."""
def test_window_overflow_sends_goaway(self):
"""Test that window overflow results in GOAWAY with FLOW_CONTROL_ERROR."""
from gunicorn.http2.connection import HTTP2ServerConnection
from gunicorn.http2.errors import HTTP2ErrorCode
cfg = MockConfig()
sock = MockSocket()
conn = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
conn.initiate_connection()
# Create client and send preface
client_conn = create_client_connection()
conn.receive_data(client_conn.data_to_send())
# Mock increment_flow_control_window to raise ValueError (overflow)
original_increment = conn.h2_conn.increment_flow_control_window
def raise_overflow(increment, stream_id=None):
raise ValueError("Flow control window too large")
conn.h2_conn.increment_flow_control_window = raise_overflow
# Send a request with data to trigger the overflow
client_conn.send_headers(1, [
(':method', 'POST'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'localhost'),
], end_stream=False)
client_conn.send_data(1, b'test data', end_stream=True)
conn.receive_data(client_conn.data_to_send())
# Connection should be closed with FLOW_CONTROL_ERROR
assert conn.is_closed is True
class TestHTTP2ProtocolErrorHandling:
"""Test protocol error handling sends proper GOAWAY."""
def test_protocol_error_sends_goaway(self):
"""Test that protocol errors result in GOAWAY being sent."""
from gunicorn.http2.connection import HTTP2ServerConnection
from gunicorn.http2.errors import HTTP2ProtocolError, HTTP2ErrorCode
cfg = MockConfig()
sock = MockSocket()
conn = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
conn.initiate_connection()
# Create client and send preface
client_conn = create_client_connection()
conn.receive_data(client_conn.data_to_send())
# Clear sent data to only capture new frames
sock._sent.clear()
# Mock h2_conn.receive_data to raise ProtocolError
def raise_protocol_error(data):
raise h2.exceptions.ProtocolError("Test protocol error")
conn.h2_conn.receive_data = raise_protocol_error
# This should send GOAWAY and raise ProtocolError
with pytest.raises(HTTP2ProtocolError) as exc_info:
conn.receive_data(b'dummy data')
assert "Test protocol error" in str(exc_info.value)
# Verify something was sent (GOAWAY frame)
sent_data = sock.get_sent_data()
assert len(sent_data) > 0
# Connection should be marked as closed
assert conn.is_closed is True
class TestHTTP2NotAvailable: