import json from fastapi import FastAPI from fastapi.responses import StreamingResponse, HTMLResponse from pydantic import BaseModel from gunicorn.dirty.client import get_dirty_client_async app = FastAPI( title="Streaming Chat Demo", description="Demonstrates dirty worker streaming with simulated LLM responses", ) class ChatRequest(BaseModel): prompt: str thinking: bool = False class ChatResponse(BaseModel): response: str @app.post("/chat") async def chat(request: ChatRequest): """Stream a chat response using Server-Sent Events. The response is streamed token-by-token, simulating LLM inference. Each token is sent as an SSE event with JSON data. Args: request: Chat request with prompt and optional thinking mode Returns: StreamingResponse with text/event-stream content type """ client = await get_dirty_client_async() action = "generate_with_thinking" if request.thinking else "generate" async def stream(): async for token in client.stream_async( "streaming_chat.chat_app:ChatApp", action, request.prompt ): data = json.dumps({"token": token}) yield f"data: {data}\n\n" yield "data: [DONE]\n\n" return StreamingResponse( stream(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", # Disable nginx buffering } ) @app.post("/chat/sync", response_model=ChatResponse) async def chat_sync(request: ChatRequest): """Non-streaming chat endpoint for comparison. Waits for the complete response before returning. Useful for testing or when streaming isn't needed. Args: request: Chat request with prompt Returns: Complete response as JSON """ client = await get_dirty_client_async() action = "generate_with_thinking" if request.thinking else "generate" tokens = [] async for token in client.stream_async( "streaming_chat.chat_app:ChatApp", action, request.prompt ): tokens.append(token) return ChatResponse(response="".join(tokens)) @app.get("/health") async def health(): """Health check endpoint.""" return {"status": "ok"} @app.get("/", response_class=HTMLResponse) async def index(): """Simple chat UI for testing streaming.""" return """
This demo shows token-by-token streaming using Gunicorn's dirty workers.