From fe18960cd1090957abcd3d1e68788f7aff58d324 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Sun, 25 Jan 2026 16:31:09 +0100 Subject: [PATCH] Add HTTP/2 worker integration Integrate HTTP/2 support with gunicorn workers: - sync worker: Add warning that HTTP/2 is not supported - gthread worker: Full HTTP/2 support - ALPN negotiation with explicit handshake for deferred SSL - HTTP/2 connection lifecycle management - Per-stream request handling with WSGI - AsyncHTTP2Connection: Async version for ASGI workers - Same features as sync version with async/await - Proper flow control with chunked data sending - ASGI worker: HTTP/2 support via AsyncHTTP2Connection - AsyncWorker base: HTTP/2 connection handling - tornado worker: Add warning that HTTP/2 is not supported Also exports helper functions from http2 module. --- gunicorn/asgi/protocol.py | 233 +++++++++++++++- gunicorn/http2/__init__.py | 28 ++ gunicorn/http2/async_connection.py | 412 +++++++++++++++++++++++++++++ gunicorn/workers/base_async.py | 105 ++++++++ gunicorn/workers/gthread.py | 123 ++++++++- gunicorn/workers/gtornado.py | 8 + gunicorn/workers/sync.py | 8 + 7 files changed, 914 insertions(+), 3 deletions(-) create mode 100644 gunicorn/http2/async_connection.py diff --git a/gunicorn/asgi/protocol.py b/gunicorn/asgi/protocol.py index dfceab9d..4ef73a7e 100644 --- a/gunicorn/asgi/protocol.py +++ b/gunicorn/asgi/protocol.py @@ -5,11 +5,12 @@ """ ASGI protocol handler for gunicorn. -Implements asyncio.Protocol to handle HTTP/1.x connections and dispatch -to ASGI applications. +Implements asyncio.Protocol to handle HTTP/1.x and HTTP/2 connections +and dispatch to ASGI applications. """ import asyncio +import ssl from datetime import datetime from gunicorn.asgi.unreader import AsyncUnreader @@ -61,6 +62,18 @@ class ASGIProtocol(asyncio.Protocol): self.transport = transport self.worker.nr_conns += 1 + # Check if HTTP/2 was negotiated via ALPN + ssl_object = transport.get_extra_info('ssl_object') + if ssl_object and hasattr(ssl_object, 'selected_alpn_protocol'): + alpn = ssl_object.selected_alpn_protocol() + if alpn == 'h2': + # HTTP/2 connection - switch to HTTP/2 handler + self._task = self.worker.loop.create_task( + self._handle_http2_connection(transport, ssl_object) + ) + return + + # HTTP/1.x connection # Create stream reader/writer self.reader = asyncio.StreamReader() self.writer = transport @@ -482,3 +495,219 @@ class ASGIProtocol(asyncio.Protocol): except Exception: pass self._closed = True + + async def _handle_http2_connection(self, transport, ssl_object): + """Handle an HTTP/2 connection.""" + try: + from gunicorn.http2.async_connection import AsyncHTTP2Connection + + peername = transport.get_extra_info('peername') + sockname = transport.get_extra_info('sockname') + + # Create async reader/writer from transport + reader = asyncio.StreamReader() + protocol = asyncio.StreamReaderProtocol(reader) + writer = asyncio.StreamWriter( + transport, protocol, reader, self.worker.loop + ) + + # Create HTTP/2 connection handler + h2_conn = AsyncHTTP2Connection( + self.cfg, reader, writer, peername + ) + await h2_conn.initiate_connection() + + # Store for data_received + self.reader = reader + self._h2_conn = h2_conn + + # Main loop - receive and handle requests + while not h2_conn.is_closed and self.worker.alive: + try: + requests = await h2_conn.receive_data(timeout=1.0) + except asyncio.TimeoutError: + continue + except Exception as e: + self.log.debug("HTTP/2 receive error: %s", e) + break + + for req in requests: + try: + await self._handle_http2_request( + req, h2_conn, sockname, peername + ) + except Exception as e: + self.log.exception("Error handling HTTP/2 request") + try: + await h2_conn.send_error( + req.stream.stream_id, 500, str(e) + ) + except Exception: + pass + finally: + h2_conn.cleanup_stream(req.stream.stream_id) + + # Increment worker request count + self.worker.nr += len(requests) + + # Check max_requests + if self.worker.nr >= self.worker.max_requests: + self.log.info("Autorestarting worker after current request.") + self.worker.alive = False + break + + except asyncio.CancelledError: + pass + except Exception as e: + self.log.exception("HTTP/2 connection error: %s", e) + finally: + if hasattr(self, '_h2_conn'): + try: + await self._h2_conn.close() + except Exception: + pass + self._close_transport() + + async def _handle_http2_request(self, request, h2_conn, sockname, peername): + """Handle a single HTTP/2 request.""" + stream_id = request.stream.stream_id + scope = self._build_http2_scope(request, sockname, peername) + + response_started = False + response_complete = False + exc_to_raise = None + + response_status = 500 + response_headers = [] + response_body = b'' + + async def receive(): + # For HTTP/2, the body is already buffered in the stream + body = request.body.read() + return { + "type": "http.request", + "body": body, + "more_body": False, + } + + async def send(message): + nonlocal response_started, response_complete, exc_to_raise + nonlocal response_status, response_headers, response_body + + msg_type = message["type"] + + if msg_type == "http.response.start": + if response_started: + exc_to_raise = RuntimeError("Response already started") + return + response_started = True + response_status = message["status"] + response_headers = message.get("headers", []) + + elif msg_type == "http.response.body": + if not response_started: + exc_to_raise = RuntimeError("Response not started") + return + if response_complete: + exc_to_raise = RuntimeError("Response already complete") + return + + body = message.get("body", b"") + more_body = message.get("more_body", False) + + if body: + response_body += body + + if not more_body: + response_complete = True + + # Build environ for logging + environ = self._build_http2_environ(request, sockname, peername) + request_start = datetime.now() + + try: + self.cfg.pre_request(self.worker, request) + await self.app(scope, receive, send) + + if exc_to_raise is not None: + raise exc_to_raise + + # Send response via HTTP/2 + if response_started: + # Convert headers to list of tuples + headers = [] + for name, value in response_headers: + if isinstance(name, bytes): + name = name.decode("latin-1") + if isinstance(value, bytes): + value = value.decode("latin-1") + headers.append((name, value)) + + await h2_conn.send_response( + stream_id, response_status, headers, response_body + ) + else: + await h2_conn.send_error(stream_id, 500, "Internal Server Error") + response_status = 500 + + except Exception: + self.log.exception("Error in ASGI application") + if not response_started: + await h2_conn.send_error(stream_id, 500, "Internal Server Error") + response_status = 500 + finally: + try: + request_time = datetime.now() - request_start + resp = ASGIResponseInfo( + response_status, response_headers, len(response_body) + ) + self.log.access(resp, request, environ, request_time) + self.cfg.post_request(self.worker, request, environ, resp) + except Exception: + self.log.exception("Exception in post_request hook") + + def _build_http2_scope(self, request, sockname, peername): + """Build ASGI HTTP scope from HTTP/2 request.""" + headers = [] + for name, value in request.headers: + headers.append(( + name.lower().encode("latin-1"), + value.encode("latin-1") + )) + + scope = { + "type": "http", + "asgi": {"version": "3.0", "spec_version": "2.4"}, + "http_version": "2", + "method": request.method, + "scheme": request.scheme, + "path": request.path, + "raw_path": request.path.encode("latin-1") if request.path else b"", + "query_string": request.query.encode("latin-1") if request.query else b"", + "root_path": self.cfg.root_path or "", + "headers": headers, + "server": sockname if sockname else None, + "client": peername if peername else None, + } + + if hasattr(self.worker, 'state'): + scope["state"] = self.worker.state + + return scope + + def _build_http2_environ(self, request, sockname, peername): + """Build minimal environ dict for access logging.""" + environ = { + "REQUEST_METHOD": request.method, + "RAW_URI": request.uri, + "PATH_INFO": request.path, + "QUERY_STRING": request.query or "", + "SERVER_PROTOCOL": "HTTP/2", + "REMOTE_ADDR": peername[0] if peername else "-", + } + + for name, value in request.headers: + key = "HTTP_" + name.replace("-", "_") + environ[key] = value + + return environ diff --git a/gunicorn/http2/__init__.py b/gunicorn/http2/__init__.py index 40cb14c8..6670fc68 100644 --- a/gunicorn/http2/__init__.py +++ b/gunicorn/http2/__init__.py @@ -51,8 +51,36 @@ def get_h2_version(): return _h2_version +def get_http2_connection_class(): + """Get the HTTP2ServerConnection class if h2 is available. + + Returns: + HTTP2ServerConnection class, or raises HTTP2NotAvailable + """ + if not is_http2_available(): + from .errors import HTTP2NotAvailable + raise HTTP2NotAvailable() + from .connection import HTTP2ServerConnection + return HTTP2ServerConnection + + +def get_async_http2_connection_class(): + """Get the AsyncHTTP2Connection class if h2 is available. + + Returns: + AsyncHTTP2Connection class, or raises HTTP2NotAvailable + """ + if not is_http2_available(): + from .errors import HTTP2NotAvailable + raise HTTP2NotAvailable() + from .async_connection import AsyncHTTP2Connection + return AsyncHTTP2Connection + + __all__ = [ 'is_http2_available', 'get_h2_version', + 'get_http2_connection_class', + 'get_async_http2_connection_class', 'H2_MIN_VERSION', ] diff --git a/gunicorn/http2/async_connection.py b/gunicorn/http2/async_connection.py new file mode 100644 index 00000000..3c2f08ea --- /dev/null +++ b/gunicorn/http2/async_connection.py @@ -0,0 +1,412 @@ +# -*- coding: utf-8 - +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +""" +Async HTTP/2 server connection implementation for ASGI workers. + +Uses the hyper-h2 library for HTTP/2 protocol handling with +asyncio for non-blocking I/O. +""" + +import asyncio +from io import BytesIO + +from .errors import ( + HTTP2Error, HTTP2ProtocolError, HTTP2ConnectionError, + HTTP2NotAvailable, +) +from .stream import HTTP2Stream +from .request import HTTP2Request + + +# Import h2 lazily to allow graceful fallback +_h2 = None +_h2_config = None +_h2_events = None +_h2_exceptions = None +_h2_settings = None + + +def _import_h2(): + """Lazily import h2 library components.""" + global _h2, _h2_config, _h2_events, _h2_exceptions, _h2_settings + + if _h2 is not None: + return + + try: + import h2.connection as _h2 + import h2.config as _h2_config + import h2.events as _h2_events + import h2.exceptions as _h2_exceptions + import h2.settings as _h2_settings + except ImportError: + raise HTTP2NotAvailable() + + +class AsyncHTTP2Connection: + """Async HTTP/2 server-side connection handler for ASGI. + + Manages the HTTP/2 connection state and multiplexed streams + using asyncio for non-blocking I/O operations. + """ + + # Default buffer size for socket reads + READ_BUFFER_SIZE = 65536 + + def __init__(self, cfg, reader, writer, client_addr): + """Initialize an async HTTP/2 server connection. + + Args: + cfg: Gunicorn configuration object + reader: asyncio StreamReader + writer: asyncio StreamWriter + client_addr: Client address tuple (host, port) + + Raises: + HTTP2NotAvailable: If h2 library is not installed + """ + _import_h2() + + self.cfg = cfg + self.reader = reader + self.writer = writer + self.client_addr = client_addr + + # Active streams indexed by stream ID + self.streams = {} + + # Queue of completed requests for the worker + self._request_queue = asyncio.Queue() + + # Connection settings from config + self.initial_window_size = cfg.http2_initial_window_size + self.max_concurrent_streams = cfg.http2_max_concurrent_streams + self.max_frame_size = cfg.http2_max_frame_size + self.max_header_list_size = cfg.http2_max_header_list_size + + # Initialize h2 connection + config = _h2_config.H2Configuration( + client_side=False, + header_encoding='utf-8', + ) + self.h2_conn = _h2.H2Connection(config=config) + + # Connection state + self._closed = False + self._initialized = False + self._receive_task = None + + async def initiate_connection(self): + """Send initial HTTP/2 settings to client. + + Should be called after the SSL handshake completes and + before processing any data. + """ + if self._initialized: + return + + # Update local settings before initiating + self.h2_conn.update_settings({ + _h2_settings.SettingCodes.MAX_CONCURRENT_STREAMS: self.max_concurrent_streams, + _h2_settings.SettingCodes.INITIAL_WINDOW_SIZE: self.initial_window_size, + _h2_settings.SettingCodes.MAX_FRAME_SIZE: self.max_frame_size, + _h2_settings.SettingCodes.MAX_HEADER_LIST_SIZE: self.max_header_list_size, + }) + + self.h2_conn.initiate_connection() + await self._send_pending_data() + self._initialized = True + + async def receive_data(self, timeout=None): + """Receive data and return completed requests. + + Args: + timeout: Optional timeout in seconds for read operation + + Returns: + list: List of HTTP2Request objects for completed requests + + Raises: + HTTP2ConnectionError: On protocol or connection errors + asyncio.TimeoutError: If timeout expires + """ + try: + if timeout is not None: + data = await asyncio.wait_for( + self.reader.read(self.READ_BUFFER_SIZE), + timeout=timeout + ) + else: + data = await self.reader.read(self.READ_BUFFER_SIZE) + except (OSError, IOError) as e: + raise HTTP2ConnectionError(f"Socket read error: {e}") + + if not data: + # Connection closed by peer + self._closed = True + return [] + + # Feed data to h2 + try: + events = self.h2_conn.receive_data(data) + except _h2_exceptions.ProtocolError as e: + raise HTTP2ProtocolError(str(e)) + + # Process events + completed_requests = [] + for event in events: + request = self._handle_event(event) + if request is not None: + completed_requests.append(request) + + # Send any pending data (WINDOW_UPDATE, etc.) + await self._send_pending_data() + + return completed_requests + + def _handle_event(self, event): + """Handle a single h2 event. + + Args: + event: h2 event object + + Returns: + HTTP2Request if a request is complete, None otherwise + """ + if isinstance(event, _h2_events.RequestReceived): + return self._handle_request_received(event) + + elif isinstance(event, _h2_events.DataReceived): + return self._handle_data_received(event) + + elif isinstance(event, _h2_events.StreamEnded): + return self._handle_stream_ended(event) + + elif isinstance(event, _h2_events.StreamReset): + self._handle_stream_reset(event) + + elif isinstance(event, _h2_events.WindowUpdated): + pass # Flow control update, handled by h2 + + elif isinstance(event, _h2_events.PriorityUpdated): + pass # Priority update, could be used for scheduling + + elif isinstance(event, _h2_events.SettingsAcknowledged): + pass # Settings ACK received + + elif isinstance(event, _h2_events.ConnectionTerminated): + self._handle_connection_terminated(event) + + elif isinstance(event, _h2_events.TrailersReceived): + return self._handle_trailers_received(event) + + return None + + def _handle_request_received(self, event): + """Handle RequestReceived event (HEADERS frame).""" + stream_id = event.stream_id + headers = event.headers + + # Create new stream + stream = HTTP2Stream(stream_id, self) + self.streams[stream_id] = stream + + # Process headers + stream.receive_headers(headers, end_stream=False) + return None + + def _handle_data_received(self, event): + """Handle DataReceived event.""" + stream_id = event.stream_id + data = event.data + + stream = self.streams.get(stream_id) + if stream is None: + return None + + stream.receive_data(data, end_stream=False) + + # Increment flow control windows (only if data received) + if len(data) > 0: + # Update stream-level window + self.h2_conn.increment_flow_control_window(len(data), stream_id=stream_id) + # Update connection-level window + self.h2_conn.increment_flow_control_window(len(data), stream_id=None) + + return None + + def _handle_stream_ended(self, event): + """Handle StreamEnded event.""" + stream_id = event.stream_id + stream = self.streams.get(stream_id) + + if stream is None: + return None + + stream.request_complete = True + return HTTP2Request(stream, self.cfg, self.client_addr) + + def _handle_stream_reset(self, event): + """Handle StreamReset event.""" + stream_id = event.stream_id + stream = self.streams.get(stream_id) + + if stream is not None: + stream.reset(event.error_code) + + def _handle_connection_terminated(self, event): + """Handle ConnectionTerminated event.""" + self._closed = True + + def _handle_trailers_received(self, event): + """Handle TrailersReceived event.""" + stream_id = event.stream_id + stream = self.streams.get(stream_id) + + if stream is None: + return None + + stream.receive_trailers(event.headers) + return HTTP2Request(stream, self.cfg, self.client_addr) + + async def send_response(self, stream_id, status, headers, body=None): + """Send a response on a stream. + + Args: + stream_id: The stream ID to respond on + status: HTTP status code (int) + headers: List of (name, value) header tuples + body: Optional response body bytes + """ + stream = self.streams.get(stream_id) + if stream is None: + raise HTTP2Error(f"Stream {stream_id} not found") + + # Build response headers with :status pseudo-header + response_headers = [(':status', str(status))] + for name, value in headers: + response_headers.append((name.lower(), str(value))) + + end_stream = body is None or len(body) == 0 + + # Send headers + self.h2_conn.send_headers(stream_id, response_headers, end_stream=end_stream) + stream.send_headers(response_headers, end_stream=end_stream) + await self._send_pending_data() + + # Send body if present + if body and len(body) > 0: + await self.send_data(stream_id, body, end_stream=True) + + async def send_data(self, stream_id, data, end_stream=False): + """Send data on a stream. + + Args: + stream_id: The stream ID + data: Body data bytes + end_stream: Whether this ends the stream + """ + stream = self.streams.get(stream_id) + if stream is None: + raise HTTP2Error(f"Stream {stream_id} not found") + + # Send data in chunks respecting flow control and max frame size + data_to_send = data + while data_to_send: + # Get available window size for this stream + available = self.h2_conn.local_flow_control_window(stream_id) + # Also respect max frame size + chunk_size = min(available, self.max_frame_size, len(data_to_send)) + + if chunk_size <= 0: + # No window available, send what we have and wait + await self._send_pending_data() + # Try again - the window might open after ACKs + available = self.h2_conn.local_flow_control_window(stream_id) + if available <= 0: + # Still no window, just try to send a small chunk + chunk_size = min(self.max_frame_size, len(data_to_send)) + + chunk = data_to_send[:chunk_size] + data_to_send = data_to_send[chunk_size:] + + # Only set end_stream on the final chunk + is_final = end_stream and len(data_to_send) == 0 + + self.h2_conn.send_data(stream_id, chunk, end_stream=is_final) + await self._send_pending_data() + + stream.send_data(data, end_stream=end_stream) + + async def send_error(self, stream_id, status_code, message=None): + """Send an error response on a stream.""" + body = message.encode() if message else b'' + headers = [('content-length', str(len(body)))] + if body: + headers.append(('content-type', 'text/plain; charset=utf-8')) + + await self.send_response(stream_id, status_code, headers, body) + + async def reset_stream(self, stream_id, error_code=0x8): + """Reset a stream with RST_STREAM.""" + stream = self.streams.get(stream_id) + if stream is not None: + stream.reset(error_code) + + self.h2_conn.reset_stream(stream_id, error_code=error_code) + await self._send_pending_data() + + async def close(self, error_code=0x0, last_stream_id=None): + """Close the connection gracefully with GOAWAY.""" + if self._closed: + return + + self._closed = True + + if last_stream_id is None: + last_stream_id = max(self.streams.keys()) if self.streams else 0 + + try: + self.h2_conn.close_connection(error_code=error_code) + await self._send_pending_data() + except Exception: + pass + + try: + self.writer.close() + await self.writer.wait_closed() + except Exception: + pass + + async def _send_pending_data(self): + """Send any pending data from h2 to the socket.""" + data = self.h2_conn.data_to_send() + if data: + try: + self.writer.write(data) + await self.writer.drain() + except (OSError, IOError) as e: + self._closed = True + raise HTTP2ConnectionError(f"Socket write error: {e}") + + @property + def is_closed(self): + """Check if connection is closed.""" + return self._closed + + def cleanup_stream(self, stream_id): + """Remove a stream after processing is complete.""" + self.streams.pop(stream_id, None) + + def __repr__(self): + return ( + f"" + ) + + +__all__ = ['AsyncHTTP2Connection'] diff --git a/gunicorn/workers/base_async.py b/gunicorn/workers/base_async.py index 22ea09ab..c78c8646 100644 --- a/gunicorn/workers/base_async.py +++ b/gunicorn/workers/base_async.py @@ -11,6 +11,7 @@ import sys from gunicorn import http from gunicorn.http import wsgi from gunicorn import util +from gunicorn import sock as gunicorn_sock from gunicorn.workers import base ALREADY_HANDLED = object() @@ -32,6 +33,14 @@ class AsyncWorker(base.Worker): def handle(self, listener, client, addr): req = None try: + # Check if HTTP/2 was negotiated (for SSL connections) + is_http2 = gunicorn_sock.is_http2_negotiated(client) + + if is_http2: + # Handle HTTP/2 connection + self.handle_http2(listener, client, addr) + return + parser = http.get_parser(self.cfg, client, addr) try: listener_name = listener.getsockname() @@ -86,6 +95,102 @@ class AsyncWorker(base.Worker): finally: util.close(client) + def handle_http2(self, listener, client, addr): + """Handle an HTTP/2 connection. + + Processes multiplexed HTTP/2 streams until the connection closes. + """ + listener_name = listener.getsockname() + + try: + h2_conn = http.get_parser(self.cfg, client, addr, http2_connection=True) + h2_conn.initiate_connection() + + while not h2_conn.is_closed and self.alive: + try: + requests = h2_conn.receive_data() + except http.errors.NoMoreData: + self.log.debug("HTTP/2 connection closed by client") + break + + for req in requests: + try: + self.handle_http2_request(listener_name, req, client, addr, h2_conn) + except Exception as e: + self.log.exception("Error handling HTTP/2 request") + try: + h2_conn.send_error(req.stream.stream_id, 500, str(e)) + except Exception: + pass + finally: + h2_conn.cleanup_stream(req.stream.stream_id) + + except ssl.SSLError as e: + if e.args[0] == ssl.SSL_ERROR_EOF: + self.log.debug("HTTP/2 SSL connection closed") + else: + self.log.debug("HTTP/2 SSL error: %s", e) + except OSError as e: + if e.errno not in (errno.EPIPE, errno.ECONNRESET, errno.ENOTCONN): + self.log.exception("HTTP/2 socket error") + except Exception as e: + self.log.exception("HTTP/2 connection error: %s", e) + + def handle_http2_request(self, listener_name, req, sock, addr, h2_conn): + """Handle a single HTTP/2 request.""" + stream_id = req.stream.stream_id + request_start = datetime.now() + environ = {} + resp = None + + try: + self.cfg.pre_request(self, req) + resp, environ = wsgi.create(req, sock, addr, listener_name, self.cfg) + environ["wsgi.multithread"] = True + environ["HTTP_VERSION"] = "2" + + self.nr += 1 + if self.nr >= self.max_requests: + if self.alive: + self.log.info("Autorestarting worker after current request.") + self.alive = False + + # Run WSGI app + respiter = self.wsgi(environ, resp.start_response) + if self.is_already_handled(respiter): + return + + # Collect response body + response_body = b'' + try: + if hasattr(respiter, '__iter__'): + for item in respiter: + if item: + response_body += item + finally: + if hasattr(respiter, "close"): + respiter.close() + + # Send response via HTTP/2 + h2_conn.send_response( + stream_id, + resp.status_code, + resp.headers, + response_body + ) + + request_time = datetime.now() - request_start + self.log.access(resp, req, environ, request_time) + + except Exception: + self.log.exception("Error handling HTTP/2 request") + raise + finally: + try: + self.cfg.post_request(self, req, environ, resp) + except Exception: + self.log.exception("Exception in post_request hook") + def handle_request(self, listener_name, req, sock, addr): request_start = datetime.now() environ = {} diff --git a/gunicorn/workers/gthread.py b/gunicorn/workers/gthread.py index 1665f4e6..b9d210de 100644 --- a/gunicorn/workers/gthread.py +++ b/gunicorn/workers/gthread.py @@ -41,6 +41,7 @@ class TConn: self.timeout = None self.parser = None self.initialized = False + self.is_http2 = False # set the socket to non blocking self.sock.setblocking(False) @@ -57,7 +58,21 @@ class TConn: if self.cfg.is_ssl: self.sock = sock.ssl_wrap_socket(self.sock, self.cfg) - # initialize the parser + # Complete the handshake to ensure ALPN negotiation is done + # (needed if do_handshake_on_connect is False) + if not self.cfg.do_handshake_on_connect: + self.sock.do_handshake() + + # Check if HTTP/2 was negotiated via ALPN + if sock.is_http2_negotiated(self.sock): + self.is_http2 = True + self.parser = http.get_parser( + self.cfg, self.sock, self.client, http2_connection=True + ) + self.parser.initiate_connection() + return + + # initialize the HTTP/1.x parser self.parser = http.get_parser(self.cfg, self.sock, self.client) def set_timeout(self): @@ -354,6 +369,10 @@ class ThreadWorker(base.Worker): # (ENOTCONN from ssl_wrap_socket would crash main thread otherwise) conn.init() + # HTTP/2 connections require special handling + if conn.is_http2: + return self.handle_http2(conn) + req = next(conn.parser) if not req: return False @@ -391,6 +410,108 @@ class ThreadWorker(base.Worker): return False + def handle_http2(self, conn): + """Handle an HTTP/2 connection. Runs in a worker thread. + + HTTP/2 connections are persistent and multiplex multiple streams. + We handle all streams until the connection is closed. + + Returns: + False (HTTP/2 connections don't use keepalive polling) + """ + h2_conn = conn.parser # HTTP2ServerConnection + + try: + while not h2_conn.is_closed and self.alive: + # Receive data and get completed requests + requests = h2_conn.receive_data() + + for req in requests: + try: + self.handle_http2_request(req, conn, h2_conn) + except Exception as e: + self.log.exception("Error handling HTTP/2 request") + try: + h2_conn.send_error(req.stream.stream_id, 500, str(e)) + except Exception: + pass + finally: + # Cleanup stream after processing + h2_conn.cleanup_stream(req.stream.stream_id) + + # Check if we need to close + if not self.alive: + h2_conn.close() + break + + except http.errors.NoMoreData: + self.log.debug("HTTP/2 connection closed by client") + except ssl.SSLError as e: + if e.args[0] == ssl.SSL_ERROR_EOF: + self.log.debug("HTTP/2 SSL connection closed") + else: + self.log.debug("HTTP/2 SSL error: %s", e) + except OSError as e: + if e.errno not in (errno.EPIPE, errno.ECONNRESET, errno.ENOTCONN): + self.log.exception("HTTP/2 socket error") + except Exception as e: + self.log.exception("HTTP/2 connection error") + + return False + + def handle_http2_request(self, req, conn, h2_conn): + """Handle a single HTTP/2 request/stream.""" + environ = {} + resp = None + stream_id = req.stream.stream_id + + try: + self.cfg.pre_request(self, req) + request_start = datetime.now() + + # Create WSGI environ + resp, environ = wsgi.create(req, conn.sock, conn.client, + conn.server, self.cfg) + environ["wsgi.multithread"] = True + environ["HTTP_VERSION"] = "2" # Indicate HTTP/2 + + self.nr += 1 + if self.nr >= self.max_requests: + if self.alive: + self.log.info("Autorestarting worker after current request.") + self.alive = False + + # Run WSGI app + respiter = self.wsgi(environ, resp.start_response) + + # Collect response body + response_body = b'' + try: + if hasattr(respiter, '__iter__'): + for item in respiter: + if item: + response_body += item + finally: + if hasattr(respiter, "close"): + respiter.close() + + # Send response via HTTP/2 + h2_conn.send_response( + stream_id, + resp.status_code, + resp.headers, + response_body + ) + + request_time = datetime.now() - request_start + self.log.access(resp, req, environ, request_time) + + finally: + try: + self.cfg.post_request(self, req, environ, resp) + except Exception: + self.log.exception("Exception in post_request hook") + def handle_request(self, req, conn): environ = {} resp = None diff --git a/gunicorn/workers/gtornado.py b/gunicorn/workers/gtornado.py index cac0f925..2c4b5c58 100644 --- a/gunicorn/workers/gtornado.py +++ b/gunicorn/workers/gtornado.py @@ -77,6 +77,14 @@ class TornadoWorker(Worker): self.alive = True self.server_alive = False + # Warn if HTTP/2 is requested - tornado worker doesn't support it + if 'h2' in self.cfg.http_protocols: + self.log.warning( + "HTTP/2 is not supported by the tornado worker. " + "Use gthread, gevent, eventlet, or asgi workers for HTTP/2 support. " + "Falling back to HTTP/1.1 only." + ) + self.callbacks = [] self.callbacks.append(PeriodicCallback(self.watchdog, 1000)) self.callbacks.append(PeriodicCallback(self.heartbeat, 1000)) diff --git a/gunicorn/workers/sync.py b/gunicorn/workers/sync.py index 99dbdaac..763f8652 100644 --- a/gunicorn/workers/sync.py +++ b/gunicorn/workers/sync.py @@ -114,6 +114,14 @@ class SyncWorker(base.Worker): # use the CPU for nothing. This minimal timeout prevent it. timeout = self.timeout or 0.5 + # Warn if HTTP/2 is requested - sync worker doesn't support it + if 'h2' in self.cfg.http_protocols: + self.log.warning( + "HTTP/2 is not supported by the sync worker. " + "Use gthread, gevent, eventlet, or asgi workers for HTTP/2 support. " + "Falling back to HTTP/1.1 only." + ) + # self.socket appears to lose its blocking status after # we fork in the arbiter. Reset it here. for s in self.sockets: