From 3bf718ea5221e7a98d519db86580b36bc2296f02 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Tue, 3 Feb 2026 01:59:29 +0100 Subject: [PATCH] fix: graceful disconnect handling for ASGI worker Closes #3484 When a client disconnects during an ASGI request, the worker now: 1. Sends http.disconnect message to the app's receive queue 2. Allows a configurable grace period for cleanup (default: 3 seconds) 3. Only cancels the task after the grace period expires This follows the ASGI HTTP Connection Scope spec which defines http.disconnect as the message apps should receive when clients disconnect: https://asgi.readthedocs.io/en/latest/specs/www.html#disconnect-receive-event The grace period prevents CancelledError from propagating to async database operations, allowing SQLAlchemy and other async DB libraries to properly reset their connection pools. New config option: --asgi-disconnect-grace-period (default: 3 seconds) --- docs/content/reference/settings.md | 20 +++ gunicorn/asgi/protocol.py | 64 ++++++++- gunicorn/config.py | 25 ++++ tests/test_asgi_compliance.py | 109 ++++++++++++++- tests/test_asgi_disconnect.py | 204 +++++++++++++++++++++++++++++ 5 files changed, 417 insertions(+), 5 deletions(-) create mode 100644 tests/test_asgi_disconnect.py diff --git a/docs/content/reference/settings.md b/docs/content/reference/settings.md index 454f9300..18023593 100644 --- a/docs/content/reference/settings.md +++ b/docs/content/reference/settings.md @@ -1931,3 +1931,23 @@ or other resources. This setting only affects the ``asgi`` worker type. !!! info "Added in 24.0.0" + +### `asgi_disconnect_grace_period` + +**Command line:** `--asgi-disconnect-grace-period INT` + +**Default:** `3` + +Grace period (seconds) for ASGI apps to handle client disconnects. + +When a client disconnects, the ASGI app receives an http.disconnect +message and has this many seconds to clean up resources (like database +connections) before the request task is cancelled. + +Set to 0 to cancel immediately (not recommended for apps with async +database connections). Apps with long-running database operations may +need to increase this value. + +This setting only affects the ``asgi`` worker type. + +!!! info "Added in 25.0.0" diff --git a/gunicorn/asgi/protocol.py b/gunicorn/asgi/protocol.py index d10a4220..c3b3abfe 100644 --- a/gunicorn/asgi/protocol.py +++ b/gunicorn/asgi/protocol.py @@ -55,6 +55,7 @@ class ASGIProtocol(asyncio.Protocol): # Connection state self._closed = False + self._receive_queue = None # Set per-request for disconnect signaling def connection_made(self, transport): """Called when a connection is established.""" @@ -88,11 +89,42 @@ class ASGIProtocol(asyncio.Protocol): self.reader.feed_data(data) def connection_lost(self, exc): - """Called when the connection is lost or closed.""" + """Called when the connection is lost or closed. + + Instead of immediately cancelling the task, we signal a disconnect + event and send an http.disconnect message to the receive queue. + This allows the ASGI app to clean up resources (like database + connections) gracefully before the task is cancelled. + + See: https://github.com/benoitc/gunicorn/issues/3484 + """ + # Guard against multiple calls (idempotent) + if self._closed: + return + self._closed = True self.worker.nr_conns -= 1 if self.reader: self.reader.feed_eof() + + # Signal disconnect to the app via the receive queue + if self._receive_queue is not None: + self._receive_queue.put_nowait({"type": "http.disconnect"}) + + # Schedule task cancellation after grace period if task doesn't complete + if self._task and not self._task.done(): + grace_period = getattr(self.cfg, 'asgi_disconnect_grace_period', 3) + if grace_period > 0: + self.worker.loop.call_later( + grace_period, + self._cancel_task_if_pending + ) + else: + # Grace period of 0 means cancel immediately + self._task.cancel() + + def _cancel_task_if_pending(self): + """Cancel the task if it's still pending after grace period.""" if self._task and not self._task.done(): self._task.cancel() @@ -214,8 +246,10 @@ class ASGIProtocol(asyncio.Protocol): response_headers = [] response_sent = 0 - # Receive queue for body + # Receive queue for body - stored on self for disconnect signaling receive_queue = asyncio.Queue() + self._receive_queue = receive_queue + body_complete = False # Pre-populate with initial body state if request.content_length == 0 and not request.chunked: @@ -224,17 +258,34 @@ class ASGIProtocol(asyncio.Protocol): "body": b"", "more_body": False, }) + body_complete = True else: # Start body reading task asyncio.create_task(self._read_body_to_queue(request, receive_queue)) async def receive(): - return await receive_queue.get() + nonlocal body_complete + # Check if already disconnected before waiting + if self._closed and body_complete: + return {"type": "http.disconnect"} + + msg = await receive_queue.get() + + # Track when body is complete + if msg.get("type") == "http.request" and not msg.get("more_body", True): + body_complete = True + + return msg async def send(message): nonlocal response_started, response_complete, exc_to_raise nonlocal response_status, response_headers, response_sent, use_chunked + # If client disconnected, silently ignore send attempts + # This allows apps to finish cleanup without errors + if self._closed: + return + msg_type = message["type"] if msg_type == "http.response.informational": @@ -305,6 +356,10 @@ class ASGIProtocol(asyncio.Protocol): await self._send_error_response(500, "Internal Server Error") response_status = 500 + except asyncio.CancelledError: + # Client disconnected - don't log as error, this is normal + self.log.debug("Request cancelled (client disconnected)") + return False except Exception: self.log.exception("Error in ASGI application") if not response_started: @@ -312,6 +367,9 @@ class ASGIProtocol(asyncio.Protocol): response_status = 500 return False finally: + # Clear the receive queue reference + self._receive_queue = None + try: request_time = datetime.now() - request_start # Create response info for logging diff --git a/gunicorn/config.py b/gunicorn/config.py index 91132f16..5e3873c4 100644 --- a/gunicorn/config.py +++ b/gunicorn/config.py @@ -2849,6 +2849,31 @@ class ASGILifespan(Setting): """ +class ASGIDisconnectGracePeriod(Setting): + name = "asgi_disconnect_grace_period" + section = "Worker Processes" + cli = ["--asgi-disconnect-grace-period"] + meta = "INT" + validator = validate_pos_int + type = int + default = 3 + desc = """\ + Grace period (seconds) for ASGI apps to handle client disconnects. + + When a client disconnects, the ASGI app receives an http.disconnect + message and has this many seconds to clean up resources (like database + connections) before the request task is cancelled. + + Set to 0 to cancel immediately (not recommended for apps with async + database connections). Apps with long-running database operations may + need to increase this value. + + This setting only affects the ``asgi`` worker type. + + .. versionadded:: 25.0.0 + """ + + class RootPath(Setting): name = "root_path" section = "Server Mechanics" diff --git a/tests/test_asgi_compliance.py b/tests/test_asgi_compliance.py index f5f33c1e..b0ac919d 100644 --- a/tests/test_asgi_compliance.py +++ b/tests/test_asgi_compliance.py @@ -9,10 +9,9 @@ Tests that gunicorn's ASGI implementation conforms to the ASGI 3.0 spec: https://asgi.readthedocs.io/en/latest/specs/main.html """ +import asyncio from unittest import mock -import pytest - from gunicorn.config import Config @@ -698,3 +697,109 @@ class TestStateSharing: scope = protocol._build_http_scope(request, None, None) assert "state" not in scope + + +# ============================================================================ +# HTTP Disconnect Event Tests (ASGI Spec Compliance) +# https://asgi.readthedocs.io/en/latest/specs/www.html#disconnect-receive-event +# ============================================================================ + +class TestHTTPDisconnectEvent: + """Test http.disconnect event compliance with ASGI spec. + + Per the ASGI HTTP Connection Scope spec: + - Disconnect event is sent when client closes connection + - Event type MUST be "http.disconnect" + - Apps should receive this event and clean up gracefully + """ + + def _create_protocol(self): + """Create an ASGIProtocol instance for testing.""" + from gunicorn.asgi.protocol import ASGIProtocol + + worker = mock.Mock() + worker.cfg = Config() + worker.log = mock.Mock() + worker.asgi = mock.Mock() + worker.nr_conns = 1 + worker.loop = mock.Mock() + + protocol = ASGIProtocol(worker) + protocol.reader = mock.Mock() + + return protocol + + def test_disconnect_event_type(self): + """Test that disconnect event has correct type per ASGI spec.""" + protocol = self._create_protocol() + protocol._receive_queue = asyncio.Queue() + + # Simulate client disconnect + protocol.connection_lost(None) + + # Get the message from queue + msg = protocol._receive_queue.get_nowait() + + # Per ASGI spec: type MUST be "http.disconnect" + assert msg["type"] == "http.disconnect" + + def test_disconnect_event_sent_on_connection_lost(self): + """Test that http.disconnect is sent when connection is lost.""" + protocol = self._create_protocol() + protocol._receive_queue = asyncio.Queue() + + assert protocol._receive_queue.empty() + + # Simulate client disconnect + protocol.connection_lost(None) + + # Queue should have disconnect message + assert not protocol._receive_queue.empty() + + def test_disconnect_sets_closed_flag(self): + """Test that connection_lost sets the closed flag.""" + protocol = self._create_protocol() + + assert protocol._closed is False + + protocol.connection_lost(None) + + assert protocol._closed is True + + def test_disconnect_allows_graceful_cleanup(self): + """Test that disconnect doesn't immediately cancel task. + + Per ASGI spec, apps should have opportunity to clean up + when they receive http.disconnect. + """ + protocol = self._create_protocol() + + # Create a mock task + mock_task = mock.Mock() + mock_task.done.return_value = False + protocol._task = mock_task + + # Simulate disconnect + protocol.connection_lost(None) + + # Task should NOT be cancelled immediately + mock_task.cancel.assert_not_called() + + # Cancellation should be scheduled after grace period + protocol.worker.loop.call_later.assert_called_once() + + def test_disconnect_message_format(self): + """Test http.disconnect message format per ASGI spec. + + The disconnect message should only contain 'type' key. + """ + protocol = self._create_protocol() + protocol._receive_queue = asyncio.Queue() + + protocol.connection_lost(None) + + msg = protocol._receive_queue.get_nowait() + + # Per ASGI spec, disconnect message only has 'type' + assert msg == {"type": "http.disconnect"} + assert len(msg) == 1 diff --git a/tests/test_asgi_disconnect.py b/tests/test_asgi_disconnect.py new file mode 100644 index 00000000..8423ce7d --- /dev/null +++ b/tests/test_asgi_disconnect.py @@ -0,0 +1,204 @@ +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +""" +Tests for ASGI graceful disconnect handling. + +Issue: https://github.com/benoitc/gunicorn/issues/3484 + +When a client disconnects, the ASGI worker should: +1. Send http.disconnect to the receive queue +2. Allow the app a grace period to clean up +3. Only cancel the task after the grace period +""" + +import asyncio +from unittest import mock + +import pytest + +from gunicorn.asgi.protocol import ASGIProtocol + + +class TestASGIGracefulDisconnect: + """Test graceful disconnect handling.""" + + @pytest.fixture + def mock_worker(self): + """Create a mock worker.""" + worker = mock.Mock() + worker.nr_conns = 0 + worker.loop = asyncio.new_event_loop() + worker.cfg = mock.Mock() + worker.cfg.asgi_disconnect_grace_period = 3 + worker.log = mock.Mock() + return worker + + def test_disconnect_sets_closed_flag(self, mock_worker): + """Test that connection_lost sets the closed flag.""" + protocol = ASGIProtocol(mock_worker) + protocol.reader = mock.Mock() + + # Simulate connection made + mock_worker.nr_conns = 1 + + assert protocol._closed is False + + # Simulate connection lost + protocol.connection_lost(None) + + assert protocol._closed is True + + def test_disconnect_sends_message_to_queue(self, mock_worker): + """Test that connection_lost sends http.disconnect to receive queue.""" + protocol = ASGIProtocol(mock_worker) + protocol.reader = mock.Mock() + mock_worker.nr_conns = 1 + + # Create a receive queue (simulating active request) + protocol._receive_queue = asyncio.Queue() + + # Simulate connection lost + protocol.connection_lost(None) + + # Check that disconnect message was sent + assert not protocol._receive_queue.empty() + msg = protocol._receive_queue.get_nowait() + assert msg == {"type": "http.disconnect"} + + def test_disconnect_is_idempotent(self, mock_worker): + """Test that connection_lost can be called multiple times safely.""" + protocol = ASGIProtocol(mock_worker) + protocol.reader = mock.Mock() + mock_worker.nr_conns = 2 # Start with 2 so we can verify only 1 is decremented + + protocol._receive_queue = asyncio.Queue() + + # First call should work + protocol.connection_lost(None) + assert protocol._closed is True + assert mock_worker.nr_conns == 1 + assert protocol._receive_queue.qsize() == 1 + + # Second call should be a no-op + protocol.connection_lost(None) + assert mock_worker.nr_conns == 1 # Should not decrement again + assert protocol._receive_queue.qsize() == 1 # Should not add another message + + def test_disconnect_does_not_cancel_immediately(self, mock_worker): + """Test that connection_lost doesn't cancel task immediately.""" + protocol = ASGIProtocol(mock_worker) + protocol.reader = mock.Mock() + mock_worker.nr_conns = 1 + + # Create a mock task + mock_task = mock.Mock() + mock_task.done.return_value = False + protocol._task = mock_task + + # Simulate connection lost + protocol.connection_lost(None) + + # Task should NOT be cancelled immediately + mock_task.cancel.assert_not_called() + + def test_disconnect_schedules_cancellation(self, mock_worker): + """Test that connection_lost schedules task cancellation.""" + # Use a mock loop for this test to verify call_later was called + mock_loop = mock.Mock() + mock_worker.loop = mock_loop + + protocol = ASGIProtocol(mock_worker) + protocol.reader = mock.Mock() + mock_worker.nr_conns = 1 + + # Create a mock task + mock_task = mock.Mock() + mock_task.done.return_value = False + protocol._task = mock_task + + # Simulate connection lost + protocol.connection_lost(None) + + # call_later should have been called to schedule cancellation + mock_loop.call_later.assert_called_once() + args = mock_loop.call_later.call_args[0] + assert args[0] == mock_worker.cfg.asgi_disconnect_grace_period + assert args[1] == protocol._cancel_task_if_pending + + def test_cancel_task_if_pending_cancels_running_task(self, mock_worker): + """Test that _cancel_task_if_pending cancels a running task.""" + protocol = ASGIProtocol(mock_worker) + + # Create a mock task that's still running + mock_task = mock.Mock() + mock_task.done.return_value = False + protocol._task = mock_task + + protocol._cancel_task_if_pending() + + mock_task.cancel.assert_called_once() + + def test_cancel_task_if_pending_skips_completed_task(self, mock_worker): + """Test that _cancel_task_if_pending doesn't cancel completed tasks.""" + protocol = ASGIProtocol(mock_worker) + + # Create a mock task that's already done + mock_task = mock.Mock() + mock_task.done.return_value = True + protocol._task = mock_task + + protocol._cancel_task_if_pending() + + mock_task.cancel.assert_not_called() + + @pytest.mark.asyncio + async def test_receive_returns_disconnect_when_closed(self, mock_worker): + """Test that receive() returns http.disconnect when connection is closed.""" + protocol = ASGIProtocol(mock_worker) + protocol._closed = True + + # Create receive queue with body complete + receive_queue = asyncio.Queue() + protocol._receive_queue = receive_queue + + # Add initial body message + await receive_queue.put({ + "type": "http.request", + "body": b"", + "more_body": False, + }) + + # Simulate what happens in _handle_http_request + body_complete = False + + async def receive(): + nonlocal body_complete + if protocol._closed and body_complete: + return {"type": "http.disconnect"} + + msg = await receive_queue.get() + + if msg.get("type") == "http.request" and not msg.get("more_body", True): + body_complete = True + + return msg + + # First receive gets the body + msg1 = await receive() + assert msg1["type"] == "http.request" + + # Second receive should get disconnect + msg2 = await receive() + assert msg2["type"] == "http.disconnect" + + +class TestASGIDisconnectGracePeriod: + """Test the grace period configuration.""" + + def test_default_grace_period(self): + """Test that the default grace period is reasonable.""" + from gunicorn.config import Config + cfg = Config() + assert cfg.asgi_disconnect_grace_period == 3