From 4e3245a0df7c9a03026941fe513e38fb93d6db79 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Tue, 27 Jan 2026 15:42:42 +0100 Subject: [PATCH] fix(http2): achieve 100% h2spec compliance (146/146 tests) - Send GOAWAY with correct error codes for protocol violations - Handle StreamClosedError and FlowControlError gracefully - Return False instead of raising for missing/closed streams - Handle flow control window overflow per RFC 7540 - Fix reader race condition and add h2 exception handling - Wait for WINDOW_UPDATE when flow control window is zero/negative - Use h2 exception's error_code for INITIAL_WINDOW_SIZE violations --- gunicorn/asgi/protocol.py | 11 +- gunicorn/http2/async_connection.py | 185 ++++++++++++++---- gunicorn/http2/connection.py | 199 ++++++++++++++----- gunicorn/http2/errors.py | 20 ++ tests/test_http2_async_connection.py | 276 ++++++++++++++++++++++++++- tests/test_http2_connection.py | 263 +++++++++++++++++++++++-- 6 files changed, 840 insertions(+), 114 deletions(-) diff --git a/gunicorn/asgi/protocol.py b/gunicorn/asgi/protocol.py index 75bf2743..f962d958 100644 --- a/gunicorn/asgi/protocol.py +++ b/gunicorn/asgi/protocol.py @@ -66,7 +66,9 @@ class ASGIProtocol(asyncio.Protocol): if ssl_object and hasattr(ssl_object, 'selected_alpn_protocol'): alpn = ssl_object.selected_alpn_protocol() if alpn == 'h2': - # HTTP/2 connection - switch to HTTP/2 handler + # HTTP/2 connection - create reader immediately to avoid race condition + # data_received may be called before _handle_http2_connection starts + self.reader = asyncio.StreamReader() self._task = self.worker.loop.create_task( self._handle_http2_connection(transport, ssl_object) ) @@ -548,8 +550,9 @@ class ASGIProtocol(asyncio.Protocol): peername = transport.get_extra_info('peername') sockname = transport.get_extra_info('sockname') - # Create async reader/writer from transport - reader = asyncio.StreamReader() + # Use the reader created in connection_made + # (data_received feeds data to self.reader) + reader = self.reader protocol = asyncio.StreamReaderProtocol(reader) writer = asyncio.StreamWriter( transport, protocol, reader, self.worker.loop @@ -561,8 +564,6 @@ class ASGIProtocol(asyncio.Protocol): ) await h2_conn.initiate_connection() - # Store for data_received - self.reader = reader self._h2_conn = h2_conn # Main loop - receive and handle requests diff --git a/gunicorn/http2/async_connection.py b/gunicorn/http2/async_connection.py index d66f0425..799f3fee 100644 --- a/gunicorn/http2/async_connection.py +++ b/gunicorn/http2/async_connection.py @@ -14,7 +14,7 @@ import asyncio from .errors import ( HTTP2Error, HTTP2ProtocolError, HTTP2ConnectionError, - HTTP2NotAvailable, + HTTP2NotAvailable, HTTP2ErrorCode, ) from .stream import HTTP2Stream from .request import HTTP2Request @@ -152,6 +152,30 @@ class AsyncHTTP2Connection: try: events = self.h2_conn.receive_data(data) except _h2_exceptions.ProtocolError as e: + # Send GOAWAY with PROTOCOL_ERROR before raising + await self.close(error_code=HTTP2ErrorCode.PROTOCOL_ERROR) + raise HTTP2ProtocolError(str(e)) + except _h2_exceptions.FlowControlError as e: + # Send GOAWAY with FLOW_CONTROL_ERROR + await self.close(error_code=HTTP2ErrorCode.FLOW_CONTROL_ERROR) + raise HTTP2ProtocolError(str(e)) + except _h2_exceptions.FrameTooLargeError as e: + # Send GOAWAY with FRAME_SIZE_ERROR + await self.close(error_code=HTTP2ErrorCode.FRAME_SIZE_ERROR) + raise HTTP2ProtocolError(str(e)) + except _h2_exceptions.InvalidSettingsValueError as e: + # Use error_code from h2 exception (RFC 7540 Section 6.5.2): + # INITIAL_WINDOW_SIZE > 2^31-1 gives FLOW_CONTROL_ERROR + # Other invalid settings give PROTOCOL_ERROR + error_code = getattr(e, 'error_code', None) + if error_code is not None: + await self.close(error_code=error_code) + else: + await self.close(error_code=HTTP2ErrorCode.PROTOCOL_ERROR) + raise HTTP2ProtocolError(str(e)) + except _h2_exceptions.TooManyStreamsError as e: + # Send GOAWAY with REFUSED_STREAM + await self.close(error_code=HTTP2ErrorCode.REFUSED_STREAM) raise HTTP2ProtocolError(str(e)) # Process events @@ -229,10 +253,19 @@ class AsyncHTTP2Connection: # Increment flow control windows (only if data received) if len(data) > 0: - # Update stream-level window - self.h2_conn.increment_flow_control_window(len(data), stream_id=stream_id) - # Update connection-level window - self.h2_conn.increment_flow_control_window(len(data), stream_id=None) + try: + # Update stream-level window + self.h2_conn.increment_flow_control_window(len(data), stream_id=stream_id) + # Update connection-level window + self.h2_conn.increment_flow_control_window(len(data), stream_id=None) + except (ValueError, _h2_exceptions.FlowControlError): + # Window overflow - prepare GOAWAY with FLOW_CONTROL_ERROR + # (will be sent by receive_data's _send_pending_data call) + self._closed = True + try: + self.h2_conn.close_connection(error_code=HTTP2ErrorCode.FLOW_CONTROL_ERROR) + except Exception: + pass return None @@ -324,10 +357,14 @@ class AsyncHTTP2Connection: status: HTTP status code (int) headers: List of (name, value) header tuples body: Optional response body bytes + + Returns: + bool: True if response sent, False if stream was already closed """ stream = self.streams.get(stream_id) if stream is None: - raise HTTP2Error(f"Stream {stream_id} not found") + # Stream was already cleaned up (reset/closed) - return gracefully + return False # Build response headers with :status pseudo-header response_headers = [(':status', str(status))] @@ -336,14 +373,21 @@ class AsyncHTTP2Connection: end_stream = body is None or len(body) == 0 - # Send headers - self.h2_conn.send_headers(stream_id, response_headers, end_stream=end_stream) - stream.send_headers(response_headers, end_stream=end_stream) - await self._send_pending_data() + try: + # Send headers + self.h2_conn.send_headers(stream_id, response_headers, end_stream=end_stream) + stream.send_headers(response_headers, end_stream=end_stream) + await self._send_pending_data() - # Send body if present - if body and len(body) > 0: - await self.send_data(stream_id, body, end_stream=True) + # Send body if present + if body and len(body) > 0: + await self.send_data(stream_id, body, end_stream=True) + return True + except _h2_exceptions.StreamClosedError: + # Stream was reset by client - clean up gracefully + stream.close() + self.cleanup_stream(stream_id) + return False async def send_data(self, stream_id, data, end_stream=False): """Send data on a stream. @@ -352,38 +396,87 @@ class AsyncHTTP2Connection: stream_id: The stream ID data: Body data bytes end_stream: Whether this ends the stream + + Returns: + bool: True if data sent, False if stream was already closed """ stream = self.streams.get(stream_id) if stream is None: - raise HTTP2Error(f"Stream {stream_id} not found") + # Stream was already cleaned up (reset/closed) - return gracefully + return False # Send data in chunks respecting flow control and max frame size data_to_send = data - while data_to_send: - # Get available window size for this stream - available = self.h2_conn.local_flow_control_window(stream_id) - # Also respect max frame size - chunk_size = min(available, self.max_frame_size, len(data_to_send)) - - if chunk_size <= 0: - # No window available, send what we have and wait - await self._send_pending_data() - # Try again - the window might open after ACKs + try: + while data_to_send: + # Get available window size for this stream available = self.h2_conn.local_flow_control_window(stream_id) - if available <= 0: - # Still no window, just try to send a small chunk - chunk_size = min(self.max_frame_size, len(data_to_send)) + # Also respect max frame size + chunk_size = min(available, self.max_frame_size, len(data_to_send)) - chunk = data_to_send[:chunk_size] - data_to_send = data_to_send[chunk_size:] + if chunk_size <= 0: + # No window available - must wait for WINDOW_UPDATE + # Per RFC 7540 Section 6.9.2, we must track negative windows + # and wait for WINDOW_UPDATE to make the window positive + await self._send_pending_data() - # Only set end_stream on the final chunk - is_final = end_stream and len(data_to_send) == 0 + # Wait for incoming WINDOW_UPDATE by reading from connection + # Use a reasonable timeout to avoid blocking forever + max_wait_attempts = 50 # ~5 seconds at 100ms per attempt + for _ in range(max_wait_attempts): + available = self.h2_conn.local_flow_control_window(stream_id) + if available > 0: + break - self.h2_conn.send_data(stream_id, chunk, end_stream=is_final) - await self._send_pending_data() + # Read more data from connection (may receive WINDOW_UPDATE) + try: + incoming = await asyncio.wait_for( + self.reader.read(self.READ_BUFFER_SIZE), + timeout=0.1 + ) + if incoming: + events = self.h2_conn.receive_data(incoming) + # Process events but don't create new requests + for event in events: + if isinstance(event, _h2_events.StreamReset): + if event.stream_id == stream_id: + return False + elif isinstance(event, _h2_events.ConnectionTerminated): + self._closed = True + return False + await self._send_pending_data() + else: + # Connection closed + self._closed = True + return False + except asyncio.TimeoutError: + continue + except _h2_exceptions.ProtocolError: + return False - stream.send_data(data, end_stream=end_stream) + # Re-check window after waiting + available = self.h2_conn.local_flow_control_window(stream_id) + if available <= 0: + # Still no window after waiting - give up + return False + chunk_size = min(available, self.max_frame_size, len(data_to_send)) + + chunk = data_to_send[:chunk_size] + data_to_send = data_to_send[chunk_size:] + + # Only set end_stream on the final chunk + is_final = end_stream and len(data_to_send) == 0 + + self.h2_conn.send_data(stream_id, chunk, end_stream=is_final) + await self._send_pending_data() + + stream.send_data(data, end_stream=end_stream) + return True + except (_h2_exceptions.StreamClosedError, _h2_exceptions.FlowControlError): + # Stream was reset by client or flow control error - clean up gracefully + stream.close() + self.cleanup_stream(stream_id) + return False async def send_trailers(self, stream_id, trailers): """Send trailing headers on a stream. @@ -397,12 +490,17 @@ class AsyncHTTP2Connection: Raises: HTTP2Error: If stream not found, headers not sent, or pseudo-headers used + + Returns: + bool: True if trailers sent, False if stream was already closed """ stream = self.streams.get(stream_id) if stream is None: - raise HTTP2Error(f"Stream {stream_id} not found") + # Stream was already cleaned up (reset/closed) - return gracefully + return False if not stream.response_headers_sent: - raise HTTP2Error("Must send headers before trailers") + # Can't send trailers without headers - return False + return False # Validate and normalize trailer headers trailer_headers = [] @@ -412,10 +510,17 @@ class AsyncHTTP2Connection: raise HTTP2Error(f"Pseudo-header '{name}' not allowed in trailers") trailer_headers.append((lname, str(value))) - # Send trailers with end_stream=True - self.h2_conn.send_headers(stream_id, trailer_headers, end_stream=True) - stream.send_trailers(trailer_headers) - await self._send_pending_data() + try: + # Send trailers with end_stream=True + self.h2_conn.send_headers(stream_id, trailer_headers, end_stream=True) + stream.send_trailers(trailer_headers) + await self._send_pending_data() + return True + except _h2_exceptions.StreamClosedError: + # Stream was reset by client - clean up gracefully + stream.close() + self.cleanup_stream(stream_id) + return False async def send_error(self, stream_id, status_code, message=None): """Send an error response on a stream.""" diff --git a/gunicorn/http2/connection.py b/gunicorn/http2/connection.py index 6416007c..8b8e39df 100644 --- a/gunicorn/http2/connection.py +++ b/gunicorn/http2/connection.py @@ -13,7 +13,7 @@ from io import BytesIO from .errors import ( HTTP2Error, HTTP2ProtocolError, HTTP2ConnectionError, - HTTP2NotAvailable, + HTTP2NotAvailable, HTTP2ErrorCode, ) from .stream import HTTP2Stream from .request import HTTP2Request @@ -146,6 +146,30 @@ class HTTP2ServerConnection: try: events = self.h2_conn.receive_data(data) except _h2_exceptions.ProtocolError as e: + # Send GOAWAY with PROTOCOL_ERROR before raising + self.close(error_code=HTTP2ErrorCode.PROTOCOL_ERROR) + raise HTTP2ProtocolError(str(e)) + except _h2_exceptions.FlowControlError as e: + # Send GOAWAY with FLOW_CONTROL_ERROR + self.close(error_code=HTTP2ErrorCode.FLOW_CONTROL_ERROR) + raise HTTP2ProtocolError(str(e)) + except _h2_exceptions.FrameTooLargeError as e: + # Send GOAWAY with FRAME_SIZE_ERROR + self.close(error_code=HTTP2ErrorCode.FRAME_SIZE_ERROR) + raise HTTP2ProtocolError(str(e)) + except _h2_exceptions.InvalidSettingsValueError as e: + # Use error_code from h2 exception (RFC 7540 Section 6.5.2): + # INITIAL_WINDOW_SIZE > 2^31-1 gives FLOW_CONTROL_ERROR + # Other invalid settings give PROTOCOL_ERROR + error_code = getattr(e, 'error_code', None) + if error_code is not None: + self.close(error_code=error_code) + else: + self.close(error_code=HTTP2ErrorCode.PROTOCOL_ERROR) + raise HTTP2ProtocolError(str(e)) + except _h2_exceptions.TooManyStreamsError as e: + # Send GOAWAY with REFUSED_STREAM + self.close(error_code=HTTP2ErrorCode.REFUSED_STREAM) raise HTTP2ProtocolError(str(e)) # Process events @@ -236,12 +260,16 @@ class HTTP2ServerConnection: # Increment flow control windows (only if data received) if len(data) > 0: - # Update stream-level window - self.h2_conn.increment_flow_control_window(len(data), stream_id=stream_id) - # Update connection-level window - self.h2_conn.increment_flow_control_window(len(data), stream_id=None) - # Send WINDOW_UPDATE frames immediately - self._send_pending_data() + try: + # Update stream-level window + self.h2_conn.increment_flow_control_window(len(data), stream_id=stream_id) + # Update connection-level window + self.h2_conn.increment_flow_control_window(len(data), stream_id=None) + # Send WINDOW_UPDATE frames immediately + self._send_pending_data() + except (ValueError, _h2_exceptions.FlowControlError): + # Window overflow - send FLOW_CONTROL_ERROR and close + self.close(error_code=HTTP2ErrorCode.FLOW_CONTROL_ERROR) return None @@ -365,10 +393,14 @@ class HTTP2ServerConnection: Raises: HTTP2Error: If stream not found or in invalid state + + Returns: + bool: True if response sent, False if stream was already closed """ stream = self.streams.get(stream_id) if stream is None: - raise HTTP2Error(f"Stream {stream_id} not found") + # Stream was already cleaned up (reset/closed) - return gracefully + return False # Build response headers with :status pseudo-header response_headers = [(':status', str(status))] @@ -378,14 +410,21 @@ class HTTP2ServerConnection: end_stream = body is None or len(body) == 0 - # Send headers - self.h2_conn.send_headers(stream_id, response_headers, end_stream=end_stream) - stream.send_headers(response_headers, end_stream=end_stream) - self._send_pending_data() + try: + # Send headers + self.h2_conn.send_headers(stream_id, response_headers, end_stream=end_stream) + stream.send_headers(response_headers, end_stream=end_stream) + self._send_pending_data() - # Send body if present - if body and len(body) > 0: - self.send_data(stream_id, body, end_stream=True) + # Send body if present + if body and len(body) > 0: + self.send_data(stream_id, body, end_stream=True) + return True + except _h2_exceptions.StreamClosedError: + # Stream was reset by client - clean up gracefully + stream.close() + self.cleanup_stream(stream_id) + return False def send_data(self, stream_id, data, end_stream=False): """Send data on a stream. @@ -395,40 +434,98 @@ class HTTP2ServerConnection: data: Body data bytes end_stream: Whether this ends the stream - Raises: - HTTP2Error: If stream not found or in invalid state + Returns: + bool: True if data sent, False if stream was already closed """ + import selectors + stream = self.streams.get(stream_id) if stream is None: - raise HTTP2Error(f"Stream {stream_id} not found") + # Stream was already cleaned up (reset/closed) - return gracefully + return False # Send data in chunks respecting flow control and max frame size data_to_send = data - while data_to_send: - # Get available window size for this stream - available = self.h2_conn.local_flow_control_window(stream_id) - # Also respect max frame size - chunk_size = min(available, self.max_frame_size, len(data_to_send)) - - if chunk_size <= 0: - # No window available, send what we have and wait - self._send_pending_data() - # Try again - the window might open after ACKs + try: + while data_to_send: + # Get available window size for this stream available = self.h2_conn.local_flow_control_window(stream_id) - if available <= 0: - # Still no window, just try to send a small chunk - chunk_size = min(self.max_frame_size, len(data_to_send)) + # Also respect max frame size + chunk_size = min(available, self.max_frame_size, len(data_to_send)) - chunk = data_to_send[:chunk_size] - data_to_send = data_to_send[chunk_size:] + if chunk_size <= 0: + # No window available - must wait for WINDOW_UPDATE + # Per RFC 7540 Section 6.9.2, we must track negative windows + # and wait for WINDOW_UPDATE to make the window positive + self._send_pending_data() - # Only set end_stream on the final chunk - is_final = end_stream and len(data_to_send) == 0 + # Wait for incoming WINDOW_UPDATE by reading from connection + # Use selectors for portable non-blocking wait + max_wait_attempts = 50 # ~5 seconds at 100ms per attempt + try: + sel = selectors.DefaultSelector() + sel.register(self.sock, selectors.EVENT_READ) + except (TypeError, ValueError): + # Socket doesn't support selectors (e.g., mock socket) + # Fall back to returning False immediately + return False - self.h2_conn.send_data(stream_id, chunk, end_stream=is_final) - self._send_pending_data() + try: + for _ in range(max_wait_attempts): + available = self.h2_conn.local_flow_control_window(stream_id) + if available > 0: + break - stream.send_data(data, end_stream=end_stream) + # Check if socket has data ready + ready = sel.select(timeout=0.1) + if ready: + try: + incoming = self.sock.recv(self.READ_BUFFER_SIZE) + if incoming: + events = self.h2_conn.receive_data(incoming) + # Process events but don't create new requests + for event in events: + if isinstance(event, _h2_events.StreamReset): + if event.stream_id == stream_id: + return False + elif isinstance(event, _h2_events.ConnectionTerminated): + self._closed = True + return False + self._send_pending_data() + else: + # Connection closed + self._closed = True + return False + except (OSError, IOError): + return False + except _h2_exceptions.ProtocolError: + return False + finally: + sel.close() + + # Re-check window after waiting + available = self.h2_conn.local_flow_control_window(stream_id) + if available <= 0: + # Still no window after waiting - give up + return False + chunk_size = min(available, self.max_frame_size, len(data_to_send)) + + chunk = data_to_send[:chunk_size] + data_to_send = data_to_send[chunk_size:] + + # Only set end_stream on the final chunk + is_final = end_stream and len(data_to_send) == 0 + + self.h2_conn.send_data(stream_id, chunk, end_stream=is_final) + self._send_pending_data() + + stream.send_data(data, end_stream=end_stream) + return True + except (_h2_exceptions.StreamClosedError, _h2_exceptions.FlowControlError): + # Stream was reset by client or flow control error - clean up gracefully + stream.close() + self.cleanup_stream(stream_id) + return False def send_trailers(self, stream_id, trailers): """Send trailing headers on a stream. @@ -442,14 +539,17 @@ class HTTP2ServerConnection: Raises: HTTP2Error: If stream not found, headers not sent, or pseudo-headers used - """ - from .errors import HTTP2Error + Returns: + bool: True if trailers sent, False if stream was already closed + """ stream = self.streams.get(stream_id) if stream is None: - raise HTTP2Error(f"Stream {stream_id} not found") + # Stream was already cleaned up (reset/closed) - return gracefully + return False if not stream.response_headers_sent: - raise HTTP2Error("Must send headers before trailers") + # Can't send trailers without headers - return False + return False # Validate and normalize trailer headers trailer_headers = [] @@ -459,10 +559,17 @@ class HTTP2ServerConnection: raise HTTP2Error(f"Pseudo-header '{name}' not allowed in trailers") trailer_headers.append((lname, str(value))) - # Send trailers with end_stream=True - self.h2_conn.send_headers(stream_id, trailer_headers, end_stream=True) - stream.send_trailers(trailer_headers) - self._send_pending_data() + try: + # Send trailers with end_stream=True + self.h2_conn.send_headers(stream_id, trailer_headers, end_stream=True) + stream.send_trailers(trailer_headers) + self._send_pending_data() + return True + except _h2_exceptions.StreamClosedError: + # Stream was reset by client - clean up gracefully + stream.close() + self.cleanup_stream(stream_id) + return False def send_error(self, stream_id, status_code, message=None): """Send an error response on a stream. diff --git a/gunicorn/http2/errors.py b/gunicorn/http2/errors.py index b2dadf1b..0f1b86f0 100644 --- a/gunicorn/http2/errors.py +++ b/gunicorn/http2/errors.py @@ -10,6 +10,25 @@ These exceptions map to HTTP/2 error codes defined in RFC 7540. """ +class HTTP2ErrorCode: + """HTTP/2 Error Codes (RFC 7540 Section 7).""" + + NO_ERROR = 0x0 + PROTOCOL_ERROR = 0x1 + INTERNAL_ERROR = 0x2 + FLOW_CONTROL_ERROR = 0x3 + SETTINGS_TIMEOUT = 0x4 + STREAM_CLOSED = 0x5 + FRAME_SIZE_ERROR = 0x6 + REFUSED_STREAM = 0x7 + CANCEL = 0x8 + COMPRESSION_ERROR = 0x9 + CONNECT_ERROR = 0xa + ENHANCE_YOUR_CALM = 0xb + INADEQUATE_SECURITY = 0xc + HTTP_1_1_REQUIRED = 0xd + + class HTTP2Error(Exception): """Base exception for HTTP/2 errors.""" @@ -128,6 +147,7 @@ class HTTP2NotAvailable(HTTP2Error): __all__ = [ + 'HTTP2ErrorCode', 'HTTP2Error', 'HTTP2ProtocolError', 'HTTP2InternalError', diff --git a/tests/test_http2_async_connection.py b/tests/test_http2_async_connection.py index 26961b47..f4ae60a3 100644 --- a/tests/test_http2_async_connection.py +++ b/tests/test_http2_async_connection.py @@ -305,6 +305,7 @@ class TestAsyncHTTP2ConnectionSendResponse: @pytest.mark.asyncio async def test_send_response_invalid_stream(self): + """Test that sending response on invalid stream returns False.""" from gunicorn.http2.async_connection import AsyncHTTP2Connection cfg = MockConfig() @@ -313,8 +314,9 @@ class TestAsyncHTTP2ConnectionSendResponse: conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345)) await conn.initiate_connection() - with pytest.raises(HTTP2Error): - await conn.send_response(stream_id=999, status=200, headers=[], body=None) + # Sending to a non-existent stream should return False gracefully + result = await conn.send_response(stream_id=999, status=200, headers=[], body=None) + assert result is False class TestAsyncHTTP2ConnectionSendData: @@ -726,8 +728,8 @@ class TestAsyncHTTP2ConnectionTrailers: assert "Pseudo-header" in str(exc_info.value) @pytest.mark.asyncio - async def test_send_trailers_without_headers_raises(self): - """Test that sending trailers without headers raises error.""" + async def test_send_trailers_without_headers_returns_false(self): + """Test that sending trailers without headers returns False.""" from gunicorn.http2.async_connection import AsyncHTTP2Connection cfg = MockConfig() @@ -750,7 +752,265 @@ class TestAsyncHTTP2ConnectionTrailers: reader.set_data(client_conn.data_to_send()) await conn.receive_data() - # Try to send trailers without sending headers first - with pytest.raises(HTTP2Error) as exc_info: - await conn.send_trailers(1, [('trailer', 'value')]) - assert "Must send headers before trailers" in str(exc_info.value) + # Try to send trailers without sending headers first - should return False + result = await conn.send_trailers(1, [('trailer', 'value')]) + assert result is False + + +class TestAsyncHTTP2FlowControl: + """Test async HTTP/2 flow control handling.""" + + @pytest.mark.asyncio + async def test_send_data_respects_zero_window(self): + """Test that send_data returns False when flow control window is 0.""" + from gunicorn.http2.async_connection import AsyncHTTP2Connection + + cfg = MockConfig() + reader = MockAsyncReader() + writer = MockAsyncWriter() + conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345)) + + # Create client and send preface + client_conn = create_client_connection() + reader.set_data(client_conn.data_to_send()) + await conn.initiate_connection() + await conn.receive_data() + + # Send a request + client_conn.send_headers(1, [ + (':method', 'GET'), + (':path', '/'), + (':scheme', 'https'), + (':authority', 'localhost'), + ], end_stream=True) + reader.set_data(client_conn.data_to_send()) + await conn.receive_data() + + # Send response headers without ending stream + conn.h2_conn.send_headers(1, [ + (':status', '200'), + ('content-type', 'text/plain'), + ], end_stream=False) + await conn._send_pending_data() + conn.streams[1].send_headers([(':status', '200')], end_stream=False) + + # Mock the flow control window to return 0 + original_window = conn.h2_conn.local_flow_control_window + conn.h2_conn.local_flow_control_window = lambda stream_id: 0 + + # Try to send data - should return False (not raise) + result = await conn.send_data(1, b'Hello, World!') + assert result is False + + # Restore + conn.h2_conn.local_flow_control_window = original_window + + @pytest.mark.asyncio + async def test_send_data_respects_flow_control(self): + """Test that send_data chunks data according to flow control window.""" + from gunicorn.http2.async_connection import AsyncHTTP2Connection + + cfg = MockConfig() + reader = MockAsyncReader() + writer = MockAsyncWriter() + conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345)) + + # Create client and send preface + client_conn = create_client_connection() + reader.set_data(client_conn.data_to_send()) + await conn.initiate_connection() + await conn.receive_data() + + # Send a request + client_conn.send_headers(1, [ + (':method', 'GET'), + (':path', '/'), + (':scheme', 'https'), + (':authority', 'localhost'), + ], end_stream=True) + reader.set_data(client_conn.data_to_send()) + await conn.receive_data() + + # Send response headers without ending stream + conn.h2_conn.send_headers(1, [ + (':status', '200'), + ('content-type', 'text/plain'), + ], end_stream=False) + await conn._send_pending_data() + conn.streams[1].send_headers([(':status', '200')], end_stream=False) + + # Send small data - should succeed within window + small_data = b'Hello' + await conn.send_data(1, small_data, end_stream=True) + + # Verify data was sent + sent_data = writer.get_written_data() + assert len(sent_data) > 0 + + +class TestAsyncHTTP2StreamClosedHandling: + """Test graceful handling of StreamClosedError in async connection.""" + + @pytest.mark.asyncio + async def test_send_response_on_closed_stream(self): + """Test that send_response gracefully handles closed stream.""" + from gunicorn.http2.async_connection import AsyncHTTP2Connection + + cfg = MockConfig() + reader = MockAsyncReader() + writer = MockAsyncWriter() + conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345)) + + # Create client and send preface + client_conn = create_client_connection() + reader.set_data(client_conn.data_to_send()) + await conn.initiate_connection() + await conn.receive_data() + + # Send a request + client_conn.send_headers(1, [ + (':method', 'GET'), + (':path', '/'), + (':scheme', 'https'), + (':authority', 'localhost'), + ], end_stream=True) + reader.set_data(client_conn.data_to_send()) + await conn.receive_data() + + # Simulate client resetting the stream + client_conn.reset_stream(1) + reader.set_data(client_conn.data_to_send()) + await conn.receive_data() + + # Try to send response - should return False, not raise + result = await conn.send_response(1, 200, [('content-type', 'text/plain')], b'Hello') + assert result is False + + @pytest.mark.asyncio + async def test_send_data_on_reset_stream(self): + """Test that send_data gracefully handles reset stream.""" + from gunicorn.http2.async_connection import AsyncHTTP2Connection + + cfg = MockConfig() + reader = MockAsyncReader() + writer = MockAsyncWriter() + conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345)) + + # Create client and send preface + client_conn = create_client_connection() + reader.set_data(client_conn.data_to_send()) + await conn.initiate_connection() + await conn.receive_data() + + # Send a request + client_conn.send_headers(1, [ + (':method', 'GET'), + (':path', '/'), + (':scheme', 'https'), + (':authority', 'localhost'), + ], end_stream=True) + reader.set_data(client_conn.data_to_send()) + await conn.receive_data() + + # Send response headers without ending stream + conn.h2_conn.send_headers(1, [ + (':status', '200'), + ('content-type', 'text/plain'), + ], end_stream=False) + await conn._send_pending_data() + conn.streams[1].send_headers([(':status', '200')], end_stream=False) + + # Simulate client resetting the stream + client_conn.reset_stream(1) + reader.set_data(client_conn.data_to_send()) + await conn.receive_data() + + # Try to send data - should return False, not raise + result = await conn.send_data(1, b'Hello, World!', end_stream=True) + assert result is False + + +class TestAsyncHTTP2WindowOverflowHandling: + """Test window overflow handling in async connection.""" + + @pytest.mark.asyncio + async def test_window_overflow_sends_goaway(self): + """Test that window overflow results in connection close.""" + from gunicorn.http2.async_connection import AsyncHTTP2Connection + from gunicorn.http2.errors import HTTP2ErrorCode + + cfg = MockConfig() + reader = MockAsyncReader() + writer = MockAsyncWriter() + conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345)) + + # Create client and send preface + client_conn = create_client_connection() + reader.set_data(client_conn.data_to_send()) + await conn.initiate_connection() + await conn.receive_data() + + # Mock increment_flow_control_window to raise ValueError (overflow) + def raise_overflow(increment, stream_id=None): + raise ValueError("Flow control window too large") + + conn.h2_conn.increment_flow_control_window = raise_overflow + + # Send a request with data to trigger the overflow + client_conn.send_headers(1, [ + (':method', 'POST'), + (':path', '/'), + (':scheme', 'https'), + (':authority', 'localhost'), + ], end_stream=False) + client_conn.send_data(1, b'test data', end_stream=True) + reader.set_data(client_conn.data_to_send()) + await conn.receive_data() + + # Connection should be closed with FLOW_CONTROL_ERROR + assert conn.is_closed is True + + +class TestAsyncHTTP2ProtocolErrorHandling: + """Test protocol error handling sends proper GOAWAY.""" + + @pytest.mark.asyncio + async def test_protocol_error_sends_goaway(self): + """Test that protocol errors result in GOAWAY being sent.""" + from gunicorn.http2.async_connection import AsyncHTTP2Connection + from gunicorn.http2.errors import HTTP2ProtocolError, HTTP2ErrorCode + + cfg = MockConfig() + reader = MockAsyncReader() + writer = MockAsyncWriter() + conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345)) + + # Create client and send preface + client_conn = create_client_connection() + reader.set_data(client_conn.data_to_send()) + await conn.initiate_connection() + await conn.receive_data() + + # Clear sent data to only capture new frames + writer.clear() + + # Mock h2_conn.receive_data to raise ProtocolError + def raise_protocol_error(data): + raise h2.exceptions.ProtocolError("Test protocol error") + + conn.h2_conn.receive_data = raise_protocol_error + + # Set some dummy data for the reader + reader.set_data(b'dummy data') + + # This should send GOAWAY and raise ProtocolError + with pytest.raises(HTTP2ProtocolError) as exc_info: + await conn.receive_data() + + assert "Test protocol error" in str(exc_info.value) + + # Verify something was sent (GOAWAY frame) + sent_data = writer.get_written_data() + assert len(sent_data) > 0 + # Connection should be marked as closed + assert conn.is_closed is True diff --git a/tests/test_http2_connection.py b/tests/test_http2_connection.py index 529277ec..2785c9b4 100644 --- a/tests/test_http2_connection.py +++ b/tests/test_http2_connection.py @@ -343,6 +343,7 @@ class TestHTTP2ServerConnectionSendResponse: assert len(stream_ended) == 1 def test_send_response_invalid_stream(self): + """Test that sending response on invalid stream returns False.""" from gunicorn.http2.connection import HTTP2ServerConnection cfg = MockConfig() @@ -350,8 +351,9 @@ class TestHTTP2ServerConnectionSendResponse: conn = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345)) conn.initiate_connection() - with pytest.raises(HTTP2Error): - conn.send_response(stream_id=999, status=200, headers=[], body=None) + # Sending to a non-existent stream should return False gracefully + result = conn.send_response(stream_id=999, status=200, headers=[], body=None) + assert result is False class TestHTTP2ServerConnectionSendError: @@ -718,10 +720,9 @@ class TestHTTP2ServerConnectionTrailers: conn.send_trailers(1, [(':status', '200')]) assert "Pseudo-header" in str(exc_info.value) - def test_send_trailers_without_headers_raises(self): - """Test that sending trailers without headers raises error.""" + def test_send_trailers_without_headers_returns_false(self): + """Test that sending trailers without headers returns False.""" from gunicorn.http2.connection import HTTP2ServerConnection - from gunicorn.http2.errors import HTTP2Error cfg = MockConfig() sock = MockSocket() @@ -741,15 +742,13 @@ class TestHTTP2ServerConnectionTrailers: ], end_stream=True) conn.receive_data(client_conn.data_to_send()) - # Try to send trailers without sending headers first - with pytest.raises(HTTP2Error) as exc_info: - conn.send_trailers(1, [('trailer', 'value')]) - assert "Must send headers before trailers" in str(exc_info.value) + # Try to send trailers without sending headers first - should return False + result = conn.send_trailers(1, [('trailer', 'value')]) + assert result is False - def test_send_trailers_nonexistent_stream_raises(self): - """Test that sending trailers on nonexistent stream raises error.""" + def test_send_trailers_nonexistent_stream_returns_false(self): + """Test that sending trailers on nonexistent stream returns False.""" from gunicorn.http2.connection import HTTP2ServerConnection - from gunicorn.http2.errors import HTTP2Error cfg = MockConfig() sock = MockSocket() @@ -759,9 +758,243 @@ class TestHTTP2ServerConnectionTrailers: client_conn = create_client_connection() conn.receive_data(client_conn.data_to_send()) - with pytest.raises(HTTP2Error) as exc_info: - conn.send_trailers(99, [('trailer', 'value')]) - assert "Stream 99 not found" in str(exc_info.value) + # Sending trailers to non-existent stream should return False + result = conn.send_trailers(99, [('trailer', 'value')]) + assert result is False + + +class TestHTTP2FlowControl: + """Test HTTP/2 flow control handling.""" + + def test_send_data_respects_zero_window(self): + """Test that send_data returns False when flow control window is 0.""" + from gunicorn.http2.connection import HTTP2ServerConnection + + cfg = MockConfig() + sock = MockSocket() + conn = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345)) + conn.initiate_connection() + + # Create client and send preface + client_conn = create_client_connection() + conn.receive_data(client_conn.data_to_send()) + + # Send a request + client_conn.send_headers(1, [ + (':method', 'GET'), + (':path', '/'), + (':scheme', 'https'), + (':authority', 'localhost'), + ], end_stream=True) + conn.receive_data(client_conn.data_to_send()) + + # Send response headers without ending stream (pass body=b'' placeholder) + # We need to send headers first, so use h2_conn directly + conn.h2_conn.send_headers(1, [ + (':status', '200'), + ('content-type', 'text/plain'), + ], end_stream=False) + conn._send_pending_data() + conn.streams[1].send_headers([(':status', '200')], end_stream=False) + + # Mock the flow control window to return 0 + original_window = conn.h2_conn.local_flow_control_window + conn.h2_conn.local_flow_control_window = lambda stream_id: 0 + + # Try to send data - should return False (not raise) + result = conn.send_data(1, b'Hello, World!') + assert result is False + + # Restore + conn.h2_conn.local_flow_control_window = original_window + + def test_send_data_respects_flow_control(self): + """Test that send_data chunks data according to flow control window.""" + from gunicorn.http2.connection import HTTP2ServerConnection + + cfg = MockConfig() + sock = MockSocket() + conn = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345)) + conn.initiate_connection() + + # Create client and send preface + client_conn = create_client_connection() + conn.receive_data(client_conn.data_to_send()) + + # Send a request + client_conn.send_headers(1, [ + (':method', 'GET'), + (':path', '/'), + (':scheme', 'https'), + (':authority', 'localhost'), + ], end_stream=True) + conn.receive_data(client_conn.data_to_send()) + + # Send response headers without ending stream + conn.h2_conn.send_headers(1, [ + (':status', '200'), + ('content-type', 'text/plain'), + ], end_stream=False) + conn._send_pending_data() + conn.streams[1].send_headers([(':status', '200')], end_stream=False) + + # Send small data - should succeed within window + small_data = b'Hello' + conn.send_data(1, small_data, end_stream=True) + + # Verify data was sent + sent_data = sock.get_sent_data() + assert len(sent_data) > 0 + + +class TestHTTP2StreamClosedHandling: + """Test graceful handling of StreamClosedError.""" + + def test_send_response_on_closed_stream(self): + """Test that send_response gracefully handles closed stream.""" + from gunicorn.http2.connection import HTTP2ServerConnection + + cfg = MockConfig() + sock = MockSocket() + conn = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345)) + conn.initiate_connection() + + # Create client and send preface + client_conn = create_client_connection() + conn.receive_data(client_conn.data_to_send()) + + # Send a request + client_conn.send_headers(1, [ + (':method', 'GET'), + (':path', '/'), + (':scheme', 'https'), + (':authority', 'localhost'), + ], end_stream=True) + conn.receive_data(client_conn.data_to_send()) + + # Simulate client resetting the stream + client_conn.reset_stream(1) + conn.receive_data(client_conn.data_to_send()) + + # Try to send response - should return False, not raise + result = conn.send_response(1, 200, [('content-type', 'text/plain')], b'Hello') + assert result is False + + def test_send_data_on_reset_stream(self): + """Test that send_data gracefully handles reset stream.""" + from gunicorn.http2.connection import HTTP2ServerConnection + + cfg = MockConfig() + sock = MockSocket() + conn = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345)) + conn.initiate_connection() + + # Create client and send preface + client_conn = create_client_connection() + conn.receive_data(client_conn.data_to_send()) + + # Send a request + client_conn.send_headers(1, [ + (':method', 'GET'), + (':path', '/'), + (':scheme', 'https'), + (':authority', 'localhost'), + ], end_stream=True) + conn.receive_data(client_conn.data_to_send()) + + # Send response headers without ending stream + conn.h2_conn.send_headers(1, [ + (':status', '200'), + ('content-type', 'text/plain'), + ], end_stream=False) + conn._send_pending_data() + conn.streams[1].send_headers([(':status', '200')], end_stream=False) + + # Simulate client resetting the stream + client_conn.reset_stream(1) + conn.receive_data(client_conn.data_to_send()) + + # Try to send data - should return False, not raise + result = conn.send_data(1, b'Hello, World!', end_stream=True) + assert result is False + + +class TestHTTP2WindowOverflowHandling: + """Test window overflow handling.""" + + def test_window_overflow_sends_goaway(self): + """Test that window overflow results in GOAWAY with FLOW_CONTROL_ERROR.""" + from gunicorn.http2.connection import HTTP2ServerConnection + from gunicorn.http2.errors import HTTP2ErrorCode + + cfg = MockConfig() + sock = MockSocket() + conn = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345)) + conn.initiate_connection() + + # Create client and send preface + client_conn = create_client_connection() + conn.receive_data(client_conn.data_to_send()) + + # Mock increment_flow_control_window to raise ValueError (overflow) + original_increment = conn.h2_conn.increment_flow_control_window + + def raise_overflow(increment, stream_id=None): + raise ValueError("Flow control window too large") + + conn.h2_conn.increment_flow_control_window = raise_overflow + + # Send a request with data to trigger the overflow + client_conn.send_headers(1, [ + (':method', 'POST'), + (':path', '/'), + (':scheme', 'https'), + (':authority', 'localhost'), + ], end_stream=False) + client_conn.send_data(1, b'test data', end_stream=True) + conn.receive_data(client_conn.data_to_send()) + + # Connection should be closed with FLOW_CONTROL_ERROR + assert conn.is_closed is True + + +class TestHTTP2ProtocolErrorHandling: + """Test protocol error handling sends proper GOAWAY.""" + + def test_protocol_error_sends_goaway(self): + """Test that protocol errors result in GOAWAY being sent.""" + from gunicorn.http2.connection import HTTP2ServerConnection + from gunicorn.http2.errors import HTTP2ProtocolError, HTTP2ErrorCode + + cfg = MockConfig() + sock = MockSocket() + conn = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345)) + conn.initiate_connection() + + # Create client and send preface + client_conn = create_client_connection() + conn.receive_data(client_conn.data_to_send()) + + # Clear sent data to only capture new frames + sock._sent.clear() + + # Mock h2_conn.receive_data to raise ProtocolError + def raise_protocol_error(data): + raise h2.exceptions.ProtocolError("Test protocol error") + + conn.h2_conn.receive_data = raise_protocol_error + + # This should send GOAWAY and raise ProtocolError + with pytest.raises(HTTP2ProtocolError) as exc_info: + conn.receive_data(b'dummy data') + + assert "Test protocol error" in str(exc_info.value) + + # Verify something was sent (GOAWAY frame) + sent_data = sock.get_sent_data() + assert len(sent_data) > 0 + # Connection should be marked as closed + assert conn.is_closed is True class TestHTTP2NotAvailable: