feat(http2): add stream priority support (RFC 7540 Section 5.3)

This commit is contained in:
Benoit Chesneau 2026-01-27 11:44:33 +01:00
parent 251d8ebe51
commit 655716a181
12 changed files with 636 additions and 2 deletions

View File

@ -186,6 +186,73 @@ async def app(scope, receive, send):
Early hints are only sent to HTTP/1.1+ clients. HTTP/1.0 clients silently
ignore the callback since they don't support 1xx responses.
## Stream Priority
HTTP/2 allows clients to indicate the relative priority of streams using PRIORITY frames
(RFC 7540 Section 5.3). Gunicorn tracks stream priorities and exposes them to both
WSGI and ASGI applications.
### Accessing Priority in WSGI
Priority information is available in the WSGI environ for HTTP/2 requests:
```python
def app(environ, start_response):
# Access stream priority (HTTP/2 only)
weight = environ.get('gunicorn.http2.priority_weight')
depends_on = environ.get('gunicorn.http2.priority_depends_on')
if weight is not None:
# This is an HTTP/2 request with priority info
# Higher weight = client considers this more important
print(f"Request priority: weight={weight}, depends_on={depends_on}")
start_response('200 OK', [('Content-Type', 'text/plain')])
return [b'OK']
```
| Environ Key | Range | Default | Description |
|-------------|-------|---------|-------------|
| `gunicorn.http2.priority_weight` | 1-256 | 16 | Higher weight = more resources |
| `gunicorn.http2.priority_depends_on` | Stream ID | 0 | Parent stream (0 = root) |
### Accessing Priority in ASGI
For ASGI applications, priority is available in the scope's `extensions` dict:
```python
async def app(scope, receive, send):
if scope["type"] == "http":
# Check for HTTP/2 priority extension
extensions = scope.get("extensions", {})
priority = extensions.get("http.response.priority")
if priority:
weight = priority["weight"] # 1-256
depends_on = priority["depends_on"] # Parent stream ID
print(f"Request priority: weight={weight}, depends_on={depends_on}")
await send({
"type": "http.response.start",
"status": 200,
"headers": [(b"content-type", b"text/plain")],
})
await send({
"type": "http.response.body",
"body": b"OK",
})
```
| Extension Key | Field | Range | Default | Description |
|---------------|-------|-------|---------|-------------|
| `http.response.priority` | `weight` | 1-256 | 16 | Higher weight = more resources |
| `http.response.priority` | `depends_on` | Stream ID | 0 | Parent stream (0 = root) |
!!! note
Stream priority is advisory. Applications can use it for scheduling decisions,
but Gunicorn does not enforce priority-based request ordering. Priority
information is only present for HTTP/2 requests.
## Production Deployment
### With Nginx

View File

@ -360,6 +360,15 @@ class ASGIProtocol(asyncio.Protocol):
if hasattr(self.worker, 'state'):
scope["state"] = self.worker.state
# Add HTTP/2 priority extension if available
if hasattr(request, 'priority_weight'):
scope["extensions"] = {
"http.response.priority": {
"weight": request.priority_weight,
"depends_on": request.priority_depends_on,
}
}
return scope
def _build_environ(self, request, sockname, peername):
@ -743,6 +752,15 @@ class ASGIProtocol(asyncio.Protocol):
if hasattr(self.worker, 'state'):
scope["state"] = self.worker.state
# Add HTTP/2 priority extension
if hasattr(request, 'priority_weight'):
scope["extensions"] = {
"http.response.priority": {
"weight": request.priority_weight,
"depends_on": request.priority_depends_on,
}
}
return scope
def _build_http2_environ(self, request, sockname, peername):

View File

@ -250,6 +250,11 @@ def create(req, sock, client, server, cfg):
# Add wsgi.early_hints callback for sending 103 Early Hints
environ['wsgi.early_hints'] = _make_early_hints_callback(req, sock, resp)
# Add HTTP/2 stream priority if available
if hasattr(req, 'priority_weight'):
environ['gunicorn.http2.priority_weight'] = req.priority_weight
environ['gunicorn.http2.priority_depends_on'] = req.priority_depends_on
return resp, environ

View File

@ -191,7 +191,7 @@ class AsyncHTTP2Connection:
pass # Flow control update, handled by h2
elif isinstance(event, _h2_events.PriorityUpdated):
pass # Priority update, could be used for scheduling
self._handle_priority_updated(event)
elif isinstance(event, _h2_events.SettingsAcknowledged):
pass # Settings ACK received
@ -270,6 +270,20 @@ class AsyncHTTP2Connection:
stream.receive_trailers(event.headers)
return HTTP2Request(stream, self.cfg, self.client_addr)
def _handle_priority_updated(self, event):
"""Handle PriorityUpdated event (PRIORITY frame).
Args:
event: PriorityUpdated event with priority info
"""
stream = self.streams.get(event.stream_id)
if stream is not None:
stream.update_priority(
weight=event.weight,
depends_on=event.depends_on,
exclusive=event.exclusive
)
async def send_informational(self, stream_id, status, headers):
"""Send an informational response (1xx) on a stream.

View File

@ -185,7 +185,7 @@ class HTTP2ServerConnection:
pass # Flow control update, handled by h2
elif isinstance(event, _h2_events.PriorityUpdated):
pass # Priority update, could be used for scheduling
self._handle_priority_updated(event)
elif isinstance(event, _h2_events.SettingsAcknowledged):
pass # Settings ACK received
@ -308,6 +308,20 @@ class HTTP2ServerConnection:
# Trailers always end the request
return HTTP2Request(stream, self.cfg, self.client_addr)
def _handle_priority_updated(self, event):
"""Handle PriorityUpdated event (PRIORITY frame).
Args:
event: PriorityUpdated event with priority info
"""
stream = self.streams.get(event.stream_id)
if stream is not None:
stream.update_priority(
weight=event.weight,
depends_on=event.depends_on,
exclusive=event.exclusive
)
def send_informational(self, stream_id, status, headers):
"""Send an informational response (1xx) on a stream.

View File

@ -161,6 +161,10 @@ class HTTP2Request:
# HTTP/2 does not use proxy protocol through the data stream
self.proxy_protocol_info = None
# Stream priority (RFC 7540 Section 5.3)
self.priority_weight = stream.priority_weight
self.priority_depends_on = stream.priority_depends_on
def force_close(self):
"""Force the connection to close after this request."""
self.must_close = True

View File

@ -63,6 +63,11 @@ class HTTP2Stream:
# Trailers
self.trailers = None
# Stream priority (RFC 7540 Section 5.3)
self.priority_weight = 16
self.priority_depends_on = 0
self.priority_exclusive = False
@property
def is_client_stream(self):
"""Check if this is a client-initiated stream (odd stream ID)."""
@ -210,6 +215,21 @@ class HTTP2Stream:
self.response_complete = True
self.request_complete = True
def update_priority(self, weight=None, depends_on=None, exclusive=None):
"""Update stream priority from PRIORITY frame.
Args:
weight: Priority weight (1-256), higher = more resources
depends_on: Stream ID this stream depends on
exclusive: Whether this is an exclusive dependency
"""
if weight is not None:
self.priority_weight = max(1, min(256, weight))
if depends_on is not None:
self.priority_depends_on = depends_on
if exclusive is not None:
self.priority_exclusive = exclusive
def _half_close_local(self):
"""Transition to half-closed (local) state."""
if self.state == StreamState.OPEN:

View File

