From 6cc20d3a71c86d44f7d1399900f760df7a46170e Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Tue, 27 Jan 2026 16:57:39 +0100 Subject: [PATCH] fix(http2): fix pylint errors in connection modules - Reorder exception handlers: specific exceptions before ProtocolError - Extract _wait_for_flow_control_window() to reduce return statements - Refactor flow control waiting to avoid too-many-return-statements --- gunicorn/http2/async_connection.py | 99 +++++++++++----------- gunicorn/http2/connection.py | 128 +++++++++++++++-------------- 2 files changed, 112 insertions(+), 115 deletions(-) diff --git a/gunicorn/http2/async_connection.py b/gunicorn/http2/async_connection.py index 799f3fee..e2d67988 100644 --- a/gunicorn/http2/async_connection.py +++ b/gunicorn/http2/async_connection.py @@ -149,12 +149,9 @@ class AsyncHTTP2Connection: return [] # Feed data to h2 + # Note: Specific exceptions must come before ProtocolError (their parent class) try: events = self.h2_conn.receive_data(data) - except _h2_exceptions.ProtocolError as e: - # Send GOAWAY with PROTOCOL_ERROR before raising - await self.close(error_code=HTTP2ErrorCode.PROTOCOL_ERROR) - raise HTTP2ProtocolError(str(e)) except _h2_exceptions.FlowControlError as e: # Send GOAWAY with FLOW_CONTROL_ERROR await self.close(error_code=HTTP2ErrorCode.FLOW_CONTROL_ERROR) @@ -177,6 +174,10 @@ class AsyncHTTP2Connection: # Send GOAWAY with REFUSED_STREAM await self.close(error_code=HTTP2ErrorCode.REFUSED_STREAM) raise HTTP2ProtocolError(str(e)) + except _h2_exceptions.ProtocolError as e: + # Send GOAWAY with PROTOCOL_ERROR before raising + await self.close(error_code=HTTP2ErrorCode.PROTOCOL_ERROR) + raise HTTP2ProtocolError(str(e)) # Process events completed_requests = [] @@ -389,6 +390,46 @@ class AsyncHTTP2Connection: self.cleanup_stream(stream_id) return False + async def _wait_for_flow_control_window(self, stream_id): + """Wait for flow control window to become positive. + + Returns: + int: Available window size, or -1 if waiting failed + """ + max_wait_attempts = 50 # ~5 seconds at 100ms per attempt + for _ in range(max_wait_attempts): + available = self.h2_conn.local_flow_control_window(stream_id) + if available > 0: + return available + + # Read more data from connection (may receive WINDOW_UPDATE) + try: + incoming = await asyncio.wait_for( + self.reader.read(self.READ_BUFFER_SIZE), + timeout=0.1 + ) + if incoming: + events = self.h2_conn.receive_data(incoming) + # Process events but don't create new requests + for event in events: + if isinstance(event, _h2_events.StreamReset): + if event.stream_id == stream_id: + return -1 + elif isinstance(event, _h2_events.ConnectionTerminated): + self._closed = True + return -1 + await self._send_pending_data() + else: + # Connection closed + self._closed = True + return -1 + except asyncio.TimeoutError: + continue + except _h2_exceptions.ProtocolError: + return -1 + + return self.h2_conn.local_flow_control_window(stream_id) + async def send_data(self, stream_id, data, end_stream=False): """Send data on a stream. @@ -402,69 +443,24 @@ class AsyncHTTP2Connection: """ stream = self.streams.get(stream_id) if stream is None: - # Stream was already cleaned up (reset/closed) - return gracefully return False - # Send data in chunks respecting flow control and max frame size data_to_send = data try: while data_to_send: - # Get available window size for this stream available = self.h2_conn.local_flow_control_window(stream_id) - # Also respect max frame size chunk_size = min(available, self.max_frame_size, len(data_to_send)) if chunk_size <= 0: - # No window available - must wait for WINDOW_UPDATE - # Per RFC 7540 Section 6.9.2, we must track negative windows - # and wait for WINDOW_UPDATE to make the window positive + # Wait for WINDOW_UPDATE per RFC 7540 Section 6.9.2 await self._send_pending_data() - - # Wait for incoming WINDOW_UPDATE by reading from connection - # Use a reasonable timeout to avoid blocking forever - max_wait_attempts = 50 # ~5 seconds at 100ms per attempt - for _ in range(max_wait_attempts): - available = self.h2_conn.local_flow_control_window(stream_id) - if available > 0: - break - - # Read more data from connection (may receive WINDOW_UPDATE) - try: - incoming = await asyncio.wait_for( - self.reader.read(self.READ_BUFFER_SIZE), - timeout=0.1 - ) - if incoming: - events = self.h2_conn.receive_data(incoming) - # Process events but don't create new requests - for event in events: - if isinstance(event, _h2_events.StreamReset): - if event.stream_id == stream_id: - return False - elif isinstance(event, _h2_events.ConnectionTerminated): - self._closed = True - return False - await self._send_pending_data() - else: - # Connection closed - self._closed = True - return False - except asyncio.TimeoutError: - continue - except _h2_exceptions.ProtocolError: - return False - - # Re-check window after waiting - available = self.h2_conn.local_flow_control_window(stream_id) + available = await self._wait_for_flow_control_window(stream_id) if available <= 0: - # Still no window after waiting - give up return False chunk_size = min(available, self.max_frame_size, len(data_to_send)) chunk = data_to_send[:chunk_size] data_to_send = data_to_send[chunk_size:] - - # Only set end_stream on the final chunk is_final = end_stream and len(data_to_send) == 0 self.h2_conn.send_data(stream_id, chunk, end_stream=is_final) @@ -473,7 +469,6 @@ class AsyncHTTP2Connection: stream.send_data(data, end_stream=end_stream) return True except (_h2_exceptions.StreamClosedError, _h2_exceptions.FlowControlError): - # Stream was reset by client or flow control error - clean up gracefully stream.close() self.cleanup_stream(stream_id) return False diff --git a/gunicorn/http2/connection.py b/gunicorn/http2/connection.py index 8b8e39df..b332b2f1 100644 --- a/gunicorn/http2/connection.py +++ b/gunicorn/http2/connection.py @@ -143,12 +143,9 @@ class HTTP2ServerConnection: return [] # Feed data to h2 + # Note: Specific exceptions must come before ProtocolError (their parent class) try: events = self.h2_conn.receive_data(data) - except _h2_exceptions.ProtocolError as e: - # Send GOAWAY with PROTOCOL_ERROR before raising - self.close(error_code=HTTP2ErrorCode.PROTOCOL_ERROR) - raise HTTP2ProtocolError(str(e)) except _h2_exceptions.FlowControlError as e: # Send GOAWAY with FLOW_CONTROL_ERROR self.close(error_code=HTTP2ErrorCode.FLOW_CONTROL_ERROR) @@ -171,6 +168,10 @@ class HTTP2ServerConnection: # Send GOAWAY with REFUSED_STREAM self.close(error_code=HTTP2ErrorCode.REFUSED_STREAM) raise HTTP2ProtocolError(str(e)) + except _h2_exceptions.ProtocolError as e: + # Send GOAWAY with PROTOCOL_ERROR before raising + self.close(error_code=HTTP2ErrorCode.PROTOCOL_ERROR) + raise HTTP2ProtocolError(str(e)) # Process events completed_requests = [] @@ -426,6 +427,64 @@ class HTTP2ServerConnection: self.cleanup_stream(stream_id) return False + def _wait_for_flow_control_window(self, stream_id): + """Wait for flow control window to become positive. + + Returns: + int: Available window size, or -1 if waiting failed + """ + import selectors + + max_wait_attempts = 50 # ~5 seconds at 100ms per attempt + try: + sel = selectors.DefaultSelector() + sel.register(self.sock, selectors.EVENT_READ) + except (TypeError, ValueError): + # Socket doesn't support selectors (e.g., mock socket) + return -1 + + result = -1 + try: + for _ in range(max_wait_attempts): + available = self.h2_conn.local_flow_control_window(stream_id) + if available > 0: + result = available + break + + ready = sel.select(timeout=0.1) + if ready: + try: + incoming = self.sock.recv(self.READ_BUFFER_SIZE) + except (OSError, IOError, _h2_exceptions.ProtocolError): + break + if not incoming: + self._closed = True + break + try: + events = self.h2_conn.receive_data(incoming) + except _h2_exceptions.ProtocolError: + break + for event in events: + if isinstance(event, _h2_events.StreamReset): + if event.stream_id == stream_id: + result = -1 + break + elif isinstance(event, _h2_events.ConnectionTerminated): + self._closed = True + result = -1 + break + else: + self._send_pending_data() + continue + break # Break outer loop if inner loop broke + else: + # Loop completed without break - check final window + result = self.h2_conn.local_flow_control_window(stream_id) + finally: + sel.close() + + return result + def send_data(self, stream_id, data, end_stream=False): """Send data on a stream. @@ -437,83 +496,26 @@ class HTTP2ServerConnection: Returns: bool: True if data sent, False if stream was already closed """ - import selectors - stream = self.streams.get(stream_id) if stream is None: - # Stream was already cleaned up (reset/closed) - return gracefully return False - # Send data in chunks respecting flow control and max frame size data_to_send = data try: while data_to_send: - # Get available window size for this stream available = self.h2_conn.local_flow_control_window(stream_id) - # Also respect max frame size chunk_size = min(available, self.max_frame_size, len(data_to_send)) if chunk_size <= 0: - # No window available - must wait for WINDOW_UPDATE - # Per RFC 7540 Section 6.9.2, we must track negative windows - # and wait for WINDOW_UPDATE to make the window positive + # Wait for WINDOW_UPDATE per RFC 7540 Section 6.9.2 self._send_pending_data() - - # Wait for incoming WINDOW_UPDATE by reading from connection - # Use selectors for portable non-blocking wait - max_wait_attempts = 50 # ~5 seconds at 100ms per attempt - try: - sel = selectors.DefaultSelector() - sel.register(self.sock, selectors.EVENT_READ) - except (TypeError, ValueError): - # Socket doesn't support selectors (e.g., mock socket) - # Fall back to returning False immediately - return False - - try: - for _ in range(max_wait_attempts): - available = self.h2_conn.local_flow_control_window(stream_id) - if available > 0: - break - - # Check if socket has data ready - ready = sel.select(timeout=0.1) - if ready: - try: - incoming = self.sock.recv(self.READ_BUFFER_SIZE) - if incoming: - events = self.h2_conn.receive_data(incoming) - # Process events but don't create new requests - for event in events: - if isinstance(event, _h2_events.StreamReset): - if event.stream_id == stream_id: - return False - elif isinstance(event, _h2_events.ConnectionTerminated): - self._closed = True - return False - self._send_pending_data() - else: - # Connection closed - self._closed = True - return False - except (OSError, IOError): - return False - except _h2_exceptions.ProtocolError: - return False - finally: - sel.close() - - # Re-check window after waiting - available = self.h2_conn.local_flow_control_window(stream_id) + available = self._wait_for_flow_control_window(stream_id) if available <= 0: - # Still no window after waiting - give up return False chunk_size = min(available, self.max_frame_size, len(data_to_send)) chunk = data_to_send[:chunk_size] data_to_send = data_to_send[chunk_size:] - - # Only set end_stream on the final chunk is_final = end_stream and len(data_to_send) == 0 self.h2_conn.send_data(stream_id, chunk, end_stream=is_final)