feat(examples): add FastAPI embedding service with Docker testing

Add a complete example demonstrating dirty workers with sentence-transformers
for text embeddings via FastAPI:

- EmbeddingApp DirtyApp that loads and manages the ML model
- FastAPI endpoints for /embed and /health
- Docker and docker-compose configuration
- Integration tests with numpy similarity checks
- GitHub Actions CI workflow
This commit is contained in:
Benoit Chesneau 2026-01-24 11:35:03 +01:00
parent ce2e06ceba
commit 0e05c824e9
10 changed files with 299 additions and 0 deletions

View File

@ -0,0 +1,42 @@
name: Embedding Service Integration Tests
on:
push:
paths:
- 'examples/embedding_service/**'
- 'gunicorn/dirty/**'
pull_request:
paths:
- 'examples/embedding_service/**'
- 'gunicorn/dirty/**'
jobs:
test:
runs-on: ubuntu-latest
timeout-minutes: 15
steps:
- uses: actions/checkout@v4
- name: Build and start service
run: |
cd examples/embedding_service
docker compose up -d --build
docker compose logs -f &
- name: Wait for healthy
run: |
for i in {1..30}; do
curl -s http://127.0.0.1:8000/health && break
sleep 2
done
- name: Run tests
run: |
pip install requests numpy
python examples/embedding_service/test_embedding.py
- name: Cleanup
if: always()
run: |
cd examples/embedding_service
docker compose down

View File

@ -0,0 +1,21 @@
FROM python:3.12-slim
WORKDIR /app
# Install dependencies
RUN pip install --no-cache-dir \
sentence-transformers \
fastapi \
pydantic
# Copy gunicorn source
COPY . /app/gunicorn-src
RUN pip install /app/gunicorn-src
# Copy app
COPY examples/embedding_service /app/embedding_service
ENV PYTHONPATH=/app
EXPOSE 8000
CMD ["gunicorn", "embedding_service.main:app", "-c", "embedding_service/gunicorn_conf.py"]

View File

@ -0,0 +1,133 @@
# Embedding Service Example
A FastAPI-based text embedding service using sentence-transformers, powered by
gunicorn's dirty workers for efficient ML model management.
## Overview
This example demonstrates how to build a production-ready embedding API that:
- Keeps ML models loaded in memory across requests (dirty workers)
- Handles HTTP efficiently with async FastAPI (ASGI workers)
- Provides batch embedding for multiple texts
- Includes Docker-based deployment and testing
## Architecture
```
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────────┐
│ HTTP Clients │────►│ FastAPI (ASGI) │────►│ DirtyWorker │
│ │ │ - /embed │ │ - sentence- │
│ │◄────│ - /health │◄────│ transformers │
└─────────────────┘ └──────────────────┘ │ - Model in memory │
└─────────────────────┘
```
**Why dirty workers?**
- ML models are expensive to load (several seconds)
- Dirty workers load the model once at startup
- HTTP workers remain lightweight and responsive
- Model stays in memory, serving many requests
## Quick Start
### With Docker (recommended)
```bash
cd examples/embedding_service
docker compose up --build
```
### Local Development
```bash
# Install dependencies
pip install sentence-transformers fastapi pydantic
# Run with gunicorn
gunicorn examples.embedding_service.main:app \
-c examples/embedding_service/gunicorn_conf.py
```
## API Reference
### POST /embed
Generate embeddings for a list of texts.
**Request:**
```json
{
"texts": ["Hello world", "Another sentence"]
}
```
**Response:**
```json
{
"embeddings": [
[0.123, -0.456, ...],
[0.789, -0.012, ...]
]
}
```
**Example:**
```bash
curl -X POST http://localhost:8000/embed \
-H "Content-Type: application/json" \
-d '{"texts": ["Hello world"]}'
```
### GET /health
Health check endpoint.
**Response:**
```json
{"status": "ok"}
```
## Configuration
Edit `gunicorn_conf.py` to adjust:
| Setting | Default | Description |
|---------|---------|-------------|
| `workers` | 2 | Number of HTTP workers |
| `dirty_workers` | 1 | Number of ML model workers |
| `dirty_timeout` | 60 | Max seconds per inference |
| `bind` | 0.0.0.0:8000 | Listen address |
## Model
Uses [all-MiniLM-L6-v2](https://huggingface.co/sentence-transformers/all-MiniLM-L6-v2):
- 384-dimensional embeddings
- Fast inference (~14K sentences/sec on GPU)
- Good quality for semantic search
- ~90MB download
To use a different model, edit `embedding_app.py`:
```python
self.model = SentenceTransformer('your-model-name')
```
## Testing
Run the integration tests:
```bash
# Start the service first
docker compose up -d
# Run tests
pip install requests numpy
python test_embedding.py
```
## Production Considerations
1. **GPU Support**: Add CUDA to the Dockerfile for faster inference
2. **Scaling**: Increase `dirty_workers` for more concurrent embeddings
3. **Caching**: Add Redis caching for repeated texts
4. **Rate Limiting**: Add FastAPI middleware for rate limiting
5. **Monitoring**: Add Prometheus metrics endpoint

View File

@ -0,0 +1 @@
# Embedding service package

View File

@ -0,0 +1,13 @@
services:
embedding-service:
build:
context: ../..
dockerfile: examples/embedding_service/Dockerfile
ports:
- "8000:8000"
healthcheck:
test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://127.0.0.1:8000/health', timeout=5)"]
interval: 10s
timeout: 5s
retries: 5
start_period: 30s # Model loading time

View File

@ -0,0 +1,14 @@
from gunicorn.dirty.app import DirtyApp
class EmbeddingApp(DirtyApp):
def init(self):
from sentence_transformers import SentenceTransformer
self.model = SentenceTransformer('all-MiniLM-L6-v2')
def embed(self, texts):
embeddings = self.model.encode(texts)
return embeddings.tolist()
def close(self):
del self.model

View File

@ -0,0 +1,8 @@
bind = "0.0.0.0:8000"
workers = 2
worker_class = "asgi"
# Dirty worker config
dirty_apps = ["embedding_service.embedding_app:EmbeddingApp"]
dirty_workers = 1
dirty_timeout = 60

View File

@ -0,0 +1,29 @@
from fastapi import FastAPI
from pydantic import BaseModel
from gunicorn.dirty.client import get_dirty_client
app = FastAPI()
class EmbedRequest(BaseModel):
texts: list[str]
class EmbedResponse(BaseModel):
embeddings: list[list[float]]
@app.post("/embed", response_model=EmbedResponse)
async def embed(request: EmbedRequest):
client = get_dirty_client()
result = client.execute(
"embedding_service.embedding_app:EmbeddingApp",
"embed",
request.texts
)
return EmbedResponse(embeddings=result)
@app.get("/health")
async def health():
return {"status": "ok"}

View File

@ -0,0 +1,5 @@
sentence-transformers
fastapi
pydantic
requests
numpy

View File

@ -0,0 +1,33 @@
import os
import requests
import numpy as np
def test_embedding_endpoint():
base_url = os.environ.get("EMBEDDING_SERVICE_URL", "http://127.0.0.1:8000")
url = f"{base_url}/embed"
# Test single text
response = requests.post(url, json={"texts": ["Hello world"]})
assert response.status_code == 200
data = response.json()
assert len(data["embeddings"]) == 1
assert len(data["embeddings"][0]) == 384 # MiniLM dimension
# Test batch
texts = ["First sentence", "Second sentence", "Third one"]
response = requests.post(url, json={"texts": texts})
assert response.status_code == 200
data = response.json()
assert len(data["embeddings"]) == 3
# Test similarity (same text = same embedding)
response = requests.post(url, json={"texts": ["test", "test"]})
emb1, emb2 = response.json()["embeddings"]
assert np.allclose(emb1, emb2, rtol=1e-5, atol=1e-6)
print("All tests passed!")
if __name__ == "__main__":
test_embedding_endpoint()