# -*- encoding: utf-8 -*- import io import t import pytest from gunicorn import util from gunicorn.http.body import Body, LengthReader, EOFReader from gunicorn.http.wsgi import Response from gunicorn.http.unreader import Unreader, IterUnreader, SocketUnreader from gunicorn.http.errors import InvalidHeader, InvalidHeaderName try: import unittest.mock as mock except ImportError: import mock def assert_readline(payload, size, expected): body = Body(io.BytesIO(payload)) assert body.readline(size) == expected def test_readline_empty_body(): assert_readline(b"", None, b"") assert_readline(b"", 1, b"") def test_readline_zero_size(): assert_readline(b"abc", 0, b"") assert_readline(b"\n", 0, b"") def test_readline_new_line_before_size(): body = Body(io.BytesIO(b"abc\ndef")) assert body.readline(4) == b"abc\n" assert body.readline() == b"def" def test_readline_new_line_after_size(): body = Body(io.BytesIO(b"abc\ndef")) assert body.readline(2) == b"ab" assert body.readline() == b"c\n" def test_readline_no_new_line(): body = Body(io.BytesIO(b"abcdef")) assert body.readline() == b"abcdef" body = Body(io.BytesIO(b"abcdef")) assert body.readline(2) == b"ab" assert body.readline(2) == b"cd" assert body.readline(2) == b"ef" def test_readline_buffer_loaded(): reader = io.BytesIO(b"abc\ndef") body = Body(reader) body.read(1) # load internal buffer reader.write(b"g\nhi") reader.seek(7) assert body.readline() == b"bc\n" assert body.readline() == b"defg\n" assert body.readline() == b"hi" def test_readline_buffer_loaded_with_size(): body = Body(io.BytesIO(b"abc\ndef")) body.read(1) # load internal buffer assert body.readline(2) == b"bc" assert body.readline(2) == b"\n" assert body.readline(2) == b"de" assert body.readline(2) == b"f" def test_http_header_encoding(): """ tests whether http response headers are USASCII encoded """ mocked_socket = mock.MagicMock() mocked_socket.sendall = mock.MagicMock() mocked_request = mock.MagicMock() response = Response(mocked_request, mocked_socket, None) # set umlaut header response.headers.append(('foo', 'häder')) with pytest.raises(UnicodeEncodeError): response.send_headers() # build our own header_str to compare against tosend = response.default_headers() tosend.extend(["%s: %s\r\n" % (k, v) for k, v in response.headers]) header_str = "%s\r\n" % "".join(tosend) with pytest.raises(UnicodeEncodeError): mocked_socket.sendall(util.to_bytestring(header_str, "ascii")) def test_http_invalid_response_header(): """ tests whether http response headers are contains control chars """ mocked_socket = mock.MagicMock() mocked_socket.sendall = mock.MagicMock() mocked_request = mock.MagicMock() response = Response(mocked_request, mocked_socket, None) with pytest.raises(InvalidHeader): response.start_response("200 OK", [('foo', 'essai\r\n')]) response = Response(mocked_request, mocked_socket, None) with pytest.raises(InvalidHeaderName): response.start_response("200 OK", [('foo\r\n', 'essai')]) def test_unreader_read_when_size_is_none(): unreader = Unreader() unreader.chunk = mock.MagicMock(side_effect=[b'qwerty', b'123456', b'']) assert unreader.read(size=None) == b'qwerty' assert unreader.read(size=None) == b'123456' assert unreader.read(size=None) == b'' def test_unreader_unread(): unreader = Unreader() unreader.unread(b'hi there') assert b'hi there' in unreader.read() def test_unreader_read_zero_size(): unreader = Unreader() unreader.chunk = mock.MagicMock(side_effect=[b'qwerty', b'asdfgh']) assert unreader.read(size=0) == b'' def test_unreader_read_with_nonzero_size(): unreader = Unreader() unreader.chunk = mock.MagicMock(side_effect=[ b'qwerty', b'asdfgh', b'zxcvbn', b'123456', b'', b'' ]) assert unreader.read(size=5) == b'qwert' assert unreader.read(size=5) == b'yasdf' assert unreader.read(size=5) == b'ghzxc' assert unreader.read(size=5) == b'vbn12' assert unreader.read(size=5) == b'3456' assert unreader.read(size=5) == b'' def test_unreader_raises_excpetion_on_invalid_size(): unreader = Unreader() with pytest.raises(TypeError): unreader.read(size='foobar') with pytest.raises(TypeError): unreader.read(size=3.14) with pytest.raises(TypeError): unreader.read(size=[]) def test_iter_unreader_chunk(): iter_unreader = IterUnreader((b'ab', b'cd', b'ef')) assert iter_unreader.chunk() == b'ab' assert iter_unreader.chunk() == b'cd' assert iter_unreader.chunk() == b'ef' assert iter_unreader.chunk() == b'' assert iter_unreader.chunk() == b'' def test_socket_unreader_chunk(): fake_sock = t.FakeSocket(io.BytesIO(b'Lorem ipsum dolor')) sock_unreader = SocketUnreader(fake_sock, max_chunk=5) assert sock_unreader.chunk() == b'Lorem' assert sock_unreader.chunk() == b' ipsu' assert sock_unreader.chunk() == b'm dol' assert sock_unreader.chunk() == b'or' assert sock_unreader.chunk() == b'' def test_length_reader_read(): unreader = IterUnreader((b'Lorem', b'ipsum', b'dolor', b'sit', b'amet')) reader = LengthReader(unreader, 13) assert reader.read(0) == b'' assert reader.read(5) == b'Lorem' assert reader.read(6) == b'ipsumd' assert reader.read(4) == b'ol' assert reader.read(100) == b'' reader = LengthReader(unreader, 10) assert reader.read(0) == b'' assert reader.read(5) == b'orsit' assert reader.read(5) == b'amet' assert reader.read(100) == b'' def test_length_reader_read_invalid_size(): reader = LengthReader(None, 5) with pytest.raises(TypeError): reader.read('100') with pytest.raises(TypeError): reader.read([100]) with pytest.raises(ValueError): reader.read(-100) def test_eof_reader_read(): unreader = IterUnreader((b'Lorem', b'ipsum', b'dolor', b'sit', b'amet')) reader = EOFReader(unreader) assert reader.read(0) == b'' assert reader.read(5) == b'Lorem' assert reader.read(5) == b'ipsum' assert reader.read(3) == b'dol' assert reader.read(3) == b'ors' assert reader.read(100) == b'itamet' assert reader.read(100) == b'' def test_eof_reader_read_invalid_size(): reader = EOFReader(None) with pytest.raises(TypeError): reader.read('100') with pytest.raises(TypeError): reader.read([100]) with pytest.raises(ValueError): reader.read(-100)