mirror of
https://github.com/frappe/gunicorn.git
synced 2026-07-01 18:21:30 +08:00
Add HTTP/2 core protocol implementation
Core classes for HTTP/2 server-side protocol handling: - HTTP2Stream: Stream state management matching RFC 7540 Section 5.1 - StreamState enum for proper lifecycle tracking - Request/response tracking and body buffering - Pseudo-header extraction for :method, :path, etc. - Proper state transitions for half-close semantics - HTTP2Request: Request interface compatibility layer - Wraps HTTP2Stream for worker consumption - HTTP2Body provides file-like interface for request body - Converts HTTP/2 pseudo-headers to standard attributes - Transforms lowercase headers to uppercase for WSGI - Adds HOST header from :authority pseudo-header - HTTP2ServerConnection: h2 library integration - Lazy import of h2 for graceful degradation - Connection initialization with configurable settings - Stream management for concurrent requests - Event handling for HEADERS, DATA, RST_STREAM, GOAWAY - Response sending with proper frame generation - Flow control window management with chunked data sending - get_parser() extension for HTTP/2 dispatch
This commit is contained in:
parent
c711d9fb6f
commit
89a0a46722
@ -6,21 +6,30 @@ from gunicorn.http.message import Message, Request
|
||||
from gunicorn.http.parser import RequestParser
|
||||
|
||||
|
||||
def get_parser(cfg, source, source_addr):
|
||||
def get_parser(cfg, source, source_addr, http2_connection=False):
|
||||
"""Get appropriate parser based on protocol config.
|
||||
|
||||
Args:
|
||||
cfg: Gunicorn config object
|
||||
source: Socket or iterable source
|
||||
source_addr: Source address tuple or None
|
||||
http2_connection: If True, create HTTP/2 connection handler
|
||||
|
||||
Returns:
|
||||
Parser instance (RequestParser or UWSGIParser)
|
||||
Parser instance (RequestParser, UWSGIParser, or HTTP2ServerConnection)
|
||||
"""
|
||||
# HTTP/2 connection
|
||||
if http2_connection:
|
||||
from gunicorn.http2.connection import HTTP2ServerConnection
|
||||
return HTTP2ServerConnection(cfg, source, source_addr)
|
||||
|
||||
# uWSGI protocol
|
||||
protocol = getattr(cfg, 'protocol', 'http')
|
||||
if protocol == 'uwsgi':
|
||||
from gunicorn.uwsgi.parser import UWSGIParser
|
||||
return UWSGIParser(cfg, source, source_addr)
|
||||
|
||||
# Default HTTP/1.x
|
||||
return RequestParser(cfg, source, source_addr)
|
||||
|
||||
|
||||
|
||||
475
gunicorn/http2/connection.py
Normal file
475
gunicorn/http2/connection.py
Normal file
@ -0,0 +1,475 @@
|
||||
# -*- coding: utf-8 -
|
||||
#
|
||||
# This file is part of gunicorn released under the MIT license.
|
||||
# See the NOTICE for more information.
|
||||
|
||||
"""
|
||||
HTTP/2 server connection implementation.
|
||||
|
||||
Uses the hyper-h2 library for HTTP/2 protocol handling.
|
||||
"""
|
||||
|
||||
from io import BytesIO
|
||||
|
||||
from .errors import (
|
||||
HTTP2Error, HTTP2ProtocolError, HTTP2ConnectionError,
|
||||
HTTP2NotAvailable,
|
||||
)
|
||||
from .stream import HTTP2Stream
|
||||
from .request import HTTP2Request
|
||||
|
||||
|
||||
# Import h2 lazily to allow graceful fallback
|
||||
_h2 = None
|
||||
_h2_config = None
|
||||
_h2_events = None
|
||||
_h2_exceptions = None
|
||||
_h2_settings = None
|
||||
|
||||
|
||||
def _import_h2():
|
||||
"""Lazily import h2 library components."""
|
||||
global _h2, _h2_config, _h2_events, _h2_exceptions, _h2_settings
|
||||
|
||||
if _h2 is not None:
|
||||
return
|
||||
|
||||
try:
|
||||
import h2.connection as _h2
|
||||
import h2.config as _h2_config
|
||||
import h2.events as _h2_events
|
||||
import h2.exceptions as _h2_exceptions
|
||||
import h2.settings as _h2_settings
|
||||
except ImportError:
|
||||
raise HTTP2NotAvailable()
|
||||
|
||||
|
||||
class HTTP2ServerConnection:
|
||||
"""HTTP/2 server-side connection handler.
|
||||
|
||||
Manages the HTTP/2 connection state and multiplexed streams.
|
||||
This class wraps the h2 library and provides a higher-level
|
||||
interface for gunicorn workers.
|
||||
"""
|
||||
|
||||
# Default buffer size for socket reads
|
||||
READ_BUFFER_SIZE = 65536
|
||||
|
||||
def __init__(self, cfg, sock, client_addr):
|
||||
"""Initialize an HTTP/2 server connection.
|
||||
|
||||
Args:
|
||||
cfg: Gunicorn configuration object
|
||||
sock: SSL socket with completed handshake
|
||||
client_addr: Client address tuple (host, port)
|
||||
|
||||
Raises:
|
||||
HTTP2NotAvailable: If h2 library is not installed
|
||||
"""
|
||||
_import_h2()
|
||||
|
||||
self.cfg = cfg
|
||||
self.sock = sock
|
||||
self.client_addr = client_addr
|
||||
|
||||
# Active streams indexed by stream ID
|
||||
self.streams = {}
|
||||
|
||||
# Completed requests ready for processing
|
||||
self._pending_requests = []
|
||||
|
||||
# Connection settings from config
|
||||
self.initial_window_size = cfg.http2_initial_window_size
|
||||
self.max_concurrent_streams = cfg.http2_max_concurrent_streams
|
||||
self.max_frame_size = cfg.http2_max_frame_size
|
||||
self.max_header_list_size = cfg.http2_max_header_list_size
|
||||
|
||||
# Initialize h2 connection
|
||||
config = _h2_config.H2Configuration(
|
||||
client_side=False,
|
||||
header_encoding='utf-8',
|
||||
)
|
||||
self.h2_conn = _h2.H2Connection(config=config)
|
||||
|
||||
# Read buffer for partial frames
|
||||
self._read_buffer = BytesIO()
|
||||
|
||||
# Connection state
|
||||
self._closed = False
|
||||
self._initialized = False
|
||||
|
||||
def initiate_connection(self):
|
||||
"""Send initial HTTP/2 settings to client.
|
||||
|
||||
Should be called after the SSL handshake completes and
|
||||
before processing any data.
|
||||
"""
|
||||
if self._initialized:
|
||||
return
|
||||
|
||||
# Update local settings before initiating
|
||||
self.h2_conn.update_settings({
|
||||
_h2_settings.SettingCodes.MAX_CONCURRENT_STREAMS: self.max_concurrent_streams,
|
||||
_h2_settings.SettingCodes.INITIAL_WINDOW_SIZE: self.initial_window_size,
|
||||
_h2_settings.SettingCodes.MAX_FRAME_SIZE: self.max_frame_size,
|
||||
_h2_settings.SettingCodes.MAX_HEADER_LIST_SIZE: self.max_header_list_size,
|
||||
})
|
||||
|
||||
self.h2_conn.initiate_connection()
|
||||
self._send_pending_data()
|
||||
self._initialized = True
|
||||
|
||||
def receive_data(self, data=None):
|
||||
"""Process received data and return completed requests.
|
||||
|
||||
Args:
|
||||
data: Optional bytes to process. If None, reads from socket.
|
||||
|
||||
Returns:
|
||||
list: List of HTTP2Request objects for completed requests
|
||||
|
||||
Raises:
|
||||
HTTP2ConnectionError: On protocol or connection errors
|
||||
"""
|
||||
if data is None:
|
||||
try:
|
||||
data = self.sock.recv(self.READ_BUFFER_SIZE)
|
||||
except (OSError, IOError) as e:
|
||||
raise HTTP2ConnectionError(f"Socket read error: {e}")
|
||||
|
||||
if not data:
|
||||
# Connection closed by peer
|
||||
self._closed = True
|
||||
return []
|
||||
|
||||
# Feed data to h2
|
||||
try:
|
||||
events = self.h2_conn.receive_data(data)
|
||||
except _h2_exceptions.ProtocolError as e:
|
||||
raise HTTP2ProtocolError(str(e))
|
||||
|
||||
# Process events
|
||||
completed_requests = []
|
||||
for event in events:
|
||||
request = self._handle_event(event)
|
||||
if request is not None:
|
||||
completed_requests.append(request)
|
||||
|
||||
# Send any pending data (WINDOW_UPDATE, etc.)
|
||||
self._send_pending_data()
|
||||
|
||||
return completed_requests
|
||||
|
||||
def _handle_event(self, event):
|
||||
"""Handle a single h2 event.
|
||||
|
||||
Args:
|
||||
event: h2 event object
|
||||
|
||||
Returns:
|
||||
HTTP2Request if a request is complete, None otherwise
|
||||
"""
|
||||
if isinstance(event, _h2_events.RequestReceived):
|
||||
return self._handle_request_received(event)
|
||||
|
||||
elif isinstance(event, _h2_events.DataReceived):
|
||||
return self._handle_data_received(event)
|
||||
|
||||
elif isinstance(event, _h2_events.StreamEnded):
|
||||
return self._handle_stream_ended(event)
|
||||
|
||||
elif isinstance(event, _h2_events.StreamReset):
|
||||
self._handle_stream_reset(event)
|
||||
|
||||
elif isinstance(event, _h2_events.WindowUpdated):
|
||||
pass # Flow control update, handled by h2
|
||||
|
||||
elif isinstance(event, _h2_events.PriorityUpdated):
|
||||
pass # Priority update, could be used for scheduling
|
||||
|
||||
elif isinstance(event, _h2_events.SettingsAcknowledged):
|
||||
pass # Settings ACK received
|
||||
|
||||
elif isinstance(event, _h2_events.ConnectionTerminated):
|
||||
self._handle_connection_terminated(event)
|
||||
|
||||
elif isinstance(event, _h2_events.TrailersReceived):
|
||||
return self._handle_trailers_received(event)
|
||||
|
||||
return None
|
||||
|
||||
def _handle_request_received(self, event):
|
||||
"""Handle RequestReceived event (HEADERS frame).
|
||||
|
||||
Args:
|
||||
event: RequestReceived event with headers
|
||||
|
||||
Returns:
|
||||
HTTP2Request if stream ended with headers, None otherwise
|
||||
"""
|
||||
stream_id = event.stream_id
|
||||
headers = event.headers
|
||||
|
||||
# Create new stream
|
||||
stream = HTTP2Stream(stream_id, self)
|
||||
self.streams[stream_id] = stream
|
||||
|
||||
# Process headers
|
||||
stream.receive_headers(headers, end_stream=False)
|
||||
|
||||
# Check if this was a GET/HEAD with no body
|
||||
# The StreamEnded event will come separately
|
||||
return None
|
||||
|
||||
def _handle_data_received(self, event):
|
||||
"""Handle DataReceived event.
|
||||
|
||||
Args:
|
||||
event: DataReceived event with body data
|
||||
|
||||
Returns:
|
||||
None (request completion handled by StreamEnded)
|
||||
"""
|
||||
stream_id = event.stream_id
|
||||
data = event.data
|
||||
|
||||
stream = self.streams.get(stream_id)
|
||||
if stream is None:
|
||||
# Stream was reset or doesn't exist
|
||||
return None
|
||||
|
||||
stream.receive_data(data, end_stream=False)
|
||||
|
||||
# Increment flow control windows (only if data received)
|
||||
if len(data) > 0:
|
||||
# Update stream-level window
|
||||
self.h2_conn.increment_flow_control_window(len(data), stream_id=stream_id)
|
||||
# Update connection-level window
|
||||
self.h2_conn.increment_flow_control_window(len(data), stream_id=None)
|
||||
# Send WINDOW_UPDATE frames immediately
|
||||
self._send_pending_data()
|
||||
|
||||
return None
|
||||
|
||||
def _handle_stream_ended(self, event):
|
||||
"""Handle StreamEnded event.
|
||||
|
||||
Args:
|
||||
event: StreamEnded event
|
||||
|
||||
Returns:
|
||||
HTTP2Request for the completed request
|
||||
"""
|
||||
stream_id = event.stream_id
|
||||
stream = self.streams.get(stream_id)
|
||||
|
||||
if stream is None:
|
||||
return None
|
||||
|
||||
# Mark stream as request complete
|
||||
stream.request_complete = True
|
||||
|
||||
# Create request object
|
||||
return HTTP2Request(stream, self.cfg, self.client_addr)
|
||||
|
||||
def _handle_stream_reset(self, event):
|
||||
"""Handle StreamReset event (RST_STREAM frame).
|
||||
|
||||
Args:
|
||||
event: StreamReset event
|
||||
"""
|
||||
stream_id = event.stream_id
|
||||
stream = self.streams.get(stream_id)
|
||||
|
||||
if stream is not None:
|
||||
stream.reset(event.error_code)
|
||||
# Keep stream in dict for potential cleanup
|
||||
|
||||
def _handle_connection_terminated(self, event):
|
||||
"""Handle ConnectionTerminated event (GOAWAY frame).
|
||||
|
||||
Args:
|
||||
event: ConnectionTerminated event
|
||||
"""
|
||||
self._closed = True
|
||||
# Could log event.error_code and event.additional_data
|
||||
|
||||
def _handle_trailers_received(self, event):
|
||||
"""Handle TrailersReceived event.
|
||||
|
||||
Args:
|
||||
event: TrailersReceived event with trailer headers
|
||||
|
||||
Returns:
|
||||
HTTP2Request if this completes the request
|
||||
"""
|
||||
stream_id = event.stream_id
|
||||
stream = self.streams.get(stream_id)
|
||||
|
||||
if stream is None:
|
||||
return None
|
||||
|
||||
stream.receive_trailers(event.headers)
|
||||
|
||||
# Trailers always end the request
|
||||
return HTTP2Request(stream, self.cfg, self.client_addr)
|
||||
|
||||
def send_response(self, stream_id, status, headers, body=None):
|
||||
"""Send a response on a stream.
|
||||
|
||||
Args:
|
||||
stream_id: The stream ID to respond on
|
||||
status: HTTP status code (int)
|
||||
headers: List of (name, value) header tuples
|
||||
body: Optional response body bytes
|
||||
|
||||
Raises:
|
||||
HTTP2Error: If stream not found or in invalid state
|
||||
"""
|
||||
stream = self.streams.get(stream_id)
|
||||
if stream is None:
|
||||
raise HTTP2Error(f"Stream {stream_id} not found")
|
||||
|
||||
# Build response headers with :status pseudo-header
|
||||
response_headers = [(':status', str(status))]
|
||||
for name, value in headers:
|
||||
# HTTP/2 headers must be lowercase
|
||||
response_headers.append((name.lower(), str(value)))
|
||||
|
||||
end_stream = body is None or len(body) == 0
|
||||
|
||||
# Send headers
|
||||
self.h2_conn.send_headers(stream_id, response_headers, end_stream=end_stream)
|
||||
stream.send_headers(response_headers, end_stream=end_stream)
|
||||
self._send_pending_data()
|
||||
|
||||
# Send body if present
|
||||
if body and len(body) > 0:
|
||||
self.send_data(stream_id, body, end_stream=True)
|
||||
|
||||
def send_data(self, stream_id, data, end_stream=False):
|
||||
"""Send data on a stream.
|
||||
|
||||
Args:
|
||||
stream_id: The stream ID
|
||||
data: Body data bytes
|
||||
end_stream: Whether this ends the stream
|
||||
|
||||
Raises:
|
||||
HTTP2Error: If stream not found or in invalid state
|
||||
"""
|
||||
stream = self.streams.get(stream_id)
|
||||
if stream is None:
|
||||
raise HTTP2Error(f"Stream {stream_id} not found")
|
||||
|
||||
# Send data in chunks respecting flow control and max frame size
|
||||
data_to_send = data
|
||||
while data_to_send:
|
||||
# Get available window size for this stream
|
||||
available = self.h2_conn.local_flow_control_window(stream_id)
|
||||
# Also respect max frame size
|
||||
chunk_size = min(available, self.max_frame_size, len(data_to_send))
|
||||
|
||||
if chunk_size <= 0:
|
||||
# No window available, send what we have and wait
|
||||
self._send_pending_data()
|
||||
# Try again - the window might open after ACKs
|
||||
available = self.h2_conn.local_flow_control_window(stream_id)
|
||||
if available <= 0:
|
||||
# Still no window, just try to send a small chunk
|
||||
chunk_size = min(self.max_frame_size, len(data_to_send))
|
||||
|
||||
chunk = data_to_send[:chunk_size]
|
||||
data_to_send = data_to_send[chunk_size:]
|
||||
|
||||
# Only set end_stream on the final chunk
|
||||
is_final = end_stream and len(data_to_send) == 0
|
||||
|
||||
self.h2_conn.send_data(stream_id, chunk, end_stream=is_final)
|
||||
self._send_pending_data()
|
||||
|
||||
stream.send_data(data, end_stream=end_stream)
|
||||
|
||||
def send_error(self, stream_id, status_code, message=None):
|
||||
"""Send an error response on a stream.
|
||||
|
||||
Args:
|
||||
stream_id: The stream ID
|
||||
status_code: HTTP status code
|
||||
message: Optional error message body
|
||||
"""
|
||||
body = message.encode() if message else b''
|
||||
headers = [('content-length', str(len(body)))]
|
||||
if body:
|
||||
headers.append(('content-type', 'text/plain; charset=utf-8'))
|
||||
|
||||
self.send_response(stream_id, status_code, headers, body)
|
||||
|
||||
def reset_stream(self, stream_id, error_code=0x8):
|
||||
"""Reset a stream with RST_STREAM.
|
||||
|
||||
Args:
|
||||
stream_id: The stream ID to reset
|
||||
error_code: HTTP/2 error code (default: CANCEL)
|
||||
"""
|
||||
stream = self.streams.get(stream_id)
|
||||
if stream is not None:
|
||||
stream.reset(error_code)
|
||||
|
||||
self.h2_conn.reset_stream(stream_id, error_code=error_code)
|
||||
self._send_pending_data()
|
||||
|
||||
def close(self, error_code=0x0, last_stream_id=None):
|
||||
"""Close the connection gracefully with GOAWAY.
|
||||
|
||||
Args:
|
||||
error_code: HTTP/2 error code (default: NO_ERROR)
|
||||
last_stream_id: Last processed stream ID (default: highest)
|
||||
"""
|
||||
if self._closed:
|
||||
return
|
||||
|
||||
self._closed = True
|
||||
|
||||
if last_stream_id is None:
|
||||
# Use highest stream ID we've seen
|
||||
last_stream_id = max(self.streams.keys()) if self.streams else 0
|
||||
|
||||
try:
|
||||
self.h2_conn.close_connection(error_code=error_code)
|
||||
self._send_pending_data()
|
||||
except Exception:
|
||||
pass # Best effort
|
||||
|
||||
def _send_pending_data(self):
|
||||
"""Send any pending data from h2 to the socket."""
|
||||
data = self.h2_conn.data_to_send()
|
||||
if data:
|
||||
try:
|
||||
self.sock.sendall(data)
|
||||
except (OSError, IOError) as e:
|
||||
self._closed = True
|
||||
raise HTTP2ConnectionError(f"Socket write error: {e}")
|
||||
|
||||
@property
|
||||
def is_closed(self):
|
||||
"""Check if connection is closed."""
|
||||
return self._closed
|
||||
|
||||
def cleanup_stream(self, stream_id):
|
||||
"""Remove a stream after processing is complete.
|
||||
|
||||
Args:
|
||||
stream_id: The stream ID to clean up
|
||||
"""
|
||||
self.streams.pop(stream_id, None)
|
||||
|
||||
def __repr__(self):
|
||||
return (
|
||||
f"<HTTP2ServerConnection "
|
||||
f"streams={len(self.streams)} "
|
||||
f"closed={self._closed}>"
|
||||
)
|
||||
|
||||
|
||||
__all__ = ['HTTP2ServerConnection']
|
||||
230
gunicorn/http2/request.py
Normal file
230
gunicorn/http2/request.py
Normal file
@ -0,0 +1,230 @@
|
||||
# -*- coding: utf-8 -
|
||||
#
|
||||
# This file is part of gunicorn released under the MIT license.
|
||||
# See the NOTICE for more information.
|
||||
|
||||
"""
|
||||
HTTP/2 request wrapper.
|
||||
|
||||
Provides a Request-compatible interface for HTTP/2 streams.
|
||||
"""
|
||||
|
||||
from io import BytesIO
|
||||
from urllib.parse import unquote, urlparse
|
||||
|
||||
from gunicorn.http.body import Body, LengthReader
|
||||
from gunicorn.util import split_request_uri
|
||||
|
||||
|
||||
class HTTP2Body:
|
||||
"""Body wrapper for HTTP/2 request data.
|
||||
|
||||
Provides a file-like interface to the request body,
|
||||
compatible with gunicorn's Body class expectations.
|
||||
"""
|
||||
|
||||
def __init__(self, data):
|
||||
"""Initialize with body data.
|
||||
|
||||
Args:
|
||||
data: bytes containing the request body
|
||||
"""
|
||||
self._data = BytesIO(data)
|
||||
self._len = len(data)
|
||||
|
||||
def read(self, size=None):
|
||||
"""Read data from the body.
|
||||
|
||||
Args:
|
||||
size: Number of bytes to read, or None for all remaining
|
||||
|
||||
Returns:
|
||||
bytes: The requested data
|
||||
"""
|
||||
if size is None:
|
||||
return self._data.read()
|
||||
return self._data.read(size)
|
||||
|
||||
def readline(self, size=None):
|
||||
"""Read a line from the body.
|
||||
|
||||
Args:
|
||||
size: Maximum bytes to read
|
||||
|
||||
Returns:
|
||||
bytes: A line of data
|
||||
"""
|
||||
if size is None:
|
||||
return self._data.readline()
|
||||
return self._data.readline(size)
|
||||
|
||||
def readlines(self, hint=None):
|
||||
"""Read all lines from the body.
|
||||
|
||||
Args:
|
||||
hint: Approximate byte count hint
|
||||
|
||||
Returns:
|
||||
list: List of lines
|
||||
"""
|
||||
return self._data.readlines(hint)
|
||||
|
||||
def __iter__(self):
|
||||
"""Iterate over lines in the body."""
|
||||
return iter(self._data)
|
||||
|
||||
def __len__(self):
|
||||
"""Return the content length."""
|
||||
return self._len
|
||||
|
||||
def close(self):
|
||||
"""Close the body stream."""
|
||||
self._data.close()
|
||||
|
||||
|
||||
class HTTP2Request:
|
||||
"""HTTP/2 request wrapper compatible with gunicorn Request interface.
|
||||
|
||||
Wraps an HTTP2Stream to provide the same interface as the HTTP/1.x
|
||||
Request class, allowing workers to handle HTTP/2 requests using
|
||||
existing code paths.
|
||||
"""
|
||||
|
||||
def __init__(self, stream, cfg, peer_addr):
|
||||
"""Initialize from an HTTP/2 stream.
|
||||
|
||||
Args:
|
||||
stream: HTTP2Stream instance with received headers/body
|
||||
cfg: Gunicorn configuration object
|
||||
peer_addr: Client address tuple (host, port)
|
||||
"""
|
||||
self.stream = stream
|
||||
self.cfg = cfg
|
||||
self.peer_addr = peer_addr
|
||||
self.remote_addr = peer_addr
|
||||
|
||||
# HTTP/2 version tuple
|
||||
self.version = (2, 0)
|
||||
|
||||
# Parse pseudo-headers
|
||||
pseudo = stream.get_pseudo_headers()
|
||||
self.method = pseudo.get(':method', 'GET')
|
||||
self.scheme = pseudo.get(':scheme', 'https')
|
||||
authority = pseudo.get(':authority', '')
|
||||
path = pseudo.get(':path', '/')
|
||||
|
||||
# Parse the path into components
|
||||
self.uri = path
|
||||
try:
|
||||
parts = split_request_uri(path)
|
||||
self.path = parts.path or ""
|
||||
self.query = parts.query or ""
|
||||
self.fragment = parts.fragment or ""
|
||||
except ValueError:
|
||||
self.path = path
|
||||
self.query = ""
|
||||
self.fragment = ""
|
||||
|
||||
# Store authority for Host header equivalent
|
||||
self._authority = authority
|
||||
|
||||
# Convert HTTP/2 headers to HTTP/1.1 style
|
||||
# HTTP/2 headers are lowercase, convert to uppercase for WSGI
|
||||
self.headers = []
|
||||
for name, value in stream.get_regular_headers():
|
||||
# Convert to uppercase for WSGI compatibility
|
||||
self.headers.append((name.upper(), value))
|
||||
|
||||
# Add Host header if not present (from :authority)
|
||||
if authority and not any(h[0] == 'HOST' for h in self.headers):
|
||||
self.headers.append(('HOST', authority))
|
||||
|
||||
# Trailers (if any)
|
||||
self.trailers = []
|
||||
if stream.trailers:
|
||||
self.trailers = [
|
||||
(name.upper(), value)
|
||||
for name, value in stream.trailers
|
||||
]
|
||||
|
||||
# Body - HTTP/2 streams have complete body data
|
||||
body_data = stream.get_request_body()
|
||||
self.body = HTTP2Body(body_data)
|
||||
|
||||
# Connection state
|
||||
self.must_close = False
|
||||
self._expected_100_continue = False
|
||||
|
||||
# Request numbering (for logging)
|
||||
self.req_number = stream.stream_id
|
||||
|
||||
# HTTP/2 does not use proxy protocol through the data stream
|
||||
self.proxy_protocol_info = None
|
||||
|
||||
def force_close(self):
|
||||
"""Force the connection to close after this request."""
|
||||
self.must_close = True
|
||||
|
||||
def should_close(self):
|
||||
"""Check if connection should close after this request.
|
||||
|
||||
HTTP/2 connections are persistent by design, but we may still
|
||||
need to close if explicitly requested.
|
||||
|
||||
Returns:
|
||||
bool: True if connection should close
|
||||
"""
|
||||
if self.must_close:
|
||||
return True
|
||||
# HTTP/2 connections are persistent, don't close by default
|
||||
return False
|
||||
|
||||
def get_header(self, name):
|
||||
"""Get a header value by name.
|
||||
|
||||
Args:
|
||||
name: Header name (case-insensitive)
|
||||
|
||||
Returns:
|
||||
str: Header value, or None if not found
|
||||
"""
|
||||
name = name.upper()
|
||||
for h_name, h_value in self.headers:
|
||||
if h_name == name:
|
||||
return h_value
|
||||
return None
|
||||
|
||||
@property
|
||||
def content_length(self):
|
||||
"""Get the Content-Length header value.
|
||||
|
||||
Returns:
|
||||
int: Content length, or None if not set
|
||||
"""
|
||||
cl = self.get_header('CONTENT-LENGTH')
|
||||
if cl is not None:
|
||||
try:
|
||||
return int(cl)
|
||||
except ValueError:
|
||||
pass
|
||||
return None
|
||||
|
||||
@property
|
||||
def content_type(self):
|
||||
"""Get the Content-Type header value.
|
||||
|
||||
Returns:
|
||||
str: Content type, or None if not set
|
||||
"""
|
||||
return self.get_header('CONTENT-TYPE')
|
||||
|
||||
def __repr__(self):
|
||||
return (
|
||||
f"<HTTP2Request "
|
||||
f"method={self.method} "
|
||||
f"path={self.path} "
|
||||
f"stream_id={self.stream.stream_id}>"
|
||||
)
|
||||
|
||||
|
||||
__all__ = ['HTTP2Request', 'HTTP2Body']
|
||||
279
gunicorn/http2/stream.py
Normal file
279
gunicorn/http2/stream.py
Normal file
@ -0,0 +1,279 @@
|
||||
# -*- coding: utf-8 -
|
||||
#
|
||||
# This file is part of gunicorn released under the MIT license.
|
||||
# See the NOTICE for more information.
|
||||
|
||||
"""
|
||||
HTTP/2 stream state management.
|
||||
|
||||
Each HTTP/2 stream represents a single request/response exchange.
|
||||
"""
|
||||
|
||||
from enum import Enum, auto
|
||||
from io import BytesIO
|
||||
|
||||
from .errors import HTTP2StreamError, HTTP2StreamClosed
|
||||
|
||||
|
||||
class StreamState(Enum):
|
||||
"""HTTP/2 stream states as defined in RFC 7540 Section 5.1."""
|
||||
|
||||
IDLE = auto()
|
||||
RESERVED_LOCAL = auto()
|
||||
RESERVED_REMOTE = auto()
|
||||
OPEN = auto()
|
||||
HALF_CLOSED_LOCAL = auto()
|
||||
HALF_CLOSED_REMOTE = auto()
|
||||
CLOSED = auto()
|
||||
|
||||
|
||||
class HTTP2Stream:
|
||||
"""Represents a single HTTP/2 stream.
|
||||
|
||||
Manages stream state, headers, and body data for a single
|
||||
request/response exchange within an HTTP/2 connection.
|
||||
"""
|
||||
|
||||
def __init__(self, stream_id, connection):
|
||||
"""Initialize an HTTP/2 stream.
|
||||
|
||||
Args:
|
||||
stream_id: The unique stream identifier (odd for client-initiated)
|
||||
connection: The parent HTTP2ServerConnection
|
||||
"""
|
||||
self.stream_id = stream_id
|
||||
self.connection = connection
|
||||
|
||||
# Stream state
|
||||
self.state = StreamState.IDLE
|
||||
|
||||
# Request data
|
||||
self.request_headers = []
|
||||
self.request_body = BytesIO()
|
||||
self.request_complete = False
|
||||
|
||||
# Response data
|
||||
self.response_started = False
|
||||
self.response_headers_sent = False
|
||||
self.response_complete = False
|
||||
|
||||
# Flow control
|
||||
self.window_size = connection.initial_window_size
|
||||
|
||||
# Trailers
|
||||
self.trailers = None
|
||||
|
||||
@property
|
||||
def is_client_stream(self):
|
||||
"""Check if this is a client-initiated stream (odd stream ID)."""
|
||||
return self.stream_id % 2 == 1
|
||||
|
||||
@property
|
||||
def is_server_stream(self):
|
||||
"""Check if this is a server-initiated stream (even stream ID)."""
|
||||
return self.stream_id % 2 == 0
|
||||
|
||||
@property
|
||||
def can_receive(self):
|
||||
"""Check if this stream can receive data."""
|
||||
return self.state in (
|
||||
StreamState.OPEN,
|
||||
StreamState.HALF_CLOSED_LOCAL,
|
||||
)
|
||||
|
||||
@property
|
||||
def can_send(self):
|
||||
"""Check if this stream can send data."""
|
||||
return self.state in (
|
||||
StreamState.OPEN,
|
||||
StreamState.HALF_CLOSED_REMOTE,
|
||||
)
|
||||
|
||||
def receive_headers(self, headers, end_stream=False):
|
||||
"""Process received HEADERS frame.
|
||||
|
||||
Args:
|
||||
headers: List of (name, value) tuples
|
||||
end_stream: True if END_STREAM flag is set
|
||||
|
||||
Raises:
|
||||
HTTP2StreamError: If headers received in invalid state
|
||||
"""
|
||||
if self.state == StreamState.IDLE:
|
||||
self.state = StreamState.OPEN
|
||||
elif self.state not in (StreamState.OPEN, StreamState.HALF_CLOSED_LOCAL):
|
||||
raise HTTP2StreamError(
|
||||
self.stream_id,
|
||||
f"Cannot receive headers in state {self.state.name}"
|
||||
)
|
||||
|
||||
self.request_headers.extend(headers)
|
||||
|
||||
if end_stream:
|
||||
self._half_close_remote()
|
||||
self.request_complete = True
|
||||
|
||||
def receive_data(self, data, end_stream=False):
|
||||
"""Process received DATA frame.
|
||||
|
||||
Args:
|
||||
data: Bytes received
|
||||
end_stream: True if END_STREAM flag is set
|
||||
|
||||
Raises:
|
||||
HTTP2StreamError: If data received in invalid state
|
||||
"""
|
||||
if not self.can_receive:
|
||||
raise HTTP2StreamError(
|
||||
self.stream_id,
|
||||
f"Cannot receive data in state {self.state.name}"
|
||||
)
|
||||
|
||||
self.request_body.write(data)
|
||||
|
||||
if end_stream:
|
||||
self._half_close_remote()
|
||||
self.request_complete = True
|
||||
|
||||
def receive_trailers(self, trailers):
|
||||
"""Process received trailing headers.
|
||||
|
||||
Args:
|
||||
trailers: List of (name, value) tuples
|
||||
"""
|
||||
if not self.can_receive:
|
||||
raise HTTP2StreamError(
|
||||
self.stream_id,
|
||||
f"Cannot receive trailers in state {self.state.name}"
|
||||
)
|
||||
|
||||
self.trailers = trailers
|
||||
self._half_close_remote()
|
||||
self.request_complete = True
|
||||
|
||||
def send_headers(self, headers, end_stream=False):
|
||||
"""Mark headers as sent.
|
||||
|
||||
Args:
|
||||
headers: List of (name, value) tuples to send
|
||||
end_stream: True if this completes the response
|
||||
|
||||
Raises:
|
||||
HTTP2StreamError: If headers cannot be sent in current state
|
||||
"""
|
||||
if not self.can_send:
|
||||
raise HTTP2StreamError(
|
||||
self.stream_id,
|
||||
f"Cannot send headers in state {self.state.name}"
|
||||
)
|
||||
|
||||
self.response_started = True
|
||||
self.response_headers_sent = True
|
||||
|
||||
if end_stream:
|
||||
self._half_close_local()
|
||||
self.response_complete = True
|
||||
|
||||
def send_data(self, data, end_stream=False):
|
||||
"""Mark data as sent.
|
||||
|
||||
Args:
|
||||
data: Bytes to send
|
||||
end_stream: True if this completes the response
|
||||
|
||||
Raises:
|
||||
HTTP2StreamError: If data cannot be sent in current state
|
||||
"""
|
||||
if not self.can_send:
|
||||
raise HTTP2StreamError(
|
||||
self.stream_id,
|
||||
f"Cannot send data in state {self.state.name}"
|
||||
)
|
||||
|
||||
if end_stream:
|
||||
self._half_close_local()
|
||||
self.response_complete = True
|
||||
|
||||
def reset(self, error_code=0x8):
|
||||
"""Reset this stream with RST_STREAM.
|
||||
|
||||
Args:
|
||||
error_code: HTTP/2 error code (default: CANCEL)
|
||||
"""
|
||||
self.state = StreamState.CLOSED
|
||||
self.response_complete = True
|
||||
self.request_complete = True
|
||||
|
||||
def close(self):
|
||||
"""Close this stream normally."""
|
||||
self.state = StreamState.CLOSED
|
||||
self.response_complete = True
|
||||
self.request_complete = True
|
||||
|
||||
def _half_close_local(self):
|
||||
"""Transition to half-closed (local) state."""
|
||||
if self.state == StreamState.OPEN:
|
||||
self.state = StreamState.HALF_CLOSED_LOCAL
|
||||
elif self.state == StreamState.HALF_CLOSED_REMOTE:
|
||||
self.state = StreamState.CLOSED
|
||||
else:
|
||||
raise HTTP2StreamError(
|
||||
self.stream_id,
|
||||
f"Cannot half-close local in state {self.state.name}"
|
||||
)
|
||||
|
||||
def _half_close_remote(self):
|
||||
"""Transition to half-closed (remote) state."""
|
||||
if self.state == StreamState.OPEN:
|
||||
self.state = StreamState.HALF_CLOSED_REMOTE
|
||||
elif self.state == StreamState.HALF_CLOSED_LOCAL:
|
||||
self.state = StreamState.CLOSED
|
||||
else:
|
||||
raise HTTP2StreamError(
|
||||
self.stream_id,
|
||||
f"Cannot half-close remote in state {self.state.name}"
|
||||
)
|
||||
|
||||
def get_request_body(self):
|
||||
"""Get the complete request body.
|
||||
|
||||
Returns:
|
||||
bytes: The request body data
|
||||
"""
|
||||
return self.request_body.getvalue()
|
||||
|
||||
def get_pseudo_headers(self):
|
||||
"""Extract HTTP/2 pseudo-headers from request headers.
|
||||
|
||||
Returns:
|
||||
dict: Mapping of pseudo-header names to values
|
||||
(e.g., {':method': 'GET', ':path': '/'})
|
||||
"""
|
||||
pseudo = {}
|
||||
for name, value in self.request_headers:
|
||||
if name.startswith(':'):
|
||||
pseudo[name] = value
|
||||
return pseudo
|
||||
|
||||
def get_regular_headers(self):
|
||||
"""Get regular (non-pseudo) headers from request.
|
||||
|
||||
Returns:
|
||||
list: List of (name, value) tuples for regular headers
|
||||
"""
|
||||
return [
|
||||
(name, value)
|
||||
for name, value in self.request_headers
|
||||
if not name.startswith(':')
|
||||
]
|
||||
|
||||
def __repr__(self):
|
||||
return (
|
||||
f"<HTTP2Stream id={self.stream_id} "
|
||||
f"state={self.state.name} "
|
||||
f"req_complete={self.request_complete} "
|
||||
f"resp_complete={self.response_complete}>"
|
||||
)
|
||||
|
||||
|
||||
__all__ = ['HTTP2Stream', 'StreamState']
|
||||
Loading…
x
Reference in New Issue
Block a user