Merge pull request #3513 from benoitc/feature/asgi-performance-optimizations

perf(asgi): optimize HTTP parser for improved performance
This commit is contained in:
Benoit Chesneau 2026-02-20 10:17:38 +01:00 committed by GitHub
commit eb6b69377c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 454 additions and 58 deletions

View File

@ -1,6 +1,18 @@
<span id="news"></span>
# Changelog
## unreleased
### Performance
- **ASGI HTTP Parser Optimizations**: Improve ASGI worker HTTP parsing performance
- Read chunks in 64-byte blocks instead of 1 byte at a time for chunk size lines and trailers
- Reuse BytesIO buffers with truncate/seek instead of creating new objects (reduces GC pressure)
- Use `bytearray.find()` directly instead of converting to bytes first
- Use index-based iteration for header parsing instead of `list.pop(0)` (O(1) vs O(n))
---
## 25.1.0 - 2026-02-13
### New Features

View File

@ -8,7 +8,6 @@ Async version of gunicorn/http/message.py for ASGI workers.
Reuses the parsing logic from the sync version, adapted for async I/O.
"""
import io
import ipaddress
import re
import socket
@ -151,26 +150,23 @@ class AsyncRequest:
self._parse_request_line(line)
# Headers
data = bytes(buf)
# Headers - use bytearray.find() directly to avoid bytes() conversions
while True:
idx = data.find(b"\r\n\r\n")
done = data[:2] == b"\r\n"
idx = buf.find(b"\r\n\r\n")
done = buf[:2] == b"\r\n"
if idx < 0 and not done:
await self._read_into(buf)
data = bytes(buf)
if len(data) > self.max_buffer_headers:
if len(buf) > self.max_buffer_headers:
raise LimitRequestHeaders("max buffer headers")
else:
break
if done:
self.unreader.unread(data[2:])
self.unreader.unread(bytes(buf[2:]))
else:
self.headers = self._parse_headers(data[:idx], from_trailer=False)
self.unreader.unread(data[idx + 4:])
self.headers = self._parse_headers(bytes(buf[:idx]), from_trailer=False)
self.unreader.unread(bytes(buf[idx + 4:]))
self._set_body_reader()
@ -182,21 +178,23 @@ class AsyncRequest:
buf.extend(data)
async def _read_line(self, buf, limit=0):
"""Read a line from buffer, returning (line, remaining_buffer)."""
data = bytes(buf)
"""Read a line from buffer, returning (line, remaining_buffer).
Uses bytearray.find() directly to avoid repeated bytes() conversions.
"""
while True:
idx = data.find(b"\r\n")
idx = buf.find(b"\r\n")
if idx >= 0:
if idx > limit > 0:
raise LimitRequestLine(idx, limit)
break
if len(data) - 2 > limit > 0:
raise LimitRequestLine(len(data), limit)
if len(buf) - 2 > limit > 0:
raise LimitRequestLine(len(buf), limit)
await self._read_into(buf)
data = bytes(buf)
return (data[:idx], bytearray(data[idx + 2:]))
line = bytes(buf[:idx])
remaining = bytearray(buf[idx + 2:])
return (line, remaining)
async def _handle_proxy_protocol(self, buf, mode):
"""Handle PROXY protocol detection and parsing.
@ -422,11 +420,16 @@ class AsyncRequest:
raise InvalidHTTPVersion(self.version)
def _parse_headers(self, data, from_trailer=False):
"""Parse HTTP headers from raw data."""
"""Parse HTTP headers from raw data.
Uses index-based iteration instead of list.pop(0) for O(1) access.
"""
cfg = self.cfg
headers = []
lines = [bytes_to_str(line) for line in data.split(b"\r\n")]
num_lines = len(lines)
i = 0
# Handle scheme headers
scheme_header = False
@ -440,11 +443,12 @@ class AsyncRequest:
secure_scheme_headers = cfg.secure_scheme_headers
forwarder_headers = cfg.forwarder_headers
while lines:
while i < num_lines:
if len(headers) >= self.limit_request_fields:
raise LimitRequestHeaders("limit request headers fields")
curr = lines.pop(0)
curr = lines[i]
i += 1
header_length = len(curr) + len("\r\n")
if curr.find(":") <= 0:
raise InvalidHeader(curr)
@ -457,11 +461,12 @@ class AsyncRequest:
name = name.upper()
value = [value.strip(" \t")]
# Consume value continuation lines
while lines and lines[0].startswith((" ", "\t")):
# Consume value continuation lines using index-based iteration
while i < num_lines and lines[i].startswith((" ", "\t")):
if not self.cfg.permit_obsolete_folding:
raise ObsoleteFolding(name)
curr = lines.pop(0)
curr = lines[i]
i += 1
header_length += len(curr) + len("\r\n")
if header_length > self.limit_request_field_size > 0:
raise LimitRequestHeaders("limit request headers fields size")
@ -676,29 +681,48 @@ class AsyncRequest:
raise InvalidHeader("Missing chunk terminator")
async def _read_chunk_size_line(self):
"""Read a chunk size line."""
buf = io.BytesIO()
"""Read a chunk size line.
Performance optimization: reads 64-byte chunks instead of 1 byte at a time,
then pushes back any excess data after finding the line terminator.
"""
buf = bytearray()
while True:
data = await self.unreader.read(1)
data = await self.unreader.read(64)
if not data:
raise NoMoreData()
buf.write(data)
if buf.getvalue().endswith(b"\r\n"):
return buf.getvalue()[:-2]
buf.extend(data)
idx = buf.find(b"\r\n")
if idx >= 0:
# Push back any data after the line
if idx + 2 < len(buf):
self.unreader.unread(bytes(buf[idx + 2:]))
return bytes(buf[:idx])
async def _skip_trailers(self):
"""Skip trailer headers after chunked body."""
buf = io.BytesIO()
"""Skip trailer headers after chunked body.
Performance optimization: reads 64-byte chunks instead of 1 byte at a time,
then pushes back any excess data after finding the trailer terminator.
"""
buf = bytearray()
while True:
data = await self.unreader.read(1)
data = await self.unreader.read(64)
if not data:
return
buf.write(data)
content = buf.getvalue()
if content.endswith(b"\r\n\r\n"):
# Could parse trailers here if needed
buf.extend(data)
# Check for empty trailer (just CRLF)
if buf[:2] == b"\r\n":
# Push back remaining data
if len(buf) > 2:
self.unreader.unread(bytes(buf[2:]))
return
if content == b"\r\n":
# Check for full trailer terminator
idx = buf.find(b"\r\n\r\n")
if idx >= 0:
# Push back data after the trailer
if idx + 4 < len(buf):
self.unreader.unread(bytes(buf[idx + 4:]))
return
async def drain_body(self):

