gunicorn/tests/test_dirty_tlv.py
Benoit Chesneau 68ce658f5d fix(dirty): convert dict int keys to strings in TLV encoder
JSON serializes all dict keys as strings, so for compatibility the TLV
encoder should do the same. This fixes an error when tasks return dicts
with integer keys (e.g., aggregation results grouped by numeric ID).
2026-02-11 23:39:53 +01:00

555 lines
18 KiB
Python

#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
"""Tests for dirty TLV binary encoder/decoder."""
import math
import struct
import pytest
from gunicorn.dirty.tlv import (
TLVEncoder,
TYPE_NONE,
TYPE_BOOL,
TYPE_INT64,
TYPE_FLOAT64,
TYPE_BYTES,
TYPE_STRING,
TYPE_LIST,
TYPE_DICT,
MAX_STRING_SIZE,
MAX_BYTES_SIZE,
MAX_LIST_SIZE,
MAX_DICT_SIZE,
)
from gunicorn.dirty.errors import DirtyProtocolError
class TestTLVEncoderBasicTypes:
"""Tests for basic type encoding/decoding."""
def test_encode_decode_none(self):
"""Test None encoding/decoding."""
encoded = TLVEncoder.encode(None)
assert encoded == bytes([TYPE_NONE])
value, offset = TLVEncoder.decode(encoded, 0)
assert value is None
assert offset == 1
def test_encode_decode_true(self):
"""Test True encoding/decoding."""
encoded = TLVEncoder.encode(True)
assert encoded == bytes([TYPE_BOOL, 0x01])
value, offset = TLVEncoder.decode(encoded, 0)
assert value is True
assert offset == 2
def test_encode_decode_false(self):
"""Test False encoding/decoding."""
encoded = TLVEncoder.encode(False)
assert encoded == bytes([TYPE_BOOL, 0x00])
value, offset = TLVEncoder.decode(encoded, 0)
assert value is False
assert offset == 2
def test_encode_decode_positive_int(self):
"""Test positive integer encoding/decoding."""
encoded = TLVEncoder.encode(42)
assert encoded[0] == TYPE_INT64
assert len(encoded) == 9 # 1 type + 8 value
value, offset = TLVEncoder.decode(encoded, 0)
assert value == 42
assert offset == 9
def test_encode_decode_negative_int(self):
"""Test negative integer encoding/decoding."""
encoded = TLVEncoder.encode(-12345)
value, offset = TLVEncoder.decode(encoded, 0)
assert value == -12345
def test_encode_decode_large_int(self):
"""Test large integer encoding/decoding."""
large_val = 2**62
encoded = TLVEncoder.encode(large_val)
value, offset = TLVEncoder.decode(encoded, 0)
assert value == large_val
def test_encode_decode_zero(self):
"""Test zero encoding/decoding."""
encoded = TLVEncoder.encode(0)
value, offset = TLVEncoder.decode(encoded, 0)
assert value == 0
def test_encode_decode_float(self):
"""Test float encoding/decoding."""
encoded = TLVEncoder.encode(3.14159)
assert encoded[0] == TYPE_FLOAT64
assert len(encoded) == 9 # 1 type + 8 value
value, offset = TLVEncoder.decode(encoded, 0)
assert abs(value - 3.14159) < 1e-10
def test_encode_decode_negative_float(self):
"""Test negative float encoding/decoding."""
encoded = TLVEncoder.encode(-273.15)
value, offset = TLVEncoder.decode(encoded, 0)
assert abs(value - (-273.15)) < 1e-10
def test_encode_decode_float_infinity(self):
"""Test infinity encoding/decoding."""
encoded = TLVEncoder.encode(float('inf'))
value, offset = TLVEncoder.decode(encoded, 0)
assert value == float('inf')
def test_encode_decode_float_nan(self):
"""Test NaN encoding/decoding."""
encoded = TLVEncoder.encode(float('nan'))
value, offset = TLVEncoder.decode(encoded, 0)
assert math.isnan(value)
class TestTLVEncoderBytes:
"""Tests for bytes encoding/decoding."""
def test_encode_decode_empty_bytes(self):
"""Test empty bytes encoding/decoding."""
encoded = TLVEncoder.encode(b"")
assert encoded[0] == TYPE_BYTES
value, offset = TLVEncoder.decode(encoded, 0)
assert value == b""
def test_encode_decode_bytes(self):
"""Test bytes encoding/decoding."""
data = b"\x00\x01\x02\xff\xfe\xfd"
encoded = TLVEncoder.encode(data)
value, offset = TLVEncoder.decode(encoded, 0)
assert value == data
def test_encode_decode_large_bytes(self):
"""Test large bytes encoding/decoding."""
data = b"x" * 10000
encoded = TLVEncoder.encode(data)
value, offset = TLVEncoder.decode(encoded, 0)
assert value == data
def test_bytes_too_large(self):
"""Test that bytes exceeding max size raises error."""
# We won't actually allocate MAX_BYTES_SIZE, just check the encoding
with pytest.raises(DirtyProtocolError) as exc_info:
TLVEncoder.encode(b"x" * (MAX_BYTES_SIZE + 1))
assert "too large" in str(exc_info.value).lower()
class TestTLVEncoderString:
"""Tests for string encoding/decoding."""
def test_encode_decode_empty_string(self):
"""Test empty string encoding/decoding."""
encoded = TLVEncoder.encode("")
assert encoded[0] == TYPE_STRING
value, offset = TLVEncoder.decode(encoded, 0)
assert value == ""
def test_encode_decode_ascii_string(self):
"""Test ASCII string encoding/decoding."""
encoded = TLVEncoder.encode("hello world")
value, offset = TLVEncoder.decode(encoded, 0)
assert value == "hello world"
def test_encode_decode_unicode_string(self):
"""Test Unicode string encoding/decoding."""
text = "Hello, world! \u00a9 \u2603 \U0001F600"
encoded = TLVEncoder.encode(text)
value, offset = TLVEncoder.decode(encoded, 0)
assert value == text
def test_encode_decode_chinese(self):
"""Test Chinese characters encoding/decoding."""
text = "Hello, world!"
encoded = TLVEncoder.encode(text)
value, offset = TLVEncoder.decode(encoded, 0)
assert value == text
def test_encode_decode_emoji(self):
"""Test emoji encoding/decoding."""
text = "Test emoji"
encoded = TLVEncoder.encode(text)
value, offset = TLVEncoder.decode(encoded, 0)
assert value == text
def test_encode_decode_large_string(self):
"""Test large string encoding/decoding."""
text = "x" * 10000
encoded = TLVEncoder.encode(text)
value, offset = TLVEncoder.decode(encoded, 0)
assert value == text
class TestTLVEncoderList:
"""Tests for list encoding/decoding."""
def test_encode_decode_empty_list(self):
"""Test empty list encoding/decoding."""
encoded = TLVEncoder.encode([])
assert encoded[0] == TYPE_LIST
value, offset = TLVEncoder.decode(encoded, 0)
assert value == []
def test_encode_decode_simple_list(self):
"""Test simple list encoding/decoding."""
data = [1, 2, 3]
encoded = TLVEncoder.encode(data)
value, offset = TLVEncoder.decode(encoded, 0)
assert value == data
def test_encode_decode_mixed_list(self):
"""Test mixed type list encoding/decoding."""
data = [1, "hello", 3.14, True, None, b"bytes"]
encoded = TLVEncoder.encode(data)
value, offset = TLVEncoder.decode(encoded, 0)
assert value == data
def test_encode_decode_nested_list(self):
"""Test nested list encoding/decoding."""
data = [[1, 2], [3, [4, 5]], ["a", "b"]]
encoded = TLVEncoder.encode(data)
value, offset = TLVEncoder.decode(encoded, 0)
assert value == data
def test_encode_decode_tuple_as_list(self):
"""Test that tuples are encoded as lists."""
data = (1, 2, 3)
encoded = TLVEncoder.encode(data)
value, offset = TLVEncoder.decode(encoded, 0)
assert value == [1, 2, 3] # Decoded as list
def test_encode_decode_large_list(self):
"""Test large list encoding/decoding."""
data = list(range(1000))
encoded = TLVEncoder.encode(data)
value, offset = TLVEncoder.decode(encoded, 0)
assert value == data
class TestTLVEncoderDict:
"""Tests for dict encoding/decoding."""
def test_encode_decode_empty_dict(self):
"""Test empty dict encoding/decoding."""
encoded = TLVEncoder.encode({})
assert encoded[0] == TYPE_DICT
value, offset = TLVEncoder.decode(encoded, 0)
assert value == {}
def test_encode_decode_simple_dict(self):
"""Test simple dict encoding/decoding."""
data = {"a": 1, "b": 2, "c": 3}
encoded = TLVEncoder.encode(data)
value, offset = TLVEncoder.decode(encoded, 0)
assert value == data
def test_encode_decode_mixed_values_dict(self):
"""Test dict with mixed value types."""
data = {
"int": 42,
"float": 3.14,
"string": "hello",
"bool": True,
"none": None,
"bytes": b"data",
"list": [1, 2, 3],
}
encoded = TLVEncoder.encode(data)
value, offset = TLVEncoder.decode(encoded, 0)
assert value == data
def test_encode_decode_nested_dict(self):
"""Test nested dict encoding/decoding."""
data = {
"outer": {
"inner": {
"value": 42
},
"list": [{"a": 1}, {"b": 2}]
}
}
encoded = TLVEncoder.encode(data)
value, offset = TLVEncoder.decode(encoded, 0)
assert value == data
def test_encode_dict_non_string_key_converted(self):
"""Test that non-string keys are converted to strings (like JSON)."""
data = {1: "value", 2: "other"}
encoded = TLVEncoder.encode(data)
decoded, _ = TLVEncoder.decode(encoded, 0)
# Keys should be converted to strings
assert decoded == {"1": "value", "2": "other"}
class TestTLVEncoderComplexStructures:
"""Tests for complex nested structures."""
def test_encode_decode_request_like(self):
"""Test encoding/decoding a request-like structure."""
data = {
"id": 12345,
"app_path": "myapp.ml:MLApp",
"action": "predict",
"args": [b"input_data", 0.7],
"kwargs": {"temperature": 0.7, "max_tokens": 1000},
}
encoded = TLVEncoder.encode(data)
value, offset = TLVEncoder.decode(encoded, 0)
assert value == data
def test_encode_decode_response_like(self):
"""Test encoding/decoding a response-like structure."""
data = {
"id": 12345,
"result": {
"predictions": [0.1, 0.2, 0.7],
"metadata": {"model": "v1.0", "latency_ms": 42},
}
}
encoded = TLVEncoder.encode(data)
value, offset = TLVEncoder.decode(encoded, 0)
assert value == data
def test_encode_decode_deeply_nested(self):
"""Test deeply nested structures."""
data = {"a": {"b": {"c": {"d": {"e": {"f": "deep"}}}}}}
encoded = TLVEncoder.encode(data)
value, offset = TLVEncoder.decode(encoded, 0)
assert value == data
class TestTLVEncoderRoundtrip:
"""Tests for complete roundtrip using decode_full."""
def test_decode_full_simple(self):
"""Test decode_full with simple value."""
data = {"key": "value"}
encoded = TLVEncoder.encode(data)
value = TLVEncoder.decode_full(encoded)
assert value == data
def test_decode_full_trailing_data(self):
"""Test decode_full raises on trailing data."""
encoded = TLVEncoder.encode(42) + b"extra"
with pytest.raises(DirtyProtocolError) as exc_info:
TLVEncoder.decode_full(encoded)
assert "trailing" in str(exc_info.value).lower()
class TestTLVEncoderErrors:
"""Tests for error handling."""
def test_decode_empty_data(self):
"""Test decoding empty data raises error."""
with pytest.raises(DirtyProtocolError) as exc_info:
TLVEncoder.decode(b"", 0)
assert "truncated" in str(exc_info.value).lower()
def test_decode_truncated_int(self):
"""Test decoding truncated int raises error."""
# TYPE_INT64 followed by only 4 bytes instead of 8
data = bytes([TYPE_INT64, 0, 0, 0, 0])
with pytest.raises(DirtyProtocolError) as exc_info:
TLVEncoder.decode(data, 0)
assert "truncated" in str(exc_info.value).lower()
def test_decode_truncated_float(self):
"""Test decoding truncated float raises error."""
data = bytes([TYPE_FLOAT64, 0, 0, 0, 0])
with pytest.raises(DirtyProtocolError) as exc_info:
TLVEncoder.decode(data, 0)
assert "truncated" in str(exc_info.value).lower()
def test_decode_truncated_bytes_length(self):
"""Test decoding truncated bytes length raises error."""
data = bytes([TYPE_BYTES, 0, 0]) # Only 2 bytes of length
with pytest.raises(DirtyProtocolError) as exc_info:
TLVEncoder.decode(data, 0)
assert "truncated" in str(exc_info.value).lower()
def test_decode_truncated_bytes_data(self):
"""Test decoding truncated bytes data raises error."""
# Says 10 bytes but only provides 5
data = bytes([TYPE_BYTES]) + struct.pack(">I", 10) + b"12345"
with pytest.raises(DirtyProtocolError) as exc_info:
TLVEncoder.decode(data, 0)
assert "truncated" in str(exc_info.value).lower()
def test_decode_truncated_string_length(self):
"""Test decoding truncated string length raises error."""
data = bytes([TYPE_STRING, 0])
with pytest.raises(DirtyProtocolError) as exc_info:
TLVEncoder.decode(data, 0)
assert "truncated" in str(exc_info.value).lower()
def test_decode_truncated_string_data(self):
"""Test decoding truncated string data raises error."""
data = bytes([TYPE_STRING]) + struct.pack(">I", 10) + b"hello"
with pytest.raises(DirtyProtocolError) as exc_info:
TLVEncoder.decode(data, 0)
assert "truncated" in str(exc_info.value).lower()
def test_decode_invalid_utf8(self):
"""Test decoding invalid UTF-8 raises error."""
# Valid length, but invalid UTF-8 bytes
data = bytes([TYPE_STRING]) + struct.pack(">I", 3) + b"\x80\x81\x82"
with pytest.raises(DirtyProtocolError) as exc_info:
TLVEncoder.decode(data, 0)
assert "utf-8" in str(exc_info.value).lower()
def test_decode_truncated_list_count(self):
"""Test decoding truncated list count raises error."""
data = bytes([TYPE_LIST, 0])
with pytest.raises(DirtyProtocolError) as exc_info:
TLVEncoder.decode(data, 0)
assert "truncated" in str(exc_info.value).lower()
def test_decode_truncated_dict_count(self):
"""Test decoding truncated dict count raises error."""
data = bytes([TYPE_DICT, 0])
with pytest.raises(DirtyProtocolError) as exc_info:
TLVEncoder.decode(data, 0)
assert "truncated" in str(exc_info.value).lower()
def test_decode_unknown_type(self):
"""Test decoding unknown type raises error."""
data = bytes([0xFF]) # Unknown type
with pytest.raises(DirtyProtocolError) as exc_info:
TLVEncoder.decode(data, 0)
assert "unknown" in str(exc_info.value).lower()
def test_encode_unsupported_type(self):
"""Test encoding unsupported type raises error."""
with pytest.raises(DirtyProtocolError) as exc_info:
TLVEncoder.encode(object())
assert "unsupported type" in str(exc_info.value).lower()
def test_encode_function_raises_error(self):
"""Test encoding a function raises error."""
with pytest.raises(DirtyProtocolError) as exc_info:
TLVEncoder.encode(lambda x: x)
assert "unsupported type" in str(exc_info.value).lower()
def test_decode_dict_non_string_key_in_data(self):
"""Test decoding dict with non-string key raises error."""
# Manually construct a dict with int key
# TYPE_DICT, count=1, TYPE_INT64 key, TYPE_INT64 value
data = (
bytes([TYPE_DICT])
+ struct.pack(">I", 1)
+ bytes([TYPE_INT64])
+ struct.pack(">q", 1) # Key (int, not string)
+ bytes([TYPE_INT64])
+ struct.pack(">q", 2) # Value
)
with pytest.raises(DirtyProtocolError) as exc_info:
TLVEncoder.decode(data, 0)
assert "string" in str(exc_info.value).lower()
class TestTLVEncoderOffset:
"""Tests for offset handling."""
def test_decode_with_offset(self):
"""Test decoding from specific offset."""
# Create data with prefix
prefix = b"garbage"
encoded = TLVEncoder.encode(42)
data = prefix + encoded
value, offset = TLVEncoder.decode(data, len(prefix))
assert value == 42
assert offset == len(prefix) + len(encoded)
def test_decode_multiple_values(self):
"""Test decoding multiple consecutive values."""
v1 = TLVEncoder.encode("hello")
v2 = TLVEncoder.encode(42)
v3 = TLVEncoder.encode([1, 2, 3])
data = v1 + v2 + v3
offset = 0
val1, offset = TLVEncoder.decode(data, offset)
assert val1 == "hello"
val2, offset = TLVEncoder.decode(data, offset)
assert val2 == 42
val3, offset = TLVEncoder.decode(data, offset)
assert val3 == [1, 2, 3]
assert offset == len(data)
class TestTLVEncoderBinaryData:
"""Tests for binary data handling (the main motivation for this protocol)."""
def test_binary_data_no_encoding(self):
"""Test that binary data is passed through without encoding."""
# This is the key advantage over JSON - binary data doesn't need base64
binary_data = bytes(range(256)) # All byte values
encoded = TLVEncoder.encode(binary_data)
value, offset = TLVEncoder.decode(encoded, 0)
assert value == binary_data
def test_binary_with_null_bytes(self):
"""Test binary data with embedded null bytes."""
binary_data = b"\x00\x00\xff\x00\x00"
encoded = TLVEncoder.encode(binary_data)
value, offset = TLVEncoder.decode(encoded, 0)
assert value == binary_data
def test_binary_in_nested_structure(self):
"""Test binary data inside nested structures."""
data = {
"image": b"\x89PNG\r\n\x1a\n" + b"\x00" * 100,
"metadata": {"width": 640, "height": 480},
"chunks": [b"chunk1", b"chunk2", b"chunk3"],
}
encoded = TLVEncoder.encode(data)
value, offset = TLVEncoder.decode(encoded, 0)
assert value == data