Merge pull request #3617 from benoitc/fix/asgi-bodyreceiver-closed-semantic-split

refactor: split BodyReceiver._closed into transport vs body-wait
This commit is contained in:
Benoit Chesneau 2026-05-03 20:53:20 +02:00 committed by GitHub
commit 8d9d9030ff
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 55 additions and 20 deletions

View File

@ -166,8 +166,8 @@ class BodyReceiver:
Uses Future-based waiting for efficient async receive().
"""
__slots__ = ('_chunks', '_complete', '_body_finished', '_closed', '_waiter',
'request', 'protocol')
__slots__ = ('_chunks', '_complete', '_body_finished', '_closed',
'_body_wait_expired', '_waiter', 'request', 'protocol')
def __init__(self, request, protocol):
self.request = request
@ -175,7 +175,13 @@ class BodyReceiver:
self._chunks = []
self._complete = False
self._body_finished = False # True after returning more_body=False
# _closed means the client transport has gone away (signal_disconnect
# was called or the protocol detected a disconnect). _body_wait_expired
# means the body did not finish framing within the configured timeout
# but the transport itself may still be open. Both surface as
# http.disconnect to the app, but they are distinct conditions.
self._closed = False
self._body_wait_expired = False
self._waiter = None
def feed(self, chunk):
@ -190,10 +196,15 @@ class BodyReceiver:
self._wake_waiter()
def signal_disconnect(self):
"""Signal that connection has been lost."""
"""Signal that the client transport has gone away."""
self._closed = True
self._wake_waiter()
@property
def _disconnected(self):
"""True when the receiver should yield http.disconnect to the app."""
return self._closed or self._body_wait_expired
def _wake_waiter(self):
"""Wake up any pending receive() call."""
if self._waiter is not None and not self._waiter.done():
@ -201,8 +212,8 @@ class BodyReceiver:
async def receive(self): # pylint: disable=too-many-return-statements
"""ASGI receive callable - returns body chunks or disconnect."""
# Already disconnected
if self._closed:
# Already disconnected (transport closed or body wait timed out)
if self._disconnected:
return {"type": "http.disconnect"}
# Body finished but not disconnected - wait for actual disconnect
@ -248,7 +259,7 @@ class BodyReceiver:
def _build_receive_result(self):
"""Build receive result after waiting for data."""
if self._closed:
if self._disconnected:
return {"type": "http.disconnect"}
if self._chunks:
@ -259,14 +270,14 @@ class BodyReceiver:
return {"type": "http.request", "body": b"", "more_body": False}
# Wait returned without data and the message was not framed complete:
# treat as a client disconnect rather than synthesizing end-of-body
# treat as a body-wait expiry rather than synthesizing end-of-body
# (which would desync the next pipelined request).
self._closed = True
self._body_wait_expired = True
return {"type": "http.disconnect"}
async def _wait_for_data(self):
"""Wait for body data to arrive via callback."""
if self._chunks or self._complete or self._closed:
if self._chunks or self._complete or self._disconnected:
return
# Create a new waiter
@ -284,10 +295,12 @@ class BodyReceiver:
try:
await asyncio.wait_for(self._waiter, timeout=timeout)
except asyncio.TimeoutError:
# No data arrived in time: mark the body receiver as disconnected
# so receive() yields http.disconnect rather than a fake terminal
# http.request with more_body=False.
self._closed = True
# No data arrived in time: mark body-wait as expired so receive()
# yields http.disconnect rather than a fake terminal http.request
# with more_body=False. The transport itself may still be alive;
# _closed stays False so any code keying on transport-disconnect
# only is unaffected.
self._body_wait_expired = True
finally:
self._waiter = None

View File

@ -225,7 +225,10 @@ class TestBodyReceiverIncompleteBody:
"""When _wait_for_data times out and the body is not complete, the
receiver MUST yield http.disconnect rather than synthesize a terminal
http.request with more_body=False that would desync the next
pipelined request."""
pipelined request.
Body-wait expiry sets _body_wait_expired, NOT _closed: the transport
may still be alive; the body just never finished framing."""
from gunicorn.asgi.protocol import ASGIProtocol, BodyReceiver
protocol = ASGIProtocol(mock_worker)
@ -240,7 +243,9 @@ class TestBodyReceiverIncompleteBody:
msg = await receiver.receive()
assert msg == {"type": "http.disconnect"}
assert receiver._closed is True
assert receiver._body_wait_expired is True
assert receiver._closed is False
assert receiver._disconnected is True
@pytest.mark.asyncio
async def test_receive_yields_terminal_request_when_complete(self, mock_worker):
@ -267,13 +272,30 @@ class TestBodyReceiverIncompleteBody:
# more_body may be False since the body is complete
assert msg["more_body"] is False
def test_signal_disconnect_sets_closed_only(self, mock_worker):
"""signal_disconnect is the transport-disconnect path; it must set
_closed without touching _body_wait_expired so the two conditions
remain distinguishable for any code that needs to differentiate."""
from gunicorn.asgi.protocol import ASGIProtocol, BodyReceiver
protocol = ASGIProtocol(mock_worker)
protocol.reader = mock.Mock()
request = mock.Mock()
request.content_length = 0
request.chunked = False
receiver = BodyReceiver(request, protocol)
receiver.signal_disconnect()
assert receiver._closed is True
assert receiver._body_wait_expired is False
assert receiver._disconnected is True
def test_keepalive_gate_refuses_after_receive_timeout(self, mock_worker):
"""The keepalive completion check must NOT treat a receive-timeout
as a framed-complete message: residual body bytes on the wire would
be misparsed as the next pipelined request (smuggling).
BodyReceiver._closed is overloaded across transport-disconnect and
receive-timeout, so the gate keys on _complete only.
be misparsed as the next pipelined request (smuggling). The gate
keys on _complete only.
"""
from gunicorn.asgi.protocol import ASGIProtocol, BodyReceiver
@ -285,7 +307,7 @@ class TestBodyReceiverIncompleteBody:
request.chunked = False
receiver = BodyReceiver(request, protocol)
receiver._closed = True # simulate _wait_for_data timeout
receiver._body_wait_expired = True # simulate _wait_for_data timeout
receiver._complete = False # body never finished framing
# The gate inlined in _handle_connection: refuse keepalive when