gunicorn/tests/test_http2_async_connection.py
Benoit Chesneau 4e3245a0df fix(http2): achieve 100% h2spec compliance (146/146 tests)
- Send GOAWAY with correct error codes for protocol violations
- Handle StreamClosedError and FlowControlError gracefully
- Return False instead of raising for missing/closed streams
- Handle flow control window overflow per RFC 7540
- Fix reader race condition and add h2 exception handling
- Wait for WINDOW_UPDATE when flow control window is zero/negative
- Use h2 exception's error_code for INITIAL_WINDOW_SIZE violations
2026-01-27 15:42:42 +01:00

1017 lines
33 KiB
Python

# -*- coding: utf-8 -
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
"""Tests for async HTTP/2 server connection."""
import asyncio
import pytest
from unittest import mock
from io import BytesIO
# Check if h2 is available for integration tests
try:
import h2.connection
import h2.config
import h2.events
H2_AVAILABLE = True
except ImportError:
H2_AVAILABLE = False
from gunicorn.http2.errors import (
HTTP2Error, HTTP2ConnectionError
)
pytestmark = pytest.mark.skipif(not H2_AVAILABLE, reason="h2 library not available")
class MockConfig:
"""Mock gunicorn configuration for HTTP/2."""
def __init__(self):
self.http2_max_concurrent_streams = 100
self.http2_initial_window_size = 65535
self.http2_max_frame_size = 16384
self.http2_max_header_list_size = 65536
class MockAsyncReader:
"""Mock asyncio StreamReader for testing."""
def __init__(self, data=b''):
self._buffer = BytesIO(data)
self._eof = False
async def read(self, n=-1):
data = self._buffer.read(n)
if not data and self._eof:
return b''
return data
def set_data(self, data):
self._buffer = BytesIO(data)
def set_eof(self):
self._eof = True
self._buffer = BytesIO(b'')
class MockAsyncWriter:
"""Mock asyncio StreamWriter for testing."""
def __init__(self):
self._buffer = bytearray()
self._closed = False
self._drained = False
def write(self, data):
if self._closed:
raise OSError("Writer is closed")
self._buffer.extend(data)
async def drain(self):
self._drained = True
def close(self):
self._closed = True
async def wait_closed(self):
pass
def get_written_data(self):
return bytes(self._buffer)
def clear(self):
self._buffer.clear()
def create_client_connection():
"""Create an h2 client connection for generating test frames."""
config = h2.config.H2Configuration(client_side=True)
conn = h2.connection.H2Connection(config=config)
conn.initiate_connection()
return conn
class TestAsyncHTTP2ConnectionInit:
"""Test AsyncHTTP2Connection initialization."""
def test_basic_initialization(self):
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
assert conn.cfg is cfg
assert conn.reader is reader
assert conn.writer is writer
assert conn.client_addr == ('127.0.0.1', 12345)
assert conn.streams == {}
assert conn.is_closed is False
assert conn._initialized is False
def test_settings_from_config(self):
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
cfg.http2_max_concurrent_streams = 50
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
assert conn.max_concurrent_streams == 50
class TestAsyncHTTP2ConnectionInitiate:
"""Test async connection initiation."""
@pytest.mark.asyncio
async def test_initiate_connection(self):
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
await conn.initiate_connection()
assert conn._initialized is True
written_data = writer.get_written_data()
assert len(written_data) > 0
@pytest.mark.asyncio
async def test_initiate_connection_idempotent(self):
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
await conn.initiate_connection()
first_len = len(writer.get_written_data())
await conn.initiate_connection()
second_len = len(writer.get_written_data())
assert first_len == second_len
class TestAsyncHTTP2ConnectionReceiveData:
"""Test async receiving and processing data."""
@pytest.mark.asyncio
async def test_receive_empty_data_closes_connection(self):
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = MockAsyncReader()
reader.set_eof()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
await conn.initiate_connection()
requests = await conn.receive_data()
assert conn.is_closed is True
assert requests == []
@pytest.mark.asyncio
async def test_receive_simple_get_request(self):
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
await conn.initiate_connection()
# Create client and exchange settings
client = create_client_connection()
client_preface = client.data_to_send()
reader.set_data(client_preface)
await conn.receive_data()
server_data = writer.get_written_data()
if server_data:
client.receive_data(server_data)
# Client sends GET request
client.send_headers(
stream_id=1,
headers=[
(':method', 'GET'),
(':path', '/async-test'),
(':scheme', 'https'),
(':authority', 'localhost'),
],
end_stream=True
)
reader.set_data(client.data_to_send())
requests = await conn.receive_data()
assert len(requests) == 1
assert requests[0].method == 'GET'
assert requests[0].path == '/async-test'
@pytest.mark.asyncio
async def test_receive_with_timeout(self):
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
await conn.initiate_connection()
client = create_client_connection()
reader.set_data(client.data_to_send())
# Should complete without timeout
await conn.receive_data(timeout=5.0)
@pytest.mark.asyncio
async def test_receive_timeout_raises(self):
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
# Create a reader that blocks forever
async def blocking_read(n):
await asyncio.sleep(10)
return b''
reader = mock.Mock()
reader.read = mock.AsyncMock(side_effect=blocking_read)
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
await conn.initiate_connection()
# Timeout is converted to HTTP2ConnectionError by the implementation
with pytest.raises((asyncio.TimeoutError, HTTP2ConnectionError)):
await conn.receive_data(timeout=0.01)
class TestAsyncHTTP2ConnectionSendResponse:
"""Test async sending responses."""
@pytest.mark.asyncio
async def test_send_simple_response(self):
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
await conn.initiate_connection()
# Setup stream via request
client = create_client_connection()
reader.set_data(client.data_to_send())
await conn.receive_data()
client.receive_data(writer.get_written_data())
client.send_headers(1, [
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'localhost'),
], end_stream=True)
reader.set_data(client.data_to_send())
await conn.receive_data()
writer.clear()
await conn.send_response(
stream_id=1,
status=200,
headers=[('content-type', 'text/plain')],
body=b'Async Hello!'
)
events = client.receive_data(writer.get_written_data())
data_events = [e for e in events if isinstance(e, h2.events.DataReceived)]
assert len(data_events) == 1
assert data_events[0].data == b'Async Hello!'
@pytest.mark.asyncio
async def test_send_response_invalid_stream(self):
"""Test that sending response on invalid stream returns False."""
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
await conn.initiate_connection()
# Sending to a non-existent stream should return False gracefully
result = await conn.send_response(stream_id=999, status=200, headers=[], body=None)
assert result is False
class TestAsyncHTTP2ConnectionSendData:
"""Test async send_data method."""
@pytest.mark.asyncio
async def test_send_data(self):
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
await conn.initiate_connection()
# Setup stream
client = create_client_connection()
reader.set_data(client.data_to_send())
await conn.receive_data()
client.receive_data(writer.get_written_data())
client.send_headers(1, [
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'localhost'),
], end_stream=True)
reader.set_data(client.data_to_send())
await conn.receive_data()
# Send full response using send_response
writer.clear()
await conn.send_response(
stream_id=1,
status=200,
headers=[('content-type', 'text/plain')],
body=b'chunk1chunk2'
)
events = client.receive_data(writer.get_written_data())
data_events = [e for e in events if isinstance(e, h2.events.DataReceived)]
assert len(data_events) >= 1
all_data = b''.join(e.data for e in data_events)
assert all_data == b'chunk1chunk2'
def get_h2_header_value(headers_list, name):
"""Extract a header value from h2 headers list."""
for header_name, header_value in headers_list:
name_str = header_name.decode() if isinstance(header_name, bytes) else header_name
if name_str == name:
return header_value.decode() if isinstance(header_value, bytes) else header_value
return None
class TestAsyncHTTP2ConnectionSendError:
"""Test async error response sending."""
@pytest.mark.asyncio
async def test_send_error(self):
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
await conn.initiate_connection()
client = create_client_connection()
reader.set_data(client.data_to_send())
await conn.receive_data()
client.receive_data(writer.get_written_data())
client.send_headers(1, [
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'localhost'),
], end_stream=True)
reader.set_data(client.data_to_send())
await conn.receive_data()
writer.clear()
await conn.send_error(stream_id=1, status_code=500, message="Internal Error")
events = client.receive_data(writer.get_written_data())
response_events = [e for e in events if isinstance(e, h2.events.ResponseReceived)]
assert len(response_events) == 1
headers_list = response_events[0].headers
assert get_h2_header_value(headers_list, ':status') == '500'
class TestAsyncHTTP2ConnectionResetStream:
"""Test async stream reset."""
@pytest.mark.asyncio
async def test_reset_stream(self):
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
await conn.initiate_connection()
client = create_client_connection()
reader.set_data(client.data_to_send())
await conn.receive_data()
client.receive_data(writer.get_written_data())
client.send_headers(1, [
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'localhost'),
], end_stream=False)
reader.set_data(client.data_to_send())
await conn.receive_data()
writer.clear()
await conn.reset_stream(stream_id=1, error_code=0x8)
events = client.receive_data(writer.get_written_data())
reset_events = [e for e in events if isinstance(e, h2.events.StreamReset)]
assert len(reset_events) == 1
class TestAsyncHTTP2ConnectionClose:
"""Test async connection close."""
@pytest.mark.asyncio
async def test_close_connection(self):
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
await conn.initiate_connection()
client = create_client_connection()
reader.set_data(client.data_to_send())
await conn.receive_data()
writer.clear()
await conn.close()
assert conn.is_closed is True
assert writer._closed is True
@pytest.mark.asyncio
async def test_close_idempotent(self):
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
await conn.initiate_connection()
await conn.close()
await conn.close() # Should not raise
class TestAsyncHTTP2ConnectionCleanup:
"""Test async stream cleanup."""
@pytest.mark.asyncio
async def test_cleanup_stream(self):
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
await conn.initiate_connection()
client = create_client_connection()
reader.set_data(client.data_to_send())
await conn.receive_data()
client.receive_data(writer.get_written_data())
client.send_headers(1, [
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'localhost'),
], end_stream=True)
reader.set_data(client.data_to_send())
await conn.receive_data()
assert 1 in conn.streams
conn.cleanup_stream(1)
assert 1 not in conn.streams
class TestAsyncHTTP2ConnectionRepr:
"""Test async connection representation."""
def test_repr(self):
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
repr_str = repr(conn)
assert "AsyncHTTP2Connection" in repr_str
assert "streams=" in repr_str
class TestAsyncHTTP2ConnectionSocketErrors:
"""Test socket error handling in async connection."""
@pytest.mark.asyncio
async def test_read_error_raises_connection_error(self):
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = mock.Mock()
reader.read = mock.AsyncMock(side_effect=OSError("Connection reset"))
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
await conn.initiate_connection()
with pytest.raises(HTTP2ConnectionError):
await conn.receive_data()
@pytest.mark.asyncio
async def test_write_error_raises_connection_error(self):
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = MockAsyncReader()
writer = mock.Mock()
writer.write = mock.Mock(side_effect=OSError("Broken pipe"))
writer.drain = mock.AsyncMock()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
with pytest.raises(HTTP2ConnectionError):
await conn.initiate_connection()
class TestAsyncHTTP2ConnectionPriority:
"""Test async HTTP/2 priority handling."""
@pytest.mark.asyncio
async def test_handle_priority_updated_existing_stream(self):
"""Test handling priority update for existing stream."""
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
# Create a client connection to generate frames
client_conn = create_client_connection()
client_data = client_conn.data_to_send()
# Set up reader with client preface
reader.set_data(client_data)
await conn.initiate_connection()
await conn.receive_data()
writer.clear()
# Send a request to create a stream
client_conn.send_headers(1, [
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'localhost'),
])
request_data = client_conn.data_to_send()
reader.set_data(request_data)
await conn.receive_data()
# Verify stream was created
assert 1 in conn.streams
stream = conn.streams[1]
# Default priority values
assert stream.priority_weight == 16
assert stream.priority_depends_on == 0
# Send a PRIORITY frame
client_conn.prioritize(1, weight=128, depends_on=0, exclusive=False)
priority_data = client_conn.data_to_send()
reader.set_data(priority_data)
await conn.receive_data()
# Verify priority was updated
assert stream.priority_weight == 128
@pytest.mark.asyncio
async def test_handle_priority_updated_nonexistent_stream(self):
"""Test that priority update for nonexistent stream is ignored."""
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
# Create a client connection
client_conn = create_client_connection()
client_data = client_conn.data_to_send()
reader.set_data(client_data)
await conn.initiate_connection()
await conn.receive_data()
# Send a PRIORITY frame for a stream that doesn't exist
client_conn.prioritize(99, weight=64, depends_on=0, exclusive=False)
priority_data = client_conn.data_to_send()
reader.set_data(priority_data)
# Should not raise
await conn.receive_data()
class TestAsyncHTTP2ConnectionTrailers:
"""Test async HTTP/2 response trailer support."""
@pytest.mark.asyncio
async def test_send_trailers_after_headers_and_body(self):
"""Test sending trailers after response headers and body."""
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
# Create a client connection
client_conn = create_client_connection()
client_data = client_conn.data_to_send()
reader.set_data(client_data)
await conn.initiate_connection()
await conn.receive_data()
writer.clear()
# Send a request
client_conn.send_headers(1, [
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'localhost'),
], end_stream=True)
reader.set_data(client_conn.data_to_send())
await conn.receive_data()
# Manually send headers without ending stream (for trailer support)
stream = conn.streams[1]
response_headers = [(':status', '200'), ('content-type', 'text/plain')]
conn.h2_conn.send_headers(1, response_headers, end_stream=False)
stream.send_headers(response_headers, end_stream=False)
await conn._send_pending_data()
# Send body without ending stream
conn.h2_conn.send_data(1, b'Hello World', end_stream=False)
stream.send_data(b'Hello World', end_stream=False)
await conn._send_pending_data()
# Send trailers
trailers = [('grpc-status', '0'), ('grpc-message', 'OK')]
await conn.send_trailers(1, trailers)
# Verify stream is closed
assert stream.response_complete is True
assert stream.response_trailers == [('grpc-status', '0'), ('grpc-message', 'OK')]
@pytest.mark.asyncio
async def test_send_trailers_pseudo_header_raises(self):
"""Test that pseudo-headers in trailers raise error."""
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
client_conn = create_client_connection()
reader.set_data(client_conn.data_to_send())
await conn.initiate_connection()
await conn.receive_data()
# Send a request
client_conn.send_headers(1, [
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'localhost'),
], end_stream=True)
reader.set_data(client_conn.data_to_send())
await conn.receive_data()
# Send response
await conn.send_response(1, 200, [('content-type', 'text/plain')], None)
# Try to send trailers with pseudo-header
with pytest.raises(HTTP2Error) as exc_info:
await conn.send_trailers(1, [(':status', '200')])
assert "Pseudo-header" in str(exc_info.value)
@pytest.mark.asyncio
async def test_send_trailers_without_headers_returns_false(self):
"""Test that sending trailers without headers returns False."""
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
client_conn = create_client_connection()
reader.set_data(client_conn.data_to_send())
await conn.initiate_connection()
await conn.receive_data()
# Send a request
client_conn.send_headers(1, [
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'localhost'),
], end_stream=True)
reader.set_data(client_conn.data_to_send())
await conn.receive_data()
# Try to send trailers without sending headers first - should return False
result = await conn.send_trailers(1, [('trailer', 'value')])
assert result is False
class TestAsyncHTTP2FlowControl:
"""Test async HTTP/2 flow control handling."""
@pytest.mark.asyncio
async def test_send_data_respects_zero_window(self):
"""Test that send_data returns False when flow control window is 0."""
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
# Create client and send preface
client_conn = create_client_connection()
reader.set_data(client_conn.data_to_send())
await conn.initiate_connection()
await conn.receive_data()
# Send a request
client_conn.send_headers(1, [
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'localhost'),
], end_stream=True)
reader.set_data(client_conn.data_to_send())
await conn.receive_data()
# Send response headers without ending stream
conn.h2_conn.send_headers(1, [
(':status', '200'),
('content-type', 'text/plain'),
], end_stream=False)
await conn._send_pending_data()
conn.streams[1].send_headers([(':status', '200')], end_stream=False)
# Mock the flow control window to return 0
original_window = conn.h2_conn.local_flow_control_window
conn.h2_conn.local_flow_control_window = lambda stream_id: 0
# Try to send data - should return False (not raise)
result = await conn.send_data(1, b'Hello, World!')
assert result is False
# Restore
conn.h2_conn.local_flow_control_window = original_window
@pytest.mark.asyncio
async def test_send_data_respects_flow_control(self):
"""Test that send_data chunks data according to flow control window."""
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
# Create client and send preface
client_conn = create_client_connection()
reader.set_data(client_conn.data_to_send())
await conn.initiate_connection()
await conn.receive_data()
# Send a request
client_conn.send_headers(1, [
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'localhost'),
], end_stream=True)
reader.set_data(client_conn.data_to_send())
await conn.receive_data()
# Send response headers without ending stream
conn.h2_conn.send_headers(1, [
(':status', '200'),
('content-type', 'text/plain'),
], end_stream=False)
await conn._send_pending_data()
conn.streams[1].send_headers([(':status', '200')], end_stream=False)
# Send small data - should succeed within window
small_data = b'Hello'
await conn.send_data(1, small_data, end_stream=True)
# Verify data was sent
sent_data = writer.get_written_data()
assert len(sent_data) > 0
class TestAsyncHTTP2StreamClosedHandling:
"""Test graceful handling of StreamClosedError in async connection."""
@pytest.mark.asyncio
async def test_send_response_on_closed_stream(self):
"""Test that send_response gracefully handles closed stream."""
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
# Create client and send preface
client_conn = create_client_connection()
reader.set_data(client_conn.data_to_send())
await conn.initiate_connection()
await conn.receive_data()
# Send a request
client_conn.send_headers(1, [
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'localhost'),
], end_stream=True)
reader.set_data(client_conn.data_to_send())
await conn.receive_data()
# Simulate client resetting the stream
client_conn.reset_stream(1)
reader.set_data(client_conn.data_to_send())
await conn.receive_data()
# Try to send response - should return False, not raise
result = await conn.send_response(1, 200, [('content-type', 'text/plain')], b'Hello')
assert result is False
@pytest.mark.asyncio
async def test_send_data_on_reset_stream(self):
"""Test that send_data gracefully handles reset stream."""
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
# Create client and send preface
client_conn = create_client_connection()
reader.set_data(client_conn.data_to_send())
await conn.initiate_connection()
await conn.receive_data()
# Send a request
client_conn.send_headers(1, [
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'localhost'),
], end_stream=True)
reader.set_data(client_conn.data_to_send())
await conn.receive_data()
# Send response headers without ending stream
conn.h2_conn.send_headers(1, [
(':status', '200'),
('content-type', 'text/plain'),
], end_stream=False)
await conn._send_pending_data()
conn.streams[1].send_headers([(':status', '200')], end_stream=False)
# Simulate client resetting the stream
client_conn.reset_stream(1)
reader.set_data(client_conn.data_to_send())
await conn.receive_data()
# Try to send data - should return False, not raise
result = await conn.send_data(1, b'Hello, World!', end_stream=True)
assert result is False
class TestAsyncHTTP2WindowOverflowHandling:
"""Test window overflow handling in async connection."""
@pytest.mark.asyncio
async def test_window_overflow_sends_goaway(self):
"""Test that window overflow results in connection close."""
from gunicorn.http2.async_connection import AsyncHTTP2Connection
from gunicorn.http2.errors import HTTP2ErrorCode
cfg = MockConfig()
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
# Create client and send preface
client_conn = create_client_connection()
reader.set_data(client_conn.data_to_send())
await conn.initiate_connection()
await conn.receive_data()
# Mock increment_flow_control_window to raise ValueError (overflow)
def raise_overflow(increment, stream_id=None):
raise ValueError("Flow control window too large")
conn.h2_conn.increment_flow_control_window = raise_overflow
# Send a request with data to trigger the overflow
client_conn.send_headers(1, [
(':method', 'POST'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'localhost'),
], end_stream=False)
client_conn.send_data(1, b'test data', end_stream=True)
reader.set_data(client_conn.data_to_send())
await conn.receive_data()
# Connection should be closed with FLOW_CONTROL_ERROR
assert conn.is_closed is True
class TestAsyncHTTP2ProtocolErrorHandling:
"""Test protocol error handling sends proper GOAWAY."""
@pytest.mark.asyncio
async def test_protocol_error_sends_goaway(self):
"""Test that protocol errors result in GOAWAY being sent."""
from gunicorn.http2.async_connection import AsyncHTTP2Connection
from gunicorn.http2.errors import HTTP2ProtocolError, HTTP2ErrorCode
cfg = MockConfig()
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
# Create client and send preface
client_conn = create_client_connection()
reader.set_data(client_conn.data_to_send())
await conn.initiate_connection()
await conn.receive_data()
# Clear sent data to only capture new frames
writer.clear()
# Mock h2_conn.receive_data to raise ProtocolError
def raise_protocol_error(data):
raise h2.exceptions.ProtocolError("Test protocol error")
conn.h2_conn.receive_data = raise_protocol_error
# Set some dummy data for the reader
reader.set_data(b'dummy data')
# This should send GOAWAY and raise ProtocolError
with pytest.raises(HTTP2ProtocolError) as exc_info:
await conn.receive_data()
assert "Test protocol error" in str(exc_info.value)
# Verify something was sent (GOAWAY frame)
sent_data = writer.get_written_data()
assert len(sent_data) > 0
# Connection should be marked as closed
assert conn.is_closed is True