mirror of
https://github.com/frappe/gunicorn.git
synced 2026-07-01 18:21:30 +08:00
Add HTTP/2 worker integration
Integrate HTTP/2 support with gunicorn workers: - sync worker: Add warning that HTTP/2 is not supported - gthread worker: Full HTTP/2 support - ALPN negotiation with explicit handshake for deferred SSL - HTTP/2 connection lifecycle management - Per-stream request handling with WSGI - AsyncHTTP2Connection: Async version for ASGI workers - Same features as sync version with async/await - Proper flow control with chunked data sending - ASGI worker: HTTP/2 support via AsyncHTTP2Connection - AsyncWorker base: HTTP/2 connection handling - tornado worker: Add warning that HTTP/2 is not supported Also exports helper functions from http2 module.
This commit is contained in:
parent
89a0a46722
commit
fe18960cd1
@ -5,11 +5,12 @@
|
||||
"""
|
||||
ASGI protocol handler for gunicorn.
|
||||
|
||||
Implements asyncio.Protocol to handle HTTP/1.x connections and dispatch
|
||||
to ASGI applications.
|
||||
Implements asyncio.Protocol to handle HTTP/1.x and HTTP/2 connections
|
||||
and dispatch to ASGI applications.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import ssl
|
||||
from datetime import datetime
|
||||
|
||||
from gunicorn.asgi.unreader import AsyncUnreader
|
||||
@ -61,6 +62,18 @@ class ASGIProtocol(asyncio.Protocol):
|
||||
self.transport = transport
|
||||
self.worker.nr_conns += 1
|
||||
|
||||
# Check if HTTP/2 was negotiated via ALPN
|
||||
ssl_object = transport.get_extra_info('ssl_object')
|
||||
if ssl_object and hasattr(ssl_object, 'selected_alpn_protocol'):
|
||||
alpn = ssl_object.selected_alpn_protocol()
|
||||
if alpn == 'h2':
|
||||
# HTTP/2 connection - switch to HTTP/2 handler
|
||||
self._task = self.worker.loop.create_task(
|
||||
self._handle_http2_connection(transport, ssl_object)
|
||||
)
|
||||
return
|
||||
|
||||
# HTTP/1.x connection
|
||||
# Create stream reader/writer
|
||||
self.reader = asyncio.StreamReader()
|
||||
self.writer = transport
|
||||
@ -482,3 +495,219 @@ class ASGIProtocol(asyncio.Protocol):
|
||||
except Exception:
|
||||
pass
|
||||
self._closed = True
|
||||
|
||||
async def _handle_http2_connection(self, transport, ssl_object):
|
||||
"""Handle an HTTP/2 connection."""
|
||||
try:
|
||||
from gunicorn.http2.async_connection import AsyncHTTP2Connection
|
||||
|
||||
peername = transport.get_extra_info('peername')
|
||||
sockname = transport.get_extra_info('sockname')
|
||||
|
||||
# Create async reader/writer from transport
|
||||
reader = asyncio.StreamReader()
|
||||
protocol = asyncio.StreamReaderProtocol(reader)
|
||||
writer = asyncio.StreamWriter(
|
||||
transport, protocol, reader, self.worker.loop
|
||||
)
|
||||
|
||||
# Create HTTP/2 connection handler
|
||||
h2_conn = AsyncHTTP2Connection(
|
||||
self.cfg, reader, writer, peername
|
||||
)
|
||||
await h2_conn.initiate_connection()
|
||||
|
||||
# Store for data_received
|
||||
self.reader = reader
|
||||
self._h2_conn = h2_conn
|
||||
|
||||
# Main loop - receive and handle requests
|
||||
while not h2_conn.is_closed and self.worker.alive:
|
||||
try:
|
||||
requests = await h2_conn.receive_data(timeout=1.0)
|
||||
except asyncio.TimeoutError:
|
||||
continue
|
||||
except Exception as e:
|
||||
self.log.debug("HTTP/2 receive error: %s", e)
|
||||
break
|
||||
|
||||
for req in requests:
|
||||
try:
|
||||
await self._handle_http2_request(
|
||||
req, h2_conn, sockname, peername
|
||||
)
|
||||
except Exception as e:
|
||||
self.log.exception("Error handling HTTP/2 request")
|
||||
try:
|
||||
await h2_conn.send_error(
|
||||
req.stream.stream_id, 500, str(e)
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
finally:
|
||||
h2_conn.cleanup_stream(req.stream.stream_id)
|
||||
|
||||
# Increment worker request count
|
||||
self.worker.nr += len(requests)
|
||||
|
||||
# Check max_requests
|
||||
if self.worker.nr >= self.worker.max_requests:
|
||||
self.log.info("Autorestarting worker after current request.")
|
||||
self.worker.alive = False
|
||||
break
|
||||
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
except Exception as e:
|
||||
self.log.exception("HTTP/2 connection error: %s", e)
|
||||
finally:
|
||||
if hasattr(self, '_h2_conn'):
|
||||
try:
|
||||
await self._h2_conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
self._close_transport()
|
||||
|
||||
async def _handle_http2_request(self, request, h2_conn, sockname, peername):
|
||||
"""Handle a single HTTP/2 request."""
|
||||
stream_id = request.stream.stream_id
|
||||
scope = self._build_http2_scope(request, sockname, peername)
|
||||
|
||||
response_started = False
|
||||
response_complete = False
|
||||
exc_to_raise = None
|
||||
|
||||
response_status = 500
|
||||
response_headers = []
|
||||
response_body = b''
|
||||
|
||||
async def receive():
|
||||
# For HTTP/2, the body is already buffered in the stream
|
||||
body = request.body.read()
|
||||
return {
|
||||
"type": "http.request",
|
||||
"body": body,
|
||||
"more_body": False,
|
||||
}
|
||||
|
||||
async def send(message):
|
||||
nonlocal response_started, response_complete, exc_to_raise
|
||||
nonlocal response_status, response_headers, response_body
|
||||
|
||||
msg_type = message["type"]
|
||||
|
||||
if msg_type == "http.response.start":
|
||||
if response_started:
|
||||
exc_to_raise = RuntimeError("Response already started")
|
||||
return
|
||||
response_started = True
|
||||
response_status = message["status"]
|
||||
response_headers = message.get("headers", [])
|
||||
|
||||
elif msg_type == "http.response.body":
|
||||
if not response_started:
|
||||
exc_to_raise = RuntimeError("Response not started")
|
||||
return
|
||||
if response_complete:
|
||||
exc_to_raise = RuntimeError("Response already complete")
|
||||
return
|
||||
|
||||
body = message.get("body", b"")
|
||||
more_body = message.get("more_body", False)
|
||||
|
||||
if body:
|
||||
response_body += body
|
||||
|
||||
if not more_body:
|
||||
response_complete = True
|
||||
|
||||
# Build environ for logging
|
||||
environ = self._build_http2_environ(request, sockname, peername)
|
||||
request_start = datetime.now()
|
||||
|
||||
try:
|
||||
self.cfg.pre_request(self.worker, request)
|
||||
await self.app(scope, receive, send)
|
||||
|
||||
if exc_to_raise is not None:
|
||||
raise exc_to_raise
|
||||
|
||||
# Send response via HTTP/2
|
||||
if response_started:
|
||||
# Convert headers to list of tuples
|
||||
headers = []
|
||||
for name, value in response_headers:
|
||||
if isinstance(name, bytes):
|
||||
name = name.decode("latin-1")
|
||||
if isinstance(value, bytes):
|
||||
value = value.decode("latin-1")
|
||||
headers.append((name, value))
|
||||
|
||||
await h2_conn.send_response(
|
||||
stream_id, response_status, headers, response_body
|
||||
)
|
||||
else:
|
||||
await h2_conn.send_error(stream_id, 500, "Internal Server Error")
|
||||
response_status = 500
|
||||
|
||||
except Exception:
|
||||
self.log.exception("Error in ASGI application")
|
||||
if not response_started:
|
||||
await h2_conn.send_error(stream_id, 500, "Internal Server Error")
|
||||
response_status = 500
|
||||
finally:
|
||||
try:
|
||||
request_time = datetime.now() - request_start
|
||||
resp = ASGIResponseInfo(
|
||||
response_status, response_headers, len(response_body)
|
||||
)
|
||||
self.log.access(resp, request, environ, request_time)
|
||||
self.cfg.post_request(self.worker, request, environ, resp)
|
||||
except Exception:
|
||||
self.log.exception("Exception in post_request hook")
|
||||
|
||||
def _build_http2_scope(self, request, sockname, peername):
|
||||
"""Build ASGI HTTP scope from HTTP/2 request."""
|
||||
headers = []
|
||||
for name, value in request.headers:
|
||||
headers.append((
|
||||
name.lower().encode("latin-1"),
|
||||
value.encode("latin-1")
|
||||
))
|
||||
|
||||
scope = {
|
||||
"type": "http",
|
||||
"asgi": {"version": "3.0", "spec_version": "2.4"},
|
||||
"http_version": "2",
|
||||
"method": request.method,
|
||||
"scheme": request.scheme,
|
||||
"path": request.path,
|
||||
"raw_path": request.path.encode("latin-1") if request.path else b"",
|
||||
"query_string": request.query.encode("latin-1") if request.query else b"",
|
||||
"root_path": self.cfg.root_path or "",
|
||||
"headers": headers,
|
||||
"server": sockname if sockname else None,
|
||||
"client": peername if peername else None,
|
||||
}
|
||||
|
||||
if hasattr(self.worker, 'state'):
|
||||
scope["state"] = self.worker.state
|
||||
|
||||
return scope
|
||||
|
||||
def _build_http2_environ(self, request, sockname, peername):
|
||||
"""Build minimal environ dict for access logging."""
|
||||
environ = {
|
||||
"REQUEST_METHOD": request.method,
|
||||
"RAW_URI": request.uri,
|
||||
"PATH_INFO": request.path,
|
||||
"QUERY_STRING": request.query or "",
|
||||
"SERVER_PROTOCOL": "HTTP/2",
|
||||
"REMOTE_ADDR": peername[0] if peername else "-",
|
||||
}
|
||||
|
||||
for name, value in request.headers:
|
||||
key = "HTTP_" + name.replace("-", "_")
|
||||
environ[key] = value
|
||||
|
||||
return environ
|
||||
|
||||
@ -51,8 +51,36 @@ def get_h2_version():
|
||||
return _h2_version
|
||||
|
||||
|
||||
def get_http2_connection_class():
|
||||
"""Get the HTTP2ServerConnection class if h2 is available.
|
||||
|
||||
Returns:
|
||||
HTTP2ServerConnection class, or raises HTTP2NotAvailable
|
||||
"""
|
||||
if not is_http2_available():
|
||||
from .errors import HTTP2NotAvailable
|
||||
raise HTTP2NotAvailable()
|
||||
from .connection import HTTP2ServerConnection
|
||||
return HTTP2ServerConnection
|
||||
|
||||
|
||||
def get_async_http2_connection_class():
|
||||
"""Get the AsyncHTTP2Connection class if h2 is available.
|
||||
|
||||
Returns:
|
||||
AsyncHTTP2Connection class, or raises HTTP2NotAvailable
|
||||
"""
|
||||
if not is_http2_available():
|
||||
from .errors import HTTP2NotAvailable
|
||||
raise HTTP2NotAvailable()
|
||||
from .async_connection import AsyncHTTP2Connection
|
||||
return AsyncHTTP2Connection
|
||||
|
||||
|
||||
__all__ = [
|
||||
'is_http2_available',
|
||||
'get_h2_version',
|
||||
'get_http2_connection_class',
|
||||
'get_async_http2_connection_class',
|
||||
'H2_MIN_VERSION',
|
||||
]
|
||||
|
||||
412
gunicorn/http2/async_connection.py
Normal file
412
gunicorn/http2/async_connection.py
Normal file
@ -0,0 +1,412 @@
|
||||
# -*- coding: utf-8 -
|
||||
#
|
||||
# This file is part of gunicorn released under the MIT license.
|
||||
# See the NOTICE for more information.
|
||||
|
||||
"""
|
||||
Async HTTP/2 server connection implementation for ASGI workers.
|
||||
|
||||
Uses the hyper-h2 library for HTTP/2 protocol handling with
|
||||
asyncio for non-blocking I/O.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from io import BytesIO
|
||||
|
||||
from .errors import (
|
||||
HTTP2Error, HTTP2ProtocolError, HTTP2ConnectionError,
|
||||
HTTP2NotAvailable,
|
||||
)
|
||||
from .stream import HTTP2Stream
|
||||
from .request import HTTP2Request
|
||||
|
||||
|
||||
# Import h2 lazily to allow graceful fallback
|
||||
_h2 = None
|
||||
_h2_config = None
|
||||
_h2_events = None
|
||||
_h2_exceptions = None
|
||||
_h2_settings = None
|
||||
|
||||
|
||||
def _import_h2():
|
||||
"""Lazily import h2 library components."""
|
||||
global _h2, _h2_config, _h2_events, _h2_exceptions, _h2_settings
|
||||
|
||||
if _h2 is not None:
|
||||
return
|
||||
|
||||
try:
|
||||
import h2.connection as _h2
|
||||
import h2.config as _h2_config
|
||||
import h2.events as _h2_events
|
||||
import h2.exceptions as _h2_exceptions
|
||||
import h2.settings as _h2_settings
|
||||
except ImportError:
|
||||
raise HTTP2NotAvailable()
|
||||
|
||||
|
||||
class AsyncHTTP2Connection:
|
||||
"""Async HTTP/2 server-side connection handler for ASGI.
|
||||
|
||||
Manages the HTTP/2 connection state and multiplexed streams
|
||||
using asyncio for non-blocking I/O operations.
|
||||
"""
|
||||
|
||||
# Default buffer size for socket reads
|
||||
READ_BUFFER_SIZE = 65536
|
||||
|
||||
def __init__(self, cfg, reader, writer, client_addr):
|
||||
"""Initialize an async HTTP/2 server connection.
|
||||
|
||||
Args:
|
||||
cfg: Gunicorn configuration object
|
||||
reader: asyncio StreamReader
|
||||
writer: asyncio StreamWriter
|
||||
client_addr: Client address tuple (host, port)
|
||||
|
||||
Raises:
|
||||
HTTP2NotAvailable: If h2 library is not installed
|
||||
"""
|
||||
_import_h2()
|
||||
|
||||
self.cfg = cfg
|
||||
self.reader = reader
|
||||
self.writer = writer
|
||||
self.client_addr = client_addr
|
||||
|
||||
# Active streams indexed by stream ID
|
||||
self.streams = {}
|
||||
|
||||
# Queue of completed requests for the worker
|
||||
self._request_queue = asyncio.Queue()
|
||||
|
||||
# Connection settings from config
|
||||
self.initial_window_size = cfg.http2_initial_window_size
|
||||
self.max_concurrent_streams = cfg.http2_max_concurrent_streams
|
||||
self.max_frame_size = cfg.http2_max_frame_size
|
||||
self.max_header_list_size = cfg.http2_max_header_list_size
|
||||
|
||||
# Initialize h2 connection
|
||||
config = _h2_config.H2Configuration(
|
||||
client_side=False,
|
||||
header_encoding='utf-8',
|
||||
)
|
||||
self.h2_conn = _h2.H2Connection(config=config)
|
||||
|
||||
# Connection state
|
||||
self._closed = False
|
||||
self._initialized = False
|
||||
self._receive_task = None
|
||||
|
||||
async def initiate_connection(self):
|
||||
"""Send initial HTTP/2 settings to client.
|
||||
|
||||
Should be called after the SSL handshake completes and
|
||||
before processing any data.
|
||||
"""
|
||||
if self._initialized:
|
||||
return
|
||||
|
||||
# Update local settings before initiating
|
||||
self.h2_conn.update_settings({
|
||||
_h2_settings.SettingCodes.MAX_CONCURRENT_STREAMS: self.max_concurrent_streams,
|
||||
_h2_settings.SettingCodes.INITIAL_WINDOW_SIZE: self.initial_window_size,
|
||||
_h2_settings.SettingCodes.MAX_FRAME_SIZE: self.max_frame_size,
|
||||
_h2_settings.SettingCodes.MAX_HEADER_LIST_SIZE: self.max_header_list_size,
|
||||
})
|
||||
|
||||
self.h2_conn.initiate_connection()
|
||||
await self._send_pending_data()
|
||||
self._initialized = True
|
||||
|
||||
async def receive_data(self, timeout=None):
|
||||
"""Receive data and return completed requests.
|
||||
|
||||
Args:
|
||||
timeout: Optional timeout in seconds for read operation
|
||||
|
||||
Returns:
|
||||
list: List of HTTP2Request objects for completed requests
|
||||
|
||||
Raises:
|
||||
HTTP2ConnectionError: On protocol or connection errors
|
||||
asyncio.TimeoutError: If timeout expires
|
||||
"""
|
||||
try:
|
||||
if timeout is not None:
|
||||
data = await asyncio.wait_for(
|
||||
self.reader.read(self.READ_BUFFER_SIZE),
|
||||
timeout=timeout
|
||||
)
|
||||
else:
|
||||
data = await self.reader.read(self.READ_BUFFER_SIZE)
|
||||
except (OSError, IOError) as e:
|
||||
raise HTTP2ConnectionError(f"Socket read error: {e}")
|
||||
|
||||
if not data:
|
||||
# Connection closed by peer
|
||||
self._closed = True
|
||||
return []
|
||||
|
||||
# Feed data to h2
|
||||
try:
|
||||
events = self.h2_conn.receive_data(data)
|
||||
except _h2_exceptions.ProtocolError as e:
|
||||
raise HTTP2ProtocolError(str(e))
|
||||
|
||||
# Process events
|
||||
completed_requests = []
|
||||
for event in events:
|
||||
request = self._handle_event(event)
|
||||
if request is not None:
|
||||
completed_requests.append(request)
|
||||
|
||||
# Send any pending data (WINDOW_UPDATE, etc.)
|
||||
await self._send_pending_data()
|
||||
|
||||
return completed_requests
|
||||
|
||||
def _handle_event(self, event):
|
||||
"""Handle a single h2 event.
|
||||
|
||||
Args:
|
||||
event: h2 event object
|
||||
|
||||
Returns:
|
||||
HTTP2Request if a request is complete, None otherwise
|
||||
"""
|
||||
if isinstance(event, _h2_events.RequestReceived):
|
||||
return self._handle_request_received(event)
|
||||
|
||||
elif isinstance(event, _h2_events.DataReceived):
|
||||
return self._handle_data_received(event)
|
||||
|
||||
elif isinstance(event, _h2_events.StreamEnded):
|
||||
return self._handle_stream_ended(event)
|
||||
|
||||
elif isinstance(event, _h2_events.StreamReset):
|
||||
self._handle_stream_reset(event)
|
||||
|
||||
elif isinstance(event, _h2_events.WindowUpdated):
|
||||
pass # Flow control update, handled by h2
|
||||
|
||||
elif isinstance(event, _h2_events.PriorityUpdated):
|
||||
pass # Priority update, could be used for scheduling
|
||||
|
||||
elif isinstance(event, _h2_events.SettingsAcknowledged):
|
||||
pass # Settings ACK received
|
||||
|
||||
elif isinstance(event, _h2_events.ConnectionTerminated):
|
||||
self._handle_connection_terminated(event)
|
||||
|
||||
elif isinstance(event, _h2_events.TrailersReceived):
|
||||
return self._handle_trailers_received(event)
|
||||
|
||||
return None
|
||||
|
||||
def _handle_request_received(self, event):
|
||||
"""Handle RequestReceived event (HEADERS frame)."""
|
||||
stream_id = event.stream_id
|
||||
headers = event.headers
|
||||
|
||||
# Create new stream
|
||||
stream = HTTP2Stream(stream_id, self)
|
||||
self.streams[stream_id] = stream
|
||||
|
||||
# Process headers
|
||||
stream.receive_headers(headers, end_stream=False)
|
||||
return None
|
||||
|
||||
def _handle_data_received(self, event):
|
||||
"""Handle DataReceived event."""
|
||||
stream_id = event.stream_id
|
||||
data = event.data
|
||||
|
||||
stream = self.streams.get(stream_id)
|
||||
if stream is None:
|
||||
return None
|
||||
|
||||
stream.receive_data(data, end_stream=False)
|
||||
|
||||
# Increment flow control windows (only if data received)
|
||||
if len(data) > 0:
|
||||
# Update stream-level window
|
||||
self.h2_conn.increment_flow_control_window(len(data), stream_id=stream_id)
|
||||
# Update connection-level window
|
||||
self.h2_conn.increment_flow_control_window(len(data), stream_id=None)
|
||||
|
||||
return None
|
||||
|
||||
def _handle_stream_ended(self, event):
|
||||
"""Handle StreamEnded event."""
|
||||
stream_id = event.stream_id
|
||||
stream = self.streams.get(stream_id)
|
||||
|
||||
if stream is None:
|
||||
return None
|
||||
|
||||
stream.request_complete = True
|
||||
return HTTP2Request(stream, self.cfg, self.client_addr)
|
||||
|
||||
def _handle_stream_reset(self, event):
|
||||
"""Handle StreamReset event."""
|
||||
stream_id = event.stream_id
|
||||
stream = self.streams.get(stream_id)
|
||||
|
||||
if stream is not None:
|
||||
stream.reset(event.error_code)
|
||||
|
||||
def _handle_connection_terminated(self, event):
|
||||
"""Handle ConnectionTerminated event."""
|
||||
self._closed = True
|
||||
|
||||
def _handle_trailers_received(self, event):
|
||||
"""Handle TrailersReceived event."""
|
||||
stream_id = event.stream_id
|
||||
stream = self.streams.get(stream_id)
|
||||
|
||||
if stream is None:
|
||||
return None
|
||||
|
||||
stream.receive_trailers(event.headers)
|
||||
return HTTP2Request(stream, self.cfg, self.client_addr)
|
||||
|
||||
async def send_response(self, stream_id, status, headers, body=None):
|
||||
"""Send a response on a stream.
|
||||
|
||||
Args:
|
||||
stream_id: The stream ID to respond on
|
||||
status: HTTP status code (int)
|
||||
headers: List of (name, value) header tuples
|
||||
body: Optional response body bytes
|
||||
"""
|
||||
stream = self.streams.get(stream_id)
|
||||
if stream is None:
|
||||
raise HTTP2Error(f"Stream {stream_id} not found")
|
||||
|
||||
# Build response headers with :status pseudo-header
|
||||
response_headers = [(':status', str(status))]
|
||||
for name, value in headers:
|
||||
response_headers.append((name.lower(), str(value)))
|
||||
|
||||
end_stream = body is None or len(body) == 0
|
||||
|
||||
# Send headers
|
||||
self.h2_conn.send_headers(stream_id, response_headers, end_stream=end_stream)
|
||||
stream.send_headers(response_headers, end_stream=end_stream)
|
||||
await self._send_pending_data()
|
||||
|
||||
# Send body if present
|
||||
if body and len(body) > 0:
|
||||
await self.send_data(stream_id, body, end_stream=True)
|
||||
|
||||
async def send_data(self, stream_id, data, end_stream=False):
|
||||
"""Send data on a stream.
|
||||
|
||||
Args:
|
||||
stream_id: The stream ID
|
||||
data: Body data bytes
|
||||
end_stream: Whether this ends the stream
|
||||
"""
|
||||
stream = self.streams.get(stream_id)
|
||||
if stream is None:
|
||||
raise HTTP2Error(f"Stream {stream_id} not found")
|
||||
|
||||
# Send data in chunks respecting flow control and max frame size
|
||||
data_to_send = data
|
||||
while data_to_send:
|
||||
# Get available window size for this stream
|
||||
available = self.h2_conn.local_flow_control_window(stream_id)
|
||||
# Also respect max frame size
|
||||
chunk_size = min(available, self.max_frame_size, len(data_to_send))
|
||||
|
||||
if chunk_size <= 0:
|
||||
# No window available, send what we have and wait
|
||||
await self._send_pending_data()
|
||||
# Try again - the window might open after ACKs
|
||||
available = self.h2_conn.local_flow_control_window(stream_id)
|
||||
if available <= 0:
|
||||
# Still no window, just try to send a small chunk
|
||||
chunk_size = min(self.max_frame_size, len(data_to_send))
|
||||
|
||||
chunk = data_to_send[:chunk_size]
|
||||
data_to_send = data_to_send[chunk_size:]
|
||||
|
||||
# Only set end_stream on the final chunk
|
||||
is_final = end_stream and len(data_to_send) == 0
|
||||
|
||||
self.h2_conn.send_data(stream_id, chunk, end_stream=is_final)
|
||||
await self._send_pending_data()
|
||||
|
||||
stream.send_data(data, end_stream=end_stream)
|
||||
|
||||
async def send_error(self, stream_id, status_code, message=None):
|
||||
"""Send an error response on a stream."""
|
||||
body = message.encode() if message else b''
|
||||
headers = [('content-length', str(len(body)))]
|
||||
if body:
|
||||
headers.append(('content-type', 'text/plain; charset=utf-8'))
|
||||
|
||||
await self.send_response(stream_id, status_code, headers, body)
|
||||
|
||||
async def reset_stream(self, stream_id, error_code=0x8):
|
||||
"""Reset a stream with RST_STREAM."""
|
||||
stream = self.streams.get(stream_id)
|
||||
if stream is not None:
|
||||
stream.reset(error_code)
|
||||
|
||||
self.h2_conn.reset_stream(stream_id, error_code=error_code)
|
||||
await self._send_pending_data()
|
||||
|
||||
async def close(self, error_code=0x0, last_stream_id=None):
|
||||
"""Close the connection gracefully with GOAWAY."""
|
||||
if self._closed:
|
||||
return
|
||||
|
||||
self._closed = True
|
||||
|
||||
if last_stream_id is None:
|
||||
last_stream_id = max(self.streams.keys()) if self.streams else 0
|
||||
|
||||
try:
|
||||
self.h2_conn.close_connection(error_code=error_code)
|
||||
await self._send_pending_data()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
try:
|
||||
self.writer.close()
|
||||
await self.writer.wait_closed()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
async def _send_pending_data(self):
|
||||
"""Send any pending data from h2 to the socket."""
|
||||
data = self.h2_conn.data_to_send()
|
||||
if data:
|
||||
try:
|
||||
self.writer.write(data)
|
||||
await self.writer.drain()
|
||||
except (OSError, IOError) as e:
|
||||
self._closed = True
|
||||
raise HTTP2ConnectionError(f"Socket write error: {e}")
|
||||
|
||||
@property
|
||||
def is_closed(self):
|
||||
"""Check if connection is closed."""
|
||||
return self._closed
|
||||
|
||||
def cleanup_stream(self, stream_id):
|
||||
"""Remove a stream after processing is complete."""
|
||||
self.streams.pop(stream_id, None)
|
||||
|
||||
def __repr__(self):
|
||||
return (
|
||||
f"<AsyncHTTP2Connection "
|
||||
f"streams={len(self.streams)} "
|
||||
f"closed={self._closed}>"
|
||||
)
|
||||
|
||||
|
||||
__all__ = ['AsyncHTTP2Connection']
|
||||
@ -11,6 +11,7 @@ import sys
|
||||
from gunicorn import http
|
||||
from gunicorn.http import wsgi
|
||||
from gunicorn import util
|
||||
from gunicorn import sock as gunicorn_sock
|
||||
from gunicorn.workers import base
|
||||
|
||||
ALREADY_HANDLED = object()
|
||||
@ -32,6 +33,14 @@ class AsyncWorker(base.Worker):
|
||||
def handle(self, listener, client, addr):
|
||||
req = None
|
||||
try:
|
||||
# Check if HTTP/2 was negotiated (for SSL connections)
|
||||
is_http2 = gunicorn_sock.is_http2_negotiated(client)
|
||||
|
||||
if is_http2:
|
||||
# Handle HTTP/2 connection
|
||||
self.handle_http2(listener, client, addr)
|
||||
return
|
||||
|
||||
parser = http.get_parser(self.cfg, client, addr)
|
||||
try:
|
||||
listener_name = listener.getsockname()
|
||||
@ -86,6 +95,102 @@ class AsyncWorker(base.Worker):
|
||||
finally:
|
||||
util.close(client)
|
||||
|
||||
def handle_http2(self, listener, client, addr):
|
||||
"""Handle an HTTP/2 connection.
|
||||
|
||||
Processes multiplexed HTTP/2 streams until the connection closes.
|
||||
"""
|
||||
listener_name = listener.getsockname()
|
||||
|
||||
try:
|
||||
h2_conn = http.get_parser(self.cfg, client, addr, http2_connection=True)
|
||||
h2_conn.initiate_connection()
|
||||
|
||||
while not h2_conn.is_closed and self.alive:
|
||||
try:
|
||||
requests = h2_conn.receive_data()
|
||||
except http.errors.NoMoreData:
|
||||
self.log.debug("HTTP/2 connection closed by client")
|
||||
break
|
||||
|
||||
for req in requests:
|
||||
try:
|
||||
self.handle_http2_request(listener_name, req, client, addr, h2_conn)
|
||||
except Exception as e:
|
||||
self.log.exception("Error handling HTTP/2 request")
|
||||
try:
|
||||
h2_conn.send_error(req.stream.stream_id, 500, str(e))
|
||||
except Exception:
|
||||
pass
|
||||
finally:
|
||||
h2_conn.cleanup_stream(req.stream.stream_id)
|
||||
|
||||
except ssl.SSLError as e:
|
||||
if e.args[0] == ssl.SSL_ERROR_EOF:
|
||||
self.log.debug("HTTP/2 SSL connection closed")
|
||||
else:
|
||||
self.log.debug("HTTP/2 SSL error: %s", e)
|
||||
except OSError as e:
|
||||
if e.errno not in (errno.EPIPE, errno.ECONNRESET, errno.ENOTCONN):
|
||||
self.log.exception("HTTP/2 socket error")
|
||||
except Exception as e:
|
||||
self.log.exception("HTTP/2 connection error: %s", e)
|
||||
|
||||
def handle_http2_request(self, listener_name, req, sock, addr, h2_conn):
|
||||
"""Handle a single HTTP/2 request."""
|
||||
stream_id = req.stream.stream_id
|
||||
request_start = datetime.now()
|
||||
environ = {}
|
||||
resp = None
|
||||
|
||||
try:
|
||||
self.cfg.pre_request(self, req)
|
||||
resp, environ = wsgi.create(req, sock, addr, listener_name, self.cfg)
|
||||
environ["wsgi.multithread"] = True
|
||||
environ["HTTP_VERSION"] = "2"
|
||||
|
||||
self.nr += 1
|
||||
if self.nr >= self.max_requests:
|
||||
if self.alive:
|
||||
self.log.info("Autorestarting worker after current request.")
|
||||
self.alive = False
|
||||
|
||||
# Run WSGI app
|
||||
respiter = self.wsgi(environ, resp.start_response)
|
||||
if self.is_already_handled(respiter):
|
||||
return
|
||||
|
||||
# Collect response body
|
||||
response_body = b''
|
||||
try:
|
||||
if hasattr(respiter, '__iter__'):
|
||||
for item in respiter:
|
||||
if item:
|
||||
response_body += item
|
||||
finally:
|
||||
if hasattr(respiter, "close"):
|
||||
respiter.close()
|
||||
|
||||
# Send response via HTTP/2
|
||||
h2_conn.send_response(
|
||||
stream_id,
|
||||
resp.status_code,
|
||||
resp.headers,
|
||||
response_body
|
||||
)
|
||||
|
||||
request_time = datetime.now() - request_start
|
||||
self.log.access(resp, req, environ, request_time)
|
||||
|
||||
except Exception:
|
||||
self.log.exception("Error handling HTTP/2 request")
|
||||
raise
|
||||
finally:
|
||||
try:
|
||||
self.cfg.post_request(self, req, environ, resp)
|
||||
except Exception:
|
||||
self.log.exception("Exception in post_request hook")
|
||||
|
||||
def handle_request(self, listener_name, req, sock, addr):
|
||||
request_start = datetime.now()
|
||||
environ = {}
|
||||
|
||||
@ -41,6 +41,7 @@ class TConn:
|
||||
self.timeout = None
|
||||
self.parser = None
|
||||
self.initialized = False
|
||||
self.is_http2 = False
|
||||
|
||||
# set the socket to non blocking
|
||||
self.sock.setblocking(False)
|
||||
@ -57,7 +58,21 @@ class TConn:
|
||||
if self.cfg.is_ssl:
|
||||
self.sock = sock.ssl_wrap_socket(self.sock, self.cfg)
|
||||
|
||||
# initialize the parser
|
||||
# Complete the handshake to ensure ALPN negotiation is done
|
||||
# (needed if do_handshake_on_connect is False)
|
||||
if not self.cfg.do_handshake_on_connect:
|
||||
self.sock.do_handshake()
|
||||
|
||||
# Check if HTTP/2 was negotiated via ALPN
|
||||
if sock.is_http2_negotiated(self.sock):
|
||||
self.is_http2 = True
|
||||
self.parser = http.get_parser(
|
||||
self.cfg, self.sock, self.client, http2_connection=True
|
||||
)
|
||||
self.parser.initiate_connection()
|
||||
return
|
||||
|
||||
# initialize the HTTP/1.x parser
|
||||
self.parser = http.get_parser(self.cfg, self.sock, self.client)
|
||||
|
||||
def set_timeout(self):
|
||||
@ -354,6 +369,10 @@ class ThreadWorker(base.Worker):
|
||||
# (ENOTCONN from ssl_wrap_socket would crash main thread otherwise)
|
||||
conn.init()
|
||||
|
||||
# HTTP/2 connections require special handling
|
||||
if conn.is_http2:
|
||||
return self.handle_http2(conn)
|
||||
|
||||
req = next(conn.parser)
|
||||
if not req:
|
||||
return False
|
||||
@ -391,6 +410,108 @@ class ThreadWorker(base.Worker):
|
||||
|
||||
return False
|
||||
|
||||
def handle_http2(self, conn):
|
||||
"""Handle an HTTP/2 connection. Runs in a worker thread.
|
||||
|
||||
HTTP/2 connections are persistent and multiplex multiple streams.
|
||||
We handle all streams until the connection is closed.
|
||||
|
||||
Returns:
|
||||
False (HTTP/2 connections don't use keepalive polling)
|
||||
"""
|
||||
h2_conn = conn.parser # HTTP2ServerConnection
|
||||
|
||||
try:
|
||||
while not h2_conn.is_closed and self.alive:
|
||||
# Receive data and get completed requests
|
||||
requests = h2_conn.receive_data()
|
||||
|
||||
for req in requests:
|
||||
try:
|
||||
self.handle_http2_request(req, conn, h2_conn)
|
||||
except Exception as e:
|
||||
self.log.exception("Error handling HTTP/2 request")
|
||||
try:
|
||||
h2_conn.send_error(req.stream.stream_id, 500, str(e))
|
||||
except Exception:
|
||||
pass
|
||||
finally:
|
||||
# Cleanup stream after processing
|
||||
h2_conn.cleanup_stream(req.stream.stream_id)
|
||||
|
||||
# Check if we need to close
|
||||
if not self.alive:
|
||||
h2_conn.close()
|
||||
break
|
||||
|
||||
except http.errors.NoMoreData:
|
||||
self.log.debug("HTTP/2 connection closed by client")
|
||||
except ssl.SSLError as e:
|
||||
if e.args[0] == ssl.SSL_ERROR_EOF:
|
||||
self.log.debug("HTTP/2 SSL connection closed")
|
||||
else:
|
||||
self.log.debug("HTTP/2 SSL error: %s", e)
|
||||
except OSError as e:
|
||||
if e.errno not in (errno.EPIPE, errno.ECONNRESET, errno.ENOTCONN):
|
||||
self.log.exception("HTTP/2 socket error")
|
||||
except Exception as e:
|
||||
self.log.exception("HTTP/2 connection error")
|
||||
|
||||
return False
|
||||
|
||||
def handle_http2_request(self, req, conn, h2_conn):
|
||||
"""Handle a single HTTP/2 request/stream."""
|
||||
environ = {}
|
||||
resp = None
|
||||
stream_id = req.stream.stream_id
|
||||
|
||||
try:
|
||||
self.cfg.pre_request(self, req)
|
||||
request_start = datetime.now()
|
||||
|
||||
# Create WSGI environ
|
||||
resp, environ = wsgi.create(req, conn.sock, conn.client,
|
||||
conn.server, self.cfg)
|
||||
environ["wsgi.multithread"] = True
|
||||
environ["HTTP_VERSION"] = "2" # Indicate HTTP/2
|
||||
|
||||
self.nr += 1
|
||||
if self.nr >= self.max_requests:
|
||||
if self.alive:
|
||||
self.log.info("Autorestarting worker after current request.")
|
||||
self.alive = False
|
||||
|
||||
# Run WSGI app
|
||||
respiter = self.wsgi(environ, resp.start_response)
|
||||
|
||||
# Collect response body
|
||||
response_body = b''
|
||||
try:
|
||||
if hasattr(respiter, '__iter__'):
|
||||
for item in respiter:
|
||||
if item:
|
||||
response_body += item
|
||||
finally:
|
||||
if hasattr(respiter, "close"):
|
||||
respiter.close()
|
||||
|
||||
# Send response via HTTP/2
|
||||
h2_conn.send_response(
|
||||
stream_id,
|
||||
resp.status_code,
|
||||
resp.headers,
|
||||
response_body
|
||||
)
|
||||
|
||||
request_time = datetime.now() - request_start
|
||||
self.log.access(resp, req, environ, request_time)
|
||||
|
||||
finally:
|
||||
try:
|
||||
self.cfg.post_request(self, req, environ, resp)
|
||||
except Exception:
|
||||
self.log.exception("Exception in post_request hook")
|
||||
|
||||
def handle_request(self, req, conn):
|
||||
environ = {}
|
||||
resp = None
|
||||
|
||||
@ -77,6 +77,14 @@ class TornadoWorker(Worker):
|
||||
self.alive = True
|
||||
self.server_alive = False
|
||||
|
||||
# Warn if HTTP/2 is requested - tornado worker doesn't support it
|
||||
if 'h2' in self.cfg.http_protocols:
|
||||
self.log.warning(
|
||||
"HTTP/2 is not supported by the tornado worker. "
|
||||
"Use gthread, gevent, eventlet, or asgi workers for HTTP/2 support. "
|
||||
"Falling back to HTTP/1.1 only."
|
||||
)
|
||||
|
||||
self.callbacks = []
|
||||
self.callbacks.append(PeriodicCallback(self.watchdog, 1000))
|
||||
self.callbacks.append(PeriodicCallback(self.heartbeat, 1000))
|
||||
|
||||
@ -114,6 +114,14 @@ class SyncWorker(base.Worker):
|
||||
# use the CPU for nothing. This minimal timeout prevent it.
|
||||
timeout = self.timeout or 0.5
|
||||
|
||||
# Warn if HTTP/2 is requested - sync worker doesn't support it
|
||||
if 'h2' in self.cfg.http_protocols:
|
||||
self.log.warning(
|
||||
"HTTP/2 is not supported by the sync worker. "
|
||||
"Use gthread, gevent, eventlet, or asgi workers for HTTP/2 support. "
|
||||
"Falling back to HTTP/1.1 only."
|
||||
)
|
||||
|
||||
# self.socket appears to lose its blocking status after
|
||||
# we fork in the arbiter. Reset it here.
|
||||
for s in self.sockets:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user