From 22bdca22e1314626719cf389567847b2e090c7d0 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Sat, 21 Mar 2026 23:24:49 +0100 Subject: [PATCH] Integrate callback parsers into ASGI protocol Add callback parser support to ASGIProtocol: - Add _handle_connection_callback() for callback-based parsing - Add parser callbacks: _on_headers_complete, _on_body, _on_message_complete - Update data_received() to feed callback parser - Add _setup_callback_parser() with H1CProtocol/PythonProtocol selection Add http_parser config options: - callback: Use callback parser (H1CProtocol if available, else PythonProtocol) - fast-callback: Require H1CProtocol callback parser Callback parsing moves HTTP parsing to data_received(), reducing async overhead in the request handling loop. --- gunicorn/asgi/protocol.py | 187 +++++++++++++++++++++++++++++++++++++- gunicorn/config.py | 12 ++- 2 files changed, 192 insertions(+), 7 deletions(-) diff --git a/gunicorn/asgi/protocol.py b/gunicorn/asgi/protocol.py index e2a2f13d..29d8a384 100644 --- a/gunicorn/asgi/protocol.py +++ b/gunicorn/asgi/protocol.py @@ -15,7 +15,9 @@ import ipaddress import time from gunicorn.asgi.unreader import AsyncUnreader -from gunicorn.asgi.parser import HttpParser, FastAsyncRequest +from gunicorn.asgi.parser import ( + HttpParser, FastAsyncRequest, PythonProtocol, CallbackRequest, ParseError +) from gunicorn.asgi.uwsgi import AsyncUWSGIRequest from gunicorn.http.errors import NoMoreData from gunicorn.uwsgi.errors import UWSGIParseException @@ -253,8 +255,14 @@ class ASGIProtocol(asyncio.Protocol): Handles connection lifecycle, request parsing, and ASGI app invocation. Uses direct buffering instead of StreamReader for better performance. + Supports both pull-based (HttpParser) and callback-based (H1CProtocol/PythonProtocol) + parsing modes. """ + # Class-level cache for H1CProtocol availability + _h1c_available = None + _h1c_protocol_class = None + def __init__(self, worker): self.worker = worker self.cfg = worker.cfg @@ -285,6 +293,13 @@ class ASGIProtocol(asyncio.Protocol): # Keep-alive timer self._keepalive_handle = None + # Callback parser state (used when use_callback_parser=True) + self._callback_parser = None + self._request_ready = None # Event signaling headers complete + self._current_request = None # Request built from parser state + self._is_ssl = False + self._use_callback_parser = False + def connection_made(self, transport): """Called when a connection is established.""" self.transport = transport @@ -302,18 +317,104 @@ class ASGIProtocol(asyncio.Protocol): ) return - # HTTP/1.x connection - use direct buffering (faster) + # HTTP/1.x connection + self._is_ssl = ssl_object is not None self._data_event = asyncio.Event() self.writer = transport - # Start handling requests - self._task = self.worker.loop.create_task(self._handle_connection()) + # Check if callback parser should be used + self._use_callback_parser = self._should_use_callback_parser() + + if self._use_callback_parser: + # Callback parser mode - setup request ready event + self._request_ready = asyncio.Event() + self._setup_callback_parser() + self._task = self.worker.loop.create_task(self._handle_connection_callback()) + else: + # Pull-based parser mode (existing fast path) + self._task = self.worker.loop.create_task(self._handle_connection()) + + @classmethod + def _check_h1c_protocol_available(cls): + """Check if H1CProtocol is available (cached at class level).""" + if cls._h1c_available is None: + try: + from gunicorn_h1c import H1CProtocol + cls._h1c_available = True + cls._h1c_protocol_class = H1CProtocol + except ImportError: + cls._h1c_available = False + return cls._h1c_available + + def _should_use_callback_parser(self): + """Determine if callback parser should be used.""" + parser_setting = getattr(self.cfg, 'http_parser', 'auto') + + # Currently default to pull-based parser for stability + # Set http_parser='callback' to enable callback mode + if parser_setting == 'callback': + return True + if parser_setting == 'fast-callback': + # Only use callback mode if H1CProtocol is available + return self._check_h1c_protocol_available() + + return False + + def _setup_callback_parser(self): + """Create callback parser with request building handlers.""" + # Select parser implementation + if self._check_h1c_protocol_available(): + parser_class = ASGIProtocol._h1c_protocol_class + else: + parser_class = PythonProtocol + + # Create parser with callbacks + self._callback_parser = parser_class( + on_headers_complete=self._on_headers_complete, + on_body=self._on_body, + on_message_complete=self._on_message_complete, + ) + + def _on_headers_complete(self): + """Callback: request headers are complete.""" + # Build request from parser state + self._current_request = CallbackRequest.from_parser( + self._callback_parser, is_ssl=self._is_ssl + ) + + # Create body receiver for this request + self._body_receiver = BodyReceiver(self._current_request, self) + + # Signal that request is ready for processing + if self._request_ready: + self._request_ready.set() + + # Return True for HEAD to skip body parsing + return self._callback_parser.method == b'HEAD' + + def _on_body(self, chunk): + """Callback: received body data chunk.""" + if self._body_receiver: + self._body_receiver.feed(chunk) + + def _on_message_complete(self): + """Callback: request is fully received.""" + if self._body_receiver: + self._body_receiver.set_complete() def data_received(self, data): """Called when data is received on the connection.""" if self.reader: # HTTP/2 path - use StreamReader self.reader.feed_data(data) + elif self._use_callback_parser and self._callback_parser: + # Callback parser path - feed directly to parser + try: + self._callback_parser.feed(data) + except ParseError as e: + self._send_error_response(400, str(e)) + self._close_transport() + return else: # HTTP/1.x path - direct buffer (faster) self._buffer.extend(data) @@ -558,6 +659,84 @@ class ASGIProtocol(asyncio.Protocol): # Arm keepalive timer between requests self._arm_keepalive_timer() + async def _handle_connection_callback(self): + """Handle HTTP connection using callback-based parser. + + This mode uses synchronous parsing in data_received(), avoiding + the async overhead of the pull-based parsing loop. The parser + fires callbacks when headers and body data are available, and + the main loop waits on events rather than actively parsing. + """ + try: + peername = self.transport.get_extra_info('peername') + sockname = self.transport.get_extra_info('sockname') + + while not self._closed: + self.req_count += 1 + self._cancel_keepalive_timer() + + # Wait for headers to be parsed (callback sets this event) + self._request_ready.clear() + self._current_request = None + + # Wait for request or timeout/disconnect + try: + await self._request_ready.wait() + except asyncio.CancelledError: + break + + if self._closed or self._current_request is None: + break + + request = self._current_request + + # Check for WebSocket upgrade + if self._is_websocket_upgrade(request): + await self._handle_websocket(request, sockname, peername) + break # WebSocket takes over the connection + + # Handle HTTP request + keepalive = await self._handle_http_request( + request, sockname, peername + ) + + # Increment worker request count + self.worker.nr += 1 + + # Check max_requests + if self.worker.nr >= self.worker.max_requests: + self.log.info("Autorestarting worker after current request.") + self.worker.alive = False + keepalive = False + + if not keepalive or not self.worker.alive: + break + + # Check connection limits for keepalive + if not self.cfg.keepalive: + break + + # Resume reading if paused during body consumption + self._resume_reading() + + # Reset parser for next request + if self._callback_parser: + self._callback_parser.reset() + + # Clear request state + self._current_request = None + self._body_receiver = None + + # Arm keepalive timer between requests + self._arm_keepalive_timer() + + except asyncio.CancelledError: + pass + except Exception as e: + self.log.exception("Error handling connection: %s", e) + finally: + self._close_transport() + async def _parse_request_fast(self, parser, buffer, peername): """Parse request using fast HttpParser with direct buffering. diff --git a/gunicorn/config.py b/gunicorn/config.py index 1c72ed65..4403e9cb 100644 --- a/gunicorn/config.py +++ b/gunicorn/config.py @@ -2775,15 +2775,16 @@ def validate_asgi_lifespan(val): def validate_http_parser(val): """Validate http_parser setting. - Accepts: auto, fast, python + Accepts: auto, fast, python, callback, fast-callback """ if val is None: return "auto" if not isinstance(val, str): raise TypeError("http_parser must be a string") val = val.lower().strip() - if val not in ("auto", "fast", "python"): - raise ValueError("http_parser must be: auto, fast, or python") + valid_values = ("auto", "fast", "python", "callback", "fast-callback") + if val not in valid_values: + raise ValueError("http_parser must be one of: %s" % ", ".join(valid_values)) return val @@ -2870,10 +2871,15 @@ class HttpParser(Setting): desc = """\ HTTP parser implementation. + Pull-based parsers (used in request handling loop): - auto: Use gunicorn_h1c if available, otherwise pure Python (default) - fast: Require gunicorn_h1c C extension (fail if unavailable) - python: Force pure Python parser + Callback-based parsers (parsing in data_received, lower overhead): + - callback: Use callback parser (H1CProtocol if available, else PythonProtocol) + - fast-callback: Require H1CProtocol callback parser (fail if unavailable) + The gunicorn_h1c C extension provides significantly faster HTTP parsing using picohttpparser with SIMD optimizations. Install it with: pip install gunicorn[fast]