From 0ca0d0cb02bd2355cee49c9bd2e67b4ee8238002 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Sun, 22 Mar 2026 00:01:17 +0100 Subject: [PATCH] Fix body polling and HTTP/2 request streaming - Replace 100ms polling with event-based waiting in BodyReceiver - Stream HTTP/2 request bodies instead of buffering entire uploads - Add timeout handling for disconnect detection --- gunicorn/asgi/protocol.py | 83 ++++++++++++++++++++++++++++++++------- 1 file changed, 68 insertions(+), 15 deletions(-) diff --git a/gunicorn/asgi/protocol.py b/gunicorn/asgi/protocol.py index 75f2a550..2a239f8b 100644 --- a/gunicorn/asgi/protocol.py +++ b/gunicorn/asgi/protocol.py @@ -275,18 +275,35 @@ class BodyReceiver: return {"type": "http.request", "body": b"", "more_body": False} async def _read_with_disconnect_check(self): - """Read body with periodic disconnect checks (avoids task creation).""" - # Use wait_for with short timeout to check disconnect periodically + """Read body using event-based waiting (no polling). + + Uses the protocol's data event to wait for incoming data, + avoiding the latency and CPU overhead of periodic polling. + """ while not self._closed and not self.protocol._closed: + # Try to read available data try: - chunk = await asyncio.wait_for( - self.request.read_body(65536), - timeout=0.1 - ) + chunk = await self.request.read_body(65536) return chunk - except asyncio.TimeoutError: - # Check disconnect and retry - continue + except Exception: + # If read fails, check if we should continue + if self._closed or self.protocol._closed: + return None + # Wait for more data using the protocol's data event + if self.protocol._data_event: + self.protocol._data_event.clear() + # Use wait_for with longer timeout for disconnect detection + try: + await asyncio.wait_for( + self.protocol._data_event.wait(), + timeout=30.0 # 30s timeout for disconnect detection + ) + except asyncio.TimeoutError: + # Check connection state and continue + continue + else: + # No event available, brief sleep as fallback + await asyncio.sleep(0.001) return None @@ -1423,10 +1440,12 @@ class ASGIProtocol(asyncio.Protocol): async def _handle_http2_request(self, request, h2_conn, sockname, peername): """Handle a single HTTP/2 request with streaming support. - Streams response body chunks immediately instead of buffering, - enabling SSE, streaming downloads, and other real-time use cases. + Streams both request and response body chunks immediately, + avoiding buffering entire uploads and enabling SSE, streaming + downloads, and other real-time use cases. """ stream_id = request.stream.stream_id + stream = h2_conn.streams.get(stream_id) scope = self._build_http2_scope(request, sockname, peername) response_started = False @@ -1437,13 +1456,47 @@ class ASGIProtocol(asyncio.Protocol): response_headers = [] response_sent = 0 + # Track if we've finished receiving body + body_received = False + async def receive(): - # For HTTP/2, the body is already buffered in the stream - body = request.body.read() + nonlocal body_received + + # Check if stream is closed or missing + if stream is None or stream.state.name == "CLOSED": + return {"type": "http.disconnect"} + + # First call: if body already complete (small requests), return it + if not body_received and stream.request_complete and not stream._body_chunks: + body_received = True + body = stream.get_request_body() + return { + "type": "http.request", + "body": body, + "more_body": False, + } + + # Streaming: read next chunk + try: + chunk = await asyncio.wait_for( + stream.read_body_chunk(), + timeout=30.0 + ) + except asyncio.TimeoutError: + return {"type": "http.disconnect"} + + if chunk is None: + body_received = True + return { + "type": "http.request", + "body": b"", + "more_body": False, + } + return { "type": "http.request", - "body": body, - "more_body": False, + "body": chunk, + "more_body": not stream._body_complete, } async def send(message):