mirror of
https://github.com/frappe/gunicorn.git
synced 2026-07-02 18:51:31 +08:00
Merge pull request #3568 from benleembruggen/fix/h2-stream-ended-body-complete
fix: HTTP/2 ASGI body duplication in async_connection.py
This commit is contained in:
commit
4e9db71aeb
@ -278,7 +278,14 @@ class AsyncHTTP2Connection:
|
|||||||
if stream is None:
|
if stream is None:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
# Mark stream as request complete and body complete so the
|
||||||
|
# receive() closure's _body_complete guard fires, preventing
|
||||||
|
# the fast path from re-reading already-consumed data from BytesIO.
|
||||||
stream.request_complete = True
|
stream.request_complete = True
|
||||||
|
stream._body_complete = True
|
||||||
|
if stream._body_event:
|
||||||
|
stream._body_event.set()
|
||||||
|
|
||||||
return HTTP2Request(stream, self.cfg, self.client_addr)
|
return HTTP2Request(stream, self.cfg, self.client_addr)
|
||||||
|
|
||||||
def _handle_stream_reset(self, event):
|
def _handle_stream_reset(self, event):
|
||||||
|
|||||||
@ -289,8 +289,13 @@ class HTTP2ServerConnection:
|
|||||||
if stream is None:
|
if stream is None:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# Mark stream as request complete
|
# Mark stream as request complete and body complete so the
|
||||||
|
# receive() closure's _body_complete guard fires, preventing
|
||||||
|
# the fast path from re-reading already-consumed data from BytesIO.
|
||||||
stream.request_complete = True
|
stream.request_complete = True
|
||||||
|
stream._body_complete = True
|
||||||
|
if stream._body_event:
|
||||||
|
stream._body_event.set()
|
||||||
|
|
||||||
# Create request object
|
# Create request object
|
||||||
return HTTP2Request(stream, self.cfg, self.client_addr)
|
return HTTP2Request(stream, self.cfg, self.client_addr)
|
||||||
|
|||||||
@ -997,6 +997,193 @@ class TestHTTP2ProtocolErrorHandling:
|
|||||||
assert conn.is_closed is True
|
assert conn.is_closed is True
|
||||||
|
|
||||||
|
|
||||||
|
class TestHTTP2StreamEndedBodyComplete:
|
||||||
|
"""Test that _handle_stream_ended sets _body_complete on the stream."""
|
||||||
|
|
||||||
|
def test_stream_ended_sets_body_complete(self):
|
||||||
|
"""_handle_stream_ended must set stream._body_complete = True."""
|
||||||
|
from gunicorn.http2.connection import HTTP2ServerConnection
|
||||||
|
|
||||||
|
cfg = MockConfig()
|
||||||
|
sock = MockSocket()
|
||||||
|
conn = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
|
||||||
|
conn.initiate_connection()
|
||||||
|
|
||||||
|
client = create_client_connection()
|
||||||
|
client_preface = client.data_to_send()
|
||||||
|
conn.receive_data(client_preface)
|
||||||
|
|
||||||
|
server_data = sock.get_sent_data()
|
||||||
|
if server_data:
|
||||||
|
client.receive_data(server_data)
|
||||||
|
|
||||||
|
# Client sends POST with body (separate HEADERS and DATA frames)
|
||||||
|
client.send_headers(
|
||||||
|
stream_id=1,
|
||||||
|
headers=[
|
||||||
|
(':method', 'POST'),
|
||||||
|
(':path', '/test'),
|
||||||
|
(':scheme', 'https'),
|
||||||
|
(':authority', 'localhost'),
|
||||||
|
('content-type', 'application/json'),
|
||||||
|
],
|
||||||
|
end_stream=False,
|
||||||
|
)
|
||||||
|
client.send_data(stream_id=1, data=b'{"input": "test"}', end_stream=True)
|
||||||
|
request_data = client.data_to_send()
|
||||||
|
|
||||||
|
requests = conn.receive_data(request_data)
|
||||||
|
|
||||||
|
assert len(requests) == 1
|
||||||
|
stream = conn.streams.get(1)
|
||||||
|
assert stream is not None
|
||||||
|
assert stream._body_complete is True
|
||||||
|
assert stream.request_complete is True
|
||||||
|
|
||||||
|
def test_stream_ended_signals_body_event(self):
|
||||||
|
"""_handle_stream_ended must signal _body_event if it exists."""
|
||||||
|
import asyncio
|
||||||
|
from gunicorn.http2.connection import HTTP2ServerConnection
|
||||||
|
|
||||||
|
cfg = MockConfig()
|
||||||
|
sock = MockSocket()
|
||||||
|
conn = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
|
||||||
|
conn.initiate_connection()
|
||||||
|
|
||||||
|
client = create_client_connection()
|
||||||
|
client_preface = client.data_to_send()
|
||||||
|
conn.receive_data(client_preface)
|
||||||
|
|
||||||
|
server_data = sock.get_sent_data()
|
||||||
|
if server_data:
|
||||||
|
client.receive_data(server_data)
|
||||||
|
|
||||||
|
# Client sends headers without end_stream to create the stream
|
||||||
|
client.send_headers(
|
||||||
|
stream_id=1,
|
||||||
|
headers=[
|
||||||
|
(':method', 'POST'),
|
||||||
|
(':path', '/test'),
|
||||||
|
(':scheme', 'https'),
|
||||||
|
(':authority', 'localhost'),
|
||||||
|
],
|
||||||
|
end_stream=False,
|
||||||
|
)
|
||||||
|
headers_data = client.data_to_send()
|
||||||
|
conn.receive_data(headers_data)
|
||||||
|
|
||||||
|
# Manually set _body_event on the stream (simulates read_body_chunk
|
||||||
|
# having been called, which lazy-inits the event)
|
||||||
|
stream = conn.streams.get(1)
|
||||||
|
assert stream is not None
|
||||||
|
stream._body_event = asyncio.Event()
|
||||||
|
|
||||||
|
# Now send data + end_stream
|
||||||
|
client.send_data(stream_id=1, data=b'body', end_stream=True)
|
||||||
|
request_data = client.data_to_send()
|
||||||
|
conn.receive_data(request_data)
|
||||||
|
|
||||||
|
assert stream._body_event.is_set()
|
||||||
|
|
||||||
|
def test_stream_ended_without_body_event_does_not_raise(self):
|
||||||
|
"""_handle_stream_ended must not raise when _body_event is None."""
|
||||||
|
from gunicorn.http2.connection import HTTP2ServerConnection
|
||||||
|
|
||||||
|
cfg = MockConfig()
|
||||||
|
sock = MockSocket()
|
||||||
|
conn = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
|
||||||
|
conn.initiate_connection()
|
||||||
|
|
||||||
|
client = create_client_connection()
|
||||||
|
client_preface = client.data_to_send()
|
||||||
|
conn.receive_data(client_preface)
|
||||||
|
|
||||||
|
server_data = sock.get_sent_data()
|
||||||
|
if server_data:
|
||||||
|
client.receive_data(server_data)
|
||||||
|
|
||||||
|
# Send GET with end_stream (no body, _body_event never initialised)
|
||||||
|
client.send_headers(
|
||||||
|
stream_id=1,
|
||||||
|
headers=[
|
||||||
|
(':method', 'GET'),
|
||||||
|
(':path', '/test'),
|
||||||
|
(':scheme', 'https'),
|
||||||
|
(':authority', 'localhost'),
|
||||||
|
],
|
||||||
|
end_stream=True,
|
||||||
|
)
|
||||||
|
request_data = client.data_to_send()
|
||||||
|
|
||||||
|
# Should not raise even though _body_event is None
|
||||||
|
requests = conn.receive_data(request_data)
|
||||||
|
assert len(requests) == 1
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_h2_post_body_not_duplicated(self):
|
||||||
|
"""Full flow: streaming read must not re-read body from BytesIO.
|
||||||
|
|
||||||
|
Simulates what the receive() closure in protocol.py does:
|
||||||
|
1. read_body_chunk() returns the body
|
||||||
|
2. read_body_chunk() returns None (body complete)
|
||||||
|
3. Total bytes received == original body length (not doubled)
|
||||||
|
"""
|
||||||
|
from gunicorn.http2.connection import HTTP2ServerConnection
|
||||||
|
|
||||||
|
cfg = MockConfig()
|
||||||
|
sock = MockSocket()
|
||||||
|
conn = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
|
||||||
|
conn.initiate_connection()
|
||||||
|
|
||||||
|
client = create_client_connection()
|
||||||
|
client_preface = client.data_to_send()
|
||||||
|
conn.receive_data(client_preface)
|
||||||
|
|
||||||
|
server_data = sock.get_sent_data()
|
||||||
|
if server_data:
|
||||||
|
client.receive_data(server_data)
|
||||||
|
|
||||||
|
body = b'{"input": ["hello world"]}'
|
||||||
|
client.send_headers(
|
||||||
|
stream_id=1,
|
||||||
|
headers=[
|
||||||
|
(':method', 'POST'),
|
||||||
|
(':path', '/embeddings'),
|
||||||
|
(':scheme', 'https'),
|
||||||
|
(':authority', 'localhost'),
|
||||||
|
('content-type', 'application/json'),
|
||||||
|
('content-length', str(len(body))),
|
||||||
|
],
|
||||||
|
end_stream=False,
|
||||||
|
)
|
||||||
|
client.send_data(stream_id=1, data=body, end_stream=True)
|
||||||
|
request_data = client.data_to_send()
|
||||||
|
|
||||||
|
requests = conn.receive_data(request_data)
|
||||||
|
assert len(requests) == 1
|
||||||
|
|
||||||
|
stream = conn.streams.get(1)
|
||||||
|
|
||||||
|
# Simulate what receive() does: read chunks via read_body_chunk()
|
||||||
|
received = bytearray()
|
||||||
|
while True:
|
||||||
|
chunk = await stream.read_body_chunk()
|
||||||
|
if chunk is None:
|
||||||
|
break
|
||||||
|
received.extend(chunk)
|
||||||
|
|
||||||
|
# The critical assertion: body must not be duplicated
|
||||||
|
assert bytes(received) == body
|
||||||
|
assert len(received) == len(body)
|
||||||
|
|
||||||
|
# _body_complete must be True so receive() knows to stop
|
||||||
|
assert stream._body_complete is True
|
||||||
|
|
||||||
|
# BytesIO must still have the data (for get_request_body compatibility)
|
||||||
|
# but read_body_chunk returning None prevents the fast path in receive()
|
||||||
|
# from ever being reached because body_received gets set to True
|
||||||
|
|
||||||
|
|
||||||
class TestHTTP2NotAvailable:
|
class TestHTTP2NotAvailable:
|
||||||
"""Test behavior when h2 is not available."""
|
"""Test behavior when h2 is not available."""
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user