From 7953c2585b6b8d00f3e8a8517786211a84afa560 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Thu, 2 Apr 2026 23:55:27 +0200 Subject: [PATCH] Fix ASGI disconnect handling for Django-style apps BodyReceiver.receive() now blocks after body is finished until actual disconnect, instead of returning http.disconnect immediately. This fixes Django's listen_for_disconnect task thinking client disconnected early. Adds regression tests for the fix. Fixes #3484 --- gunicorn/asgi/protocol.py | 50 ++++++- tests/test_asgi_compliance.py | 266 ++++++++++++++++++++++++++++++++++ 2 files changed, 310 insertions(+), 6 deletions(-) diff --git a/gunicorn/asgi/protocol.py b/gunicorn/asgi/protocol.py index 7722018c..c8715a13 100644 --- a/gunicorn/asgi/protocol.py +++ b/gunicorn/asgi/protocol.py @@ -201,8 +201,14 @@ class BodyReceiver: async def receive(self): # pylint: disable=too-many-return-statements """ASGI receive callable - returns body chunks or disconnect.""" - # Already disconnected or body finished - if self._closed or self._body_finished: + # Already disconnected + if self._closed: + return {"type": "http.disconnect"} + + # Body finished but not disconnected - wait for actual disconnect + # This is needed for frameworks like Django that listen for disconnect + if self._body_finished: + await self._wait_for_disconnect() return {"type": "http.disconnect"} # Fast path: chunk already available @@ -270,6 +276,33 @@ class BodyReceiver: finally: self._waiter = None + async def _wait_for_disconnect(self): + """Wait for connection to close after body is finished. + + This is needed for ASGI apps (like Django) that call receive() + to listen for client disconnect after the request body is consumed. + """ + if self._closed: + return + + # Check protocol closed state first + if self.protocol._closed: + self._closed = True + return + + # Create a new waiter to wait for disconnect + loop = asyncio.get_event_loop() + self._waiter = loop.create_future() + + try: + # Wait indefinitely for disconnect (or until cancelled) + await self._waiter + except asyncio.CancelledError: + pass + finally: + self._waiter = None + self._closed = True + class ASGIProtocol(asyncio.Protocol): """HTTP/1.1 protocol handler for ASGI applications. @@ -942,10 +975,15 @@ class ASGIProtocol(asyncio.Protocol): self.log.debug("Request cancelled (client disconnected)") return False except Exception: - self.log.exception("Error in ASGI application") - if not response_started: - self._send_error_response(500, "Internal Server Error") - response_status = 500 + # If response was already completely sent, this is likely a + # disconnect-related exception (e.g. Django's RequestAborted) + if response_complete: + self.log.debug("Exception after response complete (client disconnected)") + else: + self.log.exception("Error in ASGI application") + if not response_started: + self._send_error_response(500, "Internal Server Error") + response_status = 500 return False finally: # Clear the body receiver reference diff --git a/tests/test_asgi_compliance.py b/tests/test_asgi_compliance.py index 5e97ad0e..3290fe43 100644 --- a/tests/test_asgi_compliance.py +++ b/tests/test_asgi_compliance.py @@ -816,6 +816,7 @@ class TestHTTPDisconnectEvent: When body is complete and disconnect is signaled, receive() should return {"type": "http.disconnect"}. """ + import asyncio from gunicorn.asgi.protocol import BodyReceiver protocol = self._create_protocol() @@ -833,9 +834,274 @@ class TestHTTPDisconnectEvent: assert msg1["type"] == "http.request" assert msg1["more_body"] is False + # Signal disconnect (simulating connection_lost) + # After the fix, receive() waits for actual disconnect signal + body_receiver.signal_disconnect() + # Now receive should return disconnect msg2 = await body_receiver.receive() # Per ASGI spec, disconnect message only has 'type' assert msg2 == {"type": "http.disconnect"} assert len(msg2) == 1 + + +# ============================================================================ +# BodyReceiver Disconnect Regression Tests +# https://github.com/benoitc/gunicorn/issues/3484 +# ============================================================================ + +class TestBodyReceiverDisconnect: + """Regression tests for BodyReceiver._wait_for_disconnect() behavior. + + The original bug: BodyReceiver.receive() immediately returned + `http.disconnect` when `_body_finished` was True, but Django (and other + ASGI frameworks) call `receive()` to listen for client disconnect AFTER + the response is sent. This caused Django's `listen_for_disconnect` task + to think the client disconnected before the response could be sent. + + The fix: After body is finished, receive() now calls _wait_for_disconnect() + which blocks until signal_disconnect() is called or the waiter is cancelled. + """ + + def _create_protocol(self): + """Create an ASGIProtocol instance for testing.""" + from gunicorn.asgi.protocol import ASGIProtocol + + worker = mock.Mock() + worker.cfg = Config() + worker.log = mock.Mock() + worker.asgi = mock.Mock() + worker.nr_conns = 1 + worker.loop = mock.Mock() + + protocol = ASGIProtocol(worker) + protocol._closed = False + return protocol + + @pytest.mark.asyncio + async def test_body_receiver_waits_for_disconnect_after_body_finished(self): + """Test that receive() blocks after body is finished until disconnect is signaled. + + This tests the core regression fix: after body is complete, calling receive() + should NOT immediately return http.disconnect. It should block until the + connection actually closes. + """ + from gunicorn.asgi.protocol import BodyReceiver + + protocol = self._create_protocol() + + # Create a request with no body (body finishes immediately) + mock_request = mock.Mock() + mock_request.content_length = 0 + mock_request.chunked = False + + body_receiver = BodyReceiver(mock_request, protocol) + + # Get the initial body message (empty body, more_body=False) + msg1 = await body_receiver.receive() + assert msg1["type"] == "http.request" + assert msg1["body"] == b"" + assert msg1["more_body"] is False + + # At this point, _body_finished is True + assert body_receiver._body_finished is True + assert body_receiver._closed is False + + # Now calling receive() should block, not return immediately + # We test this by starting receive() as a task and verifying it doesn't complete + import asyncio + + receive_task = asyncio.create_task(body_receiver.receive()) + + # Give the task a moment to start + await asyncio.sleep(0.01) + + # Task should NOT be done yet (it's waiting for disconnect) + assert not receive_task.done() + + # Now signal disconnect + body_receiver.signal_disconnect() + + # Task should complete now + msg2 = await asyncio.wait_for(receive_task, timeout=1.0) + assert msg2 == {"type": "http.disconnect"} + + @pytest.mark.asyncio + async def test_body_receiver_immediate_disconnect_if_already_closed(self): + """Test that receive() immediately returns http.disconnect if already closed. + + If signal_disconnect() has already been called before receive(), + it should return http.disconnect immediately without blocking. + """ + from gunicorn.asgi.protocol import BodyReceiver + + protocol = self._create_protocol() + + # Create a request with body + mock_request = mock.Mock() + mock_request.content_length = 100 + mock_request.chunked = False + + body_receiver = BodyReceiver(mock_request, protocol) + + # Signal disconnect BEFORE calling receive + body_receiver.signal_disconnect() + assert body_receiver._closed is True + + # receive() should return disconnect immediately + msg = await body_receiver.receive() + assert msg == {"type": "http.disconnect"} + + @pytest.mark.asyncio + async def test_body_receiver_respects_protocol_closed_state(self): + """Test that receive() checks protocol._closed state. + + If the protocol is closed but signal_disconnect wasn't called, + receive() should still detect the disconnect. + """ + from gunicorn.asgi.protocol import BodyReceiver + + protocol = self._create_protocol() + + mock_request = mock.Mock() + mock_request.content_length = 0 + mock_request.chunked = False + + body_receiver = BodyReceiver(mock_request, protocol) + + # Consume the body first + msg1 = await body_receiver.receive() + assert msg1["type"] == "http.request" + assert msg1["more_body"] is False + + # Mark protocol as closed + protocol._closed = True + + # Start receive task - should detect protocol closure + import asyncio + + receive_task = asyncio.create_task(body_receiver.receive()) + + # Give it a moment + await asyncio.sleep(0.01) + + # Wake up the waiter by signaling disconnect + body_receiver.signal_disconnect() + + msg2 = await asyncio.wait_for(receive_task, timeout=1.0) + assert msg2 == {"type": "http.disconnect"} + + @pytest.mark.asyncio + async def test_asgi_app_with_disconnect_listener(self): + """Test Django-style ASGI app pattern that listens for disconnect. + + This simulates a real-world scenario where an ASGI app: + 1. Reads the request body + 2. Sends a response + 3. Calls receive() to wait for client disconnect (background task) + + The bug caused step 3 to return immediately with http.disconnect, + making Django think the client disconnected mid-response. + """ + from gunicorn.asgi.protocol import BodyReceiver + + protocol = self._create_protocol() + + # Simulate a POST request with body + mock_request = mock.Mock() + mock_request.content_length = 13 + mock_request.chunked = False + + body_receiver = BodyReceiver(mock_request, protocol) + + # Simulate sending body data via callback + body_receiver.feed(b"Hello, World!") + body_receiver.set_complete() + + # Step 1: App reads the body + msg1 = await body_receiver.receive() + assert msg1["type"] == "http.request" + assert msg1["body"] == b"Hello, World!" + assert msg1["more_body"] is False + + # At this point body is finished + assert body_receiver._body_finished is True + + # Step 2: App would send response here (simulated) + response_sent = True + + # Step 3: App starts listening for disconnect (like Django does) + import asyncio + + disconnect_received = asyncio.Event() + + async def listen_for_disconnect(): + """Simulates Django's disconnect listener task.""" + msg = await body_receiver.receive() + if msg["type"] == "http.disconnect": + disconnect_received.set() + return msg + + listener_task = asyncio.create_task(listen_for_disconnect()) + + # Give listener task time to start waiting + await asyncio.sleep(0.01) + + # Listener should be blocked waiting, not done + assert not listener_task.done() + assert response_sent # Response was sent before disconnect detected + + # Simulate client closing connection after receiving response + body_receiver.signal_disconnect() + + # Now listener should complete + msg = await asyncio.wait_for(listener_task, timeout=1.0) + assert msg == {"type": "http.disconnect"} + assert disconnect_received.is_set() + + @pytest.mark.asyncio + async def test_body_receiver_cancellation_during_wait(self): + """Test that receive() handles cancellation while waiting for disconnect. + + When the ASGI task is cancelled (e.g., timeout), the waiting receive() + catches the CancelledError, marks itself as closed, and the cancellation + propagates up from the await. The body receiver is marked as closed + to ensure subsequent calls return disconnect immediately. + """ + from gunicorn.asgi.protocol import BodyReceiver + + protocol = self._create_protocol() + + mock_request = mock.Mock() + mock_request.content_length = 0 + mock_request.chunked = False + + body_receiver = BodyReceiver(mock_request, protocol) + + # Consume body + await body_receiver.receive() + + import asyncio + + receive_task = asyncio.create_task(body_receiver.receive()) + + # Let it start waiting + await asyncio.sleep(0.01) + assert not receive_task.done() + + # Cancel the task + receive_task.cancel() + + # Wait for the task to finish - it may raise CancelledError + # or return disconnect depending on timing + try: + msg = await receive_task + # If it returns, it should be a disconnect message + assert msg == {"type": "http.disconnect"} + except asyncio.CancelledError: + # Cancellation propagated - this is also valid + pass + + # Body receiver should be marked as closed after cancellation + assert body_receiver._closed is True