@ -641,3 +641,108 @@ class TestASGIConfig:
cfg = Config()
cfg.set('root_path', '/api/v1')
assert cfg.root_path == '/api/v1'
# ============================================================================
# HTTP/2 Priority Tests
# ============================================================================
class TestASGIHTTP2Priority:
"""Test HTTP/2 priority in ASGI scope."""
def test_http2_priority_in_scope(self):
"""Test that HTTP/2 priority is added to ASGI scope extensions."""
from gunicorn.asgi.protocol import ASGIProtocol
worker = mock.Mock()
worker.cfg = Config()
worker.log = mock.Mock()
worker.asgi = mock.Mock()
protocol = ASGIProtocol(worker)
# Create mock HTTP/2 request with priority
request = mock.Mock()
request.method = "GET"
request.path = "/test"
request.query = ""
request.version = (2, 0)
request.scheme = "https"
request.headers = [("HOST", "localhost")]
request.priority_weight = 128
request.priority_depends_on = 3
scope = protocol._build_http_scope(
request,
("127.0.0.1", 8443),
("127.0.0.1", 12345),
)
assert "extensions" in scope
assert "http.response.priority" in scope["extensions"]
assert scope["extensions"]["http.response.priority"]["weight"] == 128
assert scope["extensions"]["http.response.priority"]["depends_on"] == 3
def test_http2_priority_in_http2_scope(self):
"""Test that HTTP/2 priority is in _build_http2_scope."""
from gunicorn.asgi.protocol import ASGIProtocol
worker = mock.Mock()
worker.cfg = Config()
worker.log = mock.Mock()
worker.asgi = mock.Mock()
protocol = ASGIProtocol(worker)
# Create mock HTTP/2 request with priority
request = mock.Mock()
request.method = "POST"
request.path = "/api/data"
request.query = "id=1"
request.uri = "/api/data?id=1"
request.scheme = "https"
request.headers = [("HOST", "localhost"), ("CONTENT-TYPE", "application/json")]
request.priority_weight = 256
request.priority_depends_on = 1
scope = protocol._build_http2_scope(
request,
("127.0.0.1", 8443),
("127.0.0.1", 12345),
)
assert scope["http_version"] == "2"
assert "extensions" in scope
assert "http.response.priority" in scope["extensions"]
assert scope["extensions"]["http.response.priority"]["weight"] == 256
assert scope["extensions"]["http.response.priority"]["depends_on"] == 1
def test_no_priority_for_http1_requests(self):
"""Test that HTTP/1.1 requests don't have priority extensions."""
from gunicorn.asgi.protocol import ASGIProtocol
worker = mock.Mock()
worker.cfg = Config()
worker.log = mock.Mock()
worker.asgi = mock.Mock()
protocol = ASGIProtocol(worker)
# Create mock HTTP/1.1 request (no priority attributes)
request = mock.Mock(spec=['method', 'path', 'query', 'version',
'scheme', 'headers'])
request.method = "GET"
request.path = "/test"
request.query = ""
request.version = (1, 1)
request.scheme = "http"
request.headers = [("HOST", "localhost")]
scope = protocol._build_http_scope(
request,
("127.0.0.1", 8000),
("127.0.0.1", 12345),
)
# HTTP/1.1 requests should not have extensions with priority
assert "extensions" not in scope or "http.response.priority" not in scope.get("extensions", {})

View File

@ -559,3 +559,82 @@ class TestAsyncHTTP2ConnectionSocketErrors:
with pytest.raises(HTTP2ConnectionError):
await conn.initiate_connection()
class TestAsyncHTTP2ConnectionPriority:
"""Test async HTTP/2 priority handling."""
@pytest.mark.asyncio
async def test_handle_priority_updated_existing_stream(self):
"""Test handling priority update for existing stream."""
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
# Create a client connection to generate frames
client_conn = create_client_connection()
client_data = client_conn.data_to_send()
# Set up reader with client preface
reader.set_data(client_data)
await conn.initiate_connection()
await conn.receive_data()
writer.clear()
# Send a request to create a stream
client_conn.send_headers(1, [
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'localhost'),
])
request_data = client_conn.data_to_send()
reader.set_data(request_data)
await conn.receive_data()
# Verify stream was created
assert 1 in conn.streams
stream = conn.streams[1]
# Default priority values
assert stream.priority_weight == 16
assert stream.priority_depends_on == 0
# Send a PRIORITY frame
client_conn.prioritize(1, weight=128, depends_on=0, exclusive=False)
priority_data = client_conn.data_to_send()
reader.set_data(priority_data)
await conn.receive_data()
# Verify priority was updated
assert stream.priority_weight == 128
@pytest.mark.asyncio
async def test_handle_priority_updated_nonexistent_stream(self):
"""Test that priority update for nonexistent stream is ignored."""
from gunicorn.http2.async_connection import AsyncHTTP2Connection
cfg = MockConfig()
reader = MockAsyncReader()
writer = MockAsyncWriter()
conn = AsyncHTTP2Connection(cfg, reader, writer, ('127.0.0.1', 12345))
# Create a client connection
client_conn = create_client_connection()
client_data = client_conn.data_to_send()
reader.set_data(client_data)
await conn.initiate_connection()
await conn.receive_data()
# Send a PRIORITY frame for a stream that doesn't exist
client_conn.prioritize(99, weight=64, depends_on=0, exclusive=False)
priority_data = client_conn.data_to_send()
reader.set_data(priority_data)
# Should not raise
await conn.receive_data()

View File

