diff --git a/gunicorn/asgi/parser.py b/gunicorn/asgi/parser.py index 03197d31..8c1bdbfe 100644 --- a/gunicorn/asgi/parser.py +++ b/gunicorn/asgi/parser.py @@ -3,935 +3,12 @@ # See the NOTICE for more information. """ -Unified HTTP parser interface for ASGI workers. +HTTP parser for ASGI workers. -Provides a common interface for both fast C parser (gunicorn_h1c) -and the pure Python parser, with incremental (push-based) parsing. +Provides callback-based parsing using either the fast C parser (gunicorn_h1c) +or the pure Python PythonProtocol fallback. """ -import re -import ipaddress -import socket -import struct - -from gunicorn.http.errors import ( - InvalidHeader, InvalidHeaderName, NoMoreData, - InvalidRequestLine, InvalidRequestMethod, InvalidHTTPVersion, - LimitRequestLine, LimitRequestHeaders, - UnsupportedTransferCoding, ObsoleteFolding, - InvalidProxyLine, InvalidProxyHeader, ForbiddenProxyRequest, - InvalidSchemeHeaders, ExpectationFailed, -) -from gunicorn.http.message import PP_V2_SIGNATURE, PPCommand, PPFamily, PPProtocol -from gunicorn.util import bytes_to_str, split_request_uri - -MAX_REQUEST_LINE = 8190 -MAX_HEADERS = 32768 -DEFAULT_MAX_HEADERFIELD_SIZE = 8190 - -# Reuse regex patterns -RFC9110_5_6_2_TOKEN_SPECIALS = r"!#$%&'*+-.^_`|~" -TOKEN_RE = re.compile(r"[%s0-9a-zA-Z]+" % (re.escape(RFC9110_5_6_2_TOKEN_SPECIALS))) -METHOD_BADCHAR_RE = re.compile("[a-z#]") -VERSION_RE = re.compile(r"HTTP/(\d)\.(\d)") -RFC9110_5_5_INVALID_AND_DANGEROUS = re.compile(r"[\0\r\n]") - - -def _ip_in_allow_list(ip_str, allow_list, networks): - """Check if IP address is in the allow list.""" - if '*' in allow_list: - return True - try: - ip = ipaddress.ip_address(ip_str) - except ValueError: - return False - for network in networks: - if ip in network: - return True - return False - - -class ParseResult: - """Result of header parsing. - - Headers are stored as bytes tuples for performance: - - headers_bytes: list of (name_bytes_lowercase, value_bytes) - - headers: list of (name_str_uppercase, value_str) for compatibility - """ - - __slots__ = ( - 'method', 'uri', 'path', 'query', 'fragment', 'version', - 'headers', 'headers_bytes', 'scheme', 'content_length', 'chunked', - 'keep_alive', 'consumed', 'proxy_protocol_info', - 'must_close', 'expect_100_continue', - ) - - def __init__(self): - self.method = None - self.uri = None - self.path = None - self.query = None - self.fragment = None - self.version = None - self.headers = [] # (name_str_uppercase, value_str) for compatibility - self.headers_bytes = [] # (name_bytes_lowercase, value_bytes) for ASGI scope - self.scheme = "http" - self.content_length = 0 - self.chunked = False - self.keep_alive = True - self.consumed = 0 - self.proxy_protocol_info = None - self.must_close = False - self.expect_100_continue = False - - -class HttpParser: - """Unified incremental HTTP parser. - - Works with both gunicorn_h1c (fast C extension) and pure Python parsing. - Designed for push-based parsing where data arrives via data_received(). - """ - - # Class-level cache for fast parser availability (import check is expensive) - _fast_available = None - _h1c_module = None - - def __init__(self, cfg, peer_addr, is_ssl=False, req_number=1, is_trusted_proxy=False): - """Initialize the parser. - - Args: - cfg: gunicorn config object - peer_addr: client address tuple (host, port) - is_ssl: whether this is an SSL connection - req_number: request number on this connection (for proxy protocol) - is_trusted_proxy: whether peer is in forwarded_allow_ips (pre-computed) - """ - self.cfg = cfg - self.peer_addr = peer_addr - self.is_ssl = is_ssl - self.req_number = req_number - self._is_trusted_proxy = is_trusted_proxy - self._result = None - - # Limits - self.limit_request_line = cfg.limit_request_line - if self.limit_request_line < 0 or self.limit_request_line >= MAX_REQUEST_LINE: - self.limit_request_line = MAX_REQUEST_LINE - - self.limit_request_fields = cfg.limit_request_fields - if self.limit_request_fields <= 0 or self.limit_request_fields > MAX_HEADERS: - self.limit_request_fields = MAX_HEADERS - - self.limit_request_field_size = cfg.limit_request_field_size - if self.limit_request_field_size < 0: - self.limit_request_field_size = DEFAULT_MAX_HEADERFIELD_SIZE - - max_header_field_size = self.limit_request_field_size or DEFAULT_MAX_HEADERFIELD_SIZE - self.max_buffer_headers = self.limit_request_fields * (max_header_field_size + 2) + 4 - - # Use cached fast parser check (import is expensive, do once per process) - self._use_fast = self._check_fast_available() - - def _check_fast_available(self): - """Check if fast C parser is available (cached at class level).""" - parser_setting = getattr(self.cfg, 'http_parser', 'auto') - if parser_setting == 'python': - return False - - # Use class-level cache to avoid repeated import checks - if HttpParser._fast_available is None: - try: - import gunicorn_h1c - HttpParser._fast_available = True - HttpParser._h1c_module = gunicorn_h1c - except ImportError: - HttpParser._fast_available = False - - if not HttpParser._fast_available and parser_setting == 'fast': - raise RuntimeError("gunicorn_h1c not installed but http_parser='fast'") - - return HttpParser._fast_available - - def feed(self, buffer): - """Parse buffer incrementally. - - Args: - buffer: bytearray containing received data - - Returns: - ParseResult if headers are complete, None if more data needed - - Raises: - Various HTTP parsing errors for malformed requests - """ - if self._use_fast: - return self._feed_fast(buffer) - else: - return self._feed_python(buffer) - - def _feed_fast(self, buffer): - """Parse using fast C parser with optimized API. - - Uses parse_request_fast() which: - - Accepts bytearray directly (no bytes() copy) - - Returns pre-computed content_length, has_chunked, connection_close - - Returns headers as bytes tuples (no intermediate conversion) - """ - h1c = HttpParser._h1c_module - try: - # Use parse_request_fast - accepts bytearray directly - req = h1c.parse_request_fast(buffer) - - # Build ParseResult from fast request object - pr = ParseResult() - - # Method and path (bytes -> str) - pr.method = bytes_to_str(req.method) - pr.uri = bytes_to_str(req.path) - - # Parse path/query from URI - try: - parts = split_request_uri(pr.uri) - pr.path = parts.path or "" - pr.query = parts.query or "" - pr.fragment = parts.fragment or "" - except ValueError: - pr.path = pr.uri - pr.query = "" - pr.fragment = "" - - pr.version = (1, req.minor_version) - pr.consumed = req.consumed - - # Headers - store both bytes (for ASGI scope) and strings (for compatibility) - # gunicorn_h1c returns headers as (name_bytes, value_bytes) - headers_bytes = [] - headers_str = [] - for n, v in req.headers: - # ASGI requires lowercase header names - headers_bytes.append((n.lower(), v)) - # Compatibility: uppercase string names - headers_str.append((bytes_to_str(n).upper(), bytes_to_str(v))) - pr.headers_bytes = headers_bytes - pr.headers = headers_str - - # Use pre-computed body info from C parser - pr.content_length = req.content_length if req.content_length >= 0 else 0 - pr.chunked = req.has_chunked - - # Validate Transfer-Encoding per RFC 7230 - if pr.chunked: - # Chunked requires HTTP/1.1+ - if req.minor_version < 1: - raise InvalidHeader("TRANSFER-ENCODING") - # Chunked with Content-Length is invalid - if req.content_length >= 0: - raise InvalidHeader("CONTENT-LENGTH") - pr.content_length = -1 - - # connection_close: -1 = not set, 0 = keep-alive, 1 = close - if req.connection_close == 1: - pr.must_close = True - pr.keep_alive = False - elif req.connection_close == 0: - pr.must_close = False - pr.keep_alive = True - else: - # Not set - default based on HTTP version - pr.keep_alive = req.minor_version >= 1 - pr.must_close = False - - pr.scheme = "https" if self.is_ssl else "http" - - # Apply scheme headers for trusted proxies - if self._is_trusted_proxy: - self._apply_scheme_headers(pr) - - self._result = pr - return pr - - except h1c.IncompleteError: - return None - except h1c.ParseError as e: - # Map to gunicorn HTTP errors - raise InvalidRequestLine(str(e)) - - def _feed_python(self, buffer): - """Parse using pure Python parser.""" - # Handle proxy protocol on first request - mode = self.cfg.proxy_protocol - proxy_info = None - buf_offset = 0 - - if mode != "off" and self.req_number == 1: - # Check for proxy protocol - if len(buffer) < 12: - return None # Need more data - - if mode in ("v2", "auto") and buffer[:12] == PP_V2_SIGNATURE: - self._proxy_protocol_access_check() - consumed, proxy_info = self._parse_proxy_v2(buffer) - if consumed is None: - return None # Need more data - buf_offset = consumed - - elif mode in ("v1", "auto") and buffer[:6] == b"PROXY ": - self._proxy_protocol_access_check() - consumed, proxy_info = self._parse_proxy_v1(buffer) - if consumed is None: - return None # Need more data - buf_offset = consumed - - # Find request line - idx = buffer.find(b"\r\n", buf_offset) - if idx < 0: - if len(buffer) - buf_offset > self.limit_request_line: - raise LimitRequestLine(len(buffer) - buf_offset, self.limit_request_line) - return None # Need more data - - line_len = idx - buf_offset - if line_len > self.limit_request_line: - raise LimitRequestLine(line_len, self.limit_request_line) - - request_line = bytes(buffer[buf_offset:idx]) - headers_start = idx + 2 - - # Find end of headers - headers_end = buffer.find(b"\r\n\r\n", headers_start) - if headers_end < 0: - # Check for empty headers case - if buffer[headers_start:headers_start + 2] == b"\r\n": - headers_end = headers_start - else: - if len(buffer) - headers_start > self.max_buffer_headers: - raise LimitRequestHeaders("max buffer headers") - return None # Need more data - - # Parse request line - pr = ParseResult() - pr.proxy_protocol_info = proxy_info - self._parse_request_line(request_line, pr) - - # Parse headers (if any) - if buffer[headers_start:headers_start + 2] == b"\r\n": - # Empty headers - pr.consumed = headers_start + 2 - pr.headers_bytes = [] - else: - headers_data = bytes(buffer[headers_start:headers_end]) - pr.headers = self._parse_headers(headers_data) - # Also generate bytes headers for ASGI scope - pr.headers_bytes = [ - (n.lower().encode('latin-1'), v.encode('latin-1')) - for n, v in pr.headers - ] - pr.consumed = headers_end + 4 - - # Set scheme - pr.scheme = "https" if self.is_ssl else "http" - - # Check for scheme headers from trusted proxy - self._apply_scheme_headers(pr) - - # Parse body info - self._parse_body_info(pr) - - # Determine keep-alive - pr.keep_alive = self._should_keep_alive(pr) - - self._result = pr - return pr - - def _proxy_protocol_access_check(self): - """Check if proxy protocol is allowed from this peer.""" - if isinstance(self.peer_addr, tuple): - if not _ip_in_allow_list( - self.peer_addr[0], - self.cfg.proxy_allow_ips, - self.cfg.proxy_allow_networks() - ): - raise ForbiddenProxyRequest(self.peer_addr[0]) - - def _parse_proxy_v1(self, buffer): - """Parse PROXY protocol v1 (text format). - - Returns (consumed, info) or (None, None) if incomplete. - """ - idx = buffer.find(b"\r\n") - if idx < 0: - return None, None - - line = bytes_to_str(bytes(buffer[:idx])) - bits = line.split(" ") - - if len(bits) != 6: - raise InvalidProxyLine(line) - - proto = bits[1] - s_addr = bits[2] - d_addr = bits[3] - - if proto not in ["TCP4", "TCP6"]: - raise InvalidProxyLine("protocol '%s' not supported" % proto) - - if proto == "TCP4": - try: - socket.inet_pton(socket.AF_INET, s_addr) - socket.inet_pton(socket.AF_INET, d_addr) - except OSError: - raise InvalidProxyLine(line) - elif proto == "TCP6": - try: - socket.inet_pton(socket.AF_INET6, s_addr) - socket.inet_pton(socket.AF_INET6, d_addr) - except OSError: - raise InvalidProxyLine(line) - - try: - s_port = int(bits[4]) - d_port = int(bits[5]) - except ValueError: - raise InvalidProxyLine("invalid port %s" % line) - - if not ((0 <= s_port <= 65535) and (0 <= d_port <= 65535)): - raise InvalidProxyLine("invalid port %s" % line) - - info = { - "proxy_protocol": proto, - "client_addr": s_addr, - "client_port": s_port, - "proxy_addr": d_addr, - "proxy_port": d_port - } - - return idx + 2, info - - def _parse_proxy_v2(self, buffer): - """Parse PROXY protocol v2 (binary format). - - Returns (consumed, info) or (None, None) if incomplete. - """ - if len(buffer) < 16: - return None, None - - ver_cmd = buffer[12] - fam_proto = buffer[13] - length = struct.unpack(">H", bytes(buffer[14:16]))[0] - - version = (ver_cmd & 0xF0) >> 4 - if version != 2: - raise InvalidProxyHeader("unsupported version %d" % version) - - command = ver_cmd & 0x0F - if command not in (PPCommand.LOCAL, PPCommand.PROXY): - raise InvalidProxyHeader("unsupported command %d" % command) - - total_size = 16 + length - if len(buffer) < total_size: - return None, None - - if command == PPCommand.LOCAL: - info = { - "proxy_protocol": "LOCAL", - "client_addr": None, - "client_port": None, - "proxy_addr": None, - "proxy_port": None - } - return total_size, info - - family = (fam_proto & 0xF0) >> 4 - protocol = fam_proto & 0x0F - - if protocol != PPProtocol.STREAM: - raise InvalidProxyHeader("only TCP protocol is supported") - - addr_data = bytes(buffer[16:16 + length]) - - if family == PPFamily.INET: - if length < 12: - raise InvalidProxyHeader("insufficient address data for IPv4") - s_addr = socket.inet_ntop(socket.AF_INET, addr_data[0:4]) - d_addr = socket.inet_ntop(socket.AF_INET, addr_data[4:8]) - s_port = struct.unpack(">H", addr_data[8:10])[0] - d_port = struct.unpack(">H", addr_data[10:12])[0] - proto = "TCP4" - - elif family == PPFamily.INET6: - if length < 36: - raise InvalidProxyHeader("insufficient address data for IPv6") - s_addr = socket.inet_ntop(socket.AF_INET6, addr_data[0:16]) - d_addr = socket.inet_ntop(socket.AF_INET6, addr_data[16:32]) - s_port = struct.unpack(">H", addr_data[32:34])[0] - d_port = struct.unpack(">H", addr_data[34:36])[0] - proto = "TCP6" - - elif family == PPFamily.UNSPEC: - info = { - "proxy_protocol": "UNSPEC", - "client_addr": None, - "client_port": None, - "proxy_addr": None, - "proxy_port": None - } - return total_size, info - - else: - raise InvalidProxyHeader("unsupported address family %d" % family) - - info = { - "proxy_protocol": proto, - "client_addr": s_addr, - "client_port": s_port, - "proxy_addr": d_addr, - "proxy_port": d_port - } - - return total_size, info - - def _parse_request_line(self, line_bytes, result): - """Parse the HTTP request line.""" - bits = [bytes_to_str(bit) for bit in line_bytes.split(b" ", 2)] - if len(bits) != 3: - raise InvalidRequestLine(bytes_to_str(line_bytes)) - - # Method - result.method = bits[0] - - if not self.cfg.permit_unconventional_http_method: - if METHOD_BADCHAR_RE.search(result.method): - raise InvalidRequestMethod(result.method) - if not 3 <= len(bits[0]) <= 20: - raise InvalidRequestMethod(result.method) - - if not TOKEN_RE.fullmatch(result.method): - raise InvalidRequestMethod(result.method) - - if self.cfg.casefold_http_method: - result.method = result.method.upper() - - # URI - result.uri = bits[1] - if len(result.uri) == 0: - raise InvalidRequestLine(bytes_to_str(line_bytes)) - - try: - parts = split_request_uri(result.uri) - except ValueError: - raise InvalidRequestLine(bytes_to_str(line_bytes)) - - result.path = parts.path or "" - result.query = parts.query or "" - result.fragment = parts.fragment or "" - - # Version - match = VERSION_RE.fullmatch(bits[2]) - if match is None: - raise InvalidHTTPVersion(bits[2]) - - result.version = (int(match.group(1)), int(match.group(2))) - if not (1, 0) <= result.version < (2, 0): - if not self.cfg.permit_unconventional_http_version: - raise InvalidHTTPVersion(result.version) - - def _parse_headers(self, data): - """Parse HTTP headers from raw data.""" - headers = [] - lines = [bytes_to_str(line) for line in data.split(b"\r\n")] - num_lines = len(lines) - i = 0 - - while i < num_lines: - if len(headers) >= self.limit_request_fields: - raise LimitRequestHeaders("limit request headers fields") - - curr = lines[i] - i += 1 - header_length = len(curr) + len("\r\n") - - if curr.find(":") <= 0: - raise InvalidHeader(curr) - - name, value = curr.split(":", 1) - if self.cfg.strip_header_spaces: - name = name.rstrip(" \t") - - if not TOKEN_RE.fullmatch(name): - raise InvalidHeaderName(name) - - name = name.upper() - value = [value.strip(" \t")] - - # Handle obsolete folding - while i < num_lines and lines[i].startswith((" ", "\t")): - if not self.cfg.permit_obsolete_folding: - raise ObsoleteFolding(name) - curr = lines[i] - i += 1 - header_length += len(curr) + len("\r\n") - if header_length > self.limit_request_field_size > 0: - raise LimitRequestHeaders("limit request headers fields size") - value.append(curr.strip("\t ")) - - value = " ".join(value) - - if RFC9110_5_5_INVALID_AND_DANGEROUS.search(value): - raise InvalidHeader(name) - - if header_length > self.limit_request_field_size > 0: - raise LimitRequestHeaders("limit request headers fields size") - - # Handle underscore in header names - if "_" in name: - forwarder_headers = self.cfg.forwarder_headers - if name in forwarder_headers or "*" in forwarder_headers: - pass - elif self.cfg.header_map == "dangerous": - pass - elif self.cfg.header_map == "drop": - continue - else: - raise InvalidHeaderName(name) - - headers.append((name, value)) - - return headers - - def _apply_scheme_headers(self, result): - """Apply scheme headers from trusted proxy.""" - if not isinstance(self.peer_addr, tuple): - return - - # Use pre-computed trusted proxy check (avoids IP parsing on every request) - if not self._is_trusted_proxy: - return - - secure_scheme_headers = self.cfg.secure_scheme_headers - scheme_header = False - - for name, value in result.headers: - if name == "EXPECT": - if value.lower() == "100-continue": - if result.version >= (1, 1): - result.expect_100_continue = True - else: - raise ExpectationFailed(value) - - if name in secure_scheme_headers: - secure = value == secure_scheme_headers[name] - scheme = "https" if secure else "http" - if scheme_header: - if scheme != result.scheme: - raise InvalidSchemeHeaders() - else: - scheme_header = True - result.scheme = scheme - - def _parse_body_info(self, result): - """Parse Content-Length and Transfer-Encoding from headers.""" - chunked = False - content_length = None - - for name, value in result.headers: - if name == "CONTENT-LENGTH": - if content_length is not None: - raise InvalidHeader("CONTENT-LENGTH") - content_length = value - - elif name == "TRANSFER-ENCODING": - vals = [v.strip() for v in value.split(',')] - for val in vals: - if val.lower() == "chunked": - if chunked: - raise InvalidHeader("TRANSFER-ENCODING") - chunked = True - elif val.lower() == "identity": - if chunked: - raise InvalidHeader("TRANSFER-ENCODING") - elif val.lower() in ('compress', 'deflate', 'gzip'): - if chunked: - raise InvalidHeader("TRANSFER-ENCODING") - result.must_close = True - else: - raise UnsupportedTransferCoding(value) - - if chunked: - if result.version < (1, 1): - raise InvalidHeader("TRANSFER-ENCODING") - if content_length is not None: - raise InvalidHeader("CONTENT-LENGTH") - result.chunked = True - result.content_length = -1 - elif content_length is not None: - try: - if str(content_length).isnumeric(): - result.content_length = int(content_length) - else: - raise InvalidHeader("CONTENT-LENGTH") - except ValueError: - raise InvalidHeader("CONTENT-LENGTH") - - if result.content_length < 0: - raise InvalidHeader("CONTENT-LENGTH") - else: - result.content_length = 0 - - def _should_keep_alive(self, result): - """Determine if connection should be kept alive.""" - if result.must_close: - return False - - for name, value in result.headers: - if name == "CONNECTION": - v = value.lower().strip(" \t") - if v == "close": - return False - elif v == "keep-alive": - return True - break - - return result.version > (1, 0) - - def reset(self): - """Reset parser state for next request on keep-alive connection.""" - self._result = None - self.req_number += 1 - - -class FastAsyncRequest: - """Fast async HTTP request wrapper. - - Wraps a ParseResult from HttpParser and provides async body reading. - This is a lightweight adapter that allows protocol.py to use the fast - parser while maintaining compatibility with the existing interface. - """ - - __slots__ = ( - 'method', 'uri', 'path', 'query', 'fragment', 'version', - 'headers', 'headers_bytes', 'scheme', 'content_length', 'chunked', - 'must_close', 'proxy_protocol_info', - '_reader', '_buffer', '_body_remaining', '_body_reader', - '_expect_100_continue', - ) - - def __init__(self, parse_result, reader, buffer, consumed): - """Initialize from a ParseResult. - - Args: - parse_result: ParseResult from HttpParser.feed() - reader: asyncio.StreamReader for body reading - buffer: bytearray buffer with remaining data after headers - consumed: bytes consumed from buffer by parser - """ - # Copy attributes from ParseResult - self.method = parse_result.method - self.uri = parse_result.uri - self.path = parse_result.path - self.query = parse_result.query - self.fragment = parse_result.fragment - self.version = parse_result.version - self.headers = parse_result.headers - self.headers_bytes = parse_result.headers_bytes # Pre-computed bytes headers - self.scheme = parse_result.scheme - self.content_length = parse_result.content_length - self.chunked = parse_result.chunked - self.must_close = parse_result.must_close - self.proxy_protocol_info = parse_result.proxy_protocol_info - self._expect_100_continue = parse_result.expect_100_continue - - # Body reading state - self._reader = reader - # Keep remaining data after headers in buffer - self._buffer = bytearray(buffer[consumed:]) - if self.chunked: - self._body_remaining = -1 - elif self.content_length: - self._body_remaining = self.content_length - else: - self._body_remaining = 0 - self._body_reader = None - - def should_close(self): - """Check if connection should be closed after this request.""" - if self.must_close: - return True - for name, value in self.headers: - if name == "CONNECTION": - v = value.lower().strip(" \t") - if v == "close": - return True - elif v == "keep-alive": - return False - break - return self.version <= (1, 0) - - def get_header(self, name): - """Get a header value by name (case-insensitive).""" - name = name.upper() - for h, v in self.headers: - if h == name: - return v - return None - - async def read_body(self, size=8192): - """Read a chunk of the request body. - - Args: - size: Maximum bytes to read - - Returns: - bytes: Body data, empty bytes when body is exhausted - """ - if self._body_remaining == 0: - return b"" - - if self.chunked: - return await self._read_chunked_body(size) - else: - return await self._read_length_body(size) - - async def _read_length_body(self, size): - """Read from a length-delimited body.""" - if self._body_remaining <= 0: - return b"" - - to_read = min(size, self._body_remaining) - - # First, use data from our buffer - if self._buffer: - if len(self._buffer) <= to_read: - data = bytes(self._buffer) - self._buffer.clear() - else: - data = bytes(self._buffer[:to_read]) - del self._buffer[:to_read] - self._body_remaining -= len(data) - return data - - # Read from stream - try: - data = await self._reader.read(to_read) - if data: - self._body_remaining -= len(data) - return data - except Exception: - return b"" - - async def _read_chunked_body(self, size): - """Read from a chunked body.""" - if self._body_reader is None: - self._body_reader = self._chunked_body_reader() - - try: - return await anext(self._body_reader) - except StopAsyncIteration: - self._body_remaining = 0 - return b"" - - async def _chunked_body_reader(self): - """Async generator for reading chunked body.""" - while True: - # Read chunk size line - size_line = await self._read_until_crlf() - # Parse chunk size (handle extensions) - chunk_size, *_ = size_line.split(b";", 1) - if _: - chunk_size = chunk_size.rstrip(b" \t") - - if any(n not in b"0123456789abcdefABCDEF" for n in chunk_size): - raise InvalidHeader("Invalid chunk size") - if len(chunk_size) == 0: - raise InvalidHeader("Invalid chunk size") - - chunk_size = int(chunk_size, 16) - - if chunk_size == 0: - # Final chunk - skip trailers and final CRLF - await self._skip_trailers() - return - - # Read chunk data - remaining = chunk_size - while remaining > 0: - data = await self._read_data(min(remaining, 8192)) - if not data: - raise NoMoreData() - remaining -= len(data) - yield data - - # Skip chunk terminating CRLF - crlf = await self._read_data(2) - if crlf != b"\r\n": - # May have partial read - while len(crlf) < 2: - more = await self._read_data(2 - len(crlf)) - if not more: - break - crlf += more - - async def _read_data(self, size): - """Read data from buffer or stream.""" - if self._buffer: - if len(self._buffer) <= size: - data = bytes(self._buffer) - self._buffer.clear() - return data - else: - data = bytes(self._buffer[:size]) - del self._buffer[:size] - return data - try: - return await self._reader.read(size) - except Exception: - return b"" - - async def _read_until_crlf(self): - """Read bytes until CRLF.""" - result = bytearray() - while True: - # Check buffer first - if self._buffer: - idx = self._buffer.find(b"\r\n") - if idx >= 0: - result.extend(self._buffer[:idx]) - del self._buffer[:idx + 2] - return bytes(result) - result.extend(self._buffer) - self._buffer.clear() - - # Read more data - try: - data = await self._reader.read(64) - except Exception: - break - if not data: - break - idx = data.find(b"\r\n") - if idx >= 0: - result.extend(data[:idx]) - # Put remaining data back in buffer - remaining = data[idx + 2:] - if remaining: - self._buffer.extend(remaining) - return bytes(result) - result.extend(data) - - return bytes(result) - - async def _skip_trailers(self): - """Skip trailer headers after chunked body.""" - while True: - line = await self._read_until_crlf() - if not line: - return - - async def drain_body(self): - """Drain any unread body data.""" - while True: - data = await self.read_body(8192) - if not data: - break - class ParseError(Exception): """Error raised during HTTP parsing.""" diff --git a/gunicorn/asgi/protocol.py b/gunicorn/asgi/protocol.py index 2a239f8b..ed320a32 100644 --- a/gunicorn/asgi/protocol.py +++ b/gunicorn/asgi/protocol.py @@ -16,7 +16,7 @@ import time from gunicorn.asgi.unreader import AsyncUnreader from gunicorn.asgi.parser import ( - HttpParser, FastAsyncRequest, PythonProtocol, CallbackRequest, ParseError + PythonProtocol, CallbackRequest, ParseError ) from gunicorn.asgi.uwsgi import AsyncUWSGIRequest from gunicorn.http.errors import NoMoreData @@ -159,41 +159,14 @@ class ASGIResponseInfo: self.headers.append((name, value)) -class BufferReader: - """Minimal async reader using protocol's direct buffer. - - Provides the read() interface that FastAsyncRequest expects, - but uses direct buffering instead of StreamReader. - """ - - __slots__ = ('_protocol',) - - def __init__(self, protocol): - self._protocol = protocol - - async def read(self, n): - """Read up to n bytes from the buffer.""" - p = self._protocol - - # Fast path: data already available - if p._buffer: - return p._consume_buffer(n) - - # Wait for data - if not await p._wait_for_data(): - return b"" - - return p._consume_buffer(n) - - class BodyReceiver: - """Fast body receiver using Future-based waiting. + """Body receiver for callback-based parsers. - Avoids asyncio.create_task overhead by using a single Future for waiting. - Supports direct chunk feeding for callback-based parsers. + Body chunks are fed directly via the feed() method from parser callbacks. + Uses Future-based waiting for efficient async receive(). """ - __slots__ = ('_chunks', '_complete', '_body_finished', '_closed', '_waiter', '_loop', + __slots__ = ('_chunks', '_complete', '_body_finished', '_closed', '_waiter', 'request', 'protocol') def __init__(self, request, protocol): @@ -204,7 +177,6 @@ class BodyReceiver: self._body_finished = False # True after returning more_body=False self._closed = False self._waiter = None - self._loop = None def feed(self, chunk): """Feed a body chunk directly (called by parser callback).""" @@ -257,63 +229,55 @@ class BodyReceiver: self._closed = True return {"type": "http.disconnect"} - # Need to read body from request (legacy path until Phase 3/4) - # Use direct await instead of create_task + wait + # Wait for body chunk to arrive via callback try: - chunk = await self._read_with_disconnect_check() - if chunk: - return {"type": "http.request", "body": chunk, "more_body": True} - else: - self._complete = True + await self._wait_for_data() + + # Check what arrived + if self._closed: + return {"type": "http.disconnect"} + + if self._chunks: + chunk = self._chunks.pop(0) + more = bool(self._chunks) or not self._complete + if not more: + self._body_finished = True + return {"type": "http.request", "body": chunk, "more_body": more} + + if self._complete: self._body_finished = True return {"type": "http.request", "body": b"", "more_body": False} + + # Timeout or other condition - return empty with more_body=True + return {"type": "http.request", "body": b"", "more_body": True} + except asyncio.CancelledError: return {"type": "http.disconnect"} - except Exception: - self._complete = True - self._body_finished = True - return {"type": "http.request", "body": b"", "more_body": False} - async def _read_with_disconnect_check(self): - """Read body using event-based waiting (no polling). + async def _wait_for_data(self): + """Wait for body data to arrive via callback.""" + if self._chunks or self._complete or self._closed: + return - Uses the protocol's data event to wait for incoming data, - avoiding the latency and CPU overhead of periodic polling. - """ - while not self._closed and not self.protocol._closed: - # Try to read available data - try: - chunk = await self.request.read_body(65536) - return chunk - except Exception: - # If read fails, check if we should continue - if self._closed or self.protocol._closed: - return None - # Wait for more data using the protocol's data event - if self.protocol._data_event: - self.protocol._data_event.clear() - # Use wait_for with longer timeout for disconnect detection - try: - await asyncio.wait_for( - self.protocol._data_event.wait(), - timeout=30.0 # 30s timeout for disconnect detection - ) - except asyncio.TimeoutError: - # Check connection state and continue - continue - else: - # No event available, brief sleep as fallback - await asyncio.sleep(0.001) - return None + # Create a new waiter + loop = asyncio.get_event_loop() + self._waiter = loop.create_future() + + try: + # Wait with timeout for data or completion + await asyncio.wait_for(self._waiter, timeout=30.0) + except asyncio.TimeoutError: + pass + finally: + self._waiter = None class ASGIProtocol(asyncio.Protocol): """HTTP/1.1 protocol handler for ASGI applications. Handles connection lifecycle, request parsing, and ASGI app invocation. - Uses direct buffering instead of StreamReader for better performance. - Supports both pull-based (HttpParser) and callback-based (H1CProtocol/PythonProtocol) - parsing modes. + Uses callback-based parsing (H1CProtocol/PythonProtocol) for efficient + incremental parsing in data_received(). """ # Class-level cache for H1CProtocol availability @@ -336,26 +300,21 @@ class ASGIProtocol(asyncio.Protocol): self._closed = False self._body_receiver = None # Set per-request for disconnect signaling - # Direct buffering (replaces StreamReader for HTTP/1.1) - self._buffer = bytearray() - self._data_event = None # Lazy init to avoid event loop issues - # Response buffering for write batching self._response_buffer = None # Backpressure control self._reading_paused = False - self._max_buffer_size = 65536 * 4 # 256KB max buffer + self._max_buffer_size = 65536 * 4 # 256KB max buffer (HTTP/2 only) # Keep-alive timer self._keepalive_handle = None - # Callback parser state (used when use_callback_parser=True) + # Callback parser state self._callback_parser = None self._request_ready = None # Event signaling headers complete self._current_request = None # Request built from parser state self._is_ssl = False - self._use_callback_parser = False # Write flow control self._flow_control = None @@ -377,26 +336,18 @@ class ASGIProtocol(asyncio.Protocol): ) return - # HTTP/1.x connection + # HTTP/1.x connection - always use callback parser self._is_ssl = ssl_object is not None - self._data_event = asyncio.Event() self.writer = transport # Setup flow control for HTTP/1.x self._flow_control = FlowControl(transport) transport.set_write_buffer_limits(high=HIGH_WATER_LIMIT) - # Check if callback parser should be used - self._use_callback_parser = self._should_use_callback_parser() - - if self._use_callback_parser: - # Callback parser mode - setup request ready event - self._request_ready = asyncio.Event() - self._setup_callback_parser() - self._task = self.worker.loop.create_task(self._handle_connection_callback()) - else: - # Pull-based parser mode (existing fast path) - self._task = self.worker.loop.create_task(self._handle_connection()) + # Setup callback parser with request ready event + self._request_ready = asyncio.Event() + self._setup_callback_parser() + self._task = self.worker.loop.create_task(self._handle_connection()) @classmethod def _check_h1c_protocol_available(cls): @@ -410,27 +361,27 @@ class ASGIProtocol(asyncio.Protocol): cls._h1c_available = False return cls._h1c_available - def _should_use_callback_parser(self): - """Determine if callback parser should be used.""" + def _setup_callback_parser(self): + """Create callback parser based on http_parser setting. + + Parser selection: + - auto: Use H1CProtocol if available, else PythonProtocol + - fast: Require H1CProtocol (error if unavailable) + - python: Use PythonProtocol only + """ parser_setting = getattr(self.cfg, 'http_parser', 'auto') - # Currently default to pull-based parser for stability - # Set http_parser='callback' to enable callback mode - if parser_setting == 'callback': - return True - if parser_setting == 'fast-callback': - # Only use callback mode if H1CProtocol is available - return self._check_h1c_protocol_available() - - return False - - def _setup_callback_parser(self): - """Create callback parser with request building handlers.""" - # Select parser implementation - if self._check_h1c_protocol_available(): - parser_class = ASGIProtocol._h1c_protocol_class - else: + if parser_setting == 'python': parser_class = PythonProtocol + elif parser_setting == 'fast': + if not self._check_h1c_protocol_available(): + raise RuntimeError("gunicorn_h1c required for http_parser='fast'") + parser_class = ASGIProtocol._h1c_protocol_class + else: # auto + if self._check_h1c_protocol_available(): + parser_class = ASGIProtocol._h1c_protocol_class + else: + parser_class = PythonProtocol # Create parser with callbacks self._callback_parser = parser_class( @@ -471,33 +422,23 @@ class ASGIProtocol(asyncio.Protocol): if self.reader: # HTTP/2 path - use StreamReader self.reader.feed_data(data) - elif self._use_callback_parser and self._callback_parser: - # Callback parser path - feed directly to parser + elif self._callback_parser: + # HTTP/1.x path - feed directly to callback parser try: self._callback_parser.feed(data) except ParseError as e: self._send_error_response(400, str(e)) self._close_transport() return - else: - # HTTP/1.x path - direct buffer (faster) - self._buffer.extend(data) - if self._data_event is not None: - self._data_event.set() # Backpressure: pause reading if buffer is too large if not self._reading_paused and self._is_buffer_full(): self._pause_reading() def _is_buffer_full(self): - """Check if internal buffer is full.""" - if self.reader: - # HTTP/2 path - if hasattr(self.reader, '_buffer'): - return len(self.reader._buffer) > self._max_buffer_size - else: - # HTTP/1.x path - return len(self._buffer) > self._max_buffer_size + """Check if internal buffer is full (HTTP/2 only).""" + if self.reader and hasattr(self.reader, '_buffer'): + return len(self.reader._buffer) > self._max_buffer_size return False def _pause_reading(self): @@ -518,33 +459,6 @@ class ASGIProtocol(asyncio.Protocol): except (AttributeError, RuntimeError): pass - async def _wait_for_data(self): - """Wait for data to arrive in the buffer. - - Returns True if data is available, False if connection closed. - """ - if self._buffer: - return True - if self._closed: - return False - if self._data_event is None: - return False - - self._data_event.clear() - await self._data_event.wait() - return bool(self._buffer) and not self._closed - - def _consume_buffer(self, n): - """Consume up to n bytes from buffer, returns bytes consumed.""" - if n >= len(self._buffer): - data = bytes(self._buffer) - self._buffer.clear() - return data - else: - data = bytes(self._buffer[:n]) - del self._buffer[:n] - return data - def _arm_keepalive_timer(self): """Arm keepalive timeout timer after response completion.""" if self._keepalive_handle: @@ -639,125 +553,34 @@ class ASGIProtocol(asyncio.Protocol): pass async def _handle_connection(self): - """Main request handling loop for this connection.""" - try: - peername = self.transport.get_extra_info('peername') - sockname = self.transport.get_extra_info('sockname') + """Main request handling loop using callback-based parser. - # Check protocol type - use old path for uWSGI - protocol_type = getattr(self.cfg, 'protocol', 'http') - if protocol_type == 'uwsgi': - await self._handle_connection_uwsgi(peername, sockname) - return - - # Fast path: use HttpParser for HTTP protocol - await self._handle_connection_fast(peername, sockname) - - except asyncio.CancelledError: - pass - except Exception as e: - self.log.exception("Error handling connection: %s", e) - finally: - self._close_transport() - - async def _handle_connection_fast(self, peername, sockname): - """Fast HTTP connection handling using HttpParser.""" - # Check if peer is trusted proxy once per connection - is_trusted = _check_trusted_proxy( - peername, - self.cfg.forwarded_allow_ips, - self.cfg.forwarded_allow_networks() - ) - - # Get SSL state - ssl_object = self.transport.get_extra_info('ssl_object') - is_ssl = ssl_object is not None - - # Create parser and buffer - parser = HttpParser( - self.cfg, peername, is_ssl=is_ssl, - req_number=1, is_trusted_proxy=is_trusted - ) - buffer = bytearray() - - while not self._closed: - self.req_count += 1 - - # Cancel keepalive timer when new request starts - self._cancel_keepalive_timer() - - try: - # Parse request using fast parser - request = await self._parse_request_fast( - parser, buffer, peername - ) - except NoMoreData: - # Client disconnected - break - - # Check for WebSocket upgrade - if self._is_websocket_upgrade(request): - await self._handle_websocket(request, sockname, peername) - break # WebSocket takes over the connection - - # Handle HTTP request - keepalive = await self._handle_http_request( - request, sockname, peername - ) - - # Increment worker request count - self.worker.nr += 1 - - # Check max_requests - if self.worker.nr >= self.worker.max_requests: - self.log.info("Autorestarting worker after current request.") - self.worker.alive = False - keepalive = False - - if not keepalive or not self.worker.alive: - break - - # Check connection limits for keepalive - if not self.cfg.keepalive: - break - - # Drain any unread body before next request - await request.drain_body() - - # Resume reading if paused during body consumption - self._resume_reading() - - # Reset parser for next request (keep trusted proxy check) - parser.reset() - - # Arm keepalive timer between requests - self._arm_keepalive_timer() - - async def _handle_connection_callback(self): - """Handle HTTP connection using callback-based parser. - - This mode uses synchronous parsing in data_received(), avoiding - the async overhead of the pull-based parsing loop. The parser - fires callbacks when headers and body data are available, and - the main loop waits on events rather than actively parsing. + Uses synchronous parsing in data_received(), avoiding the async + overhead of pull-based parsing. The parser fires callbacks when + headers and body data are available, and this loop waits on + events rather than actively parsing. """ try: peername = self.transport.get_extra_info('peername') sockname = self.transport.get_extra_info('sockname') + # Check protocol type - use separate path for uWSGI + protocol_type = getattr(self.cfg, 'protocol', 'http') + if protocol_type == 'uwsgi': + await self._handle_connection_uwsgi(peername, sockname) + return + while not self._closed: self.req_count += 1 self._cancel_keepalive_timer() - # Wait for headers to be parsed (callback sets this event) - self._request_ready.clear() - self._current_request = None - - # Wait for request or timeout/disconnect - try: - await self._request_ready.wait() - except asyncio.CancelledError: - break + # Wait for headers to be parsed (callback sets the event and _current_request) + # Don't clear if request already arrived (data_received ran before us) + if not self._request_ready.is_set(): + try: + await self._request_ready.wait() + except asyncio.CancelledError: + break if self._closed or self._current_request is None: break @@ -797,9 +620,10 @@ class ASGIProtocol(asyncio.Protocol): if self._callback_parser: self._callback_parser.reset() - # Clear request state + # Clear request state for next iteration self._current_request = None self._body_receiver = None + self._request_ready.clear() # Arm keepalive timer between requests self._arm_keepalive_timer() @@ -811,53 +635,6 @@ class ASGIProtocol(asyncio.Protocol): finally: self._close_transport() - async def _parse_request_fast(self, parser, buffer, peername): - """Parse request using fast HttpParser with direct buffering. - - Returns a FastAsyncRequest wrapping the ParseResult. - Uses protocol's direct buffer instead of StreamReader for speed. - """ - # Use protocol's direct buffer (self._buffer) instead of local buffer - # The local 'buffer' param is kept for parser state - - # Create buffer reader for body reading (wraps protocol buffer) - buffer_reader = BufferReader(self) - - # Read data until we have complete headers - while True: - # Sync buffer with protocol buffer - if self._buffer: - buffer.extend(self._buffer) - self._buffer.clear() - - # Try to parse current buffer - if buffer: - try: - result = parser.feed(buffer) - if result is not None: - # Headers complete - create request wrapper - # Remaining data after headers stays in local buffer - # then gets copied to protocol buffer for body reading - request = FastAsyncRequest( - result, buffer_reader, buffer, result.consumed - ) - # Clear consumed data from buffer - del buffer[:result.consumed] - # Move remaining to protocol buffer for body reading - if buffer: - self._buffer.extend(buffer) - buffer.clear() - return request - except Exception as e: - # Re-raise HTTP parsing errors - if 'incomplete' not in str(e).lower(): - raise - - # Need more data - wait for it - if not await self._wait_for_data(): - raise NoMoreData(bytes(buffer)) - # Data is now in self._buffer, loop will sync it - async def _handle_connection_uwsgi(self, peername, sockname): """Handle uWSGI protocol connections (legacy path).""" unreader = AsyncUnreader(self.reader) diff --git a/gunicorn/config.py b/gunicorn/config.py index 4403e9cb..997a9830 100644 --- a/gunicorn/config.py +++ b/gunicorn/config.py @@ -2775,14 +2775,14 @@ def validate_asgi_lifespan(val): def validate_http_parser(val): """Validate http_parser setting. - Accepts: auto, fast, python, callback, fast-callback + Accepts: auto, fast, python """ if val is None: return "auto" if not isinstance(val, str): raise TypeError("http_parser must be a string") val = val.lower().strip() - valid_values = ("auto", "fast", "python", "callback", "fast-callback") + valid_values = ("auto", "fast", "python") if val not in valid_values: raise ValueError("http_parser must be one of: %s" % ", ".join(valid_values)) return val @@ -2869,20 +2869,17 @@ class HttpParser(Setting): validator = validate_http_parser default = "auto" desc = """\ - HTTP parser implementation. + HTTP parser implementation for ASGI workers. - Pull-based parsers (used in request handling loop): - - auto: Use gunicorn_h1c if available, otherwise pure Python (default) - - fast: Require gunicorn_h1c C extension (fail if unavailable) - - python: Force pure Python parser + - auto: Use H1CProtocol if gunicorn_h1c is available, else PythonProtocol (default) + - fast: Require H1CProtocol from gunicorn_h1c (fail if unavailable) + - python: Force pure Python PythonProtocol parser - Callback-based parsers (parsing in data_received, lower overhead): - - callback: Use callback parser (H1CProtocol if available, else PythonProtocol) - - fast-callback: Require H1CProtocol callback parser (fail if unavailable) + ASGI workers use callback-based parsing in data_received() for efficient + incremental parsing. The gunicorn_h1c C extension provides significantly + faster HTTP parsing using picohttpparser with SIMD optimizations. - The gunicorn_h1c C extension provides significantly faster HTTP - parsing using picohttpparser with SIMD optimizations. Install it - with: pip install gunicorn[fast] + Install it with: pip install gunicorn[fast] .. versionadded:: 25.0.0 """