Integrate callback parsers into ASGI protocol

Add callback parser support to ASGIProtocol:
- Add _handle_connection_callback() for callback-based parsing
- Add parser callbacks: _on_headers_complete, _on_body, _on_message_complete
- Update data_received() to feed callback parser
- Add _setup_callback_parser() with H1CProtocol/PythonProtocol selection

Add http_parser config options:
- callback: Use callback parser (H1CProtocol if available, else PythonProtocol)
- fast-callback: Require H1CProtocol callback parser

Callback parsing moves HTTP parsing to data_received(), reducing async
overhead in the request handling loop.
This commit is contained in:
Benoit Chesneau 2026-03-21 23:24:49 +01:00
parent 23c7210b67
commit 22bdca22e1
2 changed files with 192 additions and 7 deletions

View File

@ -15,7 +15,9 @@ import ipaddress
import time
from gunicorn.asgi.unreader import AsyncUnreader
from gunicorn.asgi.parser import HttpParser, FastAsyncRequest
from gunicorn.asgi.parser import (
HttpParser, FastAsyncRequest, PythonProtocol, CallbackRequest, ParseError
)
from gunicorn.asgi.uwsgi import AsyncUWSGIRequest
from gunicorn.http.errors import NoMoreData
from gunicorn.uwsgi.errors import UWSGIParseException
@ -253,8 +255,14 @@ class ASGIProtocol(asyncio.Protocol):
Handles connection lifecycle, request parsing, and ASGI app invocation.
Uses direct buffering instead of StreamReader for better performance.
Supports both pull-based (HttpParser) and callback-based (H1CProtocol/PythonProtocol)
parsing modes.
"""
# Class-level cache for H1CProtocol availability
_h1c_available = None
_h1c_protocol_class = None
def __init__(self, worker):
self.worker = worker
self.cfg = worker.cfg
@ -285,6 +293,13 @@ class ASGIProtocol(asyncio.Protocol):
# Keep-alive timer
self._keepalive_handle = None
# Callback parser state (used when use_callback_parser=True)
self._callback_parser = None
self._request_ready = None # Event signaling headers complete
self._current_request = None # Request built from parser state
self._is_ssl = False
self._use_callback_parser = False
def connection_made(self, transport):
"""Called when a connection is established."""
self.transport = transport
@ -302,18 +317,104 @@ class ASGIProtocol(asyncio.Protocol):
)
return
# HTTP/1.x connection - use direct buffering (faster)
# HTTP/1.x connection
self._is_ssl = ssl_object is not None
self._data_event = asyncio.Event()
self.writer = transport
# Start handling requests
self._task = self.worker.loop.create_task(self._handle_connection())
# Check if callback parser should be used
self._use_callback_parser = self._should_use_callback_parser()
if self._use_callback_parser:
# Callback parser mode - setup request ready event
self._request_ready = asyncio.Event()
self._setup_callback_parser()
self._task = self.worker.loop.create_task(self._handle_connection_callback())
else:
# Pull-based parser mode (existing fast path)
self._task = self.worker.loop.create_task(self._handle_connection())
@classmethod
def _check_h1c_protocol_available(cls):
"""Check if H1CProtocol is available (cached at class level)."""
if cls._h1c_available is None:
try:
from gunicorn_h1c import H1CProtocol
cls._h1c_available = True
cls._h1c_protocol_class = H1CProtocol
except ImportError:
cls._h1c_available = False
return cls._h1c_available
def _should_use_callback_parser(self):
"""Determine if callback parser should be used."""
parser_setting = getattr(self.cfg, 'http_parser', 'auto')
# Currently default to pull-based parser for stability
# Set http_parser='callback' to enable callback mode
if parser_setting == 'callback':
return True
if parser_setting == 'fast-callback':
# Only use callback mode if H1CProtocol is available
return self._check_h1c_protocol_available()
return False
def _setup_callback_parser(self):
"""Create callback parser with request building handlers."""
# Select parser implementation
if self._check_h1c_protocol_available():
parser_class = ASGIProtocol._h1c_protocol_class
else:
parser_class = PythonProtocol
# Create parser with callbacks
self._callback_parser = parser_class(
on_headers_complete=self._on_headers_complete,
on_body=self._on_body,
on_message_complete=self._on_message_complete,
)
def _on_headers_complete(self):
"""Callback: request headers are complete."""
# Build request from parser state
self._current_request = CallbackRequest.from_parser(
self._callback_parser, is_ssl=self._is_ssl
)
# Create body receiver for this request
self._body_receiver = BodyReceiver(self._current_request, self)
# Signal that request is ready for processing
if self._request_ready:
self._request_ready.set()
# Return True for HEAD to skip body parsing
return self._callback_parser.method == b'HEAD'
def _on_body(self, chunk):
"""Callback: received body data chunk."""
if self._body_receiver:
self._body_receiver.feed(chunk)
def _on_message_complete(self):
"""Callback: request is fully received."""
if self._body_receiver:
self._body_receiver.set_complete()
def data_received(self, data):
"""Called when data is received on the connection."""
if self.reader:
# HTTP/2 path - use StreamReader
self.reader.feed_data(data)
elif self._use_callback_parser and self._callback_parser:
# Callback parser path - feed directly to parser
try:
self._callback_parser.feed(data)
except ParseError as e:
self._send_error_response(400, str(e))
self._close_transport()
return
else:
# HTTP/1.x path - direct buffer (faster)
self._buffer.extend(data)
@ -558,6 +659,84 @@ class ASGIProtocol(asyncio.Protocol):
# Arm keepalive timer between requests
self._arm_keepalive_timer()
async def _handle_connection_callback(self):
"""Handle HTTP connection using callback-based parser.
This mode uses synchronous parsing in data_received(), avoiding
the async overhead of the pull-based parsing loop. The parser
fires callbacks when headers and body data are available, and
the main loop waits on events rather than actively parsing.
"""
try:
peername = self.transport.get_extra_info('peername')
sockname = self.transport.get_extra_info('sockname')
while not self._closed:
self.req_count += 1
self._cancel_keepalive_timer()
# Wait for headers to be parsed (callback sets this event)
self._request_ready.clear()
self._current_request = None
# Wait for request or timeout/disconnect
try:
await self._request_ready.wait()
except asyncio.CancelledError:
break
if self._closed or self._current_request is None:
break
request = self._current_request
# Check for WebSocket upgrade
if self._is_websocket_upgrade(request):
await self._handle_websocket(request, sockname, peername)
break # WebSocket takes over the connection
# Handle HTTP request
keepalive = await self._handle_http_request(
request, sockname, peername
)
# Increment worker request count
self.worker.nr += 1
# Check max_requests
if self.worker.nr >= self.worker.max_requests:
self.log.info("Autorestarting worker after current request.")
self.worker.alive = False
keepalive = False
if not keepalive or not self.worker.alive:
break
# Check connection limits for keepalive
if not self.cfg.keepalive:
break
# Resume reading if paused during body consumption
self._resume_reading()
# Reset parser for next request
if self._callback_parser:
self._callback_parser.reset()
# Clear request state
self._current_request = None
self._body_receiver = None
# Arm keepalive timer between requests
self._arm_keepalive_timer()
except asyncio.CancelledError:
pass
except Exception as e:
self.log.exception("Error handling connection: %s", e)
finally:
self._close_transport()
async def _parse_request_fast(self, parser, buffer, peername):
"""Parse request using fast HttpParser with direct buffering.

