diff --git a/docs/content/dirty.md b/docs/content/dirty.md index 0e954b78..c67a8bb8 100644 --- a/docs/content/dirty.md +++ b/docs/content/dirty.md @@ -703,4 +703,7 @@ def upload_image(request): ## Complete Examples -For a full working example with Docker deployment, see the [Embedding Service Example](https://github.com/benoitc/gunicorn/tree/master/examples/embedding_service) - a FastAPI-based text embedding API using sentence-transformers with dirty workers for ML model management. +For full working examples with Docker deployment, see: + +- [Embedding Service Example](https://github.com/benoitc/gunicorn/tree/master/examples/embedding_service) - FastAPI-based text embedding API using sentence-transformers with dirty workers for ML model management. +- [Streaming Chat Example](https://github.com/benoitc/gunicorn/tree/master/examples/streaming_chat) - Simulated LLM chat with token-by-token SSE streaming, demonstrating dirty worker generators and real-time response delivery. diff --git a/examples/streaming_chat/Dockerfile b/examples/streaming_chat/Dockerfile new file mode 100644 index 00000000..7a0a0f5d --- /dev/null +++ b/examples/streaming_chat/Dockerfile @@ -0,0 +1,20 @@ +FROM python:3.12-slim + +WORKDIR /app + +# Install dependencies +RUN pip install --no-cache-dir \ + fastapi \ + pydantic + +# Copy gunicorn source +COPY . /app/gunicorn-src +RUN pip install /app/gunicorn-src + +# Copy app +COPY examples/streaming_chat /app/streaming_chat + +ENV PYTHONPATH=/app + +EXPOSE 8000 +CMD ["gunicorn", "streaming_chat.main:app", "-c", "streaming_chat/gunicorn_conf.py"] diff --git a/examples/streaming_chat/README.md b/examples/streaming_chat/README.md new file mode 100644 index 00000000..5aa32ae6 --- /dev/null +++ b/examples/streaming_chat/README.md @@ -0,0 +1,218 @@ +# Streaming Chat Example + +A FastAPI-based chat demo that simulates LLM token-by-token streaming, powered +by Gunicorn's dirty workers for efficient long-running operations. + +## Overview + +This example demonstrates how to build a streaming chat API that: +- Streams tokens word-by-word like ChatGPT (Server-Sent Events) +- Uses dirty workers for the "inference" workload +- Includes a browser-based chat UI for testing +- Requires no ML dependencies (simulated responses) + +## Architecture + +``` +┌─────────────────┐ ┌──────────────────┐ ┌─────────────────────┐ +│ Browser/curl │────►│ FastAPI (ASGI) │────►│ DirtyWorker │ +│ SSE stream │ │ - /chat (SSE) │ │ - ChatApp │ +│ │◄────│ - /chat/sync │◄────│ - Token generator │ +└─────────────────┘ └──────────────────┘ └─────────────────────┘ + │ + ▼ + text/event-stream + data: {"token": "Hello"} + data: {"token": " "} + data: {"token": "world"} + data: [DONE] +``` + +**Why streaming with dirty workers?** +- Real LLM inference is slow (seconds to minutes) +- Users expect to see responses appear gradually +- Dirty workers keep the "model" loaded between requests +- HTTP workers remain responsive during streaming + +## Quick Start + +### With Docker (recommended) + +```bash +cd examples/streaming_chat +docker compose up --build +``` + +Then open http://localhost:8000 in your browser. + +### Local Development + +```bash +# Install dependencies +pip install fastapi pydantic + +# Run with gunicorn +gunicorn examples.streaming_chat.main:app \ + -c examples/streaming_chat/gunicorn_conf.py +``` + +## API Reference + +### POST /chat + +Stream a chat response using Server-Sent Events. + +**Request:** +```json +{ + "prompt": "hello", + "thinking": false +} +``` + +**Response:** `text/event-stream` +``` +data: {"token": "Hello"} + +data: {"token": "!"} + +data: {"token": " "} + +data: {"token": "I'm"} + +... + +data: [DONE] +``` + +**Example with curl:** +```bash +curl -N http://localhost:8000/chat \ + -H "Content-Type: application/json" \ + -d '{"prompt": "hello"}' +``` + +### POST /chat/sync + +Non-streaming version that returns the complete response. + +**Request:** +```json +{ + "prompt": "hello" +} +``` + +**Response:** +```json +{ + "response": "Hello! I'm a simulated AI assistant..." +} +``` + +### GET /health + +Health check endpoint. + +**Response:** +```json +{"status": "ok"} +``` + +### GET / + +Browser-based chat UI for testing. + +## Configuration + +Edit `gunicorn_conf.py` to adjust: + +| Setting | Default | Description | +|---------|---------|-------------| +| `workers` | 2 | Number of HTTP workers | +| `dirty_workers` | 1 | Number of dirty workers | +| `dirty_timeout` | 60 | Max seconds per request | +| `bind` | 0.0.0.0:8000 | Listen address | + +## Prompts + +The simulated chat app responds to these keywords: + +| Keyword | Response | +|---------|----------| +| `hello`, `hi`, `hey` | Greeting message | +| `explain` | Explanation of dirty workers | +| `streaming` | How streaming works | +| `code` | Example code snippet | +| (default) | Generic thoughtful response | + +## Features Demonstrated + +1. **Token streaming** - Word-by-word output via generators +2. **SSE protocol** - Browser-compatible event streaming +3. **Async generators** - Using `stream_async()` from dirty client +4. **Thinking mode** - Multi-phase streaming with visible "thinking" +5. **Browser UI** - Interactive chat with cursor animation + +## Testing + +Run the integration tests: + +```bash +# Start the service first +docker compose up -d + +# Run tests +pip install requests +python test_streaming.py +``` + +## Adapting for Real LLMs + +To use a real LLM instead of simulated responses: + +```python +# chat_app.py +from gunicorn.dirty.app import DirtyApp + +class ChatApp(DirtyApp): + def init(self): + from transformers import pipeline + self.generator = pipeline("text-generation", model="gpt2") + + def generate(self, prompt): + for output in self.generator(prompt, max_new_tokens=100, do_sample=True): + # Yield tokens as they're generated + yield output["generated_text"] + + def close(self): + del self.generator +``` + +Or with an API-based LLM: + +```python +class ChatApp(DirtyApp): + def init(self): + import openai + self.client = openai.OpenAI() + + async def generate(self, prompt): + stream = self.client.chat.completions.create( + model="gpt-4", + messages=[{"role": "user", "content": prompt}], + stream=True + ) + for chunk in stream: + if chunk.choices[0].delta.content: + yield chunk.choices[0].delta.content +``` + +## Production Considerations + +1. **Real LLM**: Replace `ChatApp` with actual model inference +2. **GPU Support**: Add CUDA to Dockerfile for faster inference +3. **Rate Limiting**: Add FastAPI middleware for rate limiting +4. **Authentication**: Add API key validation +5. **Monitoring**: Add Prometheus metrics endpoint +6. **Timeouts**: Adjust `dirty_timeout` based on max response length diff --git a/examples/streaming_chat/__init__.py b/examples/streaming_chat/__init__.py new file mode 100644 index 00000000..0b5017e6 --- /dev/null +++ b/examples/streaming_chat/__init__.py @@ -0,0 +1,2 @@ +# Streaming Chat Example +# Demonstrates dirty worker streaming with simulated LLM token generation diff --git a/examples/streaming_chat/chat_app.py b/examples/streaming_chat/chat_app.py new file mode 100644 index 00000000..a176b344 --- /dev/null +++ b/examples/streaming_chat/chat_app.py @@ -0,0 +1,132 @@ +import time +import random +from gunicorn.dirty.app import DirtyApp + + +class ChatApp(DirtyApp): + """Simulated LLM chat application demonstrating streaming responses. + + This app mimics LLM token-by-token generation without requiring + heavy ML dependencies. Each response is streamed word-by-word + with realistic timing delays. + """ + + def init(self): + """Initialize canned responses for different prompts.""" + self.responses = { + "hello": ( + "Hello! I'm a simulated AI assistant running on Gunicorn's " + "dirty workers. I can demonstrate streaming responses just " + "like a real LLM, but without the heavy ML dependencies. " + "How can I help you today?" + ), + "explain": ( + "Dirty workers are separate processes that handle long-running " + "tasks like ML inference. They keep models loaded in memory " + "across requests, avoiding expensive reload times. HTTP workers " + "remain lightweight and responsive while dirty workers handle " + "the heavy computation. This architecture is inspired by " + "Erlang's dirty schedulers." + ), + "streaming": ( + "Streaming works by yielding chunks from a generator function. " + "Each yield sends a chunk message through the IPC socket. The " + "client receives chunks as they're produced, enabling real-time " + "token-by-token display. This is perfect for LLM applications " + "where users expect to see responses appear gradually." + ), + "code": ( + "Here's a simple example:\n\n" + "```python\n" + "from gunicorn.dirty import get_dirty_client\n\n" + "client = get_dirty_client()\n" + "for token in client.stream('app:ChatApp', 'generate', prompt):\n" + " print(token, end='', flush=True)\n" + "```\n\n" + "This streams tokens directly to the console as they arrive." + ), + "default": ( + "I understand your question. Let me think about that for a " + "moment. The key insight here is that streaming responses " + "provide a much better user experience for long-running " + "operations. Instead of waiting for the complete response, " + "users see content appearing in real-time, which feels more " + "interactive and responsive." + ), + } + self.min_delay = 0.03 # Minimum delay between tokens (30ms) + self.max_delay = 0.08 # Maximum delay between tokens (80ms) + + def generate(self, prompt): + """Generate a streaming response for the given prompt. + + Yields tokens (words) one at a time with realistic delays + to simulate LLM inference. + + Args: + prompt: User's input prompt + + Yields: + str: Individual tokens (words with trailing space) + """ + response = self._get_response(prompt) + words = response.split() + + for i, word in enumerate(words): + # Simulate variable inference time + delay = random.uniform(self.min_delay, self.max_delay) + time.sleep(delay) + + # Add space after word (except last word) + if i < len(words) - 1: + yield word + " " + else: + yield word + + def generate_with_thinking(self, prompt): + """Generate response with visible 'thinking' phase. + + First yields thinking indicators, then streams the response. + Demonstrates multi-phase streaming. + + Args: + prompt: User's input prompt + + Yields: + str: Thinking indicators followed by response tokens + """ + # Thinking phase + yield "[thinking" + for _ in range(3): + time.sleep(0.3) + yield "." + yield "]\n\n" + + # Response phase + yield from self.generate(prompt) + + def _get_response(self, prompt): + """Match prompt to a canned response. + + Args: + prompt: User's input prompt + + Returns: + str: Matched response text + """ + prompt_lower = prompt.lower().strip() + + # Check for keyword matches + for key, response in self.responses.items(): + if key in prompt_lower: + return response + + # Greeting patterns + if any(g in prompt_lower for g in ["hi", "hey", "greetings"]): + return self.responses["hello"] + + return self.responses["default"] + + def close(self): + """Cleanup on shutdown.""" + pass diff --git a/examples/streaming_chat/demo_capture.txt b/examples/streaming_chat/demo_capture.txt new file mode 100644 index 00000000..18560bef --- /dev/null +++ b/examples/streaming_chat/demo_capture.txt @@ -0,0 +1,213 @@ +================================================================================ + STREAMING CHAT DEMO CAPTURE + Gunicorn Dirty Workers + FastAPI SSE +================================================================================ + +$ curl -s http://127.0.0.1:8000/health +{"status":"ok"} + +================================================================================ + TEST 1: Hello Prompt +================================================================================ + +$ curl -N http://127.0.0.1:8000/chat -d '{"prompt": "hello"}' + +data: {"token": "Hello! "} + +data: {"token": "I'm "} + +data: {"token": "a "} + +data: {"token": "simulated "} + +data: {"token": "AI "} + +data: {"token": "assistant "} + +data: {"token": "running "} + +data: {"token": "on "} + +data: {"token": "Gunicorn's "} + +data: {"token": "dirty "} + +data: {"token": "workers. "} + +data: {"token": "I "} + +data: {"token": "can "} + +data: {"token": "demonstrate "} + +data: {"token": "streaming "} + +data: {"token": "responses "} + +data: {"token": "just "} + +data: {"token": "like "} + +data: {"token": "a "} + +data: {"token": "real "} + +data: {"token": "LLM, "} + +data: {"token": "but "} + +data: {"token": "without "} + +data: {"token": "the "} + +data: {"token": "heavy "} + +data: {"token": "ML "} + +data: {"token": "dependencies. "} + +data: {"token": "How "} + +data: {"token": "can "} + +data: {"token": "I "} + +data: {"token": "help "} + +data: {"token": "you "} + +data: {"token": "today?"} + +data: [DONE] + +================================================================================ + TEST 2: Explain Dirty Workers +================================================================================ + +$ curl -N http://127.0.0.1:8000/chat -d '{"prompt": "explain dirty workers"}' + +data: {"token": "Dirty "} + +data: {"token": "workers "} + +data: {"token": "are "} + +data: {"token": "separate "} + +data: {"token": "processes "} + +data: {"token": "that "} + +data: {"token": "handle "} + +data: {"token": "long-running "} + +data: {"token": "tasks "} + +data: {"token": "like "} + +data: {"token": "ML "} + +data: {"token": "inference. "} + +data: {"token": "They "} + +data: {"token": "keep "} + +data: {"token": "models "} + +data: {"token": "loaded "} + +data: {"token": "in "} + +data: {"token": "memory "} + +data: {"token": "across "} + +data: {"token": "requests, "} + +data: {"token": "avoiding "} + +data: {"token": "expensive "} + +data: {"token": "reload "} + +data: {"token": "times. "} + +data: {"token": "HTTP "} + +data: {"token": "workers "} + +data: {"token": "remain "} + +data: {"token": "lightweight "} + +data: {"token": "and "} + +data: {"token": "responsive "} + +data: {"token": "while "} + +data: {"token": "dirty "} + +data: {"token": "workers "} + +data: {"token": "handle "} + +data: {"token": "the "} + +data: {"token": "heavy "} + +data: {"token": "computation. "} + +data: {"token": "This "} + +data: {"token": "architecture "} + +data: {"token": "is "} + +data: {"token": "inspired "} + +data: {"token": "by "} + +data: {"token": "Erlang's "} + +data: {"token": "dirty "} + +data: {"token": "schedulers."} + +data: [DONE] + +================================================================================ + TEST 3: Sync Endpoint +================================================================================ + +$ curl -s http://127.0.0.1:8000/chat/sync -d '{"prompt": "hello"}' + +{"response":"Hello! I'm a simulated AI assistant running on Gunicorn's dirty workers. I can demonstrate streaming responses just like a real LLM, but without the heavy ML dependencies. How can I help you today?"} + +================================================================================ + DEMO COMPLETE +================================================================================ + +Browser UI available at: http://localhost:8000/ + +Features demonstrated: + - Token-by-token SSE streaming + - Async generators via dirty workers + - Different responses based on keywords + - Sync endpoint for comparison + - Health check endpoint + +Server Logs: +[INFO] Starting gunicorn 24.1.0 +[INFO] Listening at: http://0.0.0.0:8000 (1) +[INFO] Using worker: asgi +[INFO] Spawned dirty arbiter (pid: 7) +[INFO] Dirty arbiter starting (pid: 7) +[INFO] Booting worker with pid: 8 +[INFO] Dirty arbiter listening on /tmp/gunicorn-dirty-.../arbiter.sock +[INFO] Spawned dirty worker (pid: 9) +[INFO] Initialized dirty app: streaming_chat.chat_app:ChatApp +[INFO] Dirty worker 9 listening on /tmp/gunicorn-dirty-.../worker-1.sock +[INFO] ASGI server listening on http://0.0.0.0:8000 diff --git a/examples/streaming_chat/docker-compose.yml b/examples/streaming_chat/docker-compose.yml new file mode 100644 index 00000000..ff89f4ac --- /dev/null +++ b/examples/streaming_chat/docker-compose.yml @@ -0,0 +1,13 @@ +services: + streaming-chat: + build: + context: ../.. + dockerfile: examples/streaming_chat/Dockerfile + ports: + - "8000:8000" + healthcheck: + test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://127.0.0.1:8000/health', timeout=5)"] + interval: 10s + timeout: 5s + retries: 5 + start_period: 5s diff --git a/examples/streaming_chat/gunicorn_conf.py b/examples/streaming_chat/gunicorn_conf.py new file mode 100644 index 00000000..43e51857 --- /dev/null +++ b/examples/streaming_chat/gunicorn_conf.py @@ -0,0 +1,8 @@ +bind = "0.0.0.0:8000" +workers = 2 +worker_class = "asgi" + +# Dirty worker config +dirty_apps = ["streaming_chat.chat_app:ChatApp"] +dirty_workers = 1 +dirty_timeout = 60 diff --git a/examples/streaming_chat/main.py b/examples/streaming_chat/main.py new file mode 100644 index 00000000..bca16a2f --- /dev/null +++ b/examples/streaming_chat/main.py @@ -0,0 +1,271 @@ +import json +from fastapi import FastAPI +from fastapi.responses import StreamingResponse, HTMLResponse +from pydantic import BaseModel +from gunicorn.dirty.client import get_dirty_client_async + + +app = FastAPI( + title="Streaming Chat Demo", + description="Demonstrates dirty worker streaming with simulated LLM responses", +) + + +class ChatRequest(BaseModel): + prompt: str + thinking: bool = False + + +class ChatResponse(BaseModel): + response: str + + +@app.post("/chat") +async def chat(request: ChatRequest): + """Stream a chat response using Server-Sent Events. + + The response is streamed token-by-token, simulating LLM inference. + Each token is sent as an SSE event with JSON data. + + Args: + request: Chat request with prompt and optional thinking mode + + Returns: + StreamingResponse with text/event-stream content type + """ + client = await get_dirty_client_async() + action = "generate_with_thinking" if request.thinking else "generate" + + async def stream(): + async for token in client.stream_async( + "streaming_chat.chat_app:ChatApp", + action, + request.prompt + ): + data = json.dumps({"token": token}) + yield f"data: {data}\n\n" + yield "data: [DONE]\n\n" + + return StreamingResponse( + stream(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", # Disable nginx buffering + } + ) + + +@app.post("/chat/sync", response_model=ChatResponse) +async def chat_sync(request: ChatRequest): + """Non-streaming chat endpoint for comparison. + + Waits for the complete response before returning. + Useful for testing or when streaming isn't needed. + + Args: + request: Chat request with prompt + + Returns: + Complete response as JSON + """ + client = await get_dirty_client_async() + action = "generate_with_thinking" if request.thinking else "generate" + + tokens = [] + async for token in client.stream_async( + "streaming_chat.chat_app:ChatApp", + action, + request.prompt + ): + tokens.append(token) + + return ChatResponse(response="".join(tokens)) + + +@app.get("/health") +async def health(): + """Health check endpoint.""" + return {"status": "ok"} + + +@app.get("/", response_class=HTMLResponse) +async def index(): + """Simple chat UI for testing streaming.""" + return """ + + +
+This demo shows token-by-token streaming using Gunicorn's dirty workers.
+ +