View File

@ -16,6 +16,9 @@ class AsyncUnreader:
This class wraps an asyncio StreamReader and provides the ability
to "unread" data back into a buffer for re-parsing.
Performance optimization: Reuses BytesIO buffer with truncate/seek
instead of creating new objects to reduce GC pressure.
"""
def __init__(self, reader, max_chunk=8192):
@ -28,6 +31,25 @@ class AsyncUnreader:
self.reader = reader
self.buf = io.BytesIO()
self.max_chunk = max_chunk
self._buf_start = 0 # Start position of valid data in buffer
def _reset_buffer(self):
"""Reset buffer for reuse instead of creating new BytesIO."""
self.buf.seek(0)
self.buf.truncate(0)
self._buf_start = 0
def _get_buffered_data(self):
"""Get all buffered data and reset buffer."""
self.buf.seek(self._buf_start)
data = self.buf.read()
self._reset_buffer()
return data
def _buffer_size(self):
"""Get size of buffered data."""
end = self.buf.seek(0, io.SEEK_END)
return end - self._buf_start
async def read(self, size=None):
"""Read data from the stream, using buffered data first.
@ -48,31 +70,39 @@ class AsyncUnreader:
if size < 0:
size = None
# Move to end to check buffer size
self.buf.seek(0, io.SEEK_END)
buf_size = self._buffer_size()
# If no size specified, return buffered data or read chunk
if size is None and self.buf.tell():
ret = self.buf.getvalue()
self.buf = io.BytesIO()
return ret
if size is None and buf_size > 0:
return self._get_buffered_data()
if size is None:
chunk = await self._read_chunk()
return chunk
# Read until we have enough data
while self.buf.tell() < size:
while buf_size < size:
chunk = await self._read_chunk()
if not chunk:
ret = self.buf.getvalue()
self.buf = io.BytesIO()
return ret
return self._get_buffered_data()
self.buf.seek(0, io.SEEK_END)
self.buf.write(chunk)
buf_size += len(chunk)
data = self.buf.getvalue()
self.buf = io.BytesIO()
self.buf.write(data[size:])
return data[:size]
# We have enough data - extract what we need
self.buf.seek(self._buf_start)
data = self.buf.read(size)
# Update start position instead of creating new buffer
self._buf_start += size
# If buffer is getting large with consumed data, compact it
if self._buf_start > 8192:
remaining = self.buf.read() # Read from current position
self._reset_buffer()
if remaining:
self.buf.write(remaining)
return data
async def _read_chunk(self):
"""Read a chunk of data from the underlying stream."""
@ -86,15 +116,20 @@ class AsyncUnreader:
Args:
data: bytes to push back
Note: This prepends data to the buffer so it will be read first.
"""
if data:
self.buf.seek(0, io.SEEK_END)
# Get existing buffered data
self.buf.seek(self._buf_start)
existing = self.buf.read()
# Reset and write new data first, then existing
self._reset_buffer()
self.buf.write(data)
if existing:
self.buf.write(existing)
def has_buffered_data(self):
"""Check if there's data in the pushback buffer."""
pos = self.buf.tell()
self.buf.seek(0, io.SEEK_END)
has_data = self.buf.tell() > 0
self.buf.seek(pos)
return has_data
return self._buffer_size() > 0

