perf(asgi): optimize HTTP parser for improved performance

- Read chunk size lines and trailers in 64-byte blocks instead of 1 byte
  at a time, pushing back excess data to the unreader buffer
- Reuse BytesIO buffers with truncate/seek instead of creating new
  objects to reduce GC pressure in AsyncUnreader
- Use bytearray.find() directly instead of converting to bytes first
  in header parsing loop
- Use index-based iteration for header parsing instead of list.pop(0)
  which is O(n) per pop vs O(1) for index access

Add tests for the optimized parsing code paths.
This commit is contained in:
Benoit Chesneau 2026-02-18 10:00:46 +01:00
parent 2d4310116d
commit b3b98b1322
4 changed files with 454 additions and 57 deletions

View File

@ -1,6 +1,18 @@
<span id="news"></span>
# Changelog
## unreleased
### Performance
- **ASGI HTTP Parser Optimizations**: Improve ASGI worker HTTP parsing performance
- Read chunks in 64-byte blocks instead of 1 byte at a time for chunk size lines and trailers
- Reuse BytesIO buffers with truncate/seek instead of creating new objects (reduces GC pressure)
- Use `bytearray.find()` directly instead of converting to bytes first
- Use index-based iteration for header parsing instead of `list.pop(0)` (O(1) vs O(n))
---
## 25.1.0 - 2026-02-13
### New Features

View File