View File

@ -2775,15 +2775,16 @@ def validate_asgi_lifespan(val):
def validate_http_parser(val):
"""Validate http_parser setting.
Accepts: auto, fast, python
Accepts: auto, fast, python, callback, fast-callback
"""
if val is None:
return "auto"
if not isinstance(val, str):
raise TypeError("http_parser must be a string")
val = val.lower().strip()
if val not in ("auto", "fast", "python"):
raise ValueError("http_parser must be: auto, fast, or python")
valid_values = ("auto", "fast", "python", "callback", "fast-callback")
if val not in valid_values:
raise ValueError("http_parser must be one of: %s" % ", ".join(valid_values))
return val
@ -2870,10 +2871,15 @@ class HttpParser(Setting):
desc = """\
HTTP parser implementation.
Pull-based parsers (used in request handling loop):
- auto: Use gunicorn_h1c if available, otherwise pure Python (default)
- fast: Require gunicorn_h1c C extension (fail if unavailable)
- python: Force pure Python parser
Callback-based parsers (parsing in data_received, lower overhead):
- callback: Use callback parser (H1CProtocol if available, else PythonProtocol)
- fast-callback: Require H1CProtocol callback parser (fail if unavailable)
The gunicorn_h1c C extension provides significantly faster HTTP
parsing using picohttpparser with SIMD optimizations. Install it
with: pip install gunicorn[fast]