From 89a0a4672227dfe1d7dd751e87b31e599997d652 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Sun, 25 Jan 2026 16:29:17 +0100 Subject: [PATCH] Add HTTP/2 core protocol implementation Core classes for HTTP/2 server-side protocol handling: - HTTP2Stream: Stream state management matching RFC 7540 Section 5.1 - StreamState enum for proper lifecycle tracking - Request/response tracking and body buffering - Pseudo-header extraction for :method, :path, etc. - Proper state transitions for half-close semantics - HTTP2Request: Request interface compatibility layer - Wraps HTTP2Stream for worker consumption - HTTP2Body provides file-like interface for request body - Converts HTTP/2 pseudo-headers to standard attributes - Transforms lowercase headers to uppercase for WSGI - Adds HOST header from :authority pseudo-header - HTTP2ServerConnection: h2 library integration - Lazy import of h2 for graceful degradation - Connection initialization with configurable settings - Stream management for concurrent requests - Event handling for HEADERS, DATA, RST_STREAM, GOAWAY - Response sending with proper frame generation - Flow control window management with chunked data sending - get_parser() extension for HTTP/2 dispatch --- gunicorn/http/__init__.py | 13 +- gunicorn/http2/connection.py | 475 +++++++++++++++++++++++++++++++++++ gunicorn/http2/request.py | 230 +++++++++++++++++ gunicorn/http2/stream.py | 279 ++++++++++++++++++++ 4 files changed, 995 insertions(+), 2 deletions(-) create mode 100644 gunicorn/http2/connection.py create mode 100644 gunicorn/http2/request.py create mode 100644 gunicorn/http2/stream.py diff --git a/gunicorn/http/__init__.py b/gunicorn/http/__init__.py index 1d35b7c7..9ca81d39 100644 --- a/gunicorn/http/__init__.py +++ b/gunicorn/http/__init__.py @@ -6,21 +6,30 @@ from gunicorn.http.message import Message, Request from gunicorn.http.parser import RequestParser -def get_parser(cfg, source, source_addr): +def get_parser(cfg, source, source_addr, http2_connection=False): """Get appropriate parser based on protocol config. Args: cfg: Gunicorn config object source: Socket or iterable source source_addr: Source address tuple or None + http2_connection: If True, create HTTP/2 connection handler Returns: - Parser instance (RequestParser or UWSGIParser) + Parser instance (RequestParser, UWSGIParser, or HTTP2ServerConnection) """ + # HTTP/2 connection + if http2_connection: + from gunicorn.http2.connection import HTTP2ServerConnection + return HTTP2ServerConnection(cfg, source, source_addr) + + # uWSGI protocol protocol = getattr(cfg, 'protocol', 'http') if protocol == 'uwsgi': from gunicorn.uwsgi.parser import UWSGIParser return UWSGIParser(cfg, source, source_addr) + + # Default HTTP/1.x return RequestParser(cfg, source, source_addr) diff --git a/gunicorn/http2/connection.py b/gunicorn/http2/connection.py new file mode 100644 index 00000000..dc51014b --- /dev/null +++ b/gunicorn/http2/connection.py @@ -0,0 +1,475 @@ +# -*- coding: utf-8 - +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +""" +HTTP/2 server connection implementation. + +Uses the hyper-h2 library for HTTP/2 protocol handling. +""" + +from io import BytesIO + +from .errors import ( + HTTP2Error, HTTP2ProtocolError, HTTP2ConnectionError, + HTTP2NotAvailable, +) +from .stream import HTTP2Stream +from .request import HTTP2Request + + +# Import h2 lazily to allow graceful fallback +_h2 = None +_h2_config = None +_h2_events = None +_h2_exceptions = None +_h2_settings = None + + +def _import_h2(): + """Lazily import h2 library components.""" + global _h2, _h2_config, _h2_events, _h2_exceptions, _h2_settings + + if _h2 is not None: + return + + try: + import h2.connection as _h2 + import h2.config as _h2_config + import h2.events as _h2_events + import h2.exceptions as _h2_exceptions + import h2.settings as _h2_settings + except ImportError: + raise HTTP2NotAvailable() + + +class HTTP2ServerConnection: + """HTTP/2 server-side connection handler. + + Manages the HTTP/2 connection state and multiplexed streams. + This class wraps the h2 library and provides a higher-level + interface for gunicorn workers. + """ + + # Default buffer size for socket reads + READ_BUFFER_SIZE = 65536 + + def __init__(self, cfg, sock, client_addr): + """Initialize an HTTP/2 server connection. + + Args: + cfg: Gunicorn configuration object + sock: SSL socket with completed handshake + client_addr: Client address tuple (host, port) + + Raises: + HTTP2NotAvailable: If h2 library is not installed + """ + _import_h2() + + self.cfg = cfg + self.sock = sock + self.client_addr = client_addr + + # Active streams indexed by stream ID + self.streams = {} + + # Completed requests ready for processing + self._pending_requests = [] + + # Connection settings from config + self.initial_window_size = cfg.http2_initial_window_size + self.max_concurrent_streams = cfg.http2_max_concurrent_streams + self.max_frame_size = cfg.http2_max_frame_size + self.max_header_list_size = cfg.http2_max_header_list_size + + # Initialize h2 connection + config = _h2_config.H2Configuration( + client_side=False, + header_encoding='utf-8', + ) + self.h2_conn = _h2.H2Connection(config=config) + + # Read buffer for partial frames + self._read_buffer = BytesIO() + + # Connection state + self._closed = False + self._initialized = False + + def initiate_connection(self): + """Send initial HTTP/2 settings to client. + + Should be called after the SSL handshake completes and + before processing any data. + """ + if self._initialized: + return + + # Update local settings before initiating + self.h2_conn.update_settings({ + _h2_settings.SettingCodes.MAX_CONCURRENT_STREAMS: self.max_concurrent_streams, + _h2_settings.SettingCodes.INITIAL_WINDOW_SIZE: self.initial_window_size, + _h2_settings.SettingCodes.MAX_FRAME_SIZE: self.max_frame_size, + _h2_settings.SettingCodes.MAX_HEADER_LIST_SIZE: self.max_header_list_size, + }) + + self.h2_conn.initiate_connection() + self._send_pending_data() + self._initialized = True + + def receive_data(self, data=None): + """Process received data and return completed requests. + + Args: + data: Optional bytes to process. If None, reads from socket. + + Returns: + list: List of HTTP2Request objects for completed requests + + Raises: + HTTP2ConnectionError: On protocol or connection errors + """ + if data is None: + try: + data = self.sock.recv(self.READ_BUFFER_SIZE) + except (OSError, IOError) as e: + raise HTTP2ConnectionError(f"Socket read error: {e}") + + if not data: + # Connection closed by peer + self._closed = True + return [] + + # Feed data to h2 + try: + events = self.h2_conn.receive_data(data) + except _h2_exceptions.ProtocolError as e: + raise HTTP2ProtocolError(str(e)) + + # Process events + completed_requests = [] + for event in events: + request = self._handle_event(event) + if request is not None: + completed_requests.append(request) + + # Send any pending data (WINDOW_UPDATE, etc.) + self._send_pending_data() + + return completed_requests + + def _handle_event(self, event): + """Handle a single h2 event. + + Args: + event: h2 event object + + Returns: + HTTP2Request if a request is complete, None otherwise + """ + if isinstance(event, _h2_events.RequestReceived): + return self._handle_request_received(event) + + elif isinstance(event, _h2_events.DataReceived): + return self._handle_data_received(event) + + elif isinstance(event, _h2_events.StreamEnded): + return self._handle_stream_ended(event) + + elif isinstance(event, _h2_events.StreamReset): + self._handle_stream_reset(event) + + elif isinstance(event, _h2_events.WindowUpdated): + pass # Flow control update, handled by h2 + + elif isinstance(event, _h2_events.PriorityUpdated): + pass # Priority update, could be used for scheduling + + elif isinstance(event, _h2_events.SettingsAcknowledged): + pass # Settings ACK received + + elif isinstance(event, _h2_events.ConnectionTerminated): + self._handle_connection_terminated(event) + + elif isinstance(event, _h2_events.TrailersReceived): + return self._handle_trailers_received(event) + + return None + + def _handle_request_received(self, event): + """Handle RequestReceived event (HEADERS frame). + + Args: + event: RequestReceived event with headers + + Returns: + HTTP2Request if stream ended with headers, None otherwise + """ + stream_id = event.stream_id + headers = event.headers + + # Create new stream + stream = HTTP2Stream(stream_id, self) + self.streams[stream_id] = stream + + # Process headers + stream.receive_headers(headers, end_stream=False) + + # Check if this was a GET/HEAD with no body + # The StreamEnded event will come separately + return None + + def _handle_data_received(self, event): + """Handle DataReceived event. + + Args: + event: DataReceived event with body data + + Returns: + None (request completion handled by StreamEnded) + """ + stream_id = event.stream_id + data = event.data + + stream = self.streams.get(stream_id) + if stream is None: + # Stream was reset or doesn't exist + return None + + stream.receive_data(data, end_stream=False) + + # Increment flow control windows (only if data received) + if len(data) > 0: + # Update stream-level window + self.h2_conn.increment_flow_control_window(len(data), stream_id=stream_id) + # Update connection-level window + self.h2_conn.increment_flow_control_window(len(data), stream_id=None) + # Send WINDOW_UPDATE frames immediately + self._send_pending_data() + + return None + + def _handle_stream_ended(self, event): + """Handle StreamEnded event. + + Args: + event: StreamEnded event + + Returns: + HTTP2Request for the completed request + """ + stream_id = event.stream_id + stream = self.streams.get(stream_id) + + if stream is None: + return None + + # Mark stream as request complete + stream.request_complete = True + + # Create request object + return HTTP2Request(stream, self.cfg, self.client_addr) + + def _handle_stream_reset(self, event): + """Handle StreamReset event (RST_STREAM frame). + + Args: + event: StreamReset event + """ + stream_id = event.stream_id + stream = self.streams.get(stream_id) + + if stream is not None: + stream.reset(event.error_code) + # Keep stream in dict for potential cleanup + + def _handle_connection_terminated(self, event): + """Handle ConnectionTerminated event (GOAWAY frame). + + Args: + event: ConnectionTerminated event + """ + self._closed = True + # Could log event.error_code and event.additional_data + + def _handle_trailers_received(self, event): + """Handle TrailersReceived event. + + Args: + event: TrailersReceived event with trailer headers + + Returns: + HTTP2Request if this completes the request + """ + stream_id = event.stream_id + stream = self.streams.get(stream_id) + + if stream is None: + return None + + stream.receive_trailers(event.headers) + + # Trailers always end the request + return HTTP2Request(stream, self.cfg, self.client_addr) + + def send_response(self, stream_id, status, headers, body=None): + """Send a response on a stream. + + Args: + stream_id: The stream ID to respond on + status: HTTP status code (int) + headers: List of (name, value) header tuples + body: Optional response body bytes + + Raises: + HTTP2Error: If stream not found or in invalid state + """ + stream = self.streams.get(stream_id) + if stream is None: + raise HTTP2Error(f"Stream {stream_id} not found") + + # Build response headers with :status pseudo-header + response_headers = [(':status', str(status))] + for name, value in headers: + # HTTP/2 headers must be lowercase + response_headers.append((name.lower(), str(value))) + + end_stream = body is None or len(body) == 0 + + # Send headers + self.h2_conn.send_headers(stream_id, response_headers, end_stream=end_stream) + stream.send_headers(response_headers, end_stream=end_stream) + self._send_pending_data() + + # Send body if present + if body and len(body) > 0: + self.send_data(stream_id, body, end_stream=True) + + def send_data(self, stream_id, data, end_stream=False): + """Send data on a stream. + + Args: + stream_id: The stream ID + data: Body data bytes + end_stream: Whether this ends the stream + + Raises: + HTTP2Error: If stream not found or in invalid state + """ + stream = self.streams.get(stream_id) + if stream is None: + raise HTTP2Error(f"Stream {stream_id} not found") + + # Send data in chunks respecting flow control and max frame size + data_to_send = data + while data_to_send: + # Get available window size for this stream + available = self.h2_conn.local_flow_control_window(stream_id) + # Also respect max frame size + chunk_size = min(available, self.max_frame_size, len(data_to_send)) + + if chunk_size <= 0: + # No window available, send what we have and wait + self._send_pending_data() + # Try again - the window might open after ACKs + available = self.h2_conn.local_flow_control_window(stream_id) + if available <= 0: + # Still no window, just try to send a small chunk + chunk_size = min(self.max_frame_size, len(data_to_send)) + + chunk = data_to_send[:chunk_size] + data_to_send = data_to_send[chunk_size:] + + # Only set end_stream on the final chunk + is_final = end_stream and len(data_to_send) == 0 + + self.h2_conn.send_data(stream_id, chunk, end_stream=is_final) + self._send_pending_data() + + stream.send_data(data, end_stream=end_stream) + + def send_error(self, stream_id, status_code, message=None): + """Send an error response on a stream. + + Args: + stream_id: The stream ID + status_code: HTTP status code + message: Optional error message body + """ + body = message.encode() if message else b'' + headers = [('content-length', str(len(body)))] + if body: + headers.append(('content-type', 'text/plain; charset=utf-8')) + + self.send_response(stream_id, status_code, headers, body) + + def reset_stream(self, stream_id, error_code=0x8): + """Reset a stream with RST_STREAM. + + Args: + stream_id: The stream ID to reset + error_code: HTTP/2 error code (default: CANCEL) + """ + stream = self.streams.get(stream_id) + if stream is not None: + stream.reset(error_code) + + self.h2_conn.reset_stream(stream_id, error_code=error_code) + self._send_pending_data() + + def close(self, error_code=0x0, last_stream_id=None): + """Close the connection gracefully with GOAWAY. + + Args: + error_code: HTTP/2 error code (default: NO_ERROR) + last_stream_id: Last processed stream ID (default: highest) + """ + if self._closed: + return + + self._closed = True + + if last_stream_id is None: + # Use highest stream ID we've seen + last_stream_id = max(self.streams.keys()) if self.streams else 0 + + try: + self.h2_conn.close_connection(error_code=error_code) + self._send_pending_data() + except Exception: + pass # Best effort + + def _send_pending_data(self): + """Send any pending data from h2 to the socket.""" + data = self.h2_conn.data_to_send() + if data: + try: + self.sock.sendall(data) + except (OSError, IOError) as e: + self._closed = True + raise HTTP2ConnectionError(f"Socket write error: {e}") + + @property + def is_closed(self): + """Check if connection is closed.""" + return self._closed + + def cleanup_stream(self, stream_id): + """Remove a stream after processing is complete. + + Args: + stream_id: The stream ID to clean up + """ + self.streams.pop(stream_id, None) + + def __repr__(self): + return ( + f"" + ) + + +__all__ = ['HTTP2ServerConnection'] diff --git a/gunicorn/http2/request.py b/gunicorn/http2/request.py new file mode 100644 index 00000000..9dbdb752 --- /dev/null +++ b/gunicorn/http2/request.py @@ -0,0 +1,230 @@ +# -*- coding: utf-8 - +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +""" +HTTP/2 request wrapper. + +Provides a Request-compatible interface for HTTP/2 streams. +""" + +from io import BytesIO +from urllib.parse import unquote, urlparse + +from gunicorn.http.body import Body, LengthReader +from gunicorn.util import split_request_uri + + +class HTTP2Body: + """Body wrapper for HTTP/2 request data. + + Provides a file-like interface to the request body, + compatible with gunicorn's Body class expectations. + """ + + def __init__(self, data): + """Initialize with body data. + + Args: + data: bytes containing the request body + """ + self._data = BytesIO(data) + self._len = len(data) + + def read(self, size=None): + """Read data from the body. + + Args: + size: Number of bytes to read, or None for all remaining + + Returns: + bytes: The requested data + """ + if size is None: + return self._data.read() + return self._data.read(size) + + def readline(self, size=None): + """Read a line from the body. + + Args: + size: Maximum bytes to read + + Returns: + bytes: A line of data + """ + if size is None: + return self._data.readline() + return self._data.readline(size) + + def readlines(self, hint=None): + """Read all lines from the body. + + Args: + hint: Approximate byte count hint + + Returns: + list: List of lines + """ + return self._data.readlines(hint) + + def __iter__(self): + """Iterate over lines in the body.""" + return iter(self._data) + + def __len__(self): + """Return the content length.""" + return self._len + + def close(self): + """Close the body stream.""" + self._data.close() + + +class HTTP2Request: + """HTTP/2 request wrapper compatible with gunicorn Request interface. + + Wraps an HTTP2Stream to provide the same interface as the HTTP/1.x + Request class, allowing workers to handle HTTP/2 requests using + existing code paths. + """ + + def __init__(self, stream, cfg, peer_addr): + """Initialize from an HTTP/2 stream. + + Args: + stream: HTTP2Stream instance with received headers/body + cfg: Gunicorn configuration object + peer_addr: Client address tuple (host, port) + """ + self.stream = stream + self.cfg = cfg + self.peer_addr = peer_addr + self.remote_addr = peer_addr + + # HTTP/2 version tuple + self.version = (2, 0) + + # Parse pseudo-headers + pseudo = stream.get_pseudo_headers() + self.method = pseudo.get(':method', 'GET') + self.scheme = pseudo.get(':scheme', 'https') + authority = pseudo.get(':authority', '') + path = pseudo.get(':path', '/') + + # Parse the path into components + self.uri = path + try: + parts = split_request_uri(path) + self.path = parts.path or "" + self.query = parts.query or "" + self.fragment = parts.fragment or "" + except ValueError: + self.path = path + self.query = "" + self.fragment = "" + + # Store authority for Host header equivalent + self._authority = authority + + # Convert HTTP/2 headers to HTTP/1.1 style + # HTTP/2 headers are lowercase, convert to uppercase for WSGI + self.headers = [] + for name, value in stream.get_regular_headers(): + # Convert to uppercase for WSGI compatibility + self.headers.append((name.upper(), value)) + + # Add Host header if not present (from :authority) + if authority and not any(h[0] == 'HOST' for h in self.headers): + self.headers.append(('HOST', authority)) + + # Trailers (if any) + self.trailers = [] + if stream.trailers: + self.trailers = [ + (name.upper(), value) + for name, value in stream.trailers + ] + + # Body - HTTP/2 streams have complete body data + body_data = stream.get_request_body() + self.body = HTTP2Body(body_data) + + # Connection state + self.must_close = False + self._expected_100_continue = False + + # Request numbering (for logging) + self.req_number = stream.stream_id + + # HTTP/2 does not use proxy protocol through the data stream + self.proxy_protocol_info = None + + def force_close(self): + """Force the connection to close after this request.""" + self.must_close = True + + def should_close(self): + """Check if connection should close after this request. + + HTTP/2 connections are persistent by design, but we may still + need to close if explicitly requested. + + Returns: + bool: True if connection should close + """ + if self.must_close: + return True + # HTTP/2 connections are persistent, don't close by default + return False + + def get_header(self, name): + """Get a header value by name. + + Args: + name: Header name (case-insensitive) + + Returns: + str: Header value, or None if not found + """ + name = name.upper() + for h_name, h_value in self.headers: + if h_name == name: + return h_value + return None + + @property + def content_length(self): + """Get the Content-Length header value. + + Returns: + int: Content length, or None if not set + """ + cl = self.get_header('CONTENT-LENGTH') + if cl is not None: + try: + return int(cl) + except ValueError: + pass + return None + + @property + def content_type(self): + """Get the Content-Type header value. + + Returns: + str: Content type, or None if not set + """ + return self.get_header('CONTENT-TYPE') + + def __repr__(self): + return ( + f"" + ) + + +__all__ = ['HTTP2Request', 'HTTP2Body'] diff --git a/gunicorn/http2/stream.py b/gunicorn/http2/stream.py new file mode 100644 index 00000000..223fd441 --- /dev/null +++ b/gunicorn/http2/stream.py @@ -0,0 +1,279 @@ +# -*- coding: utf-8 - +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +""" +HTTP/2 stream state management. + +Each HTTP/2 stream represents a single request/response exchange. +""" + +from enum import Enum, auto +from io import BytesIO + +from .errors import HTTP2StreamError, HTTP2StreamClosed + + +class StreamState(Enum): + """HTTP/2 stream states as defined in RFC 7540 Section 5.1.""" + + IDLE = auto() + RESERVED_LOCAL = auto() + RESERVED_REMOTE = auto() + OPEN = auto() + HALF_CLOSED_LOCAL = auto() + HALF_CLOSED_REMOTE = auto() + CLOSED = auto() + + +class HTTP2Stream: + """Represents a single HTTP/2 stream. + + Manages stream state, headers, and body data for a single + request/response exchange within an HTTP/2 connection. + """ + + def __init__(self, stream_id, connection): + """Initialize an HTTP/2 stream. + + Args: + stream_id: The unique stream identifier (odd for client-initiated) + connection: The parent HTTP2ServerConnection + """ + self.stream_id = stream_id + self.connection = connection + + # Stream state + self.state = StreamState.IDLE + + # Request data + self.request_headers = [] + self.request_body = BytesIO() + self.request_complete = False + + # Response data + self.response_started = False + self.response_headers_sent = False + self.response_complete = False + + # Flow control + self.window_size = connection.initial_window_size + + # Trailers + self.trailers = None + + @property + def is_client_stream(self): + """Check if this is a client-initiated stream (odd stream ID).""" + return self.stream_id % 2 == 1 + + @property + def is_server_stream(self): + """Check if this is a server-initiated stream (even stream ID).""" + return self.stream_id % 2 == 0 + + @property + def can_receive(self): + """Check if this stream can receive data.""" + return self.state in ( + StreamState.OPEN, + StreamState.HALF_CLOSED_LOCAL, + ) + + @property + def can_send(self): + """Check if this stream can send data.""" + return self.state in ( + StreamState.OPEN, + StreamState.HALF_CLOSED_REMOTE, + ) + + def receive_headers(self, headers, end_stream=False): + """Process received HEADERS frame. + + Args: + headers: List of (name, value) tuples + end_stream: True if END_STREAM flag is set + + Raises: + HTTP2StreamError: If headers received in invalid state + """ + if self.state == StreamState.IDLE: + self.state = StreamState.OPEN + elif self.state not in (StreamState.OPEN, StreamState.HALF_CLOSED_LOCAL): + raise HTTP2StreamError( + self.stream_id, + f"Cannot receive headers in state {self.state.name}" + ) + + self.request_headers.extend(headers) + + if end_stream: + self._half_close_remote() + self.request_complete = True + + def receive_data(self, data, end_stream=False): + """Process received DATA frame. + + Args: + data: Bytes received + end_stream: True if END_STREAM flag is set + + Raises: + HTTP2StreamError: If data received in invalid state + """ + if not self.can_receive: + raise HTTP2StreamError( + self.stream_id, + f"Cannot receive data in state {self.state.name}" + ) + + self.request_body.write(data) + + if end_stream: + self._half_close_remote() + self.request_complete = True + + def receive_trailers(self, trailers): + """Process received trailing headers. + + Args: + trailers: List of (name, value) tuples + """ + if not self.can_receive: + raise HTTP2StreamError( + self.stream_id, + f"Cannot receive trailers in state {self.state.name}" + ) + + self.trailers = trailers + self._half_close_remote() + self.request_complete = True + + def send_headers(self, headers, end_stream=False): + """Mark headers as sent. + + Args: + headers: List of (name, value) tuples to send + end_stream: True if this completes the response + + Raises: + HTTP2StreamError: If headers cannot be sent in current state + """ + if not self.can_send: + raise HTTP2StreamError( + self.stream_id, + f"Cannot send headers in state {self.state.name}" + ) + + self.response_started = True + self.response_headers_sent = True + + if end_stream: + self._half_close_local() + self.response_complete = True + + def send_data(self, data, end_stream=False): + """Mark data as sent. + + Args: + data: Bytes to send + end_stream: True if this completes the response + + Raises: + HTTP2StreamError: If data cannot be sent in current state + """ + if not self.can_send: + raise HTTP2StreamError( + self.stream_id, + f"Cannot send data in state {self.state.name}" + ) + + if end_stream: + self._half_close_local() + self.response_complete = True + + def reset(self, error_code=0x8): + """Reset this stream with RST_STREAM. + + Args: + error_code: HTTP/2 error code (default: CANCEL) + """ + self.state = StreamState.CLOSED + self.response_complete = True + self.request_complete = True + + def close(self): + """Close this stream normally.""" + self.state = StreamState.CLOSED + self.response_complete = True + self.request_complete = True + + def _half_close_local(self): + """Transition to half-closed (local) state.""" + if self.state == StreamState.OPEN: + self.state = StreamState.HALF_CLOSED_LOCAL + elif self.state == StreamState.HALF_CLOSED_REMOTE: + self.state = StreamState.CLOSED + else: + raise HTTP2StreamError( + self.stream_id, + f"Cannot half-close local in state {self.state.name}" + ) + + def _half_close_remote(self): + """Transition to half-closed (remote) state.""" + if self.state == StreamState.OPEN: + self.state = StreamState.HALF_CLOSED_REMOTE + elif self.state == StreamState.HALF_CLOSED_LOCAL: + self.state = StreamState.CLOSED + else: + raise HTTP2StreamError( + self.stream_id, + f"Cannot half-close remote in state {self.state.name}" + ) + + def get_request_body(self): + """Get the complete request body. + + Returns: + bytes: The request body data + """ + return self.request_body.getvalue() + + def get_pseudo_headers(self): + """Extract HTTP/2 pseudo-headers from request headers. + + Returns: + dict: Mapping of pseudo-header names to values + (e.g., {':method': 'GET', ':path': '/'}) + """ + pseudo = {} + for name, value in self.request_headers: + if name.startswith(':'): + pseudo[name] = value + return pseudo + + def get_regular_headers(self): + """Get regular (non-pseudo) headers from request. + + Returns: + list: List of (name, value) tuples for regular headers + """ + return [ + (name, value) + for name, value in self.request_headers + if not name.startswith(':') + ] + + def __repr__(self): + return ( + f"" + ) + + +__all__ = ['HTTP2Stream', 'StreamState']