From 0e0dc669c81c25c15ddc0487efa51e74826d8bed Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Wed, 11 Feb 2026 22:55:03 +0100 Subject: [PATCH] feat(dirty): add TLV binary encoder/decoder Implement TLV (Type-Length-Value) serialization layer for the binary dirty worker protocol. This enables efficient binary data transfer without base64 encoding overhead. Supported types: - None, bool, int64, float64 - bytes (raw binary, no encoding needed) - string (UTF-8) - list, dict (nested structures) Inspired by OpenBSD msgctl/msgsnd message format. --- gunicorn/dirty/tlv.py | 305 ++++++++++++++++++++++ tests/test_dirty_tlv.py | 553 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 858 insertions(+) create mode 100644 gunicorn/dirty/tlv.py create mode 100644 tests/test_dirty_tlv.py diff --git a/gunicorn/dirty/tlv.py b/gunicorn/dirty/tlv.py new file mode 100644 index 00000000..5682b0c6 --- /dev/null +++ b/gunicorn/dirty/tlv.py @@ -0,0 +1,305 @@ +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +""" +TLV (Type-Length-Value) Binary Encoder/Decoder + +Provides efficient binary serialization for dirty worker protocol messages. +Inspired by OpenBSD msgctl/msgsnd message format. + +Type Codes: + 0x00: None (no value bytes) + 0x01: bool (1 byte: 0x00 or 0x01) + 0x05: int64 (8 bytes big-endian signed) + 0x06: float64 (8 bytes IEEE 754) + 0x10: bytes (4-byte length + raw bytes) + 0x11: string (4-byte length + UTF-8 encoded) + 0x20: list (4-byte count + encoded elements) + 0x21: dict (4-byte count + encoded key-value pairs) +""" + +import struct + +from .errors import DirtyProtocolError + + +# Type codes +TYPE_NONE = 0x00 +TYPE_BOOL = 0x01 +TYPE_INT64 = 0x05 +TYPE_FLOAT64 = 0x06 +TYPE_BYTES = 0x10 +TYPE_STRING = 0x11 +TYPE_LIST = 0x20 +TYPE_DICT = 0x21 + +# Maximum sizes for safety +MAX_STRING_SIZE = 64 * 1024 * 1024 # 64 MB +MAX_BYTES_SIZE = 64 * 1024 * 1024 # 64 MB +MAX_LIST_SIZE = 1024 * 1024 # 1 million items +MAX_DICT_SIZE = 1024 * 1024 # 1 million items + + +class TLVEncoder: + """ + TLV binary encoder/decoder. + + Encodes Python values to binary TLV format and decodes back. + Supports: None, bool, int, float, bytes, str, list, dict. + """ + + @staticmethod + def encode(value) -> bytes: + """ + Encode a Python value to TLV binary format. + + Args: + value: Python value to encode (None, bool, int, float, + bytes, str, list, or dict) + + Returns: + bytes: TLV-encoded binary data + + Raises: + DirtyProtocolError: If value type is not supported + """ + if value is None: + return bytes([TYPE_NONE]) + + if isinstance(value, bool): + # bool must come before int since bool is a subclass of int + return bytes([TYPE_BOOL, 0x01 if value else 0x00]) + + if isinstance(value, int): + return bytes([TYPE_INT64]) + struct.pack(">q", value) + + if isinstance(value, float): + return bytes([TYPE_FLOAT64]) + struct.pack(">d", value) + + if isinstance(value, bytes): + if len(value) > MAX_BYTES_SIZE: + raise DirtyProtocolError( + f"Bytes too large: {len(value)} bytes " + f"(max: {MAX_BYTES_SIZE})" + ) + return bytes([TYPE_BYTES]) + struct.pack(">I", len(value)) + value + + if isinstance(value, str): + encoded = value.encode("utf-8") + if len(encoded) > MAX_STRING_SIZE: + raise DirtyProtocolError( + f"String too large: {len(encoded)} bytes " + f"(max: {MAX_STRING_SIZE})" + ) + return bytes([TYPE_STRING]) + struct.pack(">I", len(encoded)) + encoded + + if isinstance(value, (list, tuple)): + if len(value) > MAX_LIST_SIZE: + raise DirtyProtocolError( + f"List too large: {len(value)} items " + f"(max: {MAX_LIST_SIZE})" + ) + parts = [bytes([TYPE_LIST]), struct.pack(">I", len(value))] + for item in value: + parts.append(TLVEncoder.encode(item)) + return b"".join(parts) + + if isinstance(value, dict): + if len(value) > MAX_DICT_SIZE: + raise DirtyProtocolError( + f"Dict too large: {len(value)} items " + f"(max: {MAX_DICT_SIZE})" + ) + parts = [bytes([TYPE_DICT]), struct.pack(">I", len(value))] + for k, v in value.items(): + # Keys must be strings + if not isinstance(k, str): + raise DirtyProtocolError( + f"Dict keys must be strings, got {type(k).__name__}" + ) + parts.append(TLVEncoder.encode(k)) + parts.append(TLVEncoder.encode(v)) + return b"".join(parts) + + raise DirtyProtocolError( + f"Unsupported type for TLV encoding: {type(value).__name__}" + ) + + @staticmethod + def decode(data: bytes, offset: int = 0) -> tuple: + """ + Decode a TLV-encoded value from binary data. + + Args: + data: Binary data to decode + offset: Starting offset in the data + + Returns: + tuple: (decoded_value, new_offset) + + Raises: + DirtyProtocolError: If data is malformed or truncated + """ + if offset >= len(data): + raise DirtyProtocolError( + "Truncated TLV data: no type byte", + raw_data=data[offset:offset + 20] + ) + + type_code = data[offset] + offset += 1 + + if type_code == TYPE_NONE: + return None, offset + + if type_code == TYPE_BOOL: + if offset >= len(data): + raise DirtyProtocolError( + "Truncated TLV data: missing bool value", + raw_data=data[offset - 1:offset + 20] + ) + value = data[offset] != 0x00 + return value, offset + 1 + + if type_code == TYPE_INT64: + if offset + 8 > len(data): + raise DirtyProtocolError( + "Truncated TLV data: incomplete int64", + raw_data=data[offset - 1:offset + 20] + ) + value = struct.unpack(">q", data[offset:offset + 8])[0] + return value, offset + 8 + + if type_code == TYPE_FLOAT64: + if offset + 8 > len(data): + raise DirtyProtocolError( + "Truncated TLV data: incomplete float64", + raw_data=data[offset - 1:offset + 20] + ) + value = struct.unpack(">d", data[offset:offset + 8])[0] + return value, offset + 8 + + if type_code == TYPE_BYTES: + if offset + 4 > len(data): + raise DirtyProtocolError( + "Truncated TLV data: incomplete bytes length", + raw_data=data[offset - 1:offset + 20] + ) + length = struct.unpack(">I", data[offset:offset + 4])[0] + offset += 4 + + if length > MAX_BYTES_SIZE: + raise DirtyProtocolError( + f"Bytes too large: {length} bytes (max: {MAX_BYTES_SIZE})" + ) + + if offset + length > len(data): + raise DirtyProtocolError( + f"Truncated TLV data: expected {length} bytes, " + f"got {len(data) - offset}", + raw_data=data[offset - 5:offset + 20] + ) + value = data[offset:offset + length] + return value, offset + length + + if type_code == TYPE_STRING: + if offset + 4 > len(data): + raise DirtyProtocolError( + "Truncated TLV data: incomplete string length", + raw_data=data[offset - 1:offset + 20] + ) + length = struct.unpack(">I", data[offset:offset + 4])[0] + offset += 4 + + if length > MAX_STRING_SIZE: + raise DirtyProtocolError( + f"String too large: {length} bytes (max: {MAX_STRING_SIZE})" + ) + + if offset + length > len(data): + raise DirtyProtocolError( + f"Truncated TLV data: expected {length} bytes for string, " + f"got {len(data) - offset}", + raw_data=data[offset - 5:offset + 20] + ) + try: + value = data[offset:offset + length].decode("utf-8") + except UnicodeDecodeError as e: + raise DirtyProtocolError( + f"Invalid UTF-8 in string: {e}", + raw_data=data[offset:offset + min(length, 20)] + ) + return value, offset + length + + if type_code == TYPE_LIST: + if offset + 4 > len(data): + raise DirtyProtocolError( + "Truncated TLV data: incomplete list count", + raw_data=data[offset - 1:offset + 20] + ) + count = struct.unpack(">I", data[offset:offset + 4])[0] + offset += 4 + + if count > MAX_LIST_SIZE: + raise DirtyProtocolError( + f"List too large: {count} items (max: {MAX_LIST_SIZE})" + ) + + items = [] + for _ in range(count): + item, offset = TLVEncoder.decode(data, offset) + items.append(item) + return items, offset + + if type_code == TYPE_DICT: + if offset + 4 > len(data): + raise DirtyProtocolError( + "Truncated TLV data: incomplete dict count", + raw_data=data[offset - 1:offset + 20] + ) + count = struct.unpack(">I", data[offset:offset + 4])[0] + offset += 4 + + if count > MAX_DICT_SIZE: + raise DirtyProtocolError( + f"Dict too large: {count} items (max: {MAX_DICT_SIZE})" + ) + + result = {} + for _ in range(count): + key, offset = TLVEncoder.decode(data, offset) + if not isinstance(key, str): + raise DirtyProtocolError( + f"Dict key must be string, got {type(key).__name__}" + ) + value, offset = TLVEncoder.decode(data, offset) + result[key] = value + return result, offset + + raise DirtyProtocolError( + f"Unknown TLV type code: 0x{type_code:02x}", + raw_data=data[offset - 1:offset + 20] + ) + + @staticmethod + def decode_full(data: bytes): + """ + Decode a complete TLV-encoded value, ensuring all data is consumed. + + Args: + data: Binary data to decode + + Returns: + Decoded Python value + + Raises: + DirtyProtocolError: If data is malformed or has trailing bytes + """ + value, offset = TLVEncoder.decode(data, 0) + if offset != len(data): + raise DirtyProtocolError( + f"Trailing data after TLV: {len(data) - offset} bytes", + raw_data=data[offset:offset + 20] + ) + return value diff --git a/tests/test_dirty_tlv.py b/tests/test_dirty_tlv.py new file mode 100644 index 00000000..87727203 --- /dev/null +++ b/tests/test_dirty_tlv.py @@ -0,0 +1,553 @@ +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +"""Tests for dirty TLV binary encoder/decoder.""" + +import math +import struct +import pytest + +from gunicorn.dirty.tlv import ( + TLVEncoder, + TYPE_NONE, + TYPE_BOOL, + TYPE_INT64, + TYPE_FLOAT64, + TYPE_BYTES, + TYPE_STRING, + TYPE_LIST, + TYPE_DICT, + MAX_STRING_SIZE, + MAX_BYTES_SIZE, + MAX_LIST_SIZE, + MAX_DICT_SIZE, +) +from gunicorn.dirty.errors import DirtyProtocolError + + +class TestTLVEncoderBasicTypes: + """Tests for basic type encoding/decoding.""" + + def test_encode_decode_none(self): + """Test None encoding/decoding.""" + encoded = TLVEncoder.encode(None) + assert encoded == bytes([TYPE_NONE]) + + value, offset = TLVEncoder.decode(encoded, 0) + assert value is None + assert offset == 1 + + def test_encode_decode_true(self): + """Test True encoding/decoding.""" + encoded = TLVEncoder.encode(True) + assert encoded == bytes([TYPE_BOOL, 0x01]) + + value, offset = TLVEncoder.decode(encoded, 0) + assert value is True + assert offset == 2 + + def test_encode_decode_false(self): + """Test False encoding/decoding.""" + encoded = TLVEncoder.encode(False) + assert encoded == bytes([TYPE_BOOL, 0x00]) + + value, offset = TLVEncoder.decode(encoded, 0) + assert value is False + assert offset == 2 + + def test_encode_decode_positive_int(self): + """Test positive integer encoding/decoding.""" + encoded = TLVEncoder.encode(42) + assert encoded[0] == TYPE_INT64 + assert len(encoded) == 9 # 1 type + 8 value + + value, offset = TLVEncoder.decode(encoded, 0) + assert value == 42 + assert offset == 9 + + def test_encode_decode_negative_int(self): + """Test negative integer encoding/decoding.""" + encoded = TLVEncoder.encode(-12345) + + value, offset = TLVEncoder.decode(encoded, 0) + assert value == -12345 + + def test_encode_decode_large_int(self): + """Test large integer encoding/decoding.""" + large_val = 2**62 + encoded = TLVEncoder.encode(large_val) + + value, offset = TLVEncoder.decode(encoded, 0) + assert value == large_val + + def test_encode_decode_zero(self): + """Test zero encoding/decoding.""" + encoded = TLVEncoder.encode(0) + + value, offset = TLVEncoder.decode(encoded, 0) + assert value == 0 + + def test_encode_decode_float(self): + """Test float encoding/decoding.""" + encoded = TLVEncoder.encode(3.14159) + assert encoded[0] == TYPE_FLOAT64 + assert len(encoded) == 9 # 1 type + 8 value + + value, offset = TLVEncoder.decode(encoded, 0) + assert abs(value - 3.14159) < 1e-10 + + def test_encode_decode_negative_float(self): + """Test negative float encoding/decoding.""" + encoded = TLVEncoder.encode(-273.15) + + value, offset = TLVEncoder.decode(encoded, 0) + assert abs(value - (-273.15)) < 1e-10 + + def test_encode_decode_float_infinity(self): + """Test infinity encoding/decoding.""" + encoded = TLVEncoder.encode(float('inf')) + + value, offset = TLVEncoder.decode(encoded, 0) + assert value == float('inf') + + def test_encode_decode_float_nan(self): + """Test NaN encoding/decoding.""" + encoded = TLVEncoder.encode(float('nan')) + + value, offset = TLVEncoder.decode(encoded, 0) + assert math.isnan(value) + + +class TestTLVEncoderBytes: + """Tests for bytes encoding/decoding.""" + + def test_encode_decode_empty_bytes(self): + """Test empty bytes encoding/decoding.""" + encoded = TLVEncoder.encode(b"") + assert encoded[0] == TYPE_BYTES + + value, offset = TLVEncoder.decode(encoded, 0) + assert value == b"" + + def test_encode_decode_bytes(self): + """Test bytes encoding/decoding.""" + data = b"\x00\x01\x02\xff\xfe\xfd" + encoded = TLVEncoder.encode(data) + + value, offset = TLVEncoder.decode(encoded, 0) + assert value == data + + def test_encode_decode_large_bytes(self): + """Test large bytes encoding/decoding.""" + data = b"x" * 10000 + encoded = TLVEncoder.encode(data) + + value, offset = TLVEncoder.decode(encoded, 0) + assert value == data + + def test_bytes_too_large(self): + """Test that bytes exceeding max size raises error.""" + # We won't actually allocate MAX_BYTES_SIZE, just check the encoding + with pytest.raises(DirtyProtocolError) as exc_info: + TLVEncoder.encode(b"x" * (MAX_BYTES_SIZE + 1)) + assert "too large" in str(exc_info.value).lower() + + +class TestTLVEncoderString: + """Tests for string encoding/decoding.""" + + def test_encode_decode_empty_string(self): + """Test empty string encoding/decoding.""" + encoded = TLVEncoder.encode("") + assert encoded[0] == TYPE_STRING + + value, offset = TLVEncoder.decode(encoded, 0) + assert value == "" + + def test_encode_decode_ascii_string(self): + """Test ASCII string encoding/decoding.""" + encoded = TLVEncoder.encode("hello world") + + value, offset = TLVEncoder.decode(encoded, 0) + assert value == "hello world" + + def test_encode_decode_unicode_string(self): + """Test Unicode string encoding/decoding.""" + text = "Hello, world! \u00a9 \u2603 \U0001F600" + encoded = TLVEncoder.encode(text) + + value, offset = TLVEncoder.decode(encoded, 0) + assert value == text + + def test_encode_decode_chinese(self): + """Test Chinese characters encoding/decoding.""" + text = "Hello, world!" + encoded = TLVEncoder.encode(text) + + value, offset = TLVEncoder.decode(encoded, 0) + assert value == text + + def test_encode_decode_emoji(self): + """Test emoji encoding/decoding.""" + text = "Test emoji" + encoded = TLVEncoder.encode(text) + + value, offset = TLVEncoder.decode(encoded, 0) + assert value == text + + def test_encode_decode_large_string(self): + """Test large string encoding/decoding.""" + text = "x" * 10000 + encoded = TLVEncoder.encode(text) + + value, offset = TLVEncoder.decode(encoded, 0) + assert value == text + + +class TestTLVEncoderList: + """Tests for list encoding/decoding.""" + + def test_encode_decode_empty_list(self): + """Test empty list encoding/decoding.""" + encoded = TLVEncoder.encode([]) + assert encoded[0] == TYPE_LIST + + value, offset = TLVEncoder.decode(encoded, 0) + assert value == [] + + def test_encode_decode_simple_list(self): + """Test simple list encoding/decoding.""" + data = [1, 2, 3] + encoded = TLVEncoder.encode(data) + + value, offset = TLVEncoder.decode(encoded, 0) + assert value == data + + def test_encode_decode_mixed_list(self): + """Test mixed type list encoding/decoding.""" + data = [1, "hello", 3.14, True, None, b"bytes"] + encoded = TLVEncoder.encode(data) + + value, offset = TLVEncoder.decode(encoded, 0) + assert value == data + + def test_encode_decode_nested_list(self): + """Test nested list encoding/decoding.""" + data = [[1, 2], [3, [4, 5]], ["a", "b"]] + encoded = TLVEncoder.encode(data) + + value, offset = TLVEncoder.decode(encoded, 0) + assert value == data + + def test_encode_decode_tuple_as_list(self): + """Test that tuples are encoded as lists.""" + data = (1, 2, 3) + encoded = TLVEncoder.encode(data) + + value, offset = TLVEncoder.decode(encoded, 0) + assert value == [1, 2, 3] # Decoded as list + + def test_encode_decode_large_list(self): + """Test large list encoding/decoding.""" + data = list(range(1000)) + encoded = TLVEncoder.encode(data) + + value, offset = TLVEncoder.decode(encoded, 0) + assert value == data + + +class TestTLVEncoderDict: + """Tests for dict encoding/decoding.""" + + def test_encode_decode_empty_dict(self): + """Test empty dict encoding/decoding.""" + encoded = TLVEncoder.encode({}) + assert encoded[0] == TYPE_DICT + + value, offset = TLVEncoder.decode(encoded, 0) + assert value == {} + + def test_encode_decode_simple_dict(self): + """Test simple dict encoding/decoding.""" + data = {"a": 1, "b": 2, "c": 3} + encoded = TLVEncoder.encode(data) + + value, offset = TLVEncoder.decode(encoded, 0) + assert value == data + + def test_encode_decode_mixed_values_dict(self): + """Test dict with mixed value types.""" + data = { + "int": 42, + "float": 3.14, + "string": "hello", + "bool": True, + "none": None, + "bytes": b"data", + "list": [1, 2, 3], + } + encoded = TLVEncoder.encode(data) + + value, offset = TLVEncoder.decode(encoded, 0) + assert value == data + + def test_encode_decode_nested_dict(self): + """Test nested dict encoding/decoding.""" + data = { + "outer": { + "inner": { + "value": 42 + }, + "list": [{"a": 1}, {"b": 2}] + } + } + encoded = TLVEncoder.encode(data) + + value, offset = TLVEncoder.decode(encoded, 0) + assert value == data + + def test_encode_dict_non_string_key(self): + """Test that non-string keys raise error.""" + data = {1: "value"} + with pytest.raises(DirtyProtocolError) as exc_info: + TLVEncoder.encode(data) + assert "keys must be strings" in str(exc_info.value).lower() + + +class TestTLVEncoderComplexStructures: + """Tests for complex nested structures.""" + + def test_encode_decode_request_like(self): + """Test encoding/decoding a request-like structure.""" + data = { + "id": 12345, + "app_path": "myapp.ml:MLApp", + "action": "predict", + "args": [b"input_data", 0.7], + "kwargs": {"temperature": 0.7, "max_tokens": 1000}, + } + encoded = TLVEncoder.encode(data) + + value, offset = TLVEncoder.decode(encoded, 0) + assert value == data + + def test_encode_decode_response_like(self): + """Test encoding/decoding a response-like structure.""" + data = { + "id": 12345, + "result": { + "predictions": [0.1, 0.2, 0.7], + "metadata": {"model": "v1.0", "latency_ms": 42}, + } + } + encoded = TLVEncoder.encode(data) + + value, offset = TLVEncoder.decode(encoded, 0) + assert value == data + + def test_encode_decode_deeply_nested(self): + """Test deeply nested structures.""" + data = {"a": {"b": {"c": {"d": {"e": {"f": "deep"}}}}}} + encoded = TLVEncoder.encode(data) + + value, offset = TLVEncoder.decode(encoded, 0) + assert value == data + + +class TestTLVEncoderRoundtrip: + """Tests for complete roundtrip using decode_full.""" + + def test_decode_full_simple(self): + """Test decode_full with simple value.""" + data = {"key": "value"} + encoded = TLVEncoder.encode(data) + + value = TLVEncoder.decode_full(encoded) + assert value == data + + def test_decode_full_trailing_data(self): + """Test decode_full raises on trailing data.""" + encoded = TLVEncoder.encode(42) + b"extra" + + with pytest.raises(DirtyProtocolError) as exc_info: + TLVEncoder.decode_full(encoded) + assert "trailing" in str(exc_info.value).lower() + + +class TestTLVEncoderErrors: + """Tests for error handling.""" + + def test_decode_empty_data(self): + """Test decoding empty data raises error.""" + with pytest.raises(DirtyProtocolError) as exc_info: + TLVEncoder.decode(b"", 0) + assert "truncated" in str(exc_info.value).lower() + + def test_decode_truncated_int(self): + """Test decoding truncated int raises error.""" + # TYPE_INT64 followed by only 4 bytes instead of 8 + data = bytes([TYPE_INT64, 0, 0, 0, 0]) + with pytest.raises(DirtyProtocolError) as exc_info: + TLVEncoder.decode(data, 0) + assert "truncated" in str(exc_info.value).lower() + + def test_decode_truncated_float(self): + """Test decoding truncated float raises error.""" + data = bytes([TYPE_FLOAT64, 0, 0, 0, 0]) + with pytest.raises(DirtyProtocolError) as exc_info: + TLVEncoder.decode(data, 0) + assert "truncated" in str(exc_info.value).lower() + + def test_decode_truncated_bytes_length(self): + """Test decoding truncated bytes length raises error.""" + data = bytes([TYPE_BYTES, 0, 0]) # Only 2 bytes of length + with pytest.raises(DirtyProtocolError) as exc_info: + TLVEncoder.decode(data, 0) + assert "truncated" in str(exc_info.value).lower() + + def test_decode_truncated_bytes_data(self): + """Test decoding truncated bytes data raises error.""" + # Says 10 bytes but only provides 5 + data = bytes([TYPE_BYTES]) + struct.pack(">I", 10) + b"12345" + with pytest.raises(DirtyProtocolError) as exc_info: + TLVEncoder.decode(data, 0) + assert "truncated" in str(exc_info.value).lower() + + def test_decode_truncated_string_length(self): + """Test decoding truncated string length raises error.""" + data = bytes([TYPE_STRING, 0]) + with pytest.raises(DirtyProtocolError) as exc_info: + TLVEncoder.decode(data, 0) + assert "truncated" in str(exc_info.value).lower() + + def test_decode_truncated_string_data(self): + """Test decoding truncated string data raises error.""" + data = bytes([TYPE_STRING]) + struct.pack(">I", 10) + b"hello" + with pytest.raises(DirtyProtocolError) as exc_info: + TLVEncoder.decode(data, 0) + assert "truncated" in str(exc_info.value).lower() + + def test_decode_invalid_utf8(self): + """Test decoding invalid UTF-8 raises error.""" + # Valid length, but invalid UTF-8 bytes + data = bytes([TYPE_STRING]) + struct.pack(">I", 3) + b"\x80\x81\x82" + with pytest.raises(DirtyProtocolError) as exc_info: + TLVEncoder.decode(data, 0) + assert "utf-8" in str(exc_info.value).lower() + + def test_decode_truncated_list_count(self): + """Test decoding truncated list count raises error.""" + data = bytes([TYPE_LIST, 0]) + with pytest.raises(DirtyProtocolError) as exc_info: + TLVEncoder.decode(data, 0) + assert "truncated" in str(exc_info.value).lower() + + def test_decode_truncated_dict_count(self): + """Test decoding truncated dict count raises error.""" + data = bytes([TYPE_DICT, 0]) + with pytest.raises(DirtyProtocolError) as exc_info: + TLVEncoder.decode(data, 0) + assert "truncated" in str(exc_info.value).lower() + + def test_decode_unknown_type(self): + """Test decoding unknown type raises error.""" + data = bytes([0xFF]) # Unknown type + with pytest.raises(DirtyProtocolError) as exc_info: + TLVEncoder.decode(data, 0) + assert "unknown" in str(exc_info.value).lower() + + def test_encode_unsupported_type(self): + """Test encoding unsupported type raises error.""" + with pytest.raises(DirtyProtocolError) as exc_info: + TLVEncoder.encode(object()) + assert "unsupported type" in str(exc_info.value).lower() + + def test_encode_function_raises_error(self): + """Test encoding a function raises error.""" + with pytest.raises(DirtyProtocolError) as exc_info: + TLVEncoder.encode(lambda x: x) + assert "unsupported type" in str(exc_info.value).lower() + + def test_decode_dict_non_string_key_in_data(self): + """Test decoding dict with non-string key raises error.""" + # Manually construct a dict with int key + # TYPE_DICT, count=1, TYPE_INT64 key, TYPE_INT64 value + data = ( + bytes([TYPE_DICT]) + + struct.pack(">I", 1) + + bytes([TYPE_INT64]) + + struct.pack(">q", 1) # Key (int, not string) + + bytes([TYPE_INT64]) + + struct.pack(">q", 2) # Value + ) + with pytest.raises(DirtyProtocolError) as exc_info: + TLVEncoder.decode(data, 0) + assert "string" in str(exc_info.value).lower() + + +class TestTLVEncoderOffset: + """Tests for offset handling.""" + + def test_decode_with_offset(self): + """Test decoding from specific offset.""" + # Create data with prefix + prefix = b"garbage" + encoded = TLVEncoder.encode(42) + data = prefix + encoded + + value, offset = TLVEncoder.decode(data, len(prefix)) + assert value == 42 + assert offset == len(prefix) + len(encoded) + + def test_decode_multiple_values(self): + """Test decoding multiple consecutive values.""" + v1 = TLVEncoder.encode("hello") + v2 = TLVEncoder.encode(42) + v3 = TLVEncoder.encode([1, 2, 3]) + data = v1 + v2 + v3 + + offset = 0 + val1, offset = TLVEncoder.decode(data, offset) + assert val1 == "hello" + + val2, offset = TLVEncoder.decode(data, offset) + assert val2 == 42 + + val3, offset = TLVEncoder.decode(data, offset) + assert val3 == [1, 2, 3] + + assert offset == len(data) + + +class TestTLVEncoderBinaryData: + """Tests for binary data handling (the main motivation for this protocol).""" + + def test_binary_data_no_encoding(self): + """Test that binary data is passed through without encoding.""" + # This is the key advantage over JSON - binary data doesn't need base64 + binary_data = bytes(range(256)) # All byte values + encoded = TLVEncoder.encode(binary_data) + + value, offset = TLVEncoder.decode(encoded, 0) + assert value == binary_data + + def test_binary_with_null_bytes(self): + """Test binary data with embedded null bytes.""" + binary_data = b"\x00\x00\xff\x00\x00" + encoded = TLVEncoder.encode(binary_data) + + value, offset = TLVEncoder.decode(encoded, 0) + assert value == binary_data + + def test_binary_in_nested_structure(self): + """Test binary data inside nested structures.""" + data = { + "image": b"\x89PNG\r\n\x1a\n" + b"\x00" * 100, + "metadata": {"width": 640, "height": 480}, + "chunks": [b"chunk1", b"chunk2", b"chunk3"], + } + encoded = TLVEncoder.encode(data) + + value, offset = TLVEncoder.decode(encoded, 0) + assert value == data