mirror of
https://github.com/frappe/gunicorn.git
synced 2026-07-01 10:11:30 +08:00
- WSGI fast parser now applies the same per-header policy as the Python parser (Expect, secure_scheme_headers, forwarded_allow_ips trust gate, forwarder_headers / header_map). Shared helpers extracted on Message. - ASGI keepalive no longer resets the parser when the previous request body was not fully framed; the connection closes instead, preventing request smuggling on pipelined connections. - BodyReceiver._wait_for_data timeout flips _closed and yields http.disconnect rather than synthesizing more_body=False. Timeout honors cfg.timeout. - ASGI chunked encoding now skips HEAD, 204, and 304 (matches Response.is_chunked in the WSGI path) via a small helper. - _setup_callback_parser passes proxy_protocol to PythonProtocol; auto falls back to the Python parser when proxy_protocol != off (the C parser does not implement PROXY framing). _effective_peername swaps the transport peer with the PROXY-supplied client address. - Parser.finish_body accepts a deadline and a 64KiB byte cap; gthread passes a deadline and abandons keepalive on incomplete drain so a stalled client cannot tie up a worker thread.
336 lines
9.9 KiB
Python
336 lines
9.9 KiB
Python
#
|
|
# This file is part of gunicorn released under the MIT license.
|
|
# See the NOTICE for more information.
|
|
|
|
import io
|
|
import t
|
|
import pytest
|
|
from unittest import mock
|
|
|
|
from gunicorn import util
|
|
from gunicorn.http.body import Body, LengthReader, EOFReader
|
|
from gunicorn.http.wsgi import FileWrapper, Response
|
|
from gunicorn.http.unreader import Unreader, IterUnreader, SocketUnreader
|
|
from gunicorn.http.errors import InvalidHeader, InvalidHeaderName, InvalidHTTPVersion
|
|
from gunicorn.http.message import TOKEN_RE
|
|
|
|
|
|
def test_method_pattern():
|
|
assert TOKEN_RE.fullmatch("GET")
|
|
assert TOKEN_RE.fullmatch("MKCALENDAR")
|
|
assert not TOKEN_RE.fullmatch("GET:")
|
|
assert not TOKEN_RE.fullmatch("GET;")
|
|
RFC9110_5_6_2_TOKEN_DELIM = r'"(),/:;<=>?@[\]{}'
|
|
for bad_char in RFC9110_5_6_2_TOKEN_DELIM:
|
|
assert not TOKEN_RE.match(bad_char)
|
|
|
|
|
|
def assert_readline(payload, size, expected):
|
|
body = Body(io.BytesIO(payload))
|
|
assert body.readline(size) == expected
|
|
|
|
|
|
def test_readline_empty_body():
|
|
assert_readline(b"", None, b"")
|
|
assert_readline(b"", 1, b"")
|
|
|
|
|
|
def test_readline_zero_size():
|
|
assert_readline(b"abc", 0, b"")
|
|
assert_readline(b"\n", 0, b"")
|
|
|
|
|
|
def test_readline_new_line_before_size():
|
|
body = Body(io.BytesIO(b"abc\ndef"))
|
|
assert body.readline(4) == b"abc\n"
|
|
assert body.readline() == b"def"
|
|
|
|
|
|
def test_readline_new_line_after_size():
|
|
body = Body(io.BytesIO(b"abc\ndef"))
|
|
assert body.readline(2) == b"ab"
|
|
assert body.readline() == b"c\n"
|
|
|
|
|
|
def test_readline_no_new_line():
|
|
body = Body(io.BytesIO(b"abcdef"))
|
|
assert body.readline() == b"abcdef"
|
|
body = Body(io.BytesIO(b"abcdef"))
|
|
assert body.readline(2) == b"ab"
|
|
assert body.readline(2) == b"cd"
|
|
assert body.readline(2) == b"ef"
|
|
|
|
|
|
def test_readline_buffer_loaded():
|
|
reader = io.BytesIO(b"abc\ndef")
|
|
body = Body(reader)
|
|
body.read(1) # load internal buffer
|
|
reader.write(b"g\nhi")
|
|
reader.seek(7)
|
|
assert body.readline() == b"bc\n"
|
|
assert body.readline() == b"defg\n"
|
|
assert body.readline() == b"hi"
|
|
|
|
|
|
def test_readline_buffer_loaded_with_size():
|
|
body = Body(io.BytesIO(b"abc\ndef"))
|
|
body.read(1) # load internal buffer
|
|
assert body.readline(2) == b"bc"
|
|
assert body.readline(2) == b"\n"
|
|
assert body.readline(2) == b"de"
|
|
assert body.readline(2) == b"f"
|
|
|
|
|
|
def test_http_header_encoding():
|
|
""" tests whether http response headers are USASCII encoded """
|
|
|
|
mocked_socket = mock.MagicMock()
|
|
mocked_socket.sendall = mock.MagicMock()
|
|
|
|
mocked_request = mock.MagicMock()
|
|
response = Response(mocked_request, mocked_socket, None)
|
|
|
|
# set umlaut header value - latin-1 is OK
|
|
response.headers.append(('foo', 'häder'))
|
|
response.send_headers()
|
|
|
|
# set a-breve header value - unicode, non-latin-1 fails
|
|
response = Response(mocked_request, mocked_socket, None)
|
|
response.headers.append(('apple', 'măr'))
|
|
with pytest.raises(UnicodeEncodeError):
|
|
response.send_headers()
|
|
|
|
# build our own header_str to compare against
|
|
tosend = response.default_headers()
|
|
tosend.extend(["%s: %s\r\n" % (k, v) for k, v in response.headers])
|
|
header_str = "%s\r\n" % "".join(tosend)
|
|
|
|
with pytest.raises(UnicodeEncodeError):
|
|
mocked_socket.sendall(util.to_bytestring(header_str, "ascii"))
|
|
|
|
|
|
def test_http_invalid_response_header():
|
|
""" tests whether http response headers are contains control chars """
|
|
|
|
mocked_socket = mock.MagicMock()
|
|
mocked_socket.sendall = mock.MagicMock()
|
|
|
|
mocked_request = mock.MagicMock()
|
|
response = Response(mocked_request, mocked_socket, None)
|
|
|
|
with pytest.raises(InvalidHeader):
|
|
response.start_response("200 OK", [('foo', 'essai\r\n')])
|
|
|
|
response = Response(mocked_request, mocked_socket, None)
|
|
with pytest.raises(InvalidHeaderName):
|
|
response.start_response("200 OK", [('foo\r\n', 'essai')])
|
|
|
|
|
|
def test_unreader_read_when_size_is_none():
|
|
unreader = Unreader()
|
|
unreader.chunk = mock.MagicMock(side_effect=[b'qwerty', b'123456', b''])
|
|
|
|
assert unreader.read(size=None) == b'qwerty'
|
|
assert unreader.read(size=None) == b'123456'
|
|
assert unreader.read(size=None) == b''
|
|
|
|
|
|
def test_unreader_unread():
|
|
unreader = Unreader()
|
|
unreader.unread(b'hi there')
|
|
assert b'hi there' in unreader.read()
|
|
|
|
|
|
def test_unreader_unread_should_place_data_at_the_beginning_of_the_buffer():
|
|
unreader = IterUnreader([b"abc", b"def"])
|
|
ab = unreader.read(2)
|
|
unreader.unread(ab)
|
|
|
|
assert unreader.read(None) == b"abc"
|
|
|
|
|
|
def test_unreader_read_zero_size():
|
|
unreader = Unreader()
|
|
unreader.chunk = mock.MagicMock(side_effect=[b'qwerty', b'asdfgh'])
|
|
|
|
assert unreader.read(size=0) == b''
|
|
|
|
|
|
def test_unreader_read_with_nonzero_size():
|
|
unreader = Unreader()
|
|
unreader.chunk = mock.MagicMock(side_effect=[
|
|
b'qwerty', b'asdfgh', b'zxcvbn', b'123456', b'', b''
|
|
])
|
|
|
|
assert unreader.read(size=5) == b'qwert'
|
|
assert unreader.read(size=5) == b'yasdf'
|
|
assert unreader.read(size=5) == b'ghzxc'
|
|
assert unreader.read(size=5) == b'vbn12'
|
|
assert unreader.read(size=5) == b'3456'
|
|
assert unreader.read(size=5) == b''
|
|
|
|
|
|
def test_unreader_raises_excpetion_on_invalid_size():
|
|
unreader = Unreader()
|
|
with pytest.raises(TypeError):
|
|
unreader.read(size='foobar')
|
|
with pytest.raises(TypeError):
|
|
unreader.read(size=3.14)
|
|
with pytest.raises(TypeError):
|
|
unreader.read(size=[])
|
|
|
|
|
|
def test_iter_unreader_chunk():
|
|
iter_unreader = IterUnreader((b'ab', b'cd', b'ef'))
|
|
|
|
assert iter_unreader.chunk() == b'ab'
|
|
assert iter_unreader.chunk() == b'cd'
|
|
assert iter_unreader.chunk() == b'ef'
|
|
assert iter_unreader.chunk() == b''
|
|
assert iter_unreader.chunk() == b''
|
|
|
|
|
|
def test_socket_unreader_chunk():
|
|
fake_sock = t.FakeSocket(io.BytesIO(b'Lorem ipsum dolor'))
|
|
sock_unreader = SocketUnreader(fake_sock, max_chunk=5)
|
|
|
|
assert sock_unreader.chunk() == b'Lorem'
|
|
assert sock_unreader.chunk() == b' ipsu'
|
|
assert sock_unreader.chunk() == b'm dol'
|
|
assert sock_unreader.chunk() == b'or'
|
|
assert sock_unreader.chunk() == b''
|
|
|
|
|
|
def test_length_reader_read():
|
|
unreader = IterUnreader((b'Lorem', b'ipsum', b'dolor', b'sit', b'amet'))
|
|
reader = LengthReader(unreader, 13)
|
|
assert reader.read(0) == b''
|
|
assert reader.read(5) == b'Lorem'
|
|
assert reader.read(6) == b'ipsumd'
|
|
assert reader.read(4) == b'ol'
|
|
assert reader.read(100) == b''
|
|
|
|
reader = LengthReader(unreader, 10)
|
|
assert reader.read(0) == b''
|
|
assert reader.read(5) == b'orsit'
|
|
assert reader.read(5) == b'amet'
|
|
assert reader.read(100) == b''
|
|
|
|
|
|
def test_length_reader_read_invalid_size():
|
|
reader = LengthReader(None, 5)
|
|
with pytest.raises(TypeError):
|
|
reader.read('100')
|
|
with pytest.raises(TypeError):
|
|
reader.read([100])
|
|
with pytest.raises(ValueError):
|
|
reader.read(-100)
|
|
|
|
|
|
def test_eof_reader_read():
|
|
unreader = IterUnreader((b'Lorem', b'ipsum', b'dolor', b'sit', b'amet'))
|
|
reader = EOFReader(unreader)
|
|
|
|
assert reader.read(0) == b''
|
|
assert reader.read(5) == b'Lorem'
|
|
assert reader.read(5) == b'ipsum'
|
|
assert reader.read(3) == b'dol'
|
|
assert reader.read(3) == b'ors'
|
|
assert reader.read(100) == b'itamet'
|
|
assert reader.read(100) == b''
|
|
|
|
|
|
def test_eof_reader_read_invalid_size():
|
|
reader = EOFReader(None)
|
|
with pytest.raises(TypeError):
|
|
reader.read('100')
|
|
with pytest.raises(TypeError):
|
|
reader.read([100])
|
|
with pytest.raises(ValueError):
|
|
reader.read(-100)
|
|
|
|
|
|
def test_invalid_http_version_error():
|
|
assert str(InvalidHTTPVersion('foo')) == "Invalid HTTP Version: 'foo'"
|
|
assert str(InvalidHTTPVersion((2, 1))) == 'Invalid HTTP Version: (2, 1)'
|
|
|
|
|
|
def _build_request_parser(payload):
|
|
"""Construct a RequestParser that drains the given bytes."""
|
|
from gunicorn.config import Config
|
|
from gunicorn.http.parser import RequestParser
|
|
|
|
cfg = Config()
|
|
parser = RequestParser(cfg, iter([payload]), None)
|
|
next(iter(parser))
|
|
return parser
|
|
|
|
|
|
def test_finish_body_drains_remainder():
|
|
payload = (
|
|
b"POST / HTTP/1.1\r\n"
|
|
b"Host: example.com\r\n"
|
|
b"Content-Length: 5\r\n"
|
|
b"\r\n"
|
|
b"hello"
|
|
)
|
|
parser = _build_request_parser(payload)
|
|
assert parser.finish_body() is True
|
|
|
|
|
|
def test_finish_body_returns_false_when_byte_cap_exceeded():
|
|
body = b"x" * (4096)
|
|
payload = (
|
|
b"POST / HTTP/1.1\r\n"
|
|
b"Host: example.com\r\n"
|
|
b"Content-Length: %d\r\n\r\n%s" % (len(body), body)
|
|
)
|
|
parser = _build_request_parser(payload)
|
|
assert parser.finish_body(max_bytes=512) is False
|
|
|
|
|
|
def test_finish_body_returns_false_on_expired_deadline():
|
|
payload = (
|
|
b"POST / HTTP/1.1\r\n"
|
|
b"Host: example.com\r\n"
|
|
b"Content-Length: 100\r\n"
|
|
b"\r\n"
|
|
b"only-partial"
|
|
)
|
|
parser = _build_request_parser(payload)
|
|
# Force an already-elapsed deadline; the drain must abandon immediately.
|
|
import time as _time
|
|
expired = _time.monotonic() - 1.0
|
|
# IterUnreader has no socket; deadline path is exercised only when sock
|
|
# is present. Stub a sock with gettimeout/settimeout to drive the branch.
|
|
from unittest import mock
|
|
sock = mock.Mock()
|
|
sock.gettimeout.return_value = None
|
|
parser.unreader.sock = sock
|
|
assert parser.finish_body(deadline=expired) is False
|
|
sock.settimeout.assert_called_with(None)
|
|
|
|
|
|
def test_file_wrapper_iterable():
|
|
"""FileWrapper should support the iterator protocol per PEP 3333."""
|
|
filelike = io.BytesIO(b"hello world")
|
|
wrapper = FileWrapper(filelike, blksize=5)
|
|
|
|
# Should be iterable
|
|
assert hasattr(wrapper, '__iter__')
|
|
assert hasattr(wrapper, '__next__')
|
|
assert iter(wrapper) is wrapper
|
|
|
|
# Should yield chunks via next()
|
|
assert next(wrapper) == b"hello"
|
|
assert next(wrapper) == b" worl"
|
|
assert next(wrapper) == b"d"
|
|
with pytest.raises(StopIteration):
|
|
next(wrapper)
|
|
|
|
# Also works with for loop
|
|
filelike2 = io.BytesIO(b"abc")
|
|
wrapper2 = FileWrapper(filelike2, blksize=2)
|
|
chunks = list(wrapper2)
|
|
assert chunks == [b"ab", b"c"]
|