@ -151,26 +151,23 @@ class AsyncRequest:
self._parse_request_line(line)
# Headers
data = bytes(buf)
# Headers - use bytearray.find() directly to avoid bytes() conversions
while True:
idx = data.find(b"\r\n\r\n")
done = data[:2] == b"\r\n"
idx = buf.find(b"\r\n\r\n")
done = buf[:2] == b"\r\n"
if idx < 0 and not done:
await self._read_into(buf)
data = bytes(buf)
if len(data) > self.max_buffer_headers:
if len(buf) > self.max_buffer_headers:
raise LimitRequestHeaders("max buffer headers")
else:
break
if done:
self.unreader.unread(data[2:])
self.unreader.unread(bytes(buf[2:]))
else:
self.headers = self._parse_headers(data[:idx], from_trailer=False)
self.unreader.unread(data[idx + 4:])
self.headers = self._parse_headers(bytes(buf[:idx]), from_trailer=False)
self.unreader.unread(bytes(buf[idx + 4:]))
self._set_body_reader()
@ -182,21 +179,23 @@ class AsyncRequest:
buf.extend(data)
async def _read_line(self, buf, limit=0):
"""Read a line from buffer, returning (line, remaining_buffer)."""
data = bytes(buf)
"""Read a line from buffer, returning (line, remaining_buffer).
Uses bytearray.find() directly to avoid repeated bytes() conversions.
"""
while True:
idx = data.find(b"\r\n")
idx = buf.find(b"\r\n")
if idx >= 0:
if idx > limit > 0:
raise LimitRequestLine(idx, limit)
break
if len(data) - 2 > limit > 0:
raise LimitRequestLine(len(data), limit)
if len(buf) - 2 > limit > 0:
raise LimitRequestLine(len(buf), limit)
await self._read_into(buf)
data = bytes(buf)
return (data[:idx], bytearray(data[idx + 2:]))
line = bytes(buf[:idx])
remaining = bytearray(buf[idx + 2:])
return (line, remaining)
async def _handle_proxy_protocol(self, buf, mode):
"""Handle PROXY protocol detection and parsing.
@ -422,11 +421,16 @@ class AsyncRequest:
raise InvalidHTTPVersion(self.version)
def _parse_headers(self, data, from_trailer=False):
"""Parse HTTP headers from raw data."""
"""Parse HTTP headers from raw data.
Uses index-based iteration instead of list.pop(0) for O(1) access.
"""
cfg = self.cfg
headers = []
lines = [bytes_to_str(line) for line in data.split(b"\r\n")]
num_lines = len(lines)
i = 0
# Handle scheme headers
scheme_header = False
@ -440,11 +444,12 @@ class AsyncRequest:
secure_scheme_headers = cfg.secure_scheme_headers
forwarder_headers = cfg.forwarder_headers
while lines:
while i < num_lines:
if len(headers) >= self.limit_request_fields:
raise LimitRequestHeaders("limit request headers fields")
curr = lines.pop(0)
curr = lines[i]
i += 1
header_length = len(curr) + len("\r\n")
if curr.find(":") <= 0:
raise InvalidHeader(curr)
@ -457,11 +462,12 @@ class AsyncRequest:
name = name.upper()
value = [value.strip(" \t")]
# Consume value continuation lines
while lines and lines[0].startswith((" ", "\t")):
# Consume value continuation lines using index-based iteration
while i < num_lines and lines[i].startswith((" ", "\t")):
if not self.cfg.permit_obsolete_folding:
raise ObsoleteFolding(name)
curr = lines.pop(0)
curr = lines[i]
i += 1
header_length += len(curr) + len("\r\n")
if header_length > self.limit_request_field_size > 0:
raise LimitRequestHeaders("limit request headers fields size")
@ -676,29 +682,48 @@ class AsyncRequest:
raise InvalidHeader("Missing chunk terminator")
async def _read_chunk_size_line(self):
"""Read a chunk size line."""
buf = io.BytesIO()
"""Read a chunk size line.
Performance optimization: reads 64-byte chunks instead of 1 byte at a time,
then pushes back any excess data after finding the line terminator.
"""
buf = bytearray()
while True:
data = await self.unreader.read(1)
data = await self.unreader.read(64)
if not data:
raise NoMoreData()
buf.write(data)
if buf.getvalue().endswith(b"\r\n"):
return buf.getvalue()[:-2]
buf.extend(data)
idx = buf.find(b"\r\n")
if idx >= 0:
# Push back any data after the line
if idx + 2 < len(buf):
self.unreader.unread(bytes(buf[idx + 2:]))
return bytes(buf[:idx])
async def _skip_trailers(self):
"""Skip trailer headers after chunked body."""
buf = io.BytesIO()
"""Skip trailer headers after chunked body.
Performance optimization: reads 64-byte chunks instead of 1 byte at a time,
then pushes back any excess data after finding the trailer terminator.
"""
buf = bytearray()
while True:
data = await self.unreader.read(1)
data = await self.unreader.read(64)
if not data:
return
buf.write(data)
content = buf.getvalue()
if content.endswith(b"\r\n\r\n"):
# Could parse trailers here if needed
buf.extend(data)
# Check for empty trailer (just CRLF)
if buf[:2] == b"\r\n":
# Push back remaining data
if len(buf) > 2:
self.unreader.unread(bytes(buf[2:]))
return
if content == b"\r\n":
# Check for full trailer terminator
idx = buf.find(b"\r\n\r\n")
if idx >= 0:
# Push back data after the trailer
if idx + 4 < len(buf):
self.unreader.unread(bytes(buf[idx + 4:]))
return
async def drain_body(self):

View File

@ -16,6 +16,9 @@ class AsyncUnreader:
This class wraps an asyncio StreamReader and provides the ability
to "unread" data back into a buffer for re-parsing.
Performance optimization: Reuses BytesIO buffer with truncate/seek
instead of creating new objects to reduce GC pressure.
"""
def __init__(self, reader, max_chunk=8192):
@ -28,6 +31,25 @@ class AsyncUnreader:
self.reader = reader
self.buf = io.BytesIO()
self.max_chunk = max_chunk
self._buf_start = 0 # Start position of valid data in buffer
def _reset_buffer(self):
"""Reset buffer for reuse instead of creating new BytesIO."""
self.buf.seek(0)
self.buf.truncate(0)
self._buf_start = 0
def _get_buffered_data(self):
"""Get all buffered data and reset buffer."""
self.buf.seek(self._buf_start)
data = self.buf.read()
self._reset_buffer()
return data
def _buffer_size(self):
"""Get size of buffered data."""
end = self.buf.seek(0, io.SEEK_END)
return end - self._buf_start
async def read(self, size=None):
"""Read data from the stream, using buffered data first.
@ -48,31 +70,39 @@ class AsyncUnreader:
if size < 0:
size = None
# Move to end to check buffer size
self.buf.seek(0, io.SEEK_END)
buf_size = self._buffer_size()
# If no size specified, return buffered data or read chunk
if size is None and self.buf.tell():
ret = self.buf.getvalue()
self.buf = io.BytesIO()
return ret
if size is None and buf_size > 0:
return self._get_buffered_data()
if size is None:
chunk = await self._read_chunk()
return chunk
# Read until we have enough data
while self.buf.tell() < size:
while buf_size < size:
chunk = await self._read_chunk()
if not chunk:
ret = self.buf.getvalue()
self.buf = io.BytesIO()
return ret
return self._get_buffered_data()
self.buf.seek(0, io.SEEK_END)
self.buf.write(chunk)
buf_size += len(chunk)
data = self.buf.getvalue()
self.buf = io.BytesIO()
self.buf.write(data[size:])
return data[:size]
# We have enough data - extract what we need
self.buf.seek(self._buf_start)
data = self.buf.read(size)
# Update start position instead of creating new buffer
self._buf_start += size
# If buffer is getting large with consumed data, compact it
if self._buf_start > 8192:
remaining = self.buf.read() # Read from current position
self._reset_buffer()
if remaining:
self.buf.write(remaining)
return data
async def _read_chunk(self):
"""Read a chunk of data from the underlying stream."""
@ -86,15 +116,20 @@ class AsyncUnreader:
Args:
data: bytes to push back
Note: This prepends data to the buffer so it will be read first.
"""
if data:
self.buf.seek(0, io.SEEK_END)
# Get existing buffered data
self.buf.seek(self._buf_start)
existing = self.buf.read()
# Reset and write new data first, then existing
self._reset_buffer()
self.buf.write(data)
if existing:
self.buf.write(existing)
def has_buffered_data(self):
"""Check if there's data in the pushback buffer."""
pos = self.buf.tell()
self.buf.seek(0, io.SEEK_END)
has_data = self.buf.tell() > 0
self.buf.seek(pos)
return has_data
return self._buffer_size() > 0