@ -568,6 +568,77 @@ class TestHTTP2ServerConnectionRepr:
assert "closed=" in repr_str
class TestHTTP2ServerConnectionPriority:
"""Test HTTP/2 priority handling."""
def test_handle_priority_updated_existing_stream(self):
"""Test handling priority update for existing stream."""
from gunicorn.http2.connection import HTTP2ServerConnection
cfg = MockConfig()
sock = MockSocket()
conn = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
conn.initiate_connection()
# Create a client connection to generate frames
client_conn = create_client_connection()
# Get client preface
client_data = client_conn.data_to_send()
# Feed client preface to server
conn.receive_data(client_data)
sock._sent = bytearray()
# Send a request to create a stream
client_conn.send_headers(1, [
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'localhost'),
])
request_data = client_conn.data_to_send()
conn.receive_data(request_data)
# Verify stream was created
assert 1 in conn.streams
stream = conn.streams[1]
# Default priority values
assert stream.priority_weight == 16
assert stream.priority_depends_on == 0
# Send a PRIORITY frame
client_conn.prioritize(1, weight=128, depends_on=0, exclusive=False)
priority_data = client_conn.data_to_send()
conn.receive_data(priority_data)
# Verify priority was updated
assert stream.priority_weight == 128
def test_handle_priority_updated_nonexistent_stream(self):
"""Test that priority update for nonexistent stream is ignored."""
from gunicorn.http2.connection import HTTP2ServerConnection
cfg = MockConfig()
sock = MockSocket()
conn = HTTP2ServerConnection(cfg, sock, ('127.0.0.1', 12345))
conn.initiate_connection()
# Create a client connection
client_conn = create_client_connection()
client_data = client_conn.data_to_send()
conn.receive_data(client_data)
# Send a PRIORITY frame for a stream that doesn't exist
# This should not raise an error
client_conn.prioritize(99, weight=64, depends_on=0, exclusive=False)
priority_data = client_conn.data_to_send()
# Should not raise
conn.receive_data(priority_data)
class TestHTTP2NotAvailable:
"""Test behavior when h2 is not available."""

View File

@ -572,3 +572,150 @@ class TestHTTP2RequestDefaults:
assert req.uri == '/'
assert req.path == '/'
class TestHTTP2RequestPriority:
"""Test HTTP2Request priority attributes."""
def test_default_priority_values(self):
"""Test that request inherits default stream priority."""
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.receive_headers([
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'example.com'),
], end_stream=True)
cfg = MockConfig()
req = HTTP2Request(stream, cfg, ('127.0.0.1', 12345))
assert req.priority_weight == 16
assert req.priority_depends_on == 0
def test_custom_priority_values(self):
"""Test that request inherits custom stream priority."""
conn = MockConnection()
stream = HTTP2Stream(stream_id=3, connection=conn)
# Update priority before creating request
stream.update_priority(weight=200, depends_on=1)
stream.receive_headers([
(':method', 'POST'),
(':path', '/api/data'),
(':scheme', 'https'),
(':authority', 'example.com'),
], end_stream=False)
stream.receive_data(b'{"data": "test"}', end_stream=True)
cfg = MockConfig()
req = HTTP2Request(stream, cfg, ('192.168.1.100', 54321))
assert req.priority_weight == 200
assert req.priority_depends_on == 1
def test_priority_reflects_stream_at_request_creation(self):
"""Test that priority reflects stream state when request is created."""
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.receive_headers([
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https'),
(':authority', 'example.com'),
], end_stream=True)
cfg = MockConfig()
# Create request with default priority
req = HTTP2Request(stream, cfg, ('127.0.0.1', 12345))
assert req.priority_weight == 16
# Update stream priority after request was created
stream.update_priority(weight=256)
# Request should still have old value (captured at creation time)
assert req.priority_weight == 16
# Stream has new value
assert stream.priority_weight == 256
class MockWSGIConfig:
"""Mock gunicorn configuration with WSGI-required attributes."""
def __init__(self):
self.errorlog = '-'
self.workers = 1
class TestHTTP2RequestWSGIEnviron:
"""Test HTTP/2 priority in WSGI environ."""
def test_priority_in_wsgi_environ(self):
"""Test that HTTP/2 priority is added to WSGI environ."""
from unittest import mock
from gunicorn.http.wsgi import create
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.update_priority(weight=128, depends_on=3)
stream.receive_headers([
(':method', 'GET'),
(':path', '/test'),
(':scheme', 'https'),
(':authority', 'example.com'),
], end_stream=True)
cfg = MockConfig()
req = HTTP2Request(stream, cfg, ('127.0.0.1', 12345))
# Create a mock socket
mock_sock = mock.Mock()
mock_sock.getsockname.return_value = ('127.0.0.1', 8443)
# Use WSGI config for environ creation
wsgi_cfg = MockWSGIConfig()
# Create WSGI environ
resp, environ = create(req, mock_sock, ('127.0.0.1', 12345), ('127.0.0.1', 8443), wsgi_cfg)
# Verify priority is in environ
assert environ.get('gunicorn.http2.priority_weight') == 128
assert environ.get('gunicorn.http2.priority_depends_on') == 3
def test_priority_not_in_environ_for_http1(self):
"""Test that HTTP/1 requests don't have priority keys."""
from unittest import mock
from gunicorn.http.wsgi import create
# Create a mock HTTP/1 request (no priority attributes)
mock_req = mock.Mock()
mock_req.headers = [('HOST', 'example.com')]
mock_req.scheme = 'https'
mock_req.path = '/test'
mock_req.query = ''
mock_req.fragment = ''
mock_req.method = 'GET'
mock_req.uri = '/test'
mock_req.version = (1, 1)
mock_req._expected_100_continue = False
mock_req.proxy_protocol_info = None
mock_req.body = mock.Mock()
# Remove priority attributes to simulate HTTP/1 request
del mock_req.priority_weight
del mock_req.priority_depends_on
wsgi_cfg = MockWSGIConfig()
mock_sock = mock.Mock()
mock_sock.getsockname.return_value = ('127.0.0.1', 8443)
resp, environ = create(mock_req, mock_sock, ('127.0.0.1', 12345), ('127.0.0.1', 8443), wsgi_cfg)
# HTTP/1 requests should not have priority keys
assert 'gunicorn.http2.priority_weight' not in environ
assert 'gunicorn.http2.priority_depends_on' not in environ

