gunicorn/gunicorn/asgi/parser.py
Benoit Chesneau 1f8e60c199 Add finish() method to ASGI callback parser for EOF handling
Handle chunked encoding edge case where connection closes before
final CRLF after zero-chunk. Skip WSGI-specific tests (casefold,
underscore headers) that don't apply to ASGI.
2026-03-26 12:13:50 +01:00

878 lines
30 KiB
Python

#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
"""
HTTP parser for ASGI workers.
Provides callback-based parsing using either the fast C parser (gunicorn_h1c)
or the pure Python PythonProtocol fallback.
"""
import struct
from enum import IntEnum
class ParseError(Exception):
"""Base error raised during HTTP parsing."""
class InvalidProxyLine(ParseError):
"""Invalid PROXY protocol v1 line."""
class InvalidProxyHeader(ParseError):
"""Invalid PROXY protocol v2 header."""
# PROXY protocol v2 constants
PP_V2_SIGNATURE = b"\x0D\x0A\x0D\x0A\x00\x0D\x0A\x51\x55\x49\x54\x0A"
class PPCommand(IntEnum):
"""PROXY protocol v2 commands."""
LOCAL = 0x0
PROXY = 0x1
class PPFamily(IntEnum):
"""PROXY protocol v2 address families."""
UNSPEC = 0x0
INET = 0x1 # IPv4
INET6 = 0x2 # IPv6
UNIX = 0x3
class PPProtocol(IntEnum):
"""PROXY protocol v2 transport protocols."""
UNSPEC = 0x0
STREAM = 0x1 # TCP
DGRAM = 0x2 # UDP
class LimitRequestLine(ParseError):
"""Request line exceeds configured limit."""
class LimitRequestHeaders(ParseError):
"""Too many headers or header field too large."""
class InvalidRequestLine(ParseError):
"""Invalid request line."""
class InvalidRequestMethod(ParseError):
"""Invalid HTTP method."""
class InvalidHTTPVersion(ParseError):
"""Invalid HTTP version."""
class InvalidHeaderName(ParseError):
"""Invalid header name."""
class InvalidHeader(ParseError):
"""Invalid header value."""
class UnsupportedTransferCoding(ParseError):
"""Unsupported Transfer-Encoding value."""
class InvalidChunkSize(ParseError):
"""Invalid chunk size in chunked transfer encoding."""
class PythonProtocol:
"""Callback-based HTTP/1.1 parser (pure Python fallback).
Mirrors H1CProtocol interface for seamless switching between
the C extension and pure Python implementations.
Callbacks:
on_message_begin: () -> None - Called when request starts
on_url: (url: bytes) -> None - Called with request URL/path
on_header: (name: bytes, value: bytes) -> None - Called for each header
on_headers_complete: () -> bool - Called when headers done (return True to skip body)
on_body: (chunk: bytes) -> None - Called with body data chunks
on_message_complete: () -> None - Called when request is complete
"""
__slots__ = (
'_on_message_begin', '_on_url', '_on_header',
'_on_headers_complete', '_on_body', '_on_message_complete',
'_state', '_buffer', '_headers_list',
'method', 'path', 'http_version', 'headers',
'content_length', 'is_chunked', 'should_keep_alive', 'is_complete',
'_body_remaining', '_skip_body',
'_chunk_state', '_chunk_size', '_chunk_remaining',
'_limit_request_line', '_limit_request_fields', '_limit_request_field_size',
'_permit_unconventional_http_method', '_permit_unconventional_http_version',
'_header_count',
'_proxy_protocol', '_proxy_protocol_info', '_proxy_protocol_done',
)
def __init__(
self,
on_message_begin=None,
on_url=None,
on_header=None,
on_headers_complete=None,
on_body=None,
on_message_complete=None,
limit_request_line=8190,
limit_request_fields=100,
limit_request_field_size=8190,
permit_unconventional_http_method=False,
permit_unconventional_http_version=False,
proxy_protocol='off',
):
self._on_message_begin = on_message_begin
self._on_url = on_url
self._on_header = on_header
self._on_headers_complete = on_headers_complete
self._on_body = on_body
self._on_message_complete = on_message_complete
# Store limits
self._limit_request_line = limit_request_line
self._limit_request_fields = limit_request_fields
self._limit_request_field_size = limit_request_field_size
self._permit_unconventional_http_method = permit_unconventional_http_method
self._permit_unconventional_http_version = permit_unconventional_http_version
self._header_count = 0
# Proxy protocol
self._proxy_protocol = proxy_protocol
self._proxy_protocol_info = None
self._proxy_protocol_done = (proxy_protocol == 'off')
# Parser state: proxy_protocol, request_line, headers, body, chunked_size, chunked_data, complete
self._state = 'proxy_protocol' if proxy_protocol != 'off' else 'request_line'
self._buffer = bytearray()
self._headers_list = []
# Request info (populated during parsing)
self.method = None
self.path = None
self.http_version = None
self.headers = []
self.content_length = None
self.is_chunked = False
self.should_keep_alive = True
self.is_complete = False
# Body state
self._body_remaining = 0
self._skip_body = False
# Chunked transfer state
self._chunk_state = 'size' # size, data, trailer
self._chunk_size = 0
self._chunk_remaining = 0
def feed(self, data):
"""Process data, fire callbacks synchronously.
Args:
data: bytes or bytearray of incoming data
Raises:
ParseError: If the HTTP request is malformed
"""
self._buffer.extend(data)
while self._buffer:
if self._state == 'proxy_protocol':
if not self._parse_proxy_protocol():
break
elif self._state == 'request_line':
if not self._parse_request_line():
break
elif self._state == 'headers':
if not self._parse_headers():
break
elif self._state == 'body':
if not self._parse_body():
break
elif self._state == 'chunked':
if not self._parse_chunked_body():
break
else:
break
@property
def proxy_protocol_info(self):
"""Return proxy protocol info if parsed."""
return self._proxy_protocol_info
def reset(self):
"""Reset for next request (keepalive)."""
self._state = 'request_line'
self._buffer.clear()
self._headers_list = []
self.method = None
self.path = None
self.http_version = None
self.headers = []
self.content_length = None
self.is_chunked = False
self.should_keep_alive = True
self.is_complete = False
self._body_remaining = 0
self._skip_body = False
self._chunk_state = 'size'
self._chunk_size = 0
self._chunk_remaining = 0
self._header_count = 0
def finish(self):
"""Mark parsing complete for EOF handling.
Call when no more data will be received. Handles edge cases like
chunked encoding without final trailer CRLF.
"""
if self._state == 'chunked' and self._chunk_state == 'trailer':
# All body data received, just missing final CRLF
self._state = 'complete'
self.is_complete = True
if self._on_message_complete:
self._on_message_complete()
def _parse_proxy_protocol(self):
"""Parse PROXY protocol header if enabled.
Returns True if parsing is complete (or not applicable),
False if more data is needed.
"""
# Need at least 12 bytes to detect v2 signature or check for v1 prefix
if len(self._buffer) < 12:
return False
mode = self._proxy_protocol
# Check for v2 signature first
if mode in ('v2', 'auto') and self._buffer[:12] == PP_V2_SIGNATURE:
return self._parse_proxy_protocol_v2()
# Check for v1 prefix
if mode in ('v1', 'auto') and self._buffer[:6] == b'PROXY ':
return self._parse_proxy_protocol_v1()
# Not proxy protocol - continue with normal parsing
self._proxy_protocol_done = True
self._state = 'request_line'
return True
def _parse_proxy_protocol_v1(self):
"""Parse PROXY protocol v1 (text format).
Format: PROXY <PROTO> <SRC_ADDR> <DST_ADDR> <SRC_PORT> <DST_PORT>\r\n
"""
# Find end of line
idx = self._buffer.find(b'\r\n')
if idx == -1:
# Need more data - v1 header can be up to 107 bytes
if len(self._buffer) > 107:
raise InvalidProxyLine("PROXY v1 header too long")
return False
line = bytes(self._buffer[:idx]).decode('latin-1')
del self._buffer[:idx + 2]
# Parse the line
parts = line.split(' ')
if len(parts) < 2:
raise InvalidProxyLine("Invalid PROXY v1 line")
proto = parts[1].upper()
if proto == 'UNKNOWN':
# Unknown protocol - no address info
self._proxy_protocol_info = {
'proxy_protocol': 'UNKNOWN',
'client_addr': None,
'client_port': None,
'proxy_addr': None,
'proxy_port': None,
}
elif proto in ('TCP4', 'TCP6'):
if len(parts) != 6:
raise InvalidProxyLine("Invalid PROXY v1 line for %s" % proto)
try:
s_addr = parts[2]
d_addr = parts[3]
s_port = int(parts[4])
d_port = int(parts[5])
except ValueError as e:
raise InvalidProxyLine("Invalid PROXY v1 port: %s" % e)
if not (0 <= s_port <= 65535 and 0 <= d_port <= 65535):
raise InvalidProxyLine("Invalid PROXY v1 port range")
self._proxy_protocol_info = {
'proxy_protocol': proto,
'client_addr': s_addr,
'client_port': s_port,
'proxy_addr': d_addr,
'proxy_port': d_port,
}
else:
raise InvalidProxyLine("Unknown PROXY v1 protocol: %s" % proto)
self._proxy_protocol_done = True
self._state = 'request_line'
return True
def _parse_proxy_protocol_v2(self):
"""Parse PROXY protocol v2 (binary format)."""
# Need at least 16 bytes for header
if len(self._buffer) < 16:
return False
# Parse header
ver_cmd = self._buffer[12]
fam_prot = self._buffer[13]
length = struct.unpack('>H', bytes(self._buffer[14:16]))[0]
# Check version
version = (ver_cmd & 0xF0) >> 4
if version != 2:
raise InvalidProxyHeader("Unsupported PROXY v2 version: %d" % version)
# Check command
command = ver_cmd & 0x0F
if command not in (PPCommand.LOCAL, PPCommand.PROXY):
raise InvalidProxyHeader("Unsupported PROXY v2 command: %d" % command)
# Check if we have the complete header
total_size = 16 + length
if len(self._buffer) < total_size:
return False
# Extract address data
addr_data = bytes(self._buffer[16:total_size])
del self._buffer[:total_size]
# Handle LOCAL command
if command == PPCommand.LOCAL:
self._proxy_protocol_info = {
'proxy_protocol': 'LOCAL',
'client_addr': None,
'client_port': None,
'proxy_addr': None,
'proxy_port': None,
}
self._proxy_protocol_done = True
self._state = 'request_line'
return True
# Parse address family and protocol
family = (fam_prot & 0xF0) >> 4
protocol = fam_prot & 0x0F
if family == PPFamily.INET:
# IPv4
if len(addr_data) < 12:
raise InvalidProxyHeader("Invalid PROXY v2 IPv4 address data")
s_addr = '.'.join(str(b) for b in addr_data[:4])
d_addr = '.'.join(str(b) for b in addr_data[4:8])
s_port = struct.unpack('>H', addr_data[8:10])[0]
d_port = struct.unpack('>H', addr_data[10:12])[0]
proto = 'TCP4' if protocol == PPProtocol.STREAM else 'UDP4'
elif family == PPFamily.INET6:
# IPv6
if len(addr_data) < 36:
raise InvalidProxyHeader("Invalid PROXY v2 IPv6 address data")
# Format IPv6 addresses
s_words = struct.unpack('>8H', addr_data[:16])
d_words = struct.unpack('>8H', addr_data[16:32])
s_addr = ':'.join('%x' % w for w in s_words)
d_addr = ':'.join('%x' % w for w in d_words)
s_port = struct.unpack('>H', addr_data[32:34])[0]
d_port = struct.unpack('>H', addr_data[34:36])[0]
proto = 'TCP6' if protocol == PPProtocol.STREAM else 'UDP6'
elif family == PPFamily.UNSPEC:
# Unspecified address family
self._proxy_protocol_info = {
'proxy_protocol': 'UNSPEC',
'client_addr': None,
'client_port': None,
'proxy_addr': None,
'proxy_port': None,
}
self._proxy_protocol_done = True
self._state = 'request_line'
return True
else:
raise InvalidProxyHeader("Unsupported PROXY v2 address family: %d" % family)
self._proxy_protocol_info = {
'proxy_protocol': proto,
'client_addr': s_addr,
'client_port': s_port,
'proxy_addr': d_addr,
'proxy_port': d_port,
}
self._proxy_protocol_done = True
self._state = 'request_line'
return True
def _parse_request_line(self):
"""Parse request line, return True if complete."""
idx = self._buffer.find(b'\r\n')
if idx == -1:
return False
# Check request line length limit
if self._limit_request_line > 0 and idx > self._limit_request_line:
raise LimitRequestLine("Request line is too large")
line = bytes(self._buffer[:idx])
del self._buffer[:idx + 2]
# Parse: METHOD PATH HTTP/x.y
parts = line.split(b' ', 2)
if len(parts) != 3:
raise InvalidRequestLine("Invalid request line")
self.method = parts[0]
self.path = parts[1]
# Validate method
if not self._permit_unconventional_http_method:
if not self._is_valid_method(self.method):
raise InvalidRequestMethod(self.method.decode('latin-1'))
# Parse version
version = parts[2]
if version == b'HTTP/1.1':
self.http_version = (1, 1)
elif version == b'HTTP/1.0':
self.http_version = (1, 0)
else:
if not self._permit_unconventional_http_version:
raise InvalidHTTPVersion(version.decode('latin-1'))
# Try to parse other HTTP/1.x versions if permitted
if version.startswith(b'HTTP/1.'):
try:
minor = int(version[7:])
self.http_version = (1, minor)
except ValueError:
raise InvalidHTTPVersion(version.decode('latin-1'))
else:
raise InvalidHTTPVersion(version.decode('latin-1'))
if self._on_message_begin:
self._on_message_begin()
if self._on_url:
self._on_url(self.path)
self._state = 'headers'
return True
def _parse_headers(self):
"""Parse headers, return True if headers are complete."""
while True:
idx = self._buffer.find(b'\r\n')
if idx == -1:
return False
line = bytes(self._buffer[:idx])
del self._buffer[:idx + 2]
if not line:
# Empty line = end of headers
self._finalize_headers()
return True
# Check header field size limit (include CRLF in size to match WSGI parser)
if self._limit_request_field_size > 0 and len(line) + 2 > self._limit_request_field_size:
raise LimitRequestHeaders("Request header field is too large")
# Check header count limit
self._header_count += 1
if self._limit_request_fields > 0 and self._header_count > self._limit_request_fields:
raise LimitRequestHeaders("Too many headers")
# Parse header
colon = line.find(b':')
if colon == -1:
raise InvalidHeader("Missing colon in header")
name = line[:colon].strip()
if not self._is_valid_token(name):
raise InvalidHeaderName(name.decode('latin-1'))
value = line[colon + 1:].strip()
if self._has_invalid_header_chars(value):
raise InvalidHeader("Invalid characters in header value")
# Store lowercase name for internal use
name_lower = name.lower()
self._headers_list.append((name_lower, value))
if self._on_header:
self._on_header(name_lower, value)
def _finalize_headers(self):
"""Called when all headers received.
Validates headers for request smuggling vulnerabilities:
- Rejects duplicate Content-Length headers
- Rejects requests with both Content-Length and Transfer-Encoding
- Rejects chunked Transfer-Encoding in HTTP/1.0
- Rejects stacked chunked encoding
- Validates Transfer-Encoding values
"""
self.headers = self._headers_list
# Extract and validate content-length and transfer-encoding
content_length = None
chunked = False
for name, value in self.headers:
if name == b'content-length':
# Reject duplicate Content-Length headers (request smuggling vector)
if content_length is not None:
raise InvalidHeader("Duplicate Content-Length header")
try:
cl_value = int(value)
except ValueError:
raise InvalidHeader("Invalid Content-Length value")
if cl_value < 0:
raise InvalidHeader("Negative Content-Length")
content_length = cl_value
elif name == b'transfer-encoding':
# Properly parse comma-separated Transfer-Encoding values
# per RFC 9112 Section 6.1
vals = [v.strip() for v in value.split(b',')]
for val in vals:
val_lower = val.lower()
if val_lower == b'chunked':
# Reject stacked chunked encoding (request smuggling vector)
if chunked:
raise InvalidHeader("Stacked chunked encoding")
chunked = True
elif val_lower == b'identity':
# identity after chunked is invalid
if chunked:
raise InvalidHeader("Invalid Transfer-Encoding after chunked")
elif val_lower in (b'compress', b'deflate', b'gzip'):
# Compression after chunked is invalid
if chunked:
raise InvalidHeader("Invalid Transfer-Encoding after chunked")
# Mark connection for close (unsupported but valid)
self.should_keep_alive = False
else:
# Reject unknown transfer codings
raise UnsupportedTransferCoding(val.decode('latin-1'))
elif name == b'connection':
val = value.lower()
if b'close' in val:
self.should_keep_alive = False
elif b'keep-alive' in val:
self.should_keep_alive = True
# Security checks for request smuggling prevention
if chunked:
# Reject chunked in HTTP/1.0 (RFC 9112 Section 6.1)
if self.http_version < (1, 1):
raise InvalidHeader("Chunked encoding not allowed in HTTP/1.0")
# Reject Content-Length with Transfer-Encoding (request smuggling vector)
if content_length is not None:
raise InvalidHeader("Content-Length with Transfer-Encoding")
self.is_chunked = True
self.content_length = None
self._body_remaining = -1 # Chunked mode
elif content_length is not None:
self.content_length = content_length
self._body_remaining = content_length
else:
# No body
self.content_length = None
self._body_remaining = 0
# HTTP/1.0 defaults to close
if self.http_version == (1, 0) and self.should_keep_alive:
# Only keep-alive if explicitly requested
has_keepalive = any(
name == b'connection' and b'keep-alive' in value.lower()
for name, value in self.headers
)
if not has_keepalive:
self.should_keep_alive = False
if self._on_headers_complete:
self._skip_body = self._on_headers_complete()
# Determine next state
if self._skip_body:
self._state = 'complete'
self.is_complete = True
if self._on_message_complete:
self._on_message_complete()
elif self.is_chunked:
self._state = 'chunked'
self._chunk_state = 'size'
elif self.content_length and self.content_length > 0:
self._state = 'body'
else:
# No body
self._state = 'complete'
self.is_complete = True
if self._on_message_complete:
self._on_message_complete()
def _parse_body(self):
"""Parse Content-Length delimited body."""
if not self._buffer or self._body_remaining <= 0:
return False
chunk_size = min(len(self._buffer), self._body_remaining)
chunk = bytes(self._buffer[:chunk_size])
del self._buffer[:chunk_size]
self._body_remaining -= chunk_size
if self._on_body:
self._on_body(chunk)
if self._body_remaining <= 0:
self._state = 'complete'
self.is_complete = True
if self._on_message_complete:
self._on_message_complete()
return True
def _parse_chunked_body(self):
"""Parse chunked transfer encoding."""
while self._buffer:
if self._chunk_state == 'size':
# Looking for chunk size line
idx = self._buffer.find(b'\r\n')
if idx == -1:
return False
size_line = bytes(self._buffer[:idx])
del self._buffer[:idx + 2]
# Handle chunk extensions (e.g., "5;ext=value")
semicolon = size_line.find(b';')
if semicolon != -1:
size_line = size_line[:semicolon]
# Strict validation: reject leading/trailing whitespace
# to prevent parser desync (request smuggling vector)
if size_line != size_line.strip():
raise InvalidChunkSize("Whitespace in chunk size")
if not size_line:
raise InvalidChunkSize("Empty chunk size")
# Validate hex characters only (0-9, a-f, A-F)
for c in size_line:
if c not in b'0123456789abcdefABCDEF':
raise InvalidChunkSize("Invalid character in chunk size")
try:
self._chunk_size = int(size_line, 16)
except ValueError:
raise InvalidChunkSize("Invalid chunk size")
if self._chunk_size == 0:
# Final chunk - skip trailers
self._chunk_state = 'trailer'
else:
self._chunk_remaining = self._chunk_size
self._chunk_state = 'data'
elif self._chunk_state == 'data':
# Reading chunk data
if not self._buffer:
return False
to_read = min(len(self._buffer), self._chunk_remaining)
chunk = bytes(self._buffer[:to_read])
del self._buffer[:to_read]
self._chunk_remaining -= to_read
if self._on_body:
self._on_body(chunk)
if self._chunk_remaining == 0:
# Need to consume trailing CRLF
self._chunk_state = 'crlf'
elif self._chunk_state == 'crlf':
# Skip CRLF after chunk data
if len(self._buffer) < 2:
return False
del self._buffer[:2] # Skip \r\n
self._chunk_state = 'size'
elif self._chunk_state == 'trailer':
# Skip trailer headers
idx = self._buffer.find(b'\r\n')
if idx == -1:
return False
line = bytes(self._buffer[:idx])
del self._buffer[:idx + 2]
if not line:
# Empty line = end of trailers
self._state = 'complete'
self.is_complete = True
if self._on_message_complete:
self._on_message_complete()
return True
return False
def _is_valid_method(self, method):
"""Check if method is valid token with conventional restrictions."""
if not method:
return False
# Check length (3-20 chars)
if not 3 <= len(method) <= 20:
return False
# Check for lowercase or # (unconventional)
for c in method:
if c in b'abcdefghijklmnopqrstuvwxyz#':
return False
return self._is_valid_token(method)
def _is_valid_token(self, data):
"""Check if data contains only RFC 9110 token characters."""
if not data:
return False
for c in data:
if c < 0x21 or c > 0x7e:
return False
# RFC 9110 delimiters: "(),/:;<=>?@[\]{}
if c in b'"(),/:;<=>?@[\\]{}"':
return False
return True
def _has_invalid_header_chars(self, value):
"""Check for NUL, CR, LF in header value."""
return b'\x00' in value or b'\r' in value or b'\n' in value
class CallbackRequest:
"""Request object built from callback parser state.
Works with both H1CProtocol (C extension) and PythonProtocol.
"""
__slots__ = (
'method', 'uri', 'path', 'query', 'fragment', 'version',
'headers', 'headers_bytes', 'scheme', 'raw_path',
'content_length', 'chunked', 'must_close',
'proxy_protocol_info', '_expect_100_continue',
)
def __init__(self):
self.method = None
self.uri = None
self.path = None
self.query = None
self.fragment = None
self.version = None
self.headers = []
self.headers_bytes = []
self.scheme = "http"
self.raw_path = b''
self.content_length = 0
self.chunked = False
self.must_close = False
self.proxy_protocol_info = None
self._expect_100_continue = False
@classmethod
def from_parser(cls, parser, is_ssl=False):
"""Build request from callback parser state.
Args:
parser: H1CProtocol or PythonProtocol instance
is_ssl: Whether connection is SSL/TLS
Returns:
CallbackRequest instance
"""
from urllib.parse import unquote_to_bytes
req = cls()
req.method = parser.method.decode('ascii')
# Parse path and query from URL
# Per ASGI spec:
# - path: percent-decoded UTF-8 string
# - raw_path: original bytes as received
raw_url = parser.path
if b'?' in raw_url:
path_part, query_part = raw_url.split(b'?', 1)
req.raw_path = path_part # Store original bytes
req.path = unquote_to_bytes(path_part).decode('utf-8', errors='replace')
req.query = query_part.decode('latin-1')
else:
req.raw_path = raw_url # Store original bytes
req.path = unquote_to_bytes(raw_url).decode('utf-8', errors='replace')
req.query = ''
req.uri = raw_url.decode('latin-1')
req.fragment = ''
req.version = parser.http_version
# Headers - store both bytes (for ASGI scope) and strings (for compatibility)
req.headers_bytes = list(parser.headers)
req.headers = [
(n.decode('latin-1').upper(), v.decode('latin-1'))
for n, v in parser.headers
]
req.scheme = 'https' if is_ssl else 'http'
req.content_length = parser.content_length or 0
req.chunked = parser.is_chunked
req.must_close = not parser.should_keep_alive
# Check for Expect: 100-continue
for name, value in parser.headers:
if name == b'expect' and value.lower() == b'100-continue':
req._expect_100_continue = True
break
return req
def should_close(self):
"""Check if connection should be closed after this request."""
if self.must_close:
return True
for name, value in self.headers:
if name == "CONNECTION":
v = value.lower().strip(" \t")
if v == "close":
return True
elif v == "keep-alive":
return False
break
return self.version <= (1, 0)
def get_header(self, name):
"""Get a header value by name (case-insensitive)."""
name = name.upper()
for h, v in self.headers:
if h == name:
return v
return None