mirror of
https://github.com/frappe/gunicorn.git
synced 2026-07-01 18:21:30 +08:00
fix(http2): fix pylint errors in connection modules
- Reorder exception handlers: specific exceptions before ProtocolError - Extract _wait_for_flow_control_window() to reduce return statements - Refactor flow control waiting to avoid too-many-return-statements
This commit is contained in:
parent
49193223d5
commit
6cc20d3a71
@ -149,12 +149,9 @@ class AsyncHTTP2Connection:
|
||||
return []
|
||||
|
||||
# Feed data to h2
|
||||
# Note: Specific exceptions must come before ProtocolError (their parent class)
|
||||
try:
|
||||
events = self.h2_conn.receive_data(data)
|
||||
except _h2_exceptions.ProtocolError as e:
|
||||
# Send GOAWAY with PROTOCOL_ERROR before raising
|
||||
await self.close(error_code=HTTP2ErrorCode.PROTOCOL_ERROR)
|
||||
raise HTTP2ProtocolError(str(e))
|
||||
except _h2_exceptions.FlowControlError as e:
|
||||
# Send GOAWAY with FLOW_CONTROL_ERROR
|
||||
await self.close(error_code=HTTP2ErrorCode.FLOW_CONTROL_ERROR)
|
||||
@ -177,6 +174,10 @@ class AsyncHTTP2Connection:
|
||||
# Send GOAWAY with REFUSED_STREAM
|
||||
await self.close(error_code=HTTP2ErrorCode.REFUSED_STREAM)
|
||||
raise HTTP2ProtocolError(str(e))
|
||||
except _h2_exceptions.ProtocolError as e:
|
||||
# Send GOAWAY with PROTOCOL_ERROR before raising
|
||||
await self.close(error_code=HTTP2ErrorCode.PROTOCOL_ERROR)
|
||||
raise HTTP2ProtocolError(str(e))
|
||||
|
||||
# Process events
|
||||
completed_requests = []
|
||||
@ -389,6 +390,46 @@ class AsyncHTTP2Connection:
|
||||
self.cleanup_stream(stream_id)
|
||||
return False
|
||||
|
||||
async def _wait_for_flow_control_window(self, stream_id):
|
||||
"""Wait for flow control window to become positive.
|
||||
|
||||
Returns:
|
||||
int: Available window size, or -1 if waiting failed
|
||||
"""
|
||||
max_wait_attempts = 50 # ~5 seconds at 100ms per attempt
|
||||
for _ in range(max_wait_attempts):
|
||||
available = self.h2_conn.local_flow_control_window(stream_id)
|
||||
if available > 0:
|
||||
return available
|
||||
|
||||
# Read more data from connection (may receive WINDOW_UPDATE)
|
||||
try:
|
||||
incoming = await asyncio.wait_for(
|
||||
self.reader.read(self.READ_BUFFER_SIZE),
|
||||
timeout=0.1
|
||||
)
|
||||
if incoming:
|
||||
events = self.h2_conn.receive_data(incoming)
|
||||
# Process events but don't create new requests
|
||||
for event in events:
|
||||
if isinstance(event, _h2_events.StreamReset):
|
||||
if event.stream_id == stream_id:
|
||||
return -1
|
||||
elif isinstance(event, _h2_events.ConnectionTerminated):
|
||||
self._closed = True
|
||||
return -1
|
||||
await self._send_pending_data()
|
||||
else:
|
||||
# Connection closed
|
||||
self._closed = True
|
||||
return -1
|
||||
except asyncio.TimeoutError:
|
||||
continue
|
||||
except _h2_exceptions.ProtocolError:
|
||||
return -1
|
||||
|
||||
return self.h2_conn.local_flow_control_window(stream_id)
|
||||
|
||||
async def send_data(self, stream_id, data, end_stream=False):
|
||||
"""Send data on a stream.
|
||||
|
||||
@ -402,69 +443,24 @@ class AsyncHTTP2Connection:
|
||||
"""
|
||||
stream = self.streams.get(stream_id)
|
||||
if stream is None:
|
||||
# Stream was already cleaned up (reset/closed) - return gracefully
|
||||
return False
|
||||
|
||||
# Send data in chunks respecting flow control and max frame size
|
||||
data_to_send = data
|
||||
try:
|
||||
while data_to_send:
|
||||
# Get available window size for this stream
|
||||
available = self.h2_conn.local_flow_control_window(stream_id)
|
||||
# Also respect max frame size
|
||||
chunk_size = min(available, self.max_frame_size, len(data_to_send))
|
||||
|
||||
if chunk_size <= 0:
|
||||
# No window available - must wait for WINDOW_UPDATE
|
||||
# Per RFC 7540 Section 6.9.2, we must track negative windows
|
||||
# and wait for WINDOW_UPDATE to make the window positive
|
||||
# Wait for WINDOW_UPDATE per RFC 7540 Section 6.9.2
|
||||
await self._send_pending_data()
|
||||
|
||||
# Wait for incoming WINDOW_UPDATE by reading from connection
|
||||
# Use a reasonable timeout to avoid blocking forever
|
||||
max_wait_attempts = 50 # ~5 seconds at 100ms per attempt
|
||||
for _ in range(max_wait_attempts):
|
||||
available = self.h2_conn.local_flow_control_window(stream_id)
|
||||
if available > 0:
|
||||
break
|
||||
|
||||
# Read more data from connection (may receive WINDOW_UPDATE)
|
||||
try:
|
||||
incoming = await asyncio.wait_for(
|
||||
self.reader.read(self.READ_BUFFER_SIZE),
|
||||
timeout=0.1
|
||||
)
|
||||
if incoming:
|
||||
events = self.h2_conn.receive_data(incoming)
|
||||
# Process events but don't create new requests
|
||||
for event in events:
|
||||
if isinstance(event, _h2_events.StreamReset):
|
||||
if event.stream_id == stream_id:
|
||||
return False
|
||||
elif isinstance(event, _h2_events.ConnectionTerminated):
|
||||
self._closed = True
|
||||
return False
|
||||
await self._send_pending_data()
|
||||
else:
|
||||
# Connection closed
|
||||
self._closed = True
|
||||
return False
|
||||
except asyncio.TimeoutError:
|
||||
continue
|
||||
except _h2_exceptions.ProtocolError:
|
||||
return False
|
||||
|
||||
# Re-check window after waiting
|
||||
available = self.h2_conn.local_flow_control_window(stream_id)
|
||||
available = await self._wait_for_flow_control_window(stream_id)
|
||||
if available <= 0:
|
||||
# Still no window after waiting - give up
|
||||
return False
|
||||
chunk_size = min(available, self.max_frame_size, len(data_to_send))
|
||||
|
||||
chunk = data_to_send[:chunk_size]
|
||||
data_to_send = data_to_send[chunk_size:]
|
||||
|
||||
# Only set end_stream on the final chunk
|
||||
is_final = end_stream and len(data_to_send) == 0
|
||||
|
||||
self.h2_conn.send_data(stream_id, chunk, end_stream=is_final)
|
||||
@ -473,7 +469,6 @@ class AsyncHTTP2Connection:
|
||||
stream.send_data(data, end_stream=end_stream)
|
||||
return True
|
||||
except (_h2_exceptions.StreamClosedError, _h2_exceptions.FlowControlError):
|
||||
# Stream was reset by client or flow control error - clean up gracefully
|
||||
stream.close()
|
||||
self.cleanup_stream(stream_id)
|
||||
return False
|
||||
|
||||
@ -143,12 +143,9 @@ class HTTP2ServerConnection:
|
||||
return []
|
||||
|
||||
# Feed data to h2
|
||||
# Note: Specific exceptions must come before ProtocolError (their parent class)
|
||||
try:
|
||||
events = self.h2_conn.receive_data(data)
|
||||
except _h2_exceptions.ProtocolError as e:
|
||||
# Send GOAWAY with PROTOCOL_ERROR before raising
|
||||
self.close(error_code=HTTP2ErrorCode.PROTOCOL_ERROR)
|
||||
raise HTTP2ProtocolError(str(e))
|
||||
except _h2_exceptions.FlowControlError as e:
|
||||
# Send GOAWAY with FLOW_CONTROL_ERROR
|
||||
self.close(error_code=HTTP2ErrorCode.FLOW_CONTROL_ERROR)
|
||||
@ -171,6 +168,10 @@ class HTTP2ServerConnection:
|
||||
# Send GOAWAY with REFUSED_STREAM
|
||||
self.close(error_code=HTTP2ErrorCode.REFUSED_STREAM)
|
||||
raise HTTP2ProtocolError(str(e))
|
||||
except _h2_exceptions.ProtocolError as e:
|
||||
# Send GOAWAY with PROTOCOL_ERROR before raising
|
||||
self.close(error_code=HTTP2ErrorCode.PROTOCOL_ERROR)
|
||||
raise HTTP2ProtocolError(str(e))
|
||||
|
||||
# Process events
|
||||
completed_requests = []
|
||||
@ -426,6 +427,64 @@ class HTTP2ServerConnection:
|
||||
self.cleanup_stream(stream_id)
|
||||
return False
|
||||
|
||||
def _wait_for_flow_control_window(self, stream_id):
|
||||
"""Wait for flow control window to become positive.
|
||||
|
||||
Returns:
|
||||
int: Available window size, or -1 if waiting failed
|
||||
"""
|
||||
import selectors
|
||||
|
||||
max_wait_attempts = 50 # ~5 seconds at 100ms per attempt
|
||||
try:
|
||||
sel = selectors.DefaultSelector()
|
||||
sel.register(self.sock, selectors.EVENT_READ)
|
||||
except (TypeError, ValueError):
|
||||
# Socket doesn't support selectors (e.g., mock socket)
|
||||
return -1
|
||||
|
||||
result = -1
|
||||
try:
|
||||
for _ in range(max_wait_attempts):
|
||||
available = self.h2_conn.local_flow_control_window(stream_id)
|
||||
if available > 0:
|
||||
result = available
|
||||
break
|
||||
|
||||
ready = sel.select(timeout=0.1)
|
||||
if ready:
|
||||
try:
|
||||
incoming = self.sock.recv(self.READ_BUFFER_SIZE)
|
||||
except (OSError, IOError, _h2_exceptions.ProtocolError):
|
||||
break
|
||||
if not incoming:
|
||||
self._closed = True
|
||||
break
|
||||
try:
|
||||
events = self.h2_conn.receive_data(incoming)
|
||||
except _h2_exceptions.ProtocolError:
|
||||
break
|
||||
for event in events:
|
||||
if isinstance(event, _h2_events.StreamReset):
|
||||
if event.stream_id == stream_id:
|
||||
result = -1
|
||||
break
|
||||
elif isinstance(event, _h2_events.ConnectionTerminated):
|
||||
self._closed = True
|
||||
result = -1
|
||||
break
|
||||
else:
|
||||
self._send_pending_data()
|
||||
continue
|
||||
break # Break outer loop if inner loop broke
|
||||
else:
|
||||
# Loop completed without break - check final window
|
||||
result = self.h2_conn.local_flow_control_window(stream_id)
|
||||
finally:
|
||||
sel.close()
|
||||
|
||||
return result
|
||||
|
||||
def send_data(self, stream_id, data, end_stream=False):
|
||||
"""Send data on a stream.
|
||||
|
||||
@ -437,83 +496,26 @@ class HTTP2ServerConnection:
|
||||
Returns:
|
||||
bool: True if data sent, False if stream was already closed
|
||||
"""
|
||||
import selectors
|
||||
|
||||
stream = self.streams.get(stream_id)
|
||||
if stream is None:
|
||||
# Stream was already cleaned up (reset/closed) - return gracefully
|
||||
return False
|
||||
|
||||
# Send data in chunks respecting flow control and max frame size
|
||||
data_to_send = data
|
||||
try:
|
||||
while data_to_send:
|
||||
# Get available window size for this stream
|
||||
available = self.h2_conn.local_flow_control_window(stream_id)
|
||||
# Also respect max frame size
|
||||
chunk_size = min(available, self.max_frame_size, len(data_to_send))
|
||||
|
||||
if chunk_size <= 0:
|
||||
# No window available - must wait for WINDOW_UPDATE
|
||||
# Per RFC 7540 Section 6.9.2, we must track negative windows
|
||||
# and wait for WINDOW_UPDATE to make the window positive
|
||||
# Wait for WINDOW_UPDATE per RFC 7540 Section 6.9.2
|
||||
self._send_pending_data()
|
||||
|
||||
# Wait for incoming WINDOW_UPDATE by reading from connection
|
||||
# Use selectors for portable non-blocking wait
|
||||
max_wait_attempts = 50 # ~5 seconds at 100ms per attempt
|
||||
try:
|
||||
sel = selectors.DefaultSelector()
|
||||
sel.register(self.sock, selectors.EVENT_READ)
|
||||
except (TypeError, ValueError):
|
||||
# Socket doesn't support selectors (e.g., mock socket)
|
||||
# Fall back to returning False immediately
|
||||
return False
|
||||
|
||||
try:
|
||||
for _ in range(max_wait_attempts):
|
||||
available = self.h2_conn.local_flow_control_window(stream_id)
|
||||
if available > 0:
|
||||
break
|
||||
|
||||
# Check if socket has data ready
|
||||
ready = sel.select(timeout=0.1)
|
||||
if ready:
|
||||
try:
|
||||
incoming = self.sock.recv(self.READ_BUFFER_SIZE)
|
||||
if incoming:
|
||||
events = self.h2_conn.receive_data(incoming)
|
||||
# Process events but don't create new requests
|
||||
for event in events:
|
||||
if isinstance(event, _h2_events.StreamReset):
|
||||
if event.stream_id == stream_id:
|
||||
return False
|
||||
elif isinstance(event, _h2_events.ConnectionTerminated):
|
||||
self._closed = True
|
||||
return False
|
||||
self._send_pending_data()
|
||||
else:
|
||||
# Connection closed
|
||||
self._closed = True
|
||||
return False
|
||||
except (OSError, IOError):
|
||||
return False
|
||||
except _h2_exceptions.ProtocolError:
|
||||
return False
|
||||
finally:
|
||||
sel.close()
|
||||
|
||||
# Re-check window after waiting
|
||||
available = self.h2_conn.local_flow_control_window(stream_id)
|
||||
available = self._wait_for_flow_control_window(stream_id)
|
||||
if available <= 0:
|
||||
# Still no window after waiting - give up
|
||||
return False
|
||||
chunk_size = min(available, self.max_frame_size, len(data_to_send))
|
||||
|
||||
chunk = data_to_send[:chunk_size]
|
||||
data_to_send = data_to_send[chunk_size:]
|
||||
|
||||
# Only set end_stream on the final chunk
|
||||
is_final = end_stream and len(data_to_send) == 0
|
||||
|
||||
self.h2_conn.send_data(stream_id, chunk, end_stream=is_final)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user