325
tests/test_asgi_parser.py Normal file
View File

@ -0,0 +1,325 @@
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
"""
Tests for ASGI HTTP parser optimizations.
"""
import asyncio
import ipaddress
import pytest
from gunicorn.asgi.unreader import AsyncUnreader
from gunicorn.asgi.message import AsyncRequest
class MockStreamReader:
"""Mock asyncio.StreamReader for testing."""
def __init__(self, data):
self.data = data
self.pos = 0
async def read(self, size=-1):
if self.pos >= len(self.data):
return b""
if size < 0:
result = self.data[self.pos:]
self.pos = len(self.data)
else:
result = self.data[self.pos:self.pos + size]
self.pos += size
return result
class MockConfig:
"""Mock gunicorn config for testing."""
def __init__(self):
self.is_ssl = False
self.proxy_protocol = "off"
self.proxy_allow_ips = ["127.0.0.1"]
self.forwarded_allow_ips = ["127.0.0.1"]
self._proxy_allow_networks = None
self._forwarded_allow_networks = None
self.secure_scheme_headers = {}
self.forwarder_headers = []
self.limit_request_line = 8190
self.limit_request_fields = 100
self.limit_request_field_size = 8190
self.permit_unconventional_http_method = False
self.permit_unconventional_http_version = False
self.permit_obsolete_folding = False
self.casefold_http_method = False
self.strip_header_spaces = False
self.header_map = "refuse"
def forwarded_allow_networks(self):
if self._forwarded_allow_networks is None:
self._forwarded_allow_networks = [
ipaddress.ip_network(addr)
for addr in self.forwarded_allow_ips
if addr != "*"
]
return self._forwarded_allow_networks
def proxy_allow_networks(self):
if self._proxy_allow_networks is None:
self._proxy_allow_networks = [
ipaddress.ip_network(addr)
for addr in self.proxy_allow_ips
if addr != "*"
]
return self._proxy_allow_networks
# Optimized Chunk Reading Tests
@pytest.mark.asyncio
async def test_chunk_size_line_reading():
"""Test optimized chunk size line reading."""
# Simulate chunked body with chunk size line
data = b"a\r\nhello body\r\n0\r\n\r\n"
reader = MockStreamReader(data)
unreader = AsyncUnreader(reader)
cfg = MockConfig()
req = AsyncRequest(cfg, unreader, ("127.0.0.1", 8000))
# Access the private method for testing
line = await req._read_chunk_size_line()
assert line == b"a"
@pytest.mark.asyncio
async def test_skip_trailers_empty():
"""Test skipping empty trailers."""
data = b"\r\n"
reader = MockStreamReader(data)
unreader = AsyncUnreader(reader)
cfg = MockConfig()
req = AsyncRequest(cfg, unreader, ("127.0.0.1", 8000))
# Should not raise
await req._skip_trailers()
@pytest.mark.asyncio
async def test_skip_trailers_with_headers():
"""Test skipping trailers with actual headers."""
data = b"X-Checksum: abc123\r\n\r\n"
reader = MockStreamReader(data)
unreader = AsyncUnreader(reader)
cfg = MockConfig()
req = AsyncRequest(cfg, unreader, ("127.0.0.1", 8000))
# Should not raise
await req._skip_trailers()
# Buffer Reuse Tests
@pytest.mark.asyncio
async def test_unreader_buffer_reuse():
"""Test that AsyncUnreader reuses buffers efficiently."""
data = b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n"
reader = MockStreamReader(data)
unreader = AsyncUnreader(reader)
# Read in chunks
chunk1 = await unreader.read(10)
assert chunk1 == b"GET / HTTP"
# Read more
chunk2 = await unreader.read(10)
assert chunk2 == b"/1.1\r\nHost"
# Unread some data
unreader.unread(b"/1.1\r\nHost")
# Read again - should get unreaded data
chunk3 = await unreader.read(10)
assert chunk3 == b"/1.1\r\nHost"
@pytest.mark.asyncio
async def test_unreader_unread_prepends():
"""Test that unread prepends data."""
data = b"original"
reader = MockStreamReader(data)
unreader = AsyncUnreader(reader)
# Read some data first
await unreader.read(4) # "orig"
# Unread something different
unreader.unread(b"NEW")
# Should read the new data first
result = await unreader.read(3)
assert result == b"NEW"
# Header Parsing Optimization Tests
@pytest.mark.asyncio
async def test_header_parsing_index_iteration():
"""Test that header parsing uses index-based iteration."""
raw_request = (
b"GET / HTTP/1.1\r\n"
b"Host: example.com\r\n"
b"Content-Type: text/plain\r\n"
b"X-Custom: value\r\n"
b"\r\n"
)
reader = MockStreamReader(raw_request)
unreader = AsyncUnreader(reader)
cfg = MockConfig()
req = await AsyncRequest.parse(cfg, unreader, ("127.0.0.1", 8000))
assert req.method == "GET"
assert req.path == "/"
assert len(req.headers) == 3
assert ("HOST", "example.com") in req.headers
assert ("CONTENT-TYPE", "text/plain") in req.headers
assert ("X-CUSTOM", "value") in req.headers
@pytest.mark.asyncio
async def test_many_headers_performance():
"""Test parsing request with many headers."""
headers = []
for i in range(50):
headers.append(f"X-Header-{i}: value-{i}\r\n")
raw_request = (
b"GET / HTTP/1.1\r\n"
+ "".join(headers).encode()
+ b"\r\n"
)
reader = MockStreamReader(raw_request)
unreader = AsyncUnreader(reader)
cfg = MockConfig()
req = await AsyncRequest.parse(cfg, unreader, ("127.0.0.1", 8000))
assert len(req.headers) == 50
# Bytearray Find Optimization Tests
@pytest.mark.asyncio
async def test_bytearray_find_optimization():
"""Test that bytearray.find() is used instead of bytes().find()."""
raw_request = (
b"GET /path?query=value HTTP/1.1\r\n"
b"Host: example.com\r\n"
b"Content-Length: 5\r\n"
b"\r\n"
b"hello"
)
reader = MockStreamReader(raw_request)
unreader = AsyncUnreader(reader)
cfg = MockConfig()
req = await AsyncRequest.parse(cfg, unreader, ("127.0.0.1", 8000))
assert req.method == "GET"
assert req.path == "/path"
assert req.query == "query=value"
assert req.content_length == 5
# Chunked Body Tests with Optimized Reading
@pytest.mark.asyncio
async def test_chunked_body_optimized_reading():
"""Test reading chunked body with optimized chunk reading."""
raw_request = (
b"POST / HTTP/1.1\r\n"
b"Host: example.com\r\n"
b"Transfer-Encoding: chunked\r\n"
b"\r\n"
b"5\r\nhello\r\n"
b"6\r\n world\r\n"
b"0\r\n\r\n"
)
reader = MockStreamReader(raw_request)
unreader = AsyncUnreader(reader)
cfg = MockConfig()
req = await AsyncRequest.parse(cfg, unreader, ("127.0.0.1", 8000))
assert req.chunked is True
assert req.content_length is None
# Read body
body_parts = []
while True:
chunk = await req.read_body(1024)
if not chunk:
break
body_parts.append(chunk)
body = b"".join(body_parts)
assert body == b"hello world"
@pytest.mark.asyncio
async def test_chunked_body_with_extension():
"""Test reading chunked body with chunk extensions."""
raw_request = (
b"POST / HTTP/1.1\r\n"
b"Host: example.com\r\n"
b"Transfer-Encoding: chunked\r\n"
b"\r\n"
b"5;ext=value\r\nhello\r\n"
b"0\r\n\r\n"
)
reader = MockStreamReader(raw_request)
unreader = AsyncUnreader(reader)
cfg = MockConfig()
req = await AsyncRequest.parse(cfg, unreader, ("127.0.0.1", 8000))
chunk = await req.read_body(1024)
assert chunk == b"hello"
# Edge Cases
@pytest.mark.asyncio
async def test_empty_headers():
"""Test request with no headers."""
raw_request = (
b"GET / HTTP/1.1\r\n"
b"\r\n"
)
reader = MockStreamReader(raw_request)
unreader = AsyncUnreader(reader)
cfg = MockConfig()
req = await AsyncRequest.parse(cfg, unreader, ("127.0.0.1", 8000))
assert req.method == "GET"
assert len(req.headers) == 0
@pytest.mark.asyncio
async def test_large_header_value():
"""Test request with large header value."""
large_value = "x" * 4000 # Within default limit
raw_request = (
b"GET / HTTP/1.1\r\n"
+ f"X-Large-Header: {large_value}\r\n".encode()
+ b"\r\n"
)
reader = MockStreamReader(raw_request)
unreader = AsyncUnreader(reader)
cfg = MockConfig()
req = await AsyncRequest.parse(cfg, unreader, ("127.0.0.1", 8000))
assert req.get_header("X-Large-Header") == large_value