examples(dirty): add streaming chat demo with SSE

Add a lightweight chat simulator demonstrating dirty worker streaming:
- Token-by-token SSE streaming via async generators
- FastAPI endpoint with browser UI
- Multiple canned responses based on keywords
- Docker deployment with docker-compose
- Integration tests for SSE protocol

Update docs/content/dirty.md to link to both examples.
This commit is contained in:
Benoit Chesneau 2026-01-25 10:17:28 +01:00
parent ae8665c4d5
commit cc39ed922e
11 changed files with 1032 additions and 1 deletions

View File

@ -703,4 +703,7 @@ def upload_image(request):
## Complete Examples
For a full working example with Docker deployment, see the [Embedding Service Example](https://github.com/benoitc/gunicorn/tree/master/examples/embedding_service) - a FastAPI-based text embedding API using sentence-transformers with dirty workers for ML model management.
For full working examples with Docker deployment, see:
- [Embedding Service Example](https://github.com/benoitc/gunicorn/tree/master/examples/embedding_service) - FastAPI-based text embedding API using sentence-transformers with dirty workers for ML model management.
- [Streaming Chat Example](https://github.com/benoitc/gunicorn/tree/master/examples/streaming_chat) - Simulated LLM chat with token-by-token SSE streaming, demonstrating dirty worker generators and real-time response delivery.

View File

@ -0,0 +1,20 @@
FROM python:3.12-slim
WORKDIR /app
# Install dependencies
RUN pip install --no-cache-dir \
fastapi \
pydantic
# Copy gunicorn source
COPY . /app/gunicorn-src
RUN pip install /app/gunicorn-src
# Copy app
COPY examples/streaming_chat /app/streaming_chat
ENV PYTHONPATH=/app
EXPOSE 8000
CMD ["gunicorn", "streaming_chat.main:app", "-c", "streaming_chat/gunicorn_conf.py"]

View File

@ -0,0 +1,218 @@
# Streaming Chat Example
A FastAPI-based chat demo that simulates LLM token-by-token streaming, powered
by Gunicorn's dirty workers for efficient long-running operations.
## Overview
This example demonstrates how to build a streaming chat API that:
- Streams tokens word-by-word like ChatGPT (Server-Sent Events)
- Uses dirty workers for the "inference" workload
- Includes a browser-based chat UI for testing
- Requires no ML dependencies (simulated responses)
## Architecture
```
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────────┐
│ Browser/curl │────►│ FastAPI (ASGI) │────►│ DirtyWorker │
│ SSE stream │ │ - /chat (SSE) │ │ - ChatApp │
│ │◄────│ - /chat/sync │◄────│ - Token generator │
└─────────────────┘ └──────────────────┘ └─────────────────────┘
text/event-stream
data: {"token": "Hello"}
data: {"token": " "}
data: {"token": "world"}
data: [DONE]
```
**Why streaming with dirty workers?**
- Real LLM inference is slow (seconds to minutes)
- Users expect to see responses appear gradually
- Dirty workers keep the "model" loaded between requests
- HTTP workers remain responsive during streaming
## Quick Start
### With Docker (recommended)
```bash
cd examples/streaming_chat
docker compose up --build
```
Then open http://localhost:8000 in your browser.
### Local Development
```bash
# Install dependencies
pip install fastapi pydantic
# Run with gunicorn
gunicorn examples.streaming_chat.main:app \
-c examples/streaming_chat/gunicorn_conf.py
```
## API Reference
### POST /chat
Stream a chat response using Server-Sent Events.
**Request:**
```json
{
"prompt": "hello",
"thinking": false
}
```
**Response:** `text/event-stream`
```
data: {"token": "Hello"}
data: {"token": "!"}
data: {"token": " "}
data: {"token": "I'm"}
...
data: [DONE]
```
**Example with curl:**
```bash
curl -N http://localhost:8000/chat \
-H "Content-Type: application/json" \
-d '{"prompt": "hello"}'
```
### POST /chat/sync
Non-streaming version that returns the complete response.
**Request:**
```json
{
"prompt": "hello"
}
```
**Response:**
```json
{
"response": "Hello! I'm a simulated AI assistant..."
}
```
### GET /health
Health check endpoint.
**Response:**
```json
{"status": "ok"}
```
### GET /
Browser-based chat UI for testing.
## Configuration
Edit `gunicorn_conf.py` to adjust:
| Setting | Default | Description |
|---------|---------|-------------|
| `workers` | 2 | Number of HTTP workers |
| `dirty_workers` | 1 | Number of dirty workers |
| `dirty_timeout` | 60 | Max seconds per request |
| `bind` | 0.0.0.0:8000 | Listen address |
## Prompts
The simulated chat app responds to these keywords:
| Keyword | Response |
|---------|----------|
| `hello`, `hi`, `hey` | Greeting message |
| `explain` | Explanation of dirty workers |
| `streaming` | How streaming works |
| `code` | Example code snippet |
| (default) | Generic thoughtful response |
## Features Demonstrated
1. **Token streaming** - Word-by-word output via generators
2. **SSE protocol** - Browser-compatible event streaming
3. **Async generators** - Using `stream_async()` from dirty client
4. **Thinking mode** - Multi-phase streaming with visible "thinking"
5. **Browser UI** - Interactive chat with cursor animation
## Testing
Run the integration tests:
```bash
# Start the service first
docker compose up -d
# Run tests
pip install requests
python test_streaming.py
```
## Adapting for Real LLMs
To use a real LLM instead of simulated responses:
```python
# chat_app.py
from gunicorn.dirty.app import DirtyApp
class ChatApp(DirtyApp):
def init(self):
from transformers import pipeline
self.generator = pipeline("text-generation", model="gpt2")
def generate(self, prompt):
for output in self.generator(prompt, max_new_tokens=100, do_sample=True):
# Yield tokens as they're generated
yield output["generated_text"]
def close(self):
del self.generator
```
Or with an API-based LLM:
```python
class ChatApp(DirtyApp):
def init(self):
import openai
self.client = openai.OpenAI()
async def generate(self, prompt):
stream = self.client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
```
## Production Considerations
1. **Real LLM**: Replace `ChatApp` with actual model inference
2. **GPU Support**: Add CUDA to Dockerfile for faster inference
3. **Rate Limiting**: Add FastAPI middleware for rate limiting
4. **Authentication**: Add API key validation
5. **Monitoring**: Add Prometheus metrics endpoint
6. **Timeouts**: Adjust `dirty_timeout` based on max response length

View File

@ -0,0 +1,2 @@
# Streaming Chat Example
# Demonstrates dirty worker streaming with simulated LLM token generation

View File

@ -0,0 +1,132 @@
import time
import random
from gunicorn.dirty.app import DirtyApp
class ChatApp(DirtyApp):
"""Simulated LLM chat application demonstrating streaming responses.
This app mimics LLM token-by-token generation without requiring
heavy ML dependencies. Each response is streamed word-by-word
with realistic timing delays.
"""
def init(self):
"""Initialize canned responses for different prompts."""
self.responses = {
"hello": (
"Hello! I'm a simulated AI assistant running on Gunicorn's "
"dirty workers. I can demonstrate streaming responses just "
"like a real LLM, but without the heavy ML dependencies. "
"How can I help you today?"
),
"explain": (
"Dirty workers are separate processes that handle long-running "
"tasks like ML inference. They keep models loaded in memory "
"across requests, avoiding expensive reload times. HTTP workers "
"remain lightweight and responsive while dirty workers handle "
"the heavy computation. This architecture is inspired by "
"Erlang's dirty schedulers."
),
"streaming": (
"Streaming works by yielding chunks from a generator function. "
"Each yield sends a chunk message through the IPC socket. The "
"client receives chunks as they're produced, enabling real-time "
"token-by-token display. This is perfect for LLM applications "
"where users expect to see responses appear gradually."
),
"code": (
"Here's a simple example:\n\n"
"```python\n"
"from gunicorn.dirty import get_dirty_client\n\n"
"client = get_dirty_client()\n"
"for token in client.stream('app:ChatApp', 'generate', prompt):\n"
" print(token, end='', flush=True)\n"
"```\n\n"
"This streams tokens directly to the console as they arrive."
),
"default": (
"I understand your question. Let me think about that for a "
"moment. The key insight here is that streaming responses "
"provide a much better user experience for long-running "
"operations. Instead of waiting for the complete response, "
"users see content appearing in real-time, which feels more "
"interactive and responsive."
),
}
self.min_delay = 0.03 # Minimum delay between tokens (30ms)
self.max_delay = 0.08 # Maximum delay between tokens (80ms)
def generate(self, prompt):
"""Generate a streaming response for the given prompt.
Yields tokens (words) one at a time with realistic delays
to simulate LLM inference.
Args:
prompt: User's input prompt
Yields:
str: Individual tokens (words with trailing space)
"""
response = self._get_response(prompt)
words = response.split()
for i, word in enumerate(words):
# Simulate variable inference time
delay = random.uniform(self.min_delay, self.max_delay)
time.sleep(delay)
# Add space after word (except last word)
if i < len(words) - 1:
yield word + " "
else:
yield word
def generate_with_thinking(self, prompt):
"""Generate response with visible 'thinking' phase.
First yields thinking indicators, then streams the response.
Demonstrates multi-phase streaming.
Args:
prompt: User's input prompt
Yields:
str: Thinking indicators followed by response tokens
"""
# Thinking phase
yield "[thinking"
for _ in range(3):
time.sleep(0.3)
yield "."
yield "]\n\n"
# Response phase
yield from self.generate(prompt)
def _get_response(self, prompt):
"""Match prompt to a canned response.
Args:
prompt: User's input prompt
Returns:
str: Matched response text
"""
prompt_lower = prompt.lower().strip()
# Check for keyword matches
for key, response in self.responses.items():
if key in prompt_lower:
return response
# Greeting patterns
if any(g in prompt_lower for g in ["hi", "hey", "greetings"]):
return self.responses["hello"]
return self.responses["default"]
def close(self):
"""Cleanup on shutdown."""
pass

View File

@ -0,0 +1,213 @@
================================================================================
STREAMING CHAT DEMO CAPTURE
Gunicorn Dirty Workers + FastAPI SSE
================================================================================
$ curl -s http://127.0.0.1:8000/health
{"status":"ok"}
================================================================================
TEST 1: Hello Prompt
================================================================================
$ curl -N http://127.0.0.1:8000/chat -d '{"prompt": "hello"}'
data: {"token": "Hello! "}
data: {"token": "I'm "}
data: {"token": "a "}
data: {"token": "simulated "}
data: {"token": "AI "}
data: {"token": "assistant "}
data: {"token": "running "}
data: {"token": "on "}
data: {"token": "Gunicorn's "}
data: {"token": "dirty "}
data: {"token": "workers. "}
data: {"token": "I "}
data: {"token": "can "}
data: {"token": "demonstrate "}
data: {"token": "streaming "}
data: {"token": "responses "}
data: {"token": "just "}
data: {"token": "like "}
data: {"token": "a "}
data: {"token": "real "}
data: {"token": "LLM, "}
data: {"token": "but "}
data: {"token": "without "}
data: {"token": "the "}
data: {"token": "heavy "}
data: {"token": "ML "}
data: {"token": "dependencies. "}
data: {"token": "How "}
data: {"token": "can "}
data: {"token": "I "}
data: {"token": "help "}
data: {"token": "you "}
data: {"token": "today?"}
data: [DONE]
================================================================================
TEST 2: Explain Dirty Workers
================================================================================
$ curl -N http://127.0.0.1:8000/chat -d '{"prompt": "explain dirty workers"}'
data: {"token": "Dirty "}
data: {"token": "workers "}
data: {"token": "are "}
data: {"token": "separate "}
data: {"token": "processes "}
data: {"token": "that "}
data: {"token": "handle "}
data: {"token": "long-running "}
data: {"token": "tasks "}
data: {"token": "like "}
data: {"token": "ML "}
data: {"token": "inference. "}
data: {"token": "They "}
data: {"token": "keep "}
data: {"token": "models "}
data: {"token": "loaded "}
data: {"token": "in "}
data: {"token": "memory "}
data: {"token": "across "}
data: {"token": "requests, "}
data: {"token": "avoiding "}
data: {"token": "expensive "}
data: {"token": "reload "}
data: {"token": "times. "}
data: {"token": "HTTP "}
data: {"token": "workers "}
data: {"token": "remain "}
data: {"token": "lightweight "}
data: {"token": "and "}
data: {"token": "responsive "}
data: {"token": "while "}
data: {"token": "dirty "}
data: {"token": "workers "}
data: {"token": "handle "}
data: {"token": "the "}
data: {"token": "heavy "}
data: {"token": "computation. "}
data: {"token": "This "}
data: {"token": "architecture "}
data: {"token": "is "}
data: {"token": "inspired "}
data: {"token": "by "}
data: {"token": "Erlang's "}
data: {"token": "dirty "}
data: {"token": "schedulers."}
data: [DONE]
================================================================================
TEST 3: Sync Endpoint
================================================================================
$ curl -s http://127.0.0.1:8000/chat/sync -d '{"prompt": "hello"}'
{"response":"Hello! I'm a simulated AI assistant running on Gunicorn's dirty workers. I can demonstrate streaming responses just like a real LLM, but without the heavy ML dependencies. How can I help you today?"}
================================================================================
DEMO COMPLETE
================================================================================
Browser UI available at: http://localhost:8000/
Features demonstrated:
- Token-by-token SSE streaming
- Async generators via dirty workers
- Different responses based on keywords
- Sync endpoint for comparison
- Health check endpoint
Server Logs:
[INFO] Starting gunicorn 24.1.0
[INFO] Listening at: http://0.0.0.0:8000 (1)
[INFO] Using worker: asgi
[INFO] Spawned dirty arbiter (pid: 7)
[INFO] Dirty arbiter starting (pid: 7)
[INFO] Booting worker with pid: 8
[INFO] Dirty arbiter listening on /tmp/gunicorn-dirty-.../arbiter.sock
[INFO] Spawned dirty worker (pid: 9)
[INFO] Initialized dirty app: streaming_chat.chat_app:ChatApp
[INFO] Dirty worker 9 listening on /tmp/gunicorn-dirty-.../worker-1.sock
[INFO] ASGI server listening on http://0.0.0.0:8000

View File

@ -0,0 +1,13 @@
services:
streaming-chat:
build:
context: ../..
dockerfile: examples/streaming_chat/Dockerfile
ports:
- "8000:8000"
healthcheck:
test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://127.0.0.1:8000/health', timeout=5)"]
interval: 10s
timeout: 5s
retries: 5
start_period: 5s

View File

@ -0,0 +1,8 @@
bind = "0.0.0.0:8000"
workers = 2
worker_class = "asgi"
# Dirty worker config
dirty_apps = ["streaming_chat.chat_app:ChatApp"]
dirty_workers = 1
dirty_timeout = 60

View File

@ -0,0 +1,271 @@
import json
from fastapi import FastAPI
from fastapi.responses import StreamingResponse, HTMLResponse
from pydantic import BaseModel
from gunicorn.dirty.client import get_dirty_client_async
app = FastAPI(
title="Streaming Chat Demo",
description="Demonstrates dirty worker streaming with simulated LLM responses",
)
class ChatRequest(BaseModel):
prompt: str
thinking: bool = False
class ChatResponse(BaseModel):
response: str
@app.post("/chat")
async def chat(request: ChatRequest):
"""Stream a chat response using Server-Sent Events.
The response is streamed token-by-token, simulating LLM inference.
Each token is sent as an SSE event with JSON data.
Args:
request: Chat request with prompt and optional thinking mode
Returns:
StreamingResponse with text/event-stream content type
"""
client = await get_dirty_client_async()
action = "generate_with_thinking" if request.thinking else "generate"
async def stream():
async for token in client.stream_async(
"streaming_chat.chat_app:ChatApp",
action,
request.prompt
):
data = json.dumps({"token": token})
yield f"data: {data}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Disable nginx buffering
}
)
@app.post("/chat/sync", response_model=ChatResponse)
async def chat_sync(request: ChatRequest):
"""Non-streaming chat endpoint for comparison.
Waits for the complete response before returning.
Useful for testing or when streaming isn't needed.
Args:
request: Chat request with prompt
Returns:
Complete response as JSON
"""
client = await get_dirty_client_async()
action = "generate_with_thinking" if request.thinking else "generate"
tokens = []
async for token in client.stream_async(
"streaming_chat.chat_app:ChatApp",
action,
request.prompt
):
tokens.append(token)
return ChatResponse(response="".join(tokens))
@app.get("/health")
async def health():
"""Health check endpoint."""
return {"status": "ok"}
@app.get("/", response_class=HTMLResponse)
async def index():
"""Simple chat UI for testing streaming."""
return """
<!DOCTYPE html>
<html>
<head>
<title>Streaming Chat Demo</title>
<style>
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
max-width: 800px;
margin: 0 auto;
padding: 20px;
background: #1a1a2e;
color: #eee;
}
h1 { color: #00d9ff; }
.chat-container {
background: #16213e;
border-radius: 8px;
padding: 20px;
margin: 20px 0;
}
#response {
min-height: 100px;
padding: 15px;
background: #0f0f23;
border-radius: 4px;
white-space: pre-wrap;
font-family: 'Monaco', 'Menlo', monospace;
line-height: 1.6;
}
.input-group {
display: flex;
gap: 10px;
margin-top: 15px;
}
input[type="text"] {
flex: 1;
padding: 12px;
border: 1px solid #333;
border-radius: 4px;
background: #0f0f23;
color: #eee;
font-size: 16px;
}
button {
padding: 12px 24px;
background: #00d9ff;
color: #000;
border: none;
border-radius: 4px;
cursor: pointer;
font-weight: bold;
}
button:hover { background: #00b8d9; }
button:disabled { background: #555; cursor: not-allowed; }
.checkbox-group {
margin-top: 10px;
}
label { cursor: pointer; }
.suggestions {
margin-top: 15px;
display: flex;
flex-wrap: wrap;
gap: 8px;
}
.suggestion {
padding: 6px 12px;
background: #333;
border-radius: 4px;
cursor: pointer;
font-size: 14px;
}
.suggestion:hover { background: #444; }
.cursor {
display: inline-block;
width: 8px;
height: 18px;
background: #00d9ff;
animation: blink 1s infinite;
vertical-align: text-bottom;
}
@keyframes blink {
0%, 50% { opacity: 1; }
51%, 100% { opacity: 0; }
}
</style>
</head>
<body>
<h1>Streaming Chat Demo</h1>
<p>This demo shows token-by-token streaming using Gunicorn's dirty workers.</p>
<div class="chat-container">
<div id="response"></div>
<div class="input-group">
<input type="text" id="prompt" placeholder="Type a message..."
onkeypress="if(event.key==='Enter') sendMessage()">
<button onclick="sendMessage()" id="sendBtn">Send</button>
</div>
<div class="checkbox-group">
<label>
<input type="checkbox" id="thinking"> Show thinking phase
</label>
</div>
<div class="suggestions">
<span class="suggestion" onclick="setPrompt('hello')">hello</span>
<span class="suggestion" onclick="setPrompt('explain dirty workers')">explain</span>
<span class="suggestion" onclick="setPrompt('how does streaming work?')">streaming</span>
<span class="suggestion" onclick="setPrompt('show me code')">code</span>
</div>
</div>
<script>
function setPrompt(text) {
document.getElementById('prompt').value = text;
sendMessage();
}
async function sendMessage() {
const promptEl = document.getElementById('prompt');
const responseEl = document.getElementById('response');
const sendBtn = document.getElementById('sendBtn');
const thinking = document.getElementById('thinking').checked;
const prompt = promptEl.value.trim();
if (!prompt) return;
sendBtn.disabled = true;
responseEl.innerHTML = '<span class="cursor"></span>';
try {
const response = await fetch('/chat', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({prompt, thinking})
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let text = '';
while (true) {
const {done, value} = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') {
responseEl.textContent = text;
} else {
try {
const parsed = JSON.parse(data);
text += parsed.token;
responseEl.innerHTML = text + '<span class="cursor"></span>';
} catch (e) {}
}
}
}
}
} catch (error) {
responseEl.textContent = 'Error: ' + error.message;
}
sendBtn.disabled = false;
promptEl.value = '';
promptEl.focus();
}
document.getElementById('prompt').focus();
</script>
</body>
</html>
"""

View File

@ -0,0 +1,2 @@
fastapi>=0.100.0
pydantic>=2.0.0

View File

@ -0,0 +1,149 @@
"""Integration tests for the streaming chat example."""
import json
import os
import requests
def test_health_endpoint():
"""Test the health check endpoint."""
base_url = os.environ.get("STREAMING_CHAT_URL", "http://127.0.0.1:8000")
response = requests.get(f"{base_url}/health")
assert response.status_code == 200
assert response.json() == {"status": "ok"}
print("Health check: OK")
def test_streaming_chat():
"""Test that chat endpoint streams tokens via SSE."""
base_url = os.environ.get("STREAMING_CHAT_URL", "http://127.0.0.1:8000")
response = requests.post(
f"{base_url}/chat",
json={"prompt": "hello"},
stream=True,
headers={"Accept": "text/event-stream"}
)
assert response.status_code == 200
assert response.headers.get("content-type") == "text/event-stream; charset=utf-8"
tokens = []
for line in response.iter_lines(decode_unicode=True):
if line.startswith("data: "):
data = line[6:]
if data == "[DONE]":
break
parsed = json.loads(data)
tokens.append(parsed["token"])
# Verify we got multiple tokens (streaming worked)
assert len(tokens) > 1, f"Expected multiple tokens, got {len(tokens)}"
# Verify tokens form a coherent response
full_response = "".join(tokens)
assert len(full_response) > 10, "Response too short"
assert "Hello" in full_response or "hello" in full_response.lower()
print(f"Streaming chat: OK (received {len(tokens)} tokens)")
def test_sync_chat():
"""Test the non-streaming chat endpoint."""
base_url = os.environ.get("STREAMING_CHAT_URL", "http://127.0.0.1:8000")
response = requests.post(
f"{base_url}/chat/sync",
json={"prompt": "hello"}
)
assert response.status_code == 200
data = response.json()
assert "response" in data
assert len(data["response"]) > 10
print("Sync chat: OK")
def test_thinking_mode():
"""Test streaming with thinking phase enabled."""
base_url = os.environ.get("STREAMING_CHAT_URL", "http://127.0.0.1:8000")
response = requests.post(
f"{base_url}/chat",
json={"prompt": "hello", "thinking": True},
stream=True
)
assert response.status_code == 200
tokens = []
for line in response.iter_lines(decode_unicode=True):
if line.startswith("data: "):
data = line[6:]
if data == "[DONE]":
break
parsed = json.loads(data)
tokens.append(parsed["token"])
full_response = "".join(tokens)
assert "[thinking" in full_response, "Thinking phase not present"
assert "...]" in full_response or "..]\n" in full_response.replace(".", ""), \
"Thinking dots not present"
print("Thinking mode: OK")
def test_different_prompts():
"""Test that different prompts get different responses."""
base_url = os.environ.get("STREAMING_CHAT_URL", "http://127.0.0.1:8000")
prompts = ["hello", "explain dirty workers", "how does streaming work?"]
responses = []
for prompt in prompts:
response = requests.post(
f"{base_url}/chat/sync",
json={"prompt": prompt}
)
assert response.status_code == 200
responses.append(response.json()["response"])
# Verify responses are different
assert len(set(responses)) == len(responses), \
"Expected different responses for different prompts"
print("Different prompts: OK")
def test_sse_format():
"""Test that SSE format is correct."""
base_url = os.environ.get("STREAMING_CHAT_URL", "http://127.0.0.1:8000")
response = requests.post(
f"{base_url}/chat",
json={"prompt": "hello"},
stream=True
)
raw_lines = []
for line in response.iter_lines(decode_unicode=True):
raw_lines.append(line)
# Check SSE format: lines should be "data: ..." or empty
for line in raw_lines:
assert line == "" or line.startswith("data: "), \
f"Invalid SSE line: {line}"
# Should end with [DONE]
data_lines = [line for line in raw_lines if line.startswith("data: ")]
assert data_lines[-1] == "data: [DONE]", "Missing [DONE] terminator"
print("SSE format: OK")
if __name__ == "__main__":
test_health_endpoint()
test_streaming_chat()
test_sync_chat()
test_thinking_mode()
test_different_prompts()
test_sse_format()
print("\nAll tests passed!")