diff --git a/docs/content/dirty.md b/docs/content/dirty.md index 8afede06..bb98f6ee 100644 --- a/docs/content/dirty.md +++ b/docs/content/dirty.md @@ -17,7 +17,7 @@ Dirty Arbiters solve this by providing: - **Separate worker pool** - Completely separate from HTTP workers, can be killed/restarted independently - **Stateful workers** - Loaded resources persist in dirty worker memory -- **Message-passing IPC** - Communication via Unix sockets with JSON serialization +- **Message-passing IPC** - Communication via Unix sockets with binary TLV protocol - **Explicit API** - Clear `execute()` calls (no hidden IPC) - **Asyncio-based** - Clean concurrent handling with streaming support @@ -476,6 +476,64 @@ Worker -> Arbiter -> Client: chunk (data: "World") Worker -> Arbiter -> Client: end ``` +## Binary Protocol + +The dirty worker IPC uses a binary protocol inspired by OpenBSD msgctl/msgsnd for efficient data transfer. This eliminates base64 encoding overhead for binary data like images, audio, or model weights. + +### Header Format (16 bytes) + +``` ++--------+--------+--------+--------+--------+--------+--------+--------+ +| Magic (2B) | Ver(1) | MType | Payload Length (4B) | ++--------+--------+--------+--------+--------+--------+--------+--------+ +| Request ID (8 bytes) | ++--------+--------+--------+--------+--------+--------+--------+--------+ +``` + +- **Magic**: `0x47 0x44` ("GD" for Gunicorn Dirty) +- **Version**: `0x01` +- **MType**: Message type (`0x01`=REQUEST, `0x02`=RESPONSE, `0x03`=ERROR, `0x04`=CHUNK, `0x05`=END) +- **Length**: Payload size (big-endian uint32, max 64MB) +- **Request ID**: uint64 identifier + +### TLV Payload Encoding + +Payloads use Type-Length-Value encoding: + +| Type | Code | Description | +|------|------|-------------| +| None | `0x00` | No value bytes | +| Bool | `0x01` | 1 byte (0x00/0x01) | +| Int64 | `0x05` | 8 bytes big-endian signed | +| Float64 | `0x06` | 8 bytes IEEE 754 | +| Bytes | `0x10` | 4-byte length + raw bytes | +| String | `0x11` | 4-byte length + UTF-8 | +| List | `0x20` | 4-byte count + elements | +| Dict | `0x21` | 4-byte count + key-value pairs | + +### Binary Data Benefits + +The binary protocol allows passing raw bytes directly without encoding: + +```python +# Image processing with binary data +def resize(self, image_data, width, height): + """Resize an image - image_data is raw bytes.""" + img = Image.open(io.BytesIO(image_data)) + resized = img.resize((width, height)) + buffer = io.BytesIO() + resized.save(buffer, format='PNG') + return buffer.getvalue() # Returns raw bytes + +# Called from HTTP worker +thumbnail = client.execute( + "myapp.images:ImageApp", + "thumbnail", + raw_image_bytes, # No base64 encoding needed + size=256 +) +``` + ### Error Handling in Streams Errors during streaming are delivered as error messages: @@ -768,7 +826,7 @@ except DirtyConnectionError: 2. **Set appropriate timeouts** based on your workload 3. **Handle errors gracefully** - dirty workers may restart 4. **Use meaningful action names** for easier debugging -5. **Keep responses JSON-serializable** - results are passed via IPC +5. **Keep responses serializable** - results are passed via binary IPC (supports bytes directly) ## Monitoring diff --git a/examples/dirty_example/test_protocol.py b/examples/dirty_example/test_protocol.py index 31c45b92..8a677d03 100644 --- a/examples/dirty_example/test_protocol.py +++ b/examples/dirty_example/test_protocol.py @@ -4,7 +4,10 @@ #!/usr/bin/env python """ -Test script to demonstrate the Dirty Protocol layer. +Test script to demonstrate the Dirty Binary Protocol layer. + +The binary protocol uses a 16-byte header + TLV-encoded payloads for efficient +binary data transfer without base64 encoding overhead. Run with: python examples/dirty_example/test_protocol.py @@ -18,10 +21,14 @@ import socket sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) from gunicorn.dirty.protocol import ( + BinaryProtocol, DirtyProtocol, make_request, make_response, make_error_response, + HEADER_SIZE, + MAGIC, + VERSION, ) from gunicorn.dirty.errors import DirtyError, DirtyTimeoutError @@ -29,13 +36,13 @@ from gunicorn.dirty.errors import DirtyError, DirtyTimeoutError def test_protocol_encode_decode(): """Test protocol encoding and decoding.""" print("=" * 60) - print("Testing Protocol Encode/Decode") + print("Testing Binary Protocol Encode/Decode") print("=" * 60) - # Test request + # Test request with integer ID (recommended for binary protocol) print("\n1. Creating a request message...") request = make_request( - request_id="req-001", + request_id=12345, # Integer IDs are efficient app_path="myapp.ml:MLApp", action="inference", args=("model1",), @@ -43,18 +50,53 @@ def test_protocol_encode_decode(): ) print(f" Request: {request}") - # Encode - print("\n2. Encoding message...") - encoded = DirtyProtocol.encode(request) + # Encode using binary protocol + print("\n2. Encoding message with binary protocol...") + encoded = BinaryProtocol._encode_from_dict(request) print(f" Encoded length: {len(encoded)} bytes") - print(f" Header (4 bytes): {encoded[:4].hex()}") + print(f" Header ({HEADER_SIZE} bytes): {encoded[:HEADER_SIZE].hex()}") + print(f" Magic: {MAGIC!r}") + print(f" Version: {VERSION}") - # Decode - print("\n3. Decoding payload...") - payload = encoded[DirtyProtocol.HEADER_SIZE:] - decoded = DirtyProtocol.decode(payload) - print(f" Decoded: {decoded}") - print(f" Match: {decoded == request}") + # Decode header + print("\n3. Decoding header...") + msg_type, request_id, payload_len = BinaryProtocol.decode_header(encoded[:HEADER_SIZE]) + print(f" Message type: {msg_type} (0x{msg_type:02x})") + print(f" Request ID: {request_id}") + print(f" Payload length: {payload_len} bytes") + + # Decode full message + print("\n4. Decoding full message...") + msg_type_str, req_id, payload = BinaryProtocol.decode_message(encoded) + print(f" Type: {msg_type_str}") + print(f" Request ID: {req_id}") + print(f" Payload: {payload}") + + +def test_binary_data_handling(): + """Test binary data handling - the main advantage of binary protocol.""" + print("\n" + "=" * 60) + print("Testing Binary Data Handling") + print("=" * 60) + + # Create binary data (e.g., image, audio, model weights) + binary_data = bytes(range(256)) # All byte values + print(f"\n1. Original binary data: {len(binary_data)} bytes") + print(f" First 16 bytes: {binary_data[:16].hex()}") + + # Create response with binary data (no base64 needed!) + print("\n2. Encoding binary data in response...") + response = make_response(67890, {"image_data": binary_data, "format": "raw"}) + encoded = BinaryProtocol._encode_from_dict(response) + print(f" Encoded total size: {len(encoded)} bytes") + + # Decode and verify + print("\n3. Decoding binary data...") + msg_type_str, req_id, payload = BinaryProtocol.decode_message(encoded) + recovered_data = payload["result"]["image_data"] + print(f" Recovered data size: {len(recovered_data)} bytes") + print(f" Data matches: {recovered_data == binary_data}") + print(f" First 16 bytes: {recovered_data[:16].hex()}") def test_protocol_response(): @@ -65,13 +107,13 @@ def test_protocol_response(): # Success response print("\n1. Creating success response...") - response = make_response("req-001", {"result": "Hello, World!", "confidence": 0.95}) + response = make_response(12345, {"result": "Hello, World!", "confidence": 0.95}) print(f" Response: {response}") # Error response print("\n2. Creating error response...") error = DirtyTimeoutError("Operation timed out", timeout=30) - error_response = make_error_response("req-001", error) + error_response = make_error_response(12345, error) print(f" Error response: {error_response}") @@ -88,7 +130,7 @@ def test_socket_communication(): # Send a request print("\n1. Sending request over socket...") request = make_request( - request_id="socket-req-001", + request_id=100001, app_path="test:App", action="compute", args=(1, 2, 3), @@ -101,19 +143,20 @@ def test_socket_communication(): print("\n2. Receiving request...") received = DirtyProtocol.read_message(server_sock) print(f" Received: {received}") - print(f" Match: {received == request}") + print(f" Request ID: {received['id']}") - # Send a response - print("\n3. Sending response...") - response = make_response("socket-req-001", {"sum": 6}) + # Send a response with binary data + print("\n3. Sending response with binary data...") + binary_result = b"\x00\x01\x02\x03\xff\xfe\xfd\xfc" + response = make_response(100001, {"data": binary_result, "sum": 6}) DirtyProtocol.write_message(server_sock, response) - print(f" Sent: {response}") + print(f" Sent binary data: {binary_result.hex()}") # Receive the response print("\n4. Receiving response...") received = DirtyProtocol.read_message(client_sock) - print(f" Received: {received}") - print(f" Match: {received == response}") + print(f" Received binary data: {received['result']['data'].hex()}") + print(f" Sum: {received['result']['sum']}") finally: server_sock.close() @@ -132,7 +175,7 @@ async def test_async_communication(): try: # Create message request = make_request( - request_id="async-req-001", + request_id=200001, app_path="async:App", action="process", args=("data",), @@ -141,7 +184,7 @@ async def test_async_communication(): # Write to pipe print("\n1. Writing async message...") - encoded = DirtyProtocol.encode(request) + encoded = BinaryProtocol._encode_from_dict(request) os.write(write_fd, encoded) os.close(write_fd) write_fd = None @@ -156,7 +199,7 @@ async def test_async_communication(): received = await DirtyProtocol.read_message_async(reader) print(f" Received: {received}") - print(f" Match: {received == request}") + print(f" Request ID: {received['id']}") finally: if write_fd is not None: @@ -193,10 +236,11 @@ def test_error_serialization(): if __name__ == "__main__": print("\n" + "#" * 60) - print("# Dirty Protocol Demonstration") + print("# Dirty Binary Protocol Demonstration") print("#" * 60) test_protocol_encode_decode() + test_binary_data_handling() test_protocol_response() test_socket_communication() asyncio.run(test_async_communication())