diff --git a/gunicorn/http2/async_connection.py b/gunicorn/http2/async_connection.py index e2d67988..b738e286 100644 --- a/gunicorn/http2/async_connection.py +++ b/gunicorn/http2/async_connection.py @@ -278,7 +278,14 @@ class AsyncHTTP2Connection: if stream is None: return None + # Mark stream as request complete and body complete so the + # receive() closure's _body_complete guard fires, preventing + # the fast path from re-reading already-consumed data from BytesIO. stream.request_complete = True + stream._body_complete = True + if stream._body_event: + stream._body_event.set() + return HTTP2Request(stream, self.cfg, self.client_addr) def _handle_stream_reset(self, event): diff --git a/gunicorn/http2/connection.py b/gunicorn/http2/connection.py index b332b2f1..a37fb8cb 100644 --- a/gunicorn/http2/connection.py +++ b/gunicorn/http2/connection.py @@ -289,8 +289,13 @@ class HTTP2ServerConnection: if stream is None: return None - # Mark stream as request complete + # Mark stream as request complete and body complete so the + # receive() closure's _body_complete guard fires, preventing + # the fast path from re-reading already-consumed data from BytesIO. stream.request_complete = True + stream._body_complete = True + if stream._body_event: + stream._body_event.set() # Create request object return HTTP2Request(stream, self.cfg, self.client_addr) diff --git a/tests/test_http2_connection.py b/tests/test_http2_connection.py index 2785c9b4..963cc0f2 100644 --- a/tests/test_http2_connection.py +++ b/tests/test_http2_connection.py @@ -997,6 +997,193 @@ class TestHTTP2ProtocolErrorHandling: assert conn.is_closed is True +class TestHTTP2StreamEndedBodyComplete: + """Test that _handle_stream_ended sets _body_complete on the stream.""" + + def test_stream_ended_sets_body_complete(self): + """_handle_stream_ended must set stream._body_complete = True.""" + from gunicorn.http2.connection import HTTP2ServerConnection + + cfg = MockConfig() + sock = MockSocket() + conn = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345)) + conn.initiate_connection() + + client = create_client_connection() + client_preface = client.data_to_send() + conn.receive_data(client_preface) + + server_data = sock.get_sent_data() + if server_data: + client.receive_data(server_data) + + # Client sends POST with body (separate HEADERS and DATA frames) + client.send_headers( + stream_id=1, + headers=[ + (':method', 'POST'), + (':path', '/test'), + (':scheme', 'https'), + (':authority', 'localhost'), + ('content-type', 'application/json'), + ], + end_stream=False, + ) + client.send_data(stream_id=1, data=b'{"input": "test"}', end_stream=True) + request_data = client.data_to_send() + + requests = conn.receive_data(request_data) + + assert len(requests) == 1 + stream = conn.streams.get(1) + assert stream is not None + assert stream._body_complete is True + assert stream.request_complete is True + + def test_stream_ended_signals_body_event(self): + """_handle_stream_ended must signal _body_event if it exists.""" + import asyncio + from gunicorn.http2.connection import HTTP2ServerConnection + + cfg = MockConfig() + sock = MockSocket() + conn = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345)) + conn.initiate_connection() + + client = create_client_connection() + client_preface = client.data_to_send() + conn.receive_data(client_preface) + + server_data = sock.get_sent_data() + if server_data: + client.receive_data(server_data) + + # Client sends headers without end_stream to create the stream + client.send_headers( + stream_id=1, + headers=[ + (':method', 'POST'), + (':path', '/test'), + (':scheme', 'https'), + (':authority', 'localhost'), + ], + end_stream=False, + ) + headers_data = client.data_to_send() + conn.receive_data(headers_data) + + # Manually set _body_event on the stream (simulates read_body_chunk + # having been called, which lazy-inits the event) + stream = conn.streams.get(1) + assert stream is not None + stream._body_event = asyncio.Event() + + # Now send data + end_stream + client.send_data(stream_id=1, data=b'body', end_stream=True) + request_data = client.data_to_send() + conn.receive_data(request_data) + + assert stream._body_event.is_set() + + def test_stream_ended_without_body_event_does_not_raise(self): + """_handle_stream_ended must not raise when _body_event is None.""" + from gunicorn.http2.connection import HTTP2ServerConnection + + cfg = MockConfig() + sock = MockSocket() + conn = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345)) + conn.initiate_connection() + + client = create_client_connection() + client_preface = client.data_to_send() + conn.receive_data(client_preface) + + server_data = sock.get_sent_data() + if server_data: + client.receive_data(server_data) + + # Send GET with end_stream (no body, _body_event never initialised) + client.send_headers( + stream_id=1, + headers=[ + (':method', 'GET'), + (':path', '/test'), + (':scheme', 'https'), + (':authority', 'localhost'), + ], + end_stream=True, + ) + request_data = client.data_to_send() + + # Should not raise even though _body_event is None + requests = conn.receive_data(request_data) + assert len(requests) == 1 + + @pytest.mark.asyncio + async def test_h2_post_body_not_duplicated(self): + """Full flow: streaming read must not re-read body from BytesIO. + + Simulates what the receive() closure in protocol.py does: + 1. read_body_chunk() returns the body + 2. read_body_chunk() returns None (body complete) + 3. Total bytes received == original body length (not doubled) + """ + from gunicorn.http2.connection import HTTP2ServerConnection + + cfg = MockConfig() + sock = MockSocket() + conn = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345)) + conn.initiate_connection() + + client = create_client_connection() + client_preface = client.data_to_send() + conn.receive_data(client_preface) + + server_data = sock.get_sent_data() + if server_data: + client.receive_data(server_data) + + body = b'{"input": ["hello world"]}' + client.send_headers( + stream_id=1, + headers=[ + (':method', 'POST'), + (':path', '/embeddings'), + (':scheme', 'https'), + (':authority', 'localhost'), + ('content-type', 'application/json'), + ('content-length', str(len(body))), + ], + end_stream=False, + ) + client.send_data(stream_id=1, data=body, end_stream=True) + request_data = client.data_to_send() + + requests = conn.receive_data(request_data) + assert len(requests) == 1 + + stream = conn.streams.get(1) + + # Simulate what receive() does: read chunks via read_body_chunk() + received = bytearray() + while True: + chunk = await stream.read_body_chunk() + if chunk is None: + break + received.extend(chunk) + + # The critical assertion: body must not be duplicated + assert bytes(received) == body + assert len(received) == len(body) + + # _body_complete must be True so receive() knows to stop + assert stream._body_complete is True + + # BytesIO must still have the data (for get_request_body compatibility) + # but read_body_chunk returning None prevents the fast path in receive() + # from ever being reached because body_received gets set to True + + class TestHTTP2NotAvailable: """Test behavior when h2 is not available."""