325
tests/test_asgi_parser.py Normal file
View File

@ -0,0 +1,325 @@
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
"""
Tests for ASGI HTTP parser optimizations.
"""
import asyncio
import ipaddress
import pytest
from gunicorn.asgi.unreader import AsyncUnreader
from gunicorn.asgi.message import AsyncRequest
class MockStreamReader:
"""Mock asyncio.StreamReader for testing."""
def __init__(self, data):
self.data = data
self.pos = 0
async def read(self, size=-1):
if self.pos >= len(self.data):
return b""
if size < 0:
result = self.data[self.pos:]
self.pos = len(self.data)
else:
result = self.data[self.pos:self.pos + size]
self.pos += size
return result
class MockConfig:
"""Mock gunicorn config for testing."""
def __init__(self):
self.is_ssl = False
self.proxy_protocol = "off"
self.proxy_allow_ips = ["127.0.0.1"]
self.forwarded_allow_ips = ["127.0.0.1"]
self._proxy_allow_networks = None
self._forwarded_allow_networks = None
self.secure_scheme_headers = {}
self.forwarder_headers = []
self.limit_request_line = 8190
self.limit_request_fields = 100
self.limit_request_field_size = 8190
self.permit_unconventional_http_method = False
self.permit_unconventional_http_version = False
self.permit_obsolete_folding = False
self.casefold_http_method = False
self.strip_header_spaces = False
self.header_map = "refuse"
def forwarded_allow_networks(self):
if self._forwarded_allow_networks is None:
self._forwarded_allow_networks = [
ipaddress.ip_network(addr)
for addr in self.forwarded_allow_ips
if addr != "*"
]
return self._forwarded_allow_networks
def proxy_allow_networks(self):
if self._proxy_allow_networks is None:
self._proxy_allow_networks = [
ipaddress.ip_network(addr)
for addr in self.proxy_allow_ips
if addr != "*"
]
return self._proxy_allow_networks
# Optimized Chunk Reading Tests
@pytest.mark.asyncio
async def test_chunk_size_line_reading():
"""Test optimized chunk size line reading."""
# Simulate chunked body with chunk size line
data = b"a\r\nhello body\r\n0\r\n\r\n"
reader = MockStreamReader(data)
unreader = AsyncUnreader(reader)
cfg = MockConfig()
req = AsyncRequest(cfg, unreader, ("127.0.0.1", 8000))
# Access the private method for testing
line = await req._read_chunk_size_line()
assert line == b"a"
@pytest.mark.asyncio
async def test_skip_trailers_empty():
"""Test skipping empty trailers."""
data = b"\r\n"
reader = MockStreamReader(data)
unreader = AsyncUnreader(reader)
cfg = MockConfig()
req = AsyncRequest(cfg, unreader, ("127.0.0.1", 8000))
# Should not raise
await req._skip_trailers()
@pytest.mark.asyncio
async def test_skip_trailers_with_headers():
"""Test skipping trailers with actual headers."""
data = b"X-Checksum: abc123\r\n\r\n"
reader = MockStreamReader(data)
unreader = AsyncUnreader(reader)
cfg = MockConfig()
req = AsyncRequest(cfg, unreader, ("127.0.0.1", 8000))
# Should not raise
await req._skip_trailers()
# Buffer Reuse Tests
@pytest.mark.asyncio
async def test_unreader_buffer_reuse():
"""Test that AsyncUnreader reuses buffers efficiently."""
data = b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n"
reader = MockStreamReader(data)
unreader = AsyncUnreader(reader)
# Read in chunks
chunk1 = await unreader.read(10)
assert chunk1 == b"GET / HTTP"
# Read more
chunk2 = await unreader.read(10)
assert chunk2 == b"/1.1\r\nHost"
# Unread some data
unreader.unread(b"/1.1\r\nHost")
# Read again - should get unreaded data
chunk3 = await unreader.read(10)
assert chunk3 == b"/1.1\r\nHost"
@pytest.mark.asyncio
async def test_unreader_unread_prepends():
"""Test that unread prepends data."""
data = b"original"
reader = MockStreamReader(data)
unreader = AsyncUnreader(reader)
# Read some data first
await unreader.read(4) # "orig"
# Unread something different
unreader.unread(b"NEW")
# Should read the new data first
result = await unreader.read(3)
assert result == b"NEW"
# Header Parsing Optimization Tests
@pytest.mark.asyncio
async def test_header_parsing_index_iteration():
"""Test that header parsing uses index-based iteration."""
raw_request = (
b"GET / HTTP/1.1\r\n"
b"Host: example.com\r\n"
b"Content-Type: text/plain\r\n"
b"X-Custom: value\r\n"
b"\r\n"
)
reader = MockStreamReader(raw_request)
unreader = AsyncUnreader(reader)
cfg = MockConfig()
req = await AsyncRequest.parse(cfg, unreader, ("127.0.0.1", 8000))
assert req.method == "GET"
assert req.path == "/"
assert len(req.headers) == 3
assert ("HOST", "example.com") in req.headers
assert ("CONTENT-TYPE", "text/plain") in req.headers
assert ("X-CUSTOM", "value") in req.headers
@pytest.mark.asyncio
async def test_many_headers_performance():
"""Test parsing request with many headers."""
headers = []
for i in range(50):
headers.append(f"X-Header-{i}: value-{i}\r\n")
raw_request = (
b"GET / HTTP/1.1\r\n"
+ "".join(headers).encode()
+ b"\r\n"
)
reader = MockStreamReader(raw_request)
unreader = AsyncUnreader(reader)
cfg = MockConfig()
req = await AsyncRequest.parse(cfg, unreader, ("127.0.0.1", 8000))
assert len(req.headers) == 50
# Bytearray Find Optimization Tests
@pytest.mark.asyncio
async def test_bytearray_find_optimization():
"""Test that bytearray.find() is used instead of bytes().find()."""
raw_request = (
b"GET /path?query=value HTTP/1.1\r\n"
b"Host: example.com\r\n"
b"Content-Length: 5\r\n"
b"\r\n"
b"hello"
)
reader = MockStreamReader(raw_request)
unreader = AsyncUnreader(reader)
cfg = MockConfig()
req = await AsyncRequest.parse(cfg, unreader, ("127.0.0.1", 8000))
assert req.method == "GET"
assert req.path == "/path"
assert req.query == "query=value"
assert req.content_length == 5
# Chunked Body Tests with Optimized Reading
@pytest.mark.asyncio
async def test_chunked_body_optimized_reading():
"""Test reading chunked body with optimized chunk reading."""
raw_request = (
b"POST / HTTP/1.1\r\n"
b"Host: example.com\r\n"
b"Transfer-Encoding: chunked\r\n"
b"\r\n"
b"5\r\nhello\r\n"
b"6\r\n world\r\n"
b"0\r\n\r\n"
)
reader = MockStreamReader(raw_request)
unreader = AsyncUnreader(reader)
cfg = MockConfig()
req = await AsyncRequest.parse(cfg, unreader, ("127.0.0.1", 8000))
assert req.chunked is True
assert req.content_length is None
# Read body
body_parts = []
while True:
chunk = await req.read_body(1024)
if not chunk:
break
body_parts.append(chunk)
body = b"".join(body_parts)
assert body == b"hello world"
@pytest.mark.asyncio
async def test_chunked_body_with_extension():
"""Test reading chunked body with chunk extensions."""
raw_request = (
b"POST / HTTP/1.1\r\n"
b"Host: example.com\r\n"
b"Transfer-Encoding: chunked\r\n"
b"\r\n"
b"5;ext=value\r\nhello\r\n"
b"0\r\n\r\n"
)
reader = MockStreamReader(raw_request)
unreader = AsyncUnreader(reader)
cfg = MockConfig()
req = await AsyncRequest.parse(cfg, unreader, ("127.0.0.1", 8000))
chunk = await req.read_body(1024)
assert chunk == b"hello"
# Edge Cases
@pytest.mark.asyncio
async def test_empty_headers():
"""Test request with no headers."""
raw_request = (
b"GET / HTTP/1.1\r\n"
b"\r\n"
)
reader = MockStreamReader(raw_request)
unreader = AsyncUnreader(reader)
cfg = MockConfig()
req = await AsyncRequest.parse(cfg, unreader, ("127.0.0.1", 8000))
assert req.method == "GET"
assert len(req.headers) == 0
@pytest.mark.asyncio
async def test_large_header_value():
"""Test request with large header value."""
large_value = "x" * 4000 # Within default limit
raw_request = (
b"GET / HTTP/1.1\r\n"
+ f"X-Large-Header: {large_value}\r\n".encode()
+ b"\r\n"
)
reader = MockStreamReader(raw_request)
unreader = AsyncUnreader(reader)
cfg = MockConfig()
req = await AsyncRequest.parse(cfg, unreader, ("127.0.0.1", 8000))
assert req.get_header("X-Large-Header") == large_value