diff --git a/docs/content/news.md b/docs/content/news.md index 0ee83ad3..6315a88c 100644 --- a/docs/content/news.md +++ b/docs/content/news.md @@ -1,6 +1,18 @@ # Changelog +## unreleased + +### Performance + +- **ASGI HTTP Parser Optimizations**: Improve ASGI worker HTTP parsing performance + - Read chunks in 64-byte blocks instead of 1 byte at a time for chunk size lines and trailers + - Reuse BytesIO buffers with truncate/seek instead of creating new objects (reduces GC pressure) + - Use `bytearray.find()` directly instead of converting to bytes first + - Use index-based iteration for header parsing instead of `list.pop(0)` (O(1) vs O(n)) + +--- + ## 25.1.0 - 2026-02-13 ### New Features diff --git a/gunicorn/asgi/message.py b/gunicorn/asgi/message.py index 73a6e7da..45afb851 100644 --- a/gunicorn/asgi/message.py +++ b/gunicorn/asgi/message.py @@ -8,7 +8,6 @@ Async version of gunicorn/http/message.py for ASGI workers. Reuses the parsing logic from the sync version, adapted for async I/O. """ -import io import ipaddress import re import socket @@ -151,26 +150,23 @@ class AsyncRequest: self._parse_request_line(line) - # Headers - data = bytes(buf) - + # Headers - use bytearray.find() directly to avoid bytes() conversions while True: - idx = data.find(b"\r\n\r\n") - done = data[:2] == b"\r\n" + idx = buf.find(b"\r\n\r\n") + done = buf[:2] == b"\r\n" if idx < 0 and not done: await self._read_into(buf) - data = bytes(buf) - if len(data) > self.max_buffer_headers: + if len(buf) > self.max_buffer_headers: raise LimitRequestHeaders("max buffer headers") else: break if done: - self.unreader.unread(data[2:]) + self.unreader.unread(bytes(buf[2:])) else: - self.headers = self._parse_headers(data[:idx], from_trailer=False) - self.unreader.unread(data[idx + 4:]) + self.headers = self._parse_headers(bytes(buf[:idx]), from_trailer=False) + self.unreader.unread(bytes(buf[idx + 4:])) self._set_body_reader() @@ -182,21 +178,23 @@ class AsyncRequest: buf.extend(data) async def _read_line(self, buf, limit=0): - """Read a line from buffer, returning (line, remaining_buffer).""" - data = bytes(buf) + """Read a line from buffer, returning (line, remaining_buffer). + Uses bytearray.find() directly to avoid repeated bytes() conversions. + """ while True: - idx = data.find(b"\r\n") + idx = buf.find(b"\r\n") if idx >= 0: if idx > limit > 0: raise LimitRequestLine(idx, limit) break - if len(data) - 2 > limit > 0: - raise LimitRequestLine(len(data), limit) + if len(buf) - 2 > limit > 0: + raise LimitRequestLine(len(buf), limit) await self._read_into(buf) - data = bytes(buf) - return (data[:idx], bytearray(data[idx + 2:])) + line = bytes(buf[:idx]) + remaining = bytearray(buf[idx + 2:]) + return (line, remaining) async def _handle_proxy_protocol(self, buf, mode): """Handle PROXY protocol detection and parsing. @@ -422,11 +420,16 @@ class AsyncRequest: raise InvalidHTTPVersion(self.version) def _parse_headers(self, data, from_trailer=False): - """Parse HTTP headers from raw data.""" + """Parse HTTP headers from raw data. + + Uses index-based iteration instead of list.pop(0) for O(1) access. + """ cfg = self.cfg headers = [] lines = [bytes_to_str(line) for line in data.split(b"\r\n")] + num_lines = len(lines) + i = 0 # Handle scheme headers scheme_header = False @@ -440,11 +443,12 @@ class AsyncRequest: secure_scheme_headers = cfg.secure_scheme_headers forwarder_headers = cfg.forwarder_headers - while lines: + while i < num_lines: if len(headers) >= self.limit_request_fields: raise LimitRequestHeaders("limit request headers fields") - curr = lines.pop(0) + curr = lines[i] + i += 1 header_length = len(curr) + len("\r\n") if curr.find(":") <= 0: raise InvalidHeader(curr) @@ -457,11 +461,12 @@ class AsyncRequest: name = name.upper() value = [value.strip(" \t")] - # Consume value continuation lines - while lines and lines[0].startswith((" ", "\t")): + # Consume value continuation lines using index-based iteration + while i < num_lines and lines[i].startswith((" ", "\t")): if not self.cfg.permit_obsolete_folding: raise ObsoleteFolding(name) - curr = lines.pop(0) + curr = lines[i] + i += 1 header_length += len(curr) + len("\r\n") if header_length > self.limit_request_field_size > 0: raise LimitRequestHeaders("limit request headers fields size") @@ -676,29 +681,48 @@ class AsyncRequest: raise InvalidHeader("Missing chunk terminator") async def _read_chunk_size_line(self): - """Read a chunk size line.""" - buf = io.BytesIO() + """Read a chunk size line. + + Performance optimization: reads 64-byte chunks instead of 1 byte at a time, + then pushes back any excess data after finding the line terminator. + """ + buf = bytearray() while True: - data = await self.unreader.read(1) + data = await self.unreader.read(64) if not data: raise NoMoreData() - buf.write(data) - if buf.getvalue().endswith(b"\r\n"): - return buf.getvalue()[:-2] + buf.extend(data) + idx = buf.find(b"\r\n") + if idx >= 0: + # Push back any data after the line + if idx + 2 < len(buf): + self.unreader.unread(bytes(buf[idx + 2:])) + return bytes(buf[:idx]) async def _skip_trailers(self): - """Skip trailer headers after chunked body.""" - buf = io.BytesIO() + """Skip trailer headers after chunked body. + + Performance optimization: reads 64-byte chunks instead of 1 byte at a time, + then pushes back any excess data after finding the trailer terminator. + """ + buf = bytearray() while True: - data = await self.unreader.read(1) + data = await self.unreader.read(64) if not data: return - buf.write(data) - content = buf.getvalue() - if content.endswith(b"\r\n\r\n"): - # Could parse trailers here if needed + buf.extend(data) + # Check for empty trailer (just CRLF) + if buf[:2] == b"\r\n": + # Push back remaining data + if len(buf) > 2: + self.unreader.unread(bytes(buf[2:])) return - if content == b"\r\n": + # Check for full trailer terminator + idx = buf.find(b"\r\n\r\n") + if idx >= 0: + # Push back data after the trailer + if idx + 4 < len(buf): + self.unreader.unread(bytes(buf[idx + 4:])) return async def drain_body(self): diff --git a/gunicorn/asgi/unreader.py b/gunicorn/asgi/unreader.py index c8d9aa82..330f56db 100644 --- a/gunicorn/asgi/unreader.py +++ b/gunicorn/asgi/unreader.py @@ -16,6 +16,9 @@ class AsyncUnreader: This class wraps an asyncio StreamReader and provides the ability to "unread" data back into a buffer for re-parsing. + + Performance optimization: Reuses BytesIO buffer with truncate/seek + instead of creating new objects to reduce GC pressure. """ def __init__(self, reader, max_chunk=8192): @@ -28,6 +31,25 @@ class AsyncUnreader: self.reader = reader self.buf = io.BytesIO() self.max_chunk = max_chunk + self._buf_start = 0 # Start position of valid data in buffer + + def _reset_buffer(self): + """Reset buffer for reuse instead of creating new BytesIO.""" + self.buf.seek(0) + self.buf.truncate(0) + self._buf_start = 0 + + def _get_buffered_data(self): + """Get all buffered data and reset buffer.""" + self.buf.seek(self._buf_start) + data = self.buf.read() + self._reset_buffer() + return data + + def _buffer_size(self): + """Get size of buffered data.""" + end = self.buf.seek(0, io.SEEK_END) + return end - self._buf_start async def read(self, size=None): """Read data from the stream, using buffered data first. @@ -48,31 +70,39 @@ class AsyncUnreader: if size < 0: size = None - # Move to end to check buffer size - self.buf.seek(0, io.SEEK_END) + buf_size = self._buffer_size() # If no size specified, return buffered data or read chunk - if size is None and self.buf.tell(): - ret = self.buf.getvalue() - self.buf = io.BytesIO() - return ret + if size is None and buf_size > 0: + return self._get_buffered_data() if size is None: chunk = await self._read_chunk() return chunk # Read until we have enough data - while self.buf.tell() < size: + while buf_size < size: chunk = await self._read_chunk() if not chunk: - ret = self.buf.getvalue() - self.buf = io.BytesIO() - return ret + return self._get_buffered_data() + self.buf.seek(0, io.SEEK_END) self.buf.write(chunk) + buf_size += len(chunk) - data = self.buf.getvalue() - self.buf = io.BytesIO() - self.buf.write(data[size:]) - return data[:size] + # We have enough data - extract what we need + self.buf.seek(self._buf_start) + data = self.buf.read(size) + + # Update start position instead of creating new buffer + self._buf_start += size + + # If buffer is getting large with consumed data, compact it + if self._buf_start > 8192: + remaining = self.buf.read() # Read from current position + self._reset_buffer() + if remaining: + self.buf.write(remaining) + + return data async def _read_chunk(self): """Read a chunk of data from the underlying stream.""" @@ -86,15 +116,20 @@ class AsyncUnreader: Args: data: bytes to push back + + Note: This prepends data to the buffer so it will be read first. """ if data: - self.buf.seek(0, io.SEEK_END) + # Get existing buffered data + self.buf.seek(self._buf_start) + existing = self.buf.read() + + # Reset and write new data first, then existing + self._reset_buffer() self.buf.write(data) + if existing: + self.buf.write(existing) def has_buffered_data(self): """Check if there's data in the pushback buffer.""" - pos = self.buf.tell() - self.buf.seek(0, io.SEEK_END) - has_data = self.buf.tell() > 0 - self.buf.seek(pos) - return has_data + return self._buffer_size() > 0 diff --git a/tests/test_asgi_parser.py b/tests/test_asgi_parser.py new file mode 100644 index 00000000..15211798 --- /dev/null +++ b/tests/test_asgi_parser.py @@ -0,0 +1,325 @@ +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +""" +Tests for ASGI HTTP parser optimizations. +""" + +import asyncio +import ipaddress +import pytest + +from gunicorn.asgi.unreader import AsyncUnreader +from gunicorn.asgi.message import AsyncRequest + + +class MockStreamReader: + """Mock asyncio.StreamReader for testing.""" + + def __init__(self, data): + self.data = data + self.pos = 0 + + async def read(self, size=-1): + if self.pos >= len(self.data): + return b"" + if size < 0: + result = self.data[self.pos:] + self.pos = len(self.data) + else: + result = self.data[self.pos:self.pos + size] + self.pos += size + return result + + +class MockConfig: + """Mock gunicorn config for testing.""" + + def __init__(self): + self.is_ssl = False + self.proxy_protocol = "off" + self.proxy_allow_ips = ["127.0.0.1"] + self.forwarded_allow_ips = ["127.0.0.1"] + self._proxy_allow_networks = None + self._forwarded_allow_networks = None + self.secure_scheme_headers = {} + self.forwarder_headers = [] + self.limit_request_line = 8190 + self.limit_request_fields = 100 + self.limit_request_field_size = 8190 + self.permit_unconventional_http_method = False + self.permit_unconventional_http_version = False + self.permit_obsolete_folding = False + self.casefold_http_method = False + self.strip_header_spaces = False + self.header_map = "refuse" + + def forwarded_allow_networks(self): + if self._forwarded_allow_networks is None: + self._forwarded_allow_networks = [ + ipaddress.ip_network(addr) + for addr in self.forwarded_allow_ips + if addr != "*" + ] + return self._forwarded_allow_networks + + def proxy_allow_networks(self): + if self._proxy_allow_networks is None: + self._proxy_allow_networks = [ + ipaddress.ip_network(addr) + for addr in self.proxy_allow_ips + if addr != "*" + ] + return self._proxy_allow_networks + + +# Optimized Chunk Reading Tests + +@pytest.mark.asyncio +async def test_chunk_size_line_reading(): + """Test optimized chunk size line reading.""" + # Simulate chunked body with chunk size line + data = b"a\r\nhello body\r\n0\r\n\r\n" + reader = MockStreamReader(data) + unreader = AsyncUnreader(reader) + cfg = MockConfig() + + req = AsyncRequest(cfg, unreader, ("127.0.0.1", 8000)) + # Access the private method for testing + line = await req._read_chunk_size_line() + assert line == b"a" + + +@pytest.mark.asyncio +async def test_skip_trailers_empty(): + """Test skipping empty trailers.""" + data = b"\r\n" + reader = MockStreamReader(data) + unreader = AsyncUnreader(reader) + cfg = MockConfig() + + req = AsyncRequest(cfg, unreader, ("127.0.0.1", 8000)) + # Should not raise + await req._skip_trailers() + + +@pytest.mark.asyncio +async def test_skip_trailers_with_headers(): + """Test skipping trailers with actual headers.""" + data = b"X-Checksum: abc123\r\n\r\n" + reader = MockStreamReader(data) + unreader = AsyncUnreader(reader) + cfg = MockConfig() + + req = AsyncRequest(cfg, unreader, ("127.0.0.1", 8000)) + # Should not raise + await req._skip_trailers() + + +# Buffer Reuse Tests + +@pytest.mark.asyncio +async def test_unreader_buffer_reuse(): + """Test that AsyncUnreader reuses buffers efficiently.""" + data = b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n" + reader = MockStreamReader(data) + unreader = AsyncUnreader(reader) + + # Read in chunks + chunk1 = await unreader.read(10) + assert chunk1 == b"GET / HTTP" + + # Read more + chunk2 = await unreader.read(10) + assert chunk2 == b"/1.1\r\nHost" + + # Unread some data + unreader.unread(b"/1.1\r\nHost") + + # Read again - should get unreaded data + chunk3 = await unreader.read(10) + assert chunk3 == b"/1.1\r\nHost" + + +@pytest.mark.asyncio +async def test_unreader_unread_prepends(): + """Test that unread prepends data.""" + data = b"original" + reader = MockStreamReader(data) + unreader = AsyncUnreader(reader) + + # Read some data first + await unreader.read(4) # "orig" + + # Unread something different + unreader.unread(b"NEW") + + # Should read the new data first + result = await unreader.read(3) + assert result == b"NEW" + + +# Header Parsing Optimization Tests + +@pytest.mark.asyncio +async def test_header_parsing_index_iteration(): + """Test that header parsing uses index-based iteration.""" + raw_request = ( + b"GET / HTTP/1.1\r\n" + b"Host: example.com\r\n" + b"Content-Type: text/plain\r\n" + b"X-Custom: value\r\n" + b"\r\n" + ) + reader = MockStreamReader(raw_request) + unreader = AsyncUnreader(reader) + cfg = MockConfig() + + req = await AsyncRequest.parse(cfg, unreader, ("127.0.0.1", 8000)) + + assert req.method == "GET" + assert req.path == "/" + assert len(req.headers) == 3 + assert ("HOST", "example.com") in req.headers + assert ("CONTENT-TYPE", "text/plain") in req.headers + assert ("X-CUSTOM", "value") in req.headers + + +@pytest.mark.asyncio +async def test_many_headers_performance(): + """Test parsing request with many headers.""" + headers = [] + for i in range(50): + headers.append(f"X-Header-{i}: value-{i}\r\n") + + raw_request = ( + b"GET / HTTP/1.1\r\n" + + "".join(headers).encode() + + b"\r\n" + ) + + reader = MockStreamReader(raw_request) + unreader = AsyncUnreader(reader) + cfg = MockConfig() + + req = await AsyncRequest.parse(cfg, unreader, ("127.0.0.1", 8000)) + + assert len(req.headers) == 50 + + +# Bytearray Find Optimization Tests + +@pytest.mark.asyncio +async def test_bytearray_find_optimization(): + """Test that bytearray.find() is used instead of bytes().find().""" + raw_request = ( + b"GET /path?query=value HTTP/1.1\r\n" + b"Host: example.com\r\n" + b"Content-Length: 5\r\n" + b"\r\n" + b"hello" + ) + reader = MockStreamReader(raw_request) + unreader = AsyncUnreader(reader) + cfg = MockConfig() + + req = await AsyncRequest.parse(cfg, unreader, ("127.0.0.1", 8000)) + + assert req.method == "GET" + assert req.path == "/path" + assert req.query == "query=value" + assert req.content_length == 5 + + +# Chunked Body Tests with Optimized Reading + +@pytest.mark.asyncio +async def test_chunked_body_optimized_reading(): + """Test reading chunked body with optimized chunk reading.""" + raw_request = ( + b"POST / HTTP/1.1\r\n" + b"Host: example.com\r\n" + b"Transfer-Encoding: chunked\r\n" + b"\r\n" + b"5\r\nhello\r\n" + b"6\r\n world\r\n" + b"0\r\n\r\n" + ) + reader = MockStreamReader(raw_request) + unreader = AsyncUnreader(reader) + cfg = MockConfig() + + req = await AsyncRequest.parse(cfg, unreader, ("127.0.0.1", 8000)) + + assert req.chunked is True + assert req.content_length is None + + # Read body + body_parts = [] + while True: + chunk = await req.read_body(1024) + if not chunk: + break + body_parts.append(chunk) + + body = b"".join(body_parts) + assert body == b"hello world" + + +@pytest.mark.asyncio +async def test_chunked_body_with_extension(): + """Test reading chunked body with chunk extensions.""" + raw_request = ( + b"POST / HTTP/1.1\r\n" + b"Host: example.com\r\n" + b"Transfer-Encoding: chunked\r\n" + b"\r\n" + b"5;ext=value\r\nhello\r\n" + b"0\r\n\r\n" + ) + reader = MockStreamReader(raw_request) + unreader = AsyncUnreader(reader) + cfg = MockConfig() + + req = await AsyncRequest.parse(cfg, unreader, ("127.0.0.1", 8000)) + + chunk = await req.read_body(1024) + assert chunk == b"hello" + + +# Edge Cases + +@pytest.mark.asyncio +async def test_empty_headers(): + """Test request with no headers.""" + raw_request = ( + b"GET / HTTP/1.1\r\n" + b"\r\n" + ) + reader = MockStreamReader(raw_request) + unreader = AsyncUnreader(reader) + cfg = MockConfig() + + req = await AsyncRequest.parse(cfg, unreader, ("127.0.0.1", 8000)) + + assert req.method == "GET" + assert len(req.headers) == 0 + + +@pytest.mark.asyncio +async def test_large_header_value(): + """Test request with large header value.""" + large_value = "x" * 4000 # Within default limit + raw_request = ( + b"GET / HTTP/1.1\r\n" + + f"X-Large-Header: {large_value}\r\n".encode() + + b"\r\n" + ) + reader = MockStreamReader(raw_request) + unreader = AsyncUnreader(reader) + cfg = MockConfig() + + req = await AsyncRequest.parse(cfg, unreader, ("127.0.0.1", 8000)) + + assert req.get_header("X-Large-Header") == large_value