View File

@ -626,3 +626,93 @@ class TestFullStreamLifecycle:
assert stream.state == StreamState.CLOSED
assert stream.request_complete is True
assert stream.response_complete is True
class TestStreamPriority:
"""Test stream priority support (RFC 7540 Section 5.3)."""
def test_default_priority_values(self):
"""Test default priority values."""
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
assert stream.priority_weight == 16
assert stream.priority_depends_on == 0
assert stream.priority_exclusive is False
def test_update_priority_weight(self):
"""Test updating priority weight."""
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.update_priority(weight=256)
assert stream.priority_weight == 256
stream.update_priority(weight=1)
assert stream.priority_weight == 1
def test_update_priority_depends_on(self):
"""Test updating priority dependency."""
conn = MockConnection()
stream = HTTP2Stream(stream_id=3, connection=conn)
stream.update_priority(depends_on=1)
assert stream.priority_depends_on == 1
def test_update_priority_exclusive(self):
"""Test updating exclusive flag."""
conn = MockConnection()
stream = HTTP2Stream(stream_id=3, connection=conn)
stream.update_priority(exclusive=True)
assert stream.priority_exclusive is True
stream.update_priority(exclusive=False)
assert stream.priority_exclusive is False
def test_update_priority_all_fields(self):
"""Test updating all priority fields at once."""
conn = MockConnection()
stream = HTTP2Stream(stream_id=5, connection=conn)
stream.update_priority(weight=128, depends_on=1, exclusive=True)
assert stream.priority_weight == 128
assert stream.priority_depends_on == 1
assert stream.priority_exclusive is True
def test_update_priority_partial(self):
"""Test that partial updates don't affect other fields."""
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
# Set initial values
stream.update_priority(weight=200, depends_on=3, exclusive=True)
# Update only weight
stream.update_priority(weight=100)
assert stream.priority_weight == 100
assert stream.priority_depends_on == 3 # unchanged
assert stream.priority_exclusive is True # unchanged
def test_weight_clamped_to_min(self):
"""Test that weight is clamped to minimum of 1."""
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.update_priority(weight=0)
assert stream.priority_weight == 1
stream.update_priority(weight=-10)
assert stream.priority_weight == 1
def test_weight_clamped_to_max(self):
"""Test that weight is clamped to maximum of 256."""
conn = MockConnection()
stream = HTTP2Stream(stream_id=1, connection=conn)
stream.update_priority(weight=300)
assert stream.priority_weight == 256
stream.update_priority(weight=1000)
assert stream.priority_weight == 256