Merge pull request #3473 from benoitc/feature/per-app-worker-allocation

feat(dirty): add per-app worker allocation for memory optimization
This commit is contained in:
Benoit Chesneau 2026-02-01 09:06:05 +01:00 committed by GitHub
commit d5ab5dc6c1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 2202 additions and 33 deletions

View File

@ -16,6 +16,15 @@
- Lifecycle hooks: `on_dirty_starting`, `dirty_post_fork`,
`dirty_worker_init`, `dirty_worker_exit`
- **Per-App Worker Allocation for Dirty Arbiters**: Control how many dirty workers
load each app for memory optimization with heavy models
([PR #3473](https://github.com/benoitc/gunicorn/pull/3473))
- Set `workers` class attribute on DirtyApp (e.g., `workers = 2`)
- Or use config format `module:class:N` (e.g., `myapp:HeavyModel:2`)
- Requests automatically routed to workers with the target app
- New exception `DirtyNoWorkersAvailableError` for graceful error handling
- Example: 8 workers × 10GB model = 80GB → with `workers=2`: 20GB (75% savings)
- **HTTP/2 Support (Beta)**: Native HTTP/2 (RFC 7540) support for improved performance
with modern clients ([PR #3468](https://github.com/benoitc/gunicorn/pull/3468))
- Multiplexed streams over a single connection

View File

@ -89,8 +89,10 @@ This makes dirty apps ideal for ML inference, where loading a model once and reu
| | | | | |
+---+--------+---+-------+---+
|
All workers load all dirty apps
[MLApp, ImageApp, ...]
Workers load apps based on allocation
Worker 1: [MLApp, ImageApp, HeavyApp]
Worker 2: [MLApp, ImageApp, HeavyApp]
Worker 3: [MLApp, ImageApp] (HeavyApp workers=2)
```
### Process Relationships
@ -138,6 +140,133 @@ gunicorn myapp:app \
| `dirty_threads` | `1` | Threads per dirty worker |
| `dirty_graceful_timeout` | `30` | Graceful shutdown timeout |
## Per-App Worker Allocation
By default, all dirty workers load all configured apps. For apps that consume significant memory (like large ML models), you can limit how many workers load a specific app.
### Why Per-App Allocation?
Consider a scenario with a 10GB ML model and 8 dirty workers:
- **Default behavior**: 8 workers × 10GB = 80GB RAM
- **With `workers=2`**: 2 workers × 10GB = 20GB RAM (75% savings)
Requests for the limited app are routed only to workers that have it loaded.
### Configuration Methods
**Method 1: Class Attribute**
Set the `workers` attribute on your DirtyApp class:
```python
from gunicorn.dirty import DirtyApp
class HeavyModelApp(DirtyApp):
workers = 2 # Only 2 workers will load this app
def init(self):
self.model = load_10gb_model()
def predict(self, data):
return self.model.predict(data)
def close(self):
pass
```
**Method 2: Config Override**
Use the `module:class:N` format in your config:
```python
# gunicorn.conf.py
dirty_apps = [
"myapp.light:LightApp", # All workers (default)
"myapp.heavy:HeavyModelApp:2", # Only 2 workers
"myapp.single:SingletonApp:1", # Only 1 worker
]
dirty_workers = 4
```
Config overrides take precedence over class attributes.
### Worker Distribution
When workers spawn, apps are assigned based on their limits:
```
Example with dirty_workers=4:
- LightApp (workers=None): Loaded on workers 1, 2, 3, 4
- HeavyModelApp (workers=2): Loaded on workers 1, 2
- SingletonApp (workers=1): Loaded on worker 1
Worker 1: [LightApp, HeavyModelApp, SingletonApp]
Worker 2: [LightApp, HeavyModelApp]
Worker 3: [LightApp]
Worker 4: [LightApp]
```
### Request Routing
Requests are automatically routed to workers that have the target app:
```python
client = get_dirty_client()
# Goes to any of 4 workers (round-robin)
client.execute("myapp.light:LightApp", "action")
# Goes to worker 1 or 2 only (round-robin between those)
client.execute("myapp.heavy:HeavyModelApp", "predict", data)
# Always goes to worker 1
client.execute("myapp.single:SingletonApp", "process")
```
### Error Handling
If no workers have the requested app loaded, a `DirtyNoWorkersAvailableError` is raised:
```python
from gunicorn.dirty import get_dirty_client
from gunicorn.dirty.errors import DirtyNoWorkersAvailableError
def my_view(request):
client = get_dirty_client()
try:
result = client.execute("myapp.heavy:HeavyModelApp", "predict", data)
except DirtyNoWorkersAvailableError as e:
# All workers with this app are down or app not configured
return {"error": "Service temporarily unavailable", "app": e.app_path}
```
### Worker Crash Recovery
When a worker crashes, its replacement gets the **same apps** as the dead worker:
```
Timeline:
t=0: Worker 1 crashes (had HeavyModelApp)
t=1: Arbiter detects crash, queues respawn
t=2: New Worker 5 spawns with same apps as Worker 1
t=3: HeavyModelApp still available on Worker 2 during gap
```
This ensures:
- No memory redistribution on existing workers
- Predictable replacement behavior
- The heavy model is only loaded on the new worker
### Best Practices
1. **Set realistic limits** - Don't set `workers=1` unless truly necessary (single point of failure)
2. **Monitor memory** - Track per-worker memory to tune allocation
3. **Handle unavailability** - Catch `DirtyNoWorkersAvailableError` gracefully
4. **Use class attributes for app-specific limits** - Makes the limit part of the app definition
5. **Use config for deployment-specific overrides** - Different limits for dev vs prod
## Creating a Dirty App
Dirty apps inherit from `DirtyApp` and implement three methods:
@ -190,8 +319,9 @@ class MLApp(DirtyApp):
### DirtyApp Interface
| Method | Description |
|--------|-------------|
| Method/Attribute | Description |
|------------------|-------------|
| `workers` | Class attribute. Number of workers to load this app (`None` = all workers). |
| `init()` | Called once when dirty worker starts, after instantiation. Load resources here. |
| `__call__(action, *args, **kwargs)` | Handle requests from HTTP workers. |
| `close()` | Called when dirty worker shuts down. Cleanup resources. |
@ -604,12 +734,13 @@ watch -n 1 'pstree -p $(cat gunicorn.pid)'
The dirty client raises specific exceptions:
```python
from gunicorn.dirty import (
from gunicorn.dirty.errors import (
DirtyError,
DirtyTimeoutError,
DirtyConnectionError,
DirtyAppError,
DirtyAppNotFoundError,
DirtyNoWorkersAvailableError,
)
try:
@ -620,6 +751,9 @@ except DirtyTimeoutError:
except DirtyAppNotFoundError:
# App not loaded in dirty workers
pass
except DirtyNoWorkersAvailableError as e:
# No workers have this app (all crashed or app limited to 0 workers)
print(f"No workers for app: {e.app_path}")
except DirtyAppError as e:
# Error during app execution
print(f"App error: {e.message}, traceback: {e.traceback}")

View File

@ -208,17 +208,38 @@ DirtyArbiter and the exiting DirtyWorker.
Dirty applications to load in the dirty worker pool.
A list of application paths in pattern ``$(MODULE_NAME):$(CLASS_NAME)``.
A list of application paths in one of these formats:
- ``$(MODULE_NAME):$(CLASS_NAME)`` - all workers load this app
- ``$(MODULE_NAME):$(CLASS_NAME):$(N)`` - only N workers load this app
Each dirty app must be a class that inherits from ``DirtyApp`` base class
and implements the ``init()``, ``__call__()``, and ``close()`` methods.
Example::
dirty_apps = [
"myapp.ml:MLApp",
"myapp.images:ImageApp",
"myapp.ml:MLApp", # All workers load this
"myapp.images:ImageApp", # All workers load this
"myapp.heavy:HugeModel:2", # Only 2 workers load this
]
The per-app worker limit is useful for memory-intensive applications
like large ML models. Instead of all 8 workers loading a 10GB model
(80GB total), you can limit it to 2 workers (20GB total).
Alternatively, you can set the ``workers`` class attribute on your
DirtyApp subclass::
class HugeModelApp(DirtyApp):
workers = 2 # Only 2 workers load this app
def init(self):
self.model = load_10gb_model()
Note: The config format (``module:Class:N``) takes precedence over
the class attribute if both are specified.
Dirty apps are loaded once when the dirty worker starts and persist
in memory for the lifetime of the worker. This is ideal for loading
ML models, database connection pools, or other stateful resources
@ -226,6 +247,9 @@ that are expensive to initialize.
!!! info "Added in 25.0.0"
!!! info "Changed in 25.1.0"
Added per-app worker allocation via ``:N`` format suffix.
### `dirty_workers`
**Command line:** `--dirty-workers INT`

View File

@ -2,7 +2,7 @@
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
version_info = (24, 1, 1)
version_info = (25, 0, 0)
__version__ = ".".join([str(v) for v in version_info])
SERVER = "gunicorn"
SERVER_SOFTWARE = "%s/%s" % (SERVER, __version__)

View File

@ -2885,23 +2885,47 @@ class DirtyApps(Setting):
desc = """\
Dirty applications to load in the dirty worker pool.
A list of application paths in pattern ``$(MODULE_NAME):$(CLASS_NAME)``.
A list of application paths in one of these formats:
- ``$(MODULE_NAME):$(CLASS_NAME)`` - all workers load this app
- ``$(MODULE_NAME):$(CLASS_NAME):$(N)`` - only N workers load this app
Each dirty app must be a class that inherits from ``DirtyApp`` base class
and implements the ``init()``, ``__call__()``, and ``close()`` methods.
Example::
dirty_apps = [
"myapp.ml:MLApp",
"myapp.images:ImageApp",
"myapp.ml:MLApp", # All workers load this
"myapp.images:ImageApp", # All workers load this
"myapp.heavy:HugeModel:2", # Only 2 workers load this
]
The per-app worker limit is useful for memory-intensive applications
like large ML models. Instead of all 8 workers loading a 10GB model
(80GB total), you can limit it to 2 workers (20GB total).
Alternatively, you can set the ``workers`` class attribute on your
DirtyApp subclass::
class HugeModelApp(DirtyApp):
workers = 2 # Only 2 workers load this app
def init(self):
self.model = load_10gb_model()
Note: The config format (``module:Class:N``) takes precedence over
the class attribute if both are specified.
Dirty apps are loaded once when the dirty worker starts and persist
in memory for the lifetime of the worker. This is ideal for loading
ML models, database connection pools, or other stateful resources
that are expensive to initialize.
.. versionadded:: 25.0.0
.. versionchanged:: 25.1.0
Added per-app worker allocation via ``:N`` format suffix.
"""

View File

@ -72,12 +72,29 @@ class DirtyApp:
self.models[name] = load_model(name)
return {"loaded": True, "name": name}
Worker Allocation
-----------------
By default, all dirty workers load all apps. For apps that consume
significant memory (like large ML models), you can limit how many
workers load the app by setting the ``workers`` class attribute::
class HeavyModelApp(DirtyApp):
workers = 2 # Only 2 workers will load this app
def init(self):
self.model = load_10gb_model()
Subclasses should implement:
- init(): Called once at worker startup to initialize resources
- __call__(action, *args, **kwargs): Handle requests from HTTP workers
- close(): Called at worker shutdown to cleanup resources
"""
# Number of workers that should load this app.
# None means all workers (default, backward compatible).
# Set to an integer to limit how many workers load this app.
workers = None
def init(self):
"""
Initialize the application.
@ -120,6 +137,81 @@ class DirtyApp:
"""
def parse_dirty_app_spec(spec):
"""
Parse a dirty app specification.
Supports two formats:
- ``"module:Class"`` - standard format, all workers load the app
- ``"module:Class:N"`` - worker-limited format, only N workers load the app
Args:
spec: The app specification string
Returns:
tuple: (import_path, worker_count)
- import_path: The "module:Class" part for importing
- worker_count: Integer limit or None for all workers
Raises:
DirtyAppError: If the spec format is invalid or worker_count is < 1
Examples::
>>> parse_dirty_app_spec("myapp:App")
("myapp:App", None)
>>> parse_dirty_app_spec("myapp:App:2")
("myapp:App", 2)
>>> parse_dirty_app_spec("myapp.sub:App:1")
("myapp.sub:App", 1)
"""
if ':' not in spec:
raise DirtyAppError(
f"Invalid import path format: {spec}. "
f"Expected 'module.path:ClassName' or 'module.path:ClassName:N'",
app_path=spec
)
parts = spec.split(':')
# Standard format: "module:Class" or "module.sub:Class"
if len(parts) == 2:
return (spec, None)
# Worker-limited format: "module:Class:N"
if len(parts) == 3:
module_path, class_name, count_str = parts
import_path = f"{module_path}:{class_name}"
# Validate the worker count
try:
worker_count = int(count_str)
except ValueError:
raise DirtyAppError(
f"Invalid worker count in spec: {spec}. "
f"Expected integer, got '{count_str}'",
app_path=spec
)
if worker_count < 1:
raise DirtyAppError(
f"Invalid worker count in spec: {spec}. "
f"Worker count must be >= 1, got {worker_count}",
app_path=spec
)
return (import_path, worker_count)
# Too many colons
raise DirtyAppError(
f"Invalid import path format: {spec}. "
f"Expected 'module.path:ClassName' or 'module.path:ClassName:N'",
app_path=spec
)
def load_dirty_app(import_path):
"""
Load a dirty app class from an import path.
@ -203,3 +295,56 @@ def load_dirty_apps(import_paths):
for import_path in import_paths:
apps[import_path] = load_dirty_app(import_path)
return apps
def get_app_workers_attribute(import_path):
"""
Get the workers class attribute from a dirty app without instantiating it.
This is used by the arbiter to determine how many workers should load
an app based on the class attribute, without needing to actually load
the app.
Args:
import_path: String in format 'module.path:ClassName'
Returns:
The workers class attribute value (int or None)
Raises:
DirtyAppNotFoundError: If the module or class cannot be found
DirtyAppError: If the import path format is invalid
"""
if ':' not in import_path:
raise DirtyAppError(
f"Invalid import path format: {import_path}. "
f"Expected 'module.path:ClassName'",
app_path=import_path
)
module_path, class_name = import_path.rsplit(':', 1)
try:
# Import the module
if module_path in sys.modules:
module = sys.modules[module_path]
else:
module = importlib.import_module(module_path)
except ImportError as e:
raise DirtyAppNotFoundError(import_path) from e
# Get the class from the module
try:
app_class = getattr(module, class_name)
except AttributeError:
raise DirtyAppNotFoundError(import_path) from None
# Validate it's a class
if not isinstance(app_class, type):
raise DirtyAppError(
f"{import_path} is not a class",
app_path=import_path
)
# Return the workers attribute (defaults to None if not set)
return getattr(app_class, 'workers', None)

View File

@ -19,7 +19,13 @@ import time
from gunicorn import util
from .errors import DirtyError, DirtyTimeoutError, DirtyWorkerError
from .app import get_app_workers_attribute, parse_dirty_app_spec
from .errors import (
DirtyError,
DirtyNoWorkersAvailableError,
DirtyTimeoutError,
DirtyWorkerError,
)
from .protocol import (
DirtyProtocol,
make_error_response,
@ -79,6 +85,117 @@ class DirtyArbiter:
self._loop = None
self._pending_requests = {} # request_id -> Future
# Per-app worker allocation tracking
# Maps import_path -> {import_path, worker_count, original_spec}
self.app_specs = {}
# Maps import_path -> set of worker PIDs that have loaded the app
self.app_worker_map = {}
# Maps worker_pid -> list of import_paths loaded by this worker
self.worker_app_map = {}
# Per-app round-robin indices for routing
self._app_rr_indices = {}
# Queue of app lists from dead workers to respawn with same apps
self._pending_respawns = []
# Parse app specs on init
self._parse_app_specs()
def _parse_app_specs(self):
"""
Parse all app specifications from config.
Populates self.app_specs with parsed information about each app,
including the import path and worker count limits.
Worker count priority:
1. Config override (e.g., "module:Class:2") - highest priority
2. Class attribute (e.g., workers = 2 on the class)
3. None (all workers) - default
"""
for spec in self.cfg.dirty_apps:
import_path, worker_count = parse_dirty_app_spec(spec)
# If no config override, check class attribute
if worker_count is None:
try:
worker_count = get_app_workers_attribute(import_path)
except Exception as e:
# Log but don't fail - we'll discover the error when loading
self.log.warning(
"Could not read workers attribute from %s: %s",
import_path, e
)
self.app_specs[import_path] = {
'import_path': import_path,
'worker_count': worker_count,
'original_spec': spec,
}
# Initialize the app_worker_map for this app
self.app_worker_map[import_path] = set()
def _get_apps_for_new_worker(self):
"""
Determine which apps a new worker should load.
Returns a list of import paths for apps that need more workers.
Apps with workers=None (all workers) are always included.
Apps with worker limits are included only if they haven't
reached their limit yet.
Returns:
List of import paths to load, or empty list if no apps need workers
"""
app_paths = []
for import_path, spec in self.app_specs.items():
worker_count = spec['worker_count']
current_workers = len(self.app_worker_map.get(import_path, set()))
# None means all workers should load this app
if worker_count is None:
app_paths.append(import_path)
# Otherwise check if we've reached the limit
elif current_workers < worker_count:
app_paths.append(import_path)
return app_paths
def _register_worker_apps(self, worker_pid, app_paths):
"""
Register which apps a worker has loaded.
Updates both app_worker_map and worker_app_map to track the
bidirectional relationship between workers and apps.
Args:
worker_pid: The PID of the worker
app_paths: List of app import paths loaded by this worker
"""
self.worker_app_map[worker_pid] = list(app_paths)
for app_path in app_paths:
if app_path not in self.app_worker_map:
self.app_worker_map[app_path] = set()
self.app_worker_map[app_path].add(worker_pid)
def _unregister_worker(self, worker_pid):
"""
Unregister a worker's apps when it exits.
Removes the worker from all tracking maps.
Args:
worker_pid: The PID of the worker to unregister
"""
# Get the apps this worker had
app_paths = self.worker_app_map.pop(worker_pid, [])
# Remove worker from each app's worker set
for app_path in app_paths:
if app_path in self.app_worker_map:
self.app_worker_map[app_path].discard(worker_pid)
def run(self):
"""Run the dirty arbiter (blocking call)."""
self.pid = os.getpid()
@ -256,14 +373,20 @@ class DirtyArbiter:
client_writer: StreamWriter to send responses to client
"""
request_id = request.get("id", "unknown")
app_path = request.get("app_path")
# Find an available worker
worker_pid = await self._get_available_worker()
# Find an available worker (filtered by app if specified)
worker_pid = await self._get_available_worker(app_path)
if worker_pid is None:
response = make_error_response(
request_id,
DirtyError("No dirty workers available")
)
# Distinguish between no workers at all vs. no workers for this app
if not self.workers:
error = DirtyError("No dirty workers available")
elif app_path and self.app_specs:
# Per-app allocation is configured and no workers have this app
error = DirtyNoWorkersAvailableError(app_path)
else:
error = DirtyError("No dirty workers available")
response = make_error_response(request_id, error)
await DirtyProtocol.write_message_async(client_writer, response)
return
@ -373,20 +496,47 @@ class DirtyArbiter:
)
await DirtyProtocol.write_message_async(client_writer, response)
async def _get_available_worker(self):
async def _get_available_worker(self, app_path=None):
"""
Get an available worker PID using round-robin selection.
Distributes requests across all available workers evenly to
maximize throughput when multiple workers are configured.
If app_path is provided, only returns workers that have loaded
that specific app. Uses per-app round-robin to ensure fair
distribution among eligible workers.
Args:
app_path: Optional import path of the target app. If None,
returns any worker using global round-robin.
Returns:
Worker PID or None if no eligible workers are available.
"""
worker_pids = list(self.workers.keys())
if not worker_pids:
# Determine eligible workers
if app_path and self.app_specs:
# Per-app allocation is configured - must return a worker
# that has this specific app
if app_path in self.app_worker_map:
eligible_pids = list(self.app_worker_map[app_path])
else:
# App not known or no workers have it
return None
else:
# No specific app requested, or no app specs configured
# (backward compatible) - any worker will do
eligible_pids = list(self.workers.keys())
if not eligible_pids:
return None
# Round-robin selection
self._worker_rr_index = (self._worker_rr_index + 1) % len(worker_pids)
return worker_pids[self._worker_rr_index]
# Per-app round-robin for fairness
if app_path and self.app_specs:
idx = self._app_rr_indices.get(app_path, 0)
self._app_rr_indices[app_path] = (idx + 1) % len(eligible_pids)
else:
idx = self._worker_rr_index
self._worker_rr_index = (idx + 1) % len(eligible_pids)
return eligible_pids[idx % len(eligible_pids)]
async def _get_worker_connection(self, worker_pid):
"""Get or create connection to a worker."""
@ -424,7 +574,10 @@ class DirtyArbiter:
# Spawn workers if needed
while self.alive and len(self.workers) < num_workers:
self.spawn_worker()
result = self.spawn_worker()
if result is None:
# No apps need more workers - stop spawning
break
await asyncio.sleep(0.1)
# Kill excess workers
@ -436,7 +589,27 @@ class DirtyArbiter:
await asyncio.sleep(0.1)
def spawn_worker(self):
"""Spawn a new dirty worker."""
"""
Spawn a new dirty worker.
Worker app assignment follows these priorities:
1. If there are pending respawns (from dead workers), use those apps
2. Otherwise, determine apps for a new worker based on allocation
Returns:
Worker PID in parent process, or None if no apps need workers
"""
# Priority 1: Respawn dead worker with same apps
if self._pending_respawns:
app_paths = self._pending_respawns.pop(0)
else:
# Priority 2: New worker for initial pool
app_paths = self._get_apps_for_new_worker()
if not app_paths:
self.log.warning("No apps need more workers, skipping spawn")
return None
self.worker_age += 1
socket_path = os.path.join(
self.tmpdir, f"worker-{self.worker_age}.sock"
@ -445,7 +618,7 @@ class DirtyArbiter:
worker = DirtyWorker(
age=self.worker_age,
ppid=self.pid,
app_paths=self.cfg.dirty_apps,
app_paths=app_paths, # Only assigned apps, not all apps
cfg=self.cfg,
log=self.log,
socket_path=socket_path
@ -457,8 +630,13 @@ class DirtyArbiter:
worker.pid = pid
self.workers[pid] = worker
self.worker_sockets[pid] = socket_path
# Register which apps this worker has
self._register_worker_apps(pid, app_paths)
self.cfg.dirty_post_fork(self, worker)
self.log.info("Spawned dirty worker (pid: %s)", pid)
self.log.info("Spawned dirty worker (pid: %s) with apps: %s",
pid, app_paths)
return pid
# Child process
@ -484,7 +662,12 @@ class DirtyArbiter:
self._cleanup_worker(pid)
def _cleanup_worker(self, pid):
"""Clean up after a worker exits."""
"""
Clean up after a worker exits.
Saves the dead worker's app list to pending respawns so the
replacement worker gets the same apps.
"""
self._close_worker_connection(pid)
# Cancel consumer task
@ -495,6 +678,15 @@ class DirtyArbiter:
# Remove queue
self.worker_queues.pop(pid, None)
# Save dead worker's apps for respawn BEFORE unregistering
if pid in self.worker_app_map:
dead_apps = list(self.worker_app_map[pid])
if dead_apps:
self._pending_respawns.append(dead_apps)
# Now safe to unregister the worker's apps
self._unregister_worker(pid)
worker = self.workers.pop(pid, None)
if worker:
self.cfg.dirty_worker_exit(self, worker)

View File

@ -46,6 +46,7 @@ class DirtyError(Exception):
"DirtyWorkerError": DirtyWorkerError,
"DirtyAppError": DirtyAppError,
"DirtyAppNotFoundError": DirtyAppNotFoundError,
"DirtyNoWorkersAvailableError": DirtyNoWorkersAvailableError,
"DirtyProtocolError": DirtyProtocolError,
}
error_type = data.get("error_type", "DirtyError")
@ -70,6 +71,8 @@ class DirtyError(Exception):
error.app_path = error.details.get("app_path")
error.action = error.details.get("action")
error.traceback = error.details.get("traceback")
elif error_class == DirtyNoWorkersAvailableError:
error.app_path = error.details.get("app_path")
return error
@ -130,6 +133,40 @@ class DirtyAppNotFoundError(DirtyAppError):
super().__init__(f"Dirty app not found: {app_path}", app_path=app_path)
class DirtyNoWorkersAvailableError(DirtyError):
"""
Raised when no workers are available for the requested app.
This exception is raised when a request targets an app that has
worker limits configured, and no workers with that app are currently
available (e.g., all workers for that app crashed and haven't been
respawned yet).
Web applications can catch this exception to provide graceful
degradation, such as queuing requests for retry or showing a
maintenance page.
Example::
from gunicorn.dirty import get_dirty_client
from gunicorn.dirty.errors import DirtyNoWorkersAvailableError
def my_view(request):
client = get_dirty_client()
try:
result = client.execute("myapp.ml:HeavyModel", "predict", data)
except DirtyNoWorkersAvailableError as e:
return {"error": "Service temporarily unavailable",
"app": e.app_path}
"""
def __init__(self, app_path, message=None):
if message is None:
message = f"No workers available for app: {app_path}"
super().__init__(message, details={"app_path": app_path})
self.app_path = app_path
class DirtyProtocolError(DirtyError):
"""Raised when there is a protocol-level error."""

View File

@ -0,0 +1,324 @@
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
"""Integration tests for per-app worker allocation."""
import pytest
from gunicorn.config import Config
from gunicorn.dirty.arbiter import DirtyArbiter
class MockLog:
"""Mock logger for testing."""
def __init__(self):
self.messages = []
def debug(self, msg, *args):
self.messages.append(("debug", msg % args if args else msg))
def info(self, msg, *args):
self.messages.append(("info", msg % args if args else msg))
def warning(self, msg, *args):
self.messages.append(("warning", msg % args if args else msg))
def error(self, msg, *args):
self.messages.append(("error", msg % args if args else msg))
def critical(self, msg, *args):
self.messages.append(("critical", msg % args if args else msg))
def exception(self, msg, *args):
self.messages.append(("exception", msg % args if args else msg))
def close_on_exec(self):
pass
def reopen_files(self):
pass
class TestPerAppWorkerAllocation:
"""Integration tests for per-app worker allocation."""
def test_heavy_app_loaded_on_limited_workers(self):
"""App with workers=2 only loaded on 2 of 4 workers."""
cfg = Config()
cfg.set("dirty_workers", 4)
cfg.set("dirty_apps", [
"tests.support_dirty_app:TestDirtyApp", # unlimited
"tests.support_dirty_app:SlowDirtyApp:2", # limited to 2
])
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
# Simulate spawning 4 workers
for i in range(4):
apps = arbiter._get_apps_for_new_worker()
arbiter._register_worker_apps(1000 + i, apps)
# Check distribution
unlimited_app = "tests.support_dirty_app:TestDirtyApp"
limited_app = "tests.support_dirty_app:SlowDirtyApp"
# Unlimited app should be on all 4 workers
assert len(arbiter.app_worker_map[unlimited_app]) == 4
# Limited app should only be on 2 workers
assert len(arbiter.app_worker_map[limited_app]) == 2
arbiter._cleanup_sync()
def test_light_app_loaded_on_all_workers(self):
"""App with workers=None loaded on all workers."""
cfg = Config()
cfg.set("dirty_workers", 4)
cfg.set("dirty_apps", [
"tests.support_dirty_app:TestDirtyApp",
])
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
# Simulate spawning 4 workers
for i in range(4):
apps = arbiter._get_apps_for_new_worker()
arbiter._register_worker_apps(1000 + i, apps)
# App should be on all 4 workers
app_path = "tests.support_dirty_app:TestDirtyApp"
assert len(arbiter.app_worker_map[app_path]) == 4
arbiter._cleanup_sync()
def test_mixed_apps_correct_distribution(self):
"""Mix of limited and unlimited apps distributed correctly."""
cfg = Config()
cfg.set("dirty_workers", 4)
cfg.set("dirty_apps", [
"tests.support_dirty_app:TestDirtyApp", # unlimited
"tests.support_dirty_app:SlowDirtyApp:1", # limited to 1
])
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
# Simulate spawning 4 workers
for i in range(4):
apps = arbiter._get_apps_for_new_worker()
arbiter._register_worker_apps(1000 + i, apps)
unlimited_app = "tests.support_dirty_app:TestDirtyApp"
limited_app = "tests.support_dirty_app:SlowDirtyApp"
# Unlimited app on all workers
assert len(arbiter.app_worker_map[unlimited_app]) == 4
# Limited app on only 1 worker
assert len(arbiter.app_worker_map[limited_app]) == 1
arbiter._cleanup_sync()
@pytest.mark.asyncio
async def test_request_routing_respects_allocation(self):
"""Requests only routed to workers with the target app."""
cfg = Config()
cfg.set("dirty_apps", [
"tests.support_dirty_app:TestDirtyApp",
"tests.support_dirty_app:SlowDirtyApp:1",
])
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
# Set up workers
arbiter.workers[1001] = "worker1"
arbiter.workers[1002] = "worker2"
# Worker 1001 has both apps, worker 1002 has only TestDirtyApp
arbiter._register_worker_apps(1001, [
"tests.support_dirty_app:TestDirtyApp",
"tests.support_dirty_app:SlowDirtyApp",
])
arbiter._register_worker_apps(1002, [
"tests.support_dirty_app:TestDirtyApp",
])
# Request for SlowDirtyApp should only go to worker 1001
worker = await arbiter._get_available_worker("tests.support_dirty_app:SlowDirtyApp")
assert worker == 1001
# Request for TestDirtyApp should go to either
worker = await arbiter._get_available_worker("tests.support_dirty_app:TestDirtyApp")
assert worker in [1001, 1002]
arbiter._cleanup_sync()
def test_worker_crash_app_reassigned_to_new_worker(self):
"""When worker dies, new worker gets the app it had."""
cfg = Config()
cfg.set("dirty_apps", [
"tests.support_dirty_app:TestDirtyApp",
"tests.support_dirty_app:SlowDirtyApp:1",
])
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
arbiter.pid = 12345
# Set up initial workers
arbiter.workers[1001] = "worker1"
arbiter.worker_sockets[1001] = "/tmp/fake1.sock"
# Worker 1001 has both apps
arbiter._register_worker_apps(1001, [
"tests.support_dirty_app:TestDirtyApp",
"tests.support_dirty_app:SlowDirtyApp",
])
# Simulate worker crash
arbiter._cleanup_worker(1001)
# Apps should be queued for respawn
assert len(arbiter._pending_respawns) == 1
pending_apps = arbiter._pending_respawns[0]
assert "tests.support_dirty_app:TestDirtyApp" in pending_apps
assert "tests.support_dirty_app:SlowDirtyApp" in pending_apps
arbiter._cleanup_sync()
@pytest.mark.asyncio
async def test_worker_crash_other_workers_still_serve_app(self):
"""When one of two workers dies, other still serves requests."""
cfg = Config()
cfg.set("dirty_apps", [
"tests.support_dirty_app:TestDirtyApp",
])
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
arbiter.pid = 12345
# Set up two workers for the same app
arbiter.workers[1001] = "worker1"
arbiter.worker_sockets[1001] = "/tmp/fake1.sock"
arbiter.workers[1002] = "worker2"
arbiter.worker_sockets[1002] = "/tmp/fake2.sock"
app_path = "tests.support_dirty_app:TestDirtyApp"
arbiter._register_worker_apps(1001, [app_path])
arbiter._register_worker_apps(1002, [app_path])
# Both workers serve the app
assert len(arbiter.app_worker_map[app_path]) == 2
# Worker 1001 crashes
arbiter._cleanup_worker(1001)
# Worker 1002 still serves requests
assert len(arbiter.app_worker_map[app_path]) == 1
assert 1002 in arbiter.app_worker_map[app_path]
worker = await arbiter._get_available_worker(app_path)
assert worker == 1002
arbiter._cleanup_sync()
@pytest.mark.asyncio
async def test_worker_crash_sole_worker_app_unavailable_until_respawn(self):
"""When sole worker for app dies, requests fail until respawn."""
cfg = Config()
cfg.set("dirty_apps", [
"tests.support_dirty_app:SlowDirtyApp:1",
])
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
arbiter.pid = 12345
# Only one worker for this app
arbiter.workers[1001] = "worker1"
arbiter.worker_sockets[1001] = "/tmp/fake1.sock"
app_path = "tests.support_dirty_app:SlowDirtyApp"
arbiter._register_worker_apps(1001, [app_path])
# Worker crashes
arbiter._cleanup_worker(1001)
# No workers available for the app
worker = await arbiter._get_available_worker(app_path)
assert worker is None
arbiter._cleanup_sync()
def test_config_format_module_class_n(self):
"""Config 'mod:Class:2' correctly limits to 2 workers."""
cfg = Config()
cfg.set("dirty_apps", [
"tests.support_dirty_app:TestDirtyApp:2",
])
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
# Check parsed spec
app_path = "tests.support_dirty_app:TestDirtyApp"
assert arbiter.app_specs[app_path]["worker_count"] == 2
arbiter._cleanup_sync()
def test_class_attribute_workers_detected(self):
"""App with workers=2 class attribute is detected by arbiter."""
cfg = Config()
cfg.set("dirty_workers", 4)
cfg.set("dirty_apps", [
"tests.support_dirty_app:HeavyModelApp", # Has workers=2 class attr
])
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
# Check parsed spec - should read workers=2 from class
app_path = "tests.support_dirty_app:HeavyModelApp"
assert arbiter.app_specs[app_path]["worker_count"] == 2
# Simulate spawning 4 workers
for i in range(4):
apps = arbiter._get_apps_for_new_worker()
arbiter._register_worker_apps(1000 + i, apps)
# HeavyModelApp should only be on 2 workers
assert len(arbiter.app_worker_map[app_path]) == 2
arbiter._cleanup_sync()
def test_config_override_takes_precedence_over_class_attribute(self):
"""Config :N takes precedence over class workers attribute."""
cfg = Config()
cfg.set("dirty_workers", 4)
cfg.set("dirty_apps", [
# HeavyModelApp has workers=2, but config says 1
"tests.support_dirty_app:HeavyModelApp:1",
])
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
# Config override (1) should take precedence
app_path = "tests.support_dirty_app:HeavyModelApp"
assert arbiter.app_specs[app_path]["worker_count"] == 1
# Simulate spawning 4 workers
for i in range(4):
apps = arbiter._get_apps_for_new_worker()
arbiter._register_worker_apps(1000 + i, apps)
# Should only be on 1 worker (config override)
assert len(arbiter.app_worker_map[app_path]) == 1
arbiter._cleanup_sync()

View File

@ -0,0 +1,20 @@
FROM python:3.12-slim
WORKDIR /app
# Copy gunicorn source
COPY . /app/gunicorn-src
# Install gunicorn and test dependencies
# setproctitle is needed for process title changes (master, dirty-arbiter, etc.)
RUN pip install --no-cache-dir /app/gunicorn-src pytest requests setproctitle
# Copy test app files
COPY tests/docker/per_app_allocation/app.py /app/
COPY tests/docker/per_app_allocation/gunicorn_conf.py /app/
# Install procps for process inspection and curl for healthcheck
RUN apt-get update && apt-get install -y procps curl && rm -rf /var/lib/apt/lists/*
# Default command - run gunicorn
CMD ["gunicorn", "app:application", "-c", "gunicorn_conf.py"]

View File

@ -0,0 +1,62 @@
# Per-App Worker Allocation E2E Tests
End-to-end Docker-based tests for the per-app worker allocation feature.
## Overview
These tests verify that:
- Apps with worker limits are only loaded on the specified number of workers
- Requests are routed only to workers that have the target app loaded
- Round-robin distribution works correctly within limited worker sets
- Worker crash scenarios maintain correct app allocation
- Class attribute `workers=N` is respected
- Config-based `:N` overrides class attributes
## Configuration
The tests use 4 dirty workers with 3 apps:
- **LightweightApp**: No limit (loads on all 4 workers)
- **HeavyApp**: `workers=2` class attribute (loads on 2 workers)
- **ConfigLimitedApp**: `:1` config (loads on 1 worker)
## Running Tests
```bash
# From this directory
cd tests/docker/per_app_allocation
# Build the Docker image
docker compose build
# Run all tests
pytest test_per_app_e2e.py -v
# Run specific test
pytest test_per_app_e2e.py::TestPerAppAllocation::test_config_limited_app_uses_one_worker -v
```
## Test Categories
### TestPerAppAllocation
- Tests basic functionality of per-app worker allocation
- Verifies round-robin distribution
- Tests app accessibility
### TestPerAppWorkerCrash
- Tests behavior when workers crash
- Verifies app recovery after worker respawn
### TestPerAppLogs
- Verifies logging output contains expected information
## Requirements
- Docker and Docker Compose
- Python 3.8+
- pytest
- requests
## Notes
- Tests run on port 8001 to avoid conflicts with the existing dirty_arbiter tests on 8000
- The container uses a keep-alive wrapper to allow testing worker crash scenarios

View File

@ -0,0 +1,184 @@
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
"""
WSGI and Dirty applications for per-app worker allocation testing.
Contains:
- A WSGI app that can make dirty client requests
- A lightweight dirty app (loads on all workers)
- A heavy dirty app (limited to 2 workers via class attribute)
- A config-limited app (limited to 1 worker via config)
"""
import json
import os
from gunicorn.dirty.app import DirtyApp
def application(environ, start_response):
"""
WSGI application that invokes dirty apps and returns worker info.
Routes:
- GET /lightweight/ping - Call LightweightApp.ping()
- GET /heavy/predict/<data> - Call HeavyApp.predict(data)
- GET /config_limited/info - Call ConfigLimitedApp.get_info()
- GET /status - Get overall status
"""
path = environ.get('PATH_INFO', '/')
method = environ.get('REQUEST_METHOD', 'GET')
if method != 'GET':
start_response('405 Method Not Allowed', [('Content-Type', 'text/plain')])
return [b'Method not allowed']
# Import dirty client here to avoid import at module load
from gunicorn.dirty import get_dirty_client
try:
client = get_dirty_client()
if path == '/status':
start_response('200 OK', [('Content-Type', 'application/json')])
return [json.dumps({"status": "ok"}).encode()]
elif path == '/lightweight/ping':
result = client.execute("app:LightweightApp", "ping")
start_response('200 OK', [('Content-Type', 'application/json')])
return [json.dumps(result).encode()]
elif path.startswith('/heavy/predict/'):
data = path.split('/')[-1]
result = client.execute("app:HeavyApp", "predict", data)
start_response('200 OK', [('Content-Type', 'application/json')])
return [json.dumps(result).encode()]
elif path == '/heavy/get_worker_id':
result = client.execute("app:HeavyApp", "get_worker_id")
start_response('200 OK', [('Content-Type', 'application/json')])
return [json.dumps({"worker_id": result}).encode()]
elif path == '/config_limited/info':
result = client.execute("app:ConfigLimitedApp", "get_info")
start_response('200 OK', [('Content-Type', 'application/json')])
return [json.dumps(result).encode()]
elif path == '/config_limited/get_worker_id':
result = client.execute("app:ConfigLimitedApp", "get_worker_id")
start_response('200 OK', [('Content-Type', 'application/json')])
return [json.dumps({"worker_id": result}).encode()]
elif path == '/lightweight/get_worker_id':
result = client.execute("app:LightweightApp", "get_worker_id")
start_response('200 OK', [('Content-Type', 'application/json')])
return [json.dumps({"worker_id": result}).encode()]
else:
start_response('404 Not Found', [('Content-Type', 'text/plain')])
return [b'Not found']
except Exception as e:
start_response('500 Internal Server Error', [('Content-Type', 'application/json')])
return [json.dumps({"error": str(e), "type": type(e).__name__}).encode()]
class LightweightApp(DirtyApp):
"""
A lightweight app that should load on ALL dirty workers.
workers=None (default) means all workers load this app.
"""
def __init__(self):
self.initialized = False
self.worker_id = None
self.call_count = 0
def init(self):
self.initialized = True
self.worker_id = os.getpid()
def ping(self):
"""Simple ping action."""
self.call_count += 1
return {
"pong": True,
"worker_id": self.worker_id,
"call_count": self.call_count,
}
def get_worker_id(self):
"""Return the worker ID that loaded this app."""
return self.worker_id
def close(self):
pass
class HeavyApp(DirtyApp):
"""
A heavy app that uses the workers class attribute to limit allocation.
workers=2 means only 2 dirty workers will load this app.
This simulates a large ML model that shouldn't be replicated everywhere.
"""
workers = 2 # Only 2 workers should load this app
def __init__(self):
self.initialized = False
self.worker_id = None
self.model_data = None
def init(self):
self.initialized = True
self.worker_id = os.getpid()
# Simulate loading a heavy model
self.model_data = {"loaded": True, "worker": self.worker_id}
def predict(self, data):
"""Simulate model prediction."""
return {
"prediction": f"result_for_{data}",
"worker_id": self.worker_id,
}
def get_worker_id(self):
"""Return the worker ID that loaded this app."""
return self.worker_id
def close(self):
self.model_data = None
class ConfigLimitedApp(DirtyApp):
"""
An app whose worker limit is specified in config (not class attribute).
The config will specify this app as "app:ConfigLimitedApp:1" to limit
it to a single worker.
"""
def __init__(self):
self.initialized = False
self.worker_id = None
def init(self):
self.initialized = True
self.worker_id = os.getpid()
def get_info(self):
"""Get app info."""
return {
"app": "ConfigLimitedApp",
"worker_id": self.worker_id,
}
def get_worker_id(self):
"""Return the worker ID that loaded this app."""
return self.worker_id
def close(self):
pass

View File

@ -0,0 +1,13 @@
services:
gunicorn:
build:
context: ../../..
dockerfile: tests/docker/per_app_allocation/Dockerfile
ports:
- "8001:8000"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/status"]
interval: 1s
timeout: 1s
retries: 30
stop_grace_period: 10s

View File

@ -0,0 +1,38 @@
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
"""
Gunicorn configuration for per-app worker allocation e2e tests.
Configuration:
- 4 dirty workers total
- LightweightApp: loads on ALL 4 workers (workers=None)
- HeavyApp: loads on 2 workers (via class attribute workers=2)
- ConfigLimitedApp: loads on 1 worker (via config :1 suffix)
"""
bind = "0.0.0.0:8000"
workers = 1 # HTTP workers
worker_class = "sync"
# 4 dirty workers - enough to test distribution
dirty_workers = 4
# App configuration:
# - LightweightApp: no limit, loads on all 4
# - HeavyApp: workers=2 class attribute, loads on 2
# - ConfigLimitedApp: config override :1, loads on 1
dirty_apps = [
"app:LightweightApp",
"app:HeavyApp",
"app:ConfigLimitedApp:1",
]
dirty_timeout = 30
dirty_graceful_timeout = 5
timeout = 30
graceful_timeout = 5
loglevel = "debug"
accesslog = "-"
errorlog = "-"

View File

@ -0,0 +1,393 @@
#!/usr/bin/env python
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
"""
Docker-based end-to-end tests for per-app worker allocation.
These tests verify:
1. Apps with worker limits are only loaded on limited workers
2. Requests are routed to workers that have the target app
3. Round-robin distribution works within limited worker sets
4. Worker crash scenarios maintain correct app allocation
Usage:
# Build the container first
docker compose build
# Run all tests
pytest test_per_app_e2e.py -v
# Run specific test
pytest test_per_app_e2e.py::TestPerAppAllocation::test_lightweight_app_round_robins -v
"""
import os
import re
import subprocess
import time
import pytest
import requests
class DockerContainer:
"""Context manager for managing a Docker container for per-app tests."""
def __init__(self, name="gunicorn-per-app-test", build=True):
self.name = name
self.build = build
self.container_id = None
self.base_url = "http://127.0.0.1:8001"
def __enter__(self):
# Build if requested
if self.build:
result = subprocess.run(
["docker", "compose", "build"],
cwd=os.path.dirname(__file__),
capture_output=True,
text=True,
)
if result.returncode != 0:
raise RuntimeError(f"Docker build failed: {result.stderr}")
# Remove any existing container with same name
subprocess.run(
["docker", "rm", "-f", self.name],
capture_output=True,
)
# Start container with a keep-alive wrapper
result = subprocess.run(
[
"docker", "run", "-d",
"--name", self.name,
"-p", "8001:8000",
"per_app_allocation-gunicorn",
"sh", "-c",
"gunicorn app:application -c gunicorn_conf.py & "
"GUNICORN_PID=$!; "
"trap 'kill $GUNICORN_PID 2>/dev/null' TERM; "
"while true; do sleep 1; done"
],
capture_output=True,
text=True,
)
if result.returncode != 0:
raise RuntimeError(f"Docker run failed: {result.stderr}")
self.container_id = result.stdout.strip()
# Wait for gunicorn to be ready
self._wait_for_ready()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.container_id:
# Get logs before cleanup
logs = self.get_logs()
if exc_val:
print(f"\n=== Container logs ===\n{logs}\n=== End logs ===\n")
# Stop and remove container
subprocess.run(
["docker", "rm", "-f", self.name],
capture_output=True,
)
def _wait_for_ready(self, timeout=60):
"""Wait for gunicorn to be ready and serving requests."""
start = time.time()
while time.time() - start < timeout:
try:
resp = requests.get(f"{self.base_url}/status", timeout=1)
if resp.status_code == 200:
# Also verify dirty workers are up by testing an app
resp = requests.get(f"{self.base_url}/lightweight/ping", timeout=2)
if resp.status_code == 200:
return
except requests.exceptions.RequestException:
pass
time.sleep(0.5)
raise TimeoutError("Gunicorn did not start within timeout")
def exec(self, cmd, check=True):
"""Execute a command in the container."""
result = subprocess.run(
["docker", "exec", self.name] + cmd,
capture_output=True,
text=True,
)
if check and result.returncode != 0:
raise RuntimeError(f"Command failed: {cmd}\n{result.stderr}")
return result
def get_logs(self):
"""Get container logs."""
result = subprocess.run(
["docker", "logs", self.name],
capture_output=True,
text=True,
)
return result.stdout + result.stderr
def get_gunicorn_pids(self):
"""Get PIDs of gunicorn processes."""
pids = {
"master": None,
"dirty-arbiter": None,
"workers": [],
"dirty-workers": [],
}
result = self.exec(["ps", "aux"], check=False)
for line in result.stdout.split("\n"):
if "gunicorn:" not in line:
continue
parts = line.split()
if len(parts) < 2:
continue
pid = int(parts[1])
if "gunicorn: master" in line:
pids["master"] = pid
elif "gunicorn: dirty-arbiter" in line:
pids["dirty-arbiter"] = pid
elif "gunicorn: dirty-worker" in line:
pids["dirty-workers"].append(pid)
elif "gunicorn: worker" in line:
pids["workers"].append(pid)
return pids
def kill_process(self, pid, signal=9):
"""Send a signal to a process in the container."""
self.exec(
["kill", f"-{signal}", str(pid)],
check=False,
)
def wait_for_dirty_worker_count(self, expected_count, timeout=10):
"""Wait for specific number of dirty workers."""
start = time.time()
while time.time() - start < timeout:
pids = self.get_gunicorn_pids()
if len(pids["dirty-workers"]) == expected_count:
return True
time.sleep(0.5)
return False
def http_get(self, path, timeout=5):
"""Make HTTP GET request to the container."""
return requests.get(f"{self.base_url}{path}", timeout=timeout)
class TestPerAppAllocation:
"""Test per-app worker allocation functionality."""
@pytest.fixture(autouse=True)
def setup(self):
"""Check Docker is available."""
result = subprocess.run(
["docker", "info"],
capture_output=True,
)
if result.returncode != 0:
pytest.skip("Docker is not available")
def test_lightweight_app_responds(self):
"""LightweightApp should be accessible and respond correctly."""
with DockerContainer() as container:
resp = container.http_get("/lightweight/ping")
assert resp.status_code == 200
data = resp.json()
assert data["pong"] is True
assert "worker_id" in data
def test_lightweight_app_round_robins(self):
"""LightweightApp requests should round-robin across all 4 workers."""
with DockerContainer() as container:
# Make multiple requests to collect worker IDs
worker_ids = set()
for _ in range(20): # More than 4 to ensure round-robin
resp = container.http_get("/lightweight/get_worker_id")
assert resp.status_code == 200
data = resp.json()
worker_ids.add(data["worker_id"])
# Should see all 4 workers (or at least more than 1)
# Note: Due to timing, we might not hit all 4 in exactly 20 requests
assert len(worker_ids) >= 2, (
f"Expected requests to go to multiple workers, got {len(worker_ids)}"
)
def test_config_limited_app_uses_one_worker(self):
"""ConfigLimitedApp (limited to 1 via config) should use only one worker."""
with DockerContainer() as container:
# Make multiple requests
worker_ids = set()
for _ in range(10):
resp = container.http_get("/config_limited/get_worker_id")
assert resp.status_code == 200
data = resp.json()
worker_ids.add(data["worker_id"])
# Should only see 1 worker (the app is limited to 1)
assert len(worker_ids) == 1, (
f"Expected ConfigLimitedApp to use only 1 worker, got {len(worker_ids)}"
)
def test_heavy_app_uses_limited_workers(self):
"""HeavyApp (workers=2) should use only 2 workers."""
with DockerContainer() as container:
# Make multiple requests
worker_ids = set()
for _ in range(20):
resp = container.http_get("/heavy/get_worker_id")
# HeavyApp uses class attribute workers=2
# But currently the arbiter only reads config :N format
# This test documents expected behavior
if resp.status_code == 200:
data = resp.json()
worker_ids.add(data["worker_id"])
else:
# If class attribute isn't supported yet, skip
pytest.skip("HeavyApp class attribute workers=2 not implemented")
return
# Should see at most 2 workers
assert len(worker_ids) <= 2, (
f"Expected HeavyApp to use at most 2 workers, got {len(worker_ids)}"
)
def test_heavy_app_prediction_works(self):
"""HeavyApp.predict() should return correct results."""
with DockerContainer() as container:
resp = container.http_get("/heavy/predict/test_input")
if resp.status_code == 200:
data = resp.json()
assert data["prediction"] == "result_for_test_input"
assert "worker_id" in data
else:
# If class attribute isn't supported, document the error
data = resp.json()
print(f"HeavyApp error: {data}")
def test_all_apps_accessible(self):
"""All configured apps should be accessible."""
with DockerContainer() as container:
# LightweightApp
resp = container.http_get("/lightweight/ping")
assert resp.status_code == 200
# ConfigLimitedApp
resp = container.http_get("/config_limited/info")
assert resp.status_code == 200
data = resp.json()
assert data["app"] == "ConfigLimitedApp"
def test_four_dirty_workers_running(self):
"""Should have 4 dirty workers as configured."""
with DockerContainer() as container:
pids = container.get_gunicorn_pids()
assert len(pids["dirty-workers"]) == 4, (
f"Expected 4 dirty workers, got {len(pids['dirty-workers'])}"
)
class TestPerAppWorkerCrash:
"""Test per-app allocation behavior when workers crash."""
@pytest.fixture(autouse=True)
def setup(self):
"""Check Docker is available."""
result = subprocess.run(
["docker", "info"],
capture_output=True,
)
if result.returncode != 0:
pytest.skip("Docker is not available")
def test_worker_crash_app_still_accessible(self):
"""When a dirty worker crashes, apps should still be accessible."""
with DockerContainer() as container:
pids = container.get_gunicorn_pids()
assert len(pids["dirty-workers"]) == 4
# Kill one dirty worker
container.kill_process(pids["dirty-workers"][0], signal=9)
# Wait for respawn (dirty arbiter should respawn it)
assert container.wait_for_dirty_worker_count(4, timeout=15), (
"Dirty arbiter should respawn killed worker"
)
# Apps should still work
resp = container.http_get("/lightweight/ping")
assert resp.status_code == 200
resp = container.http_get("/config_limited/info")
assert resp.status_code == 200
def test_config_limited_worker_crash_recovery(self):
"""When the sole worker for ConfigLimitedApp crashes, it should recover."""
with DockerContainer() as container:
# Get the worker ID that handles ConfigLimitedApp
resp = container.http_get("/config_limited/get_worker_id")
assert resp.status_code == 200
original_worker_id = resp.json()["worker_id"]
# Kill that specific worker
container.kill_process(original_worker_id, signal=9)
# Wait for respawn
time.sleep(3)
# The new worker should handle ConfigLimitedApp
resp = container.http_get("/config_limited/get_worker_id")
# Note: There might be a brief period where no worker has the app
# In production, this would return an error until respawn
if resp.status_code == 200:
new_worker_id = resp.json()["worker_id"]
# Worker ID should be different (new process)
assert new_worker_id != original_worker_id, (
"New worker should have different PID"
)
class TestPerAppLogs:
"""Test that per-app allocation is logged correctly."""
@pytest.fixture(autouse=True)
def setup(self):
"""Check Docker is available."""
result = subprocess.run(
["docker", "info"],
capture_output=True,
)
if result.returncode != 0:
pytest.skip("Docker is not available")
def test_logs_show_app_allocation(self):
"""Logs should indicate which apps are loaded on which workers."""
with DockerContainer() as container:
logs = container.get_logs()
# Should see dirty arbiter starting
assert "Dirty arbiter" in logs or "dirty arbiter" in logs.lower()
# Should see dirty workers spawning
assert "dirty" in logs.lower() and "worker" in logs.lower()
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@ -93,3 +93,65 @@ class SlowDirtyApp(DirtyApp):
def close(self):
self.closed = True
class HeavyModelApp(DirtyApp):
"""A dirty app that simulates a heavy model requiring limited workers.
Uses the workers class attribute to limit how many workers load this app.
"""
workers = 2 # Only 2 workers should load this app
def __init__(self):
self.initialized = False
self.closed = False
self.model_data = None
self.worker_id = None
def init(self):
import os
self.initialized = True
# Store the worker PID to verify which worker handled the request
self.worker_id = os.getpid()
# Simulate loading a heavy model
self.model_data = {"loaded": True, "worker": self.worker_id}
def predict(self, data):
"""Simulate model prediction."""
return {
"prediction": f"result_for_{data}",
"worker_id": self.worker_id,
}
def get_worker_id(self):
"""Return the worker ID that loaded this app."""
return self.worker_id
def close(self):
self.closed = True
self.model_data = None
class LightweightApp(DirtyApp):
"""A lightweight app that should load on all workers."""
def __init__(self):
self.initialized = False
self.closed = False
self.worker_id = None
def init(self):
import os
self.initialized = True
self.worker_id = os.getpid()
def ping(self):
"""Simple ping action."""
return {"pong": True, "worker_id": self.worker_id}
def get_worker_id(self):
"""Return the worker ID that loaded this app."""
return self.worker_id
def close(self):
self.closed = True

View File

@ -6,7 +6,12 @@
import pytest
from gunicorn.dirty.app import DirtyApp, load_dirty_app, load_dirty_apps
from gunicorn.dirty.app import (
DirtyApp,
load_dirty_app,
load_dirty_apps,
parse_dirty_app_spec,
)
from gunicorn.dirty.errors import DirtyAppError, DirtyAppNotFoundError
@ -185,3 +190,158 @@ class TestDirtyAppStateful:
with pytest.raises(ValueError) as exc_info:
app("compute", 1, 2, operation="invalid")
assert "Unknown operation" in str(exc_info.value)
class TestDirtyAppWorkersAttribute:
"""Tests for DirtyApp workers class attribute."""
def test_default_workers_is_none(self):
"""Base DirtyApp has workers=None (all workers)."""
assert DirtyApp.workers is None
def test_subclass_can_set_workers(self):
"""Subclass can override workers=2."""
class LimitedApp(DirtyApp):
workers = 2
assert LimitedApp.workers == 2
def test_workers_inherited_by_default(self):
"""Subclass without workers attr inherits None."""
class InheritedApp(DirtyApp):
pass
assert InheritedApp.workers is None
def test_instance_has_workers_attribute(self):
"""Instance should have access to workers attribute."""
app = DirtyApp()
assert app.workers is None
class LimitedApp(DirtyApp):
workers = 3
limited = LimitedApp()
assert limited.workers == 3
class TestParseDirtyAppSpec:
"""Tests for parse_dirty_app_spec function."""
def test_standard_format(self):
"""'mod:Class' returns ('mod:Class', None)."""
import_path, count = parse_dirty_app_spec("mod:Class")
assert import_path == "mod:Class"
assert count is None
def test_standard_format_with_dots(self):
"""'mod.sub.pkg:Class' returns ('mod.sub.pkg:Class', None)."""
import_path, count = parse_dirty_app_spec("mod.sub.pkg:Class")
assert import_path == "mod.sub.pkg:Class"
assert count is None
def test_with_worker_count(self):
"""'mod:Class:2' returns ('mod:Class', 2)."""
import_path, count = parse_dirty_app_spec("mod:Class:2")
assert import_path == "mod:Class"
assert count == 2
def test_worker_count_one(self):
"""'mod:Class:1' returns ('mod:Class', 1)."""
import_path, count = parse_dirty_app_spec("mod:Class:1")
assert import_path == "mod:Class"
assert count == 1
def test_worker_count_large(self):
"""'mod:Class:100' returns ('mod:Class', 100)."""
import_path, count = parse_dirty_app_spec("mod:Class:100")
assert import_path == "mod:Class"
assert count == 100
def test_worker_count_zero_raises(self):
"""'mod:Class:0' raises DirtyAppError."""
with pytest.raises(DirtyAppError) as exc_info:
parse_dirty_app_spec("mod:Class:0")
assert "must be >= 1" in str(exc_info.value)
def test_worker_count_negative_raises(self):
"""'mod:Class:-1' raises DirtyAppError."""
with pytest.raises(DirtyAppError) as exc_info:
parse_dirty_app_spec("mod:Class:-1")
assert "must be >= 1" in str(exc_info.value)
def test_non_numeric_raises(self):
"""'mod:Class:abc' raises DirtyAppError."""
with pytest.raises(DirtyAppError) as exc_info:
parse_dirty_app_spec("mod:Class:abc")
assert "Expected integer" in str(exc_info.value)
def test_no_colon_raises(self):
"""'mod.Class' (no colon) raises DirtyAppError."""
with pytest.raises(DirtyAppError) as exc_info:
parse_dirty_app_spec("mod.Class")
assert "Invalid import path format" in str(exc_info.value)
def test_too_many_colons_raises(self):
"""'mod:Class:2:extra' raises DirtyAppError."""
with pytest.raises(DirtyAppError) as exc_info:
parse_dirty_app_spec("mod:Class:2:extra")
assert "Invalid import path format" in str(exc_info.value)
def test_dotted_module_with_count(self):
"""'mod.sub:Class:2' handles dots correctly."""
import_path, count = parse_dirty_app_spec("mod.sub:Class:2")
assert import_path == "mod.sub:Class"
assert count == 2
class TestGetAppWorkersAttribute:
"""Tests for get_app_workers_attribute function."""
def test_get_workers_none_for_base_class(self):
"""Base DirtyApp returns workers=None."""
from gunicorn.dirty.app import get_app_workers_attribute
workers = get_app_workers_attribute("gunicorn.dirty.app:DirtyApp")
assert workers is None
def test_get_workers_from_class_attribute(self):
"""App with workers=2 class attribute returns 2."""
from gunicorn.dirty.app import get_app_workers_attribute
workers = get_app_workers_attribute("tests.support_dirty_app:HeavyModelApp")
assert workers == 2
def test_get_workers_none_for_inherited(self):
"""App without explicit workers attribute returns None."""
from gunicorn.dirty.app import get_app_workers_attribute
workers = get_app_workers_attribute("tests.support_dirty_app:TestDirtyApp")
assert workers is None
def test_get_workers_not_found_module(self):
"""Non-existent module raises DirtyAppNotFoundError."""
from gunicorn.dirty.app import get_app_workers_attribute
from gunicorn.dirty.errors import DirtyAppNotFoundError
with pytest.raises(DirtyAppNotFoundError):
get_app_workers_attribute("nonexistent.module:App")
def test_get_workers_not_found_class(self):
"""Non-existent class raises DirtyAppNotFoundError."""
from gunicorn.dirty.app import get_app_workers_attribute
from gunicorn.dirty.errors import DirtyAppNotFoundError
with pytest.raises(DirtyAppNotFoundError):
get_app_workers_attribute("tests.support_dirty_app:NonExistentApp")
def test_get_workers_invalid_format(self):
"""Invalid format raises DirtyAppError."""
from gunicorn.dirty.app import get_app_workers_attribute
from gunicorn.dirty.errors import DirtyAppError
with pytest.raises(DirtyAppError) as exc_info:
get_app_workers_attribute("invalid.format.no.colon")
assert "Invalid import path format" in str(exc_info.value)

View File

@ -1144,3 +1144,275 @@ class TestDirtyArbiterQueueBehavior:
assert task2.done()
arbiter._cleanup_sync()
class TestDirtyArbiterAppTracking:
"""Tests for per-app worker tracking."""
def test_parse_app_specs_standard_format(self):
"""All standard format apps have worker_count=None."""
cfg = Config()
cfg.set("dirty_apps", [
"tests.support_dirty_app:TestDirtyApp",
"tests.support_dirty_app:SlowDirtyApp",
])
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
assert len(arbiter.app_specs) == 2
assert arbiter.app_specs["tests.support_dirty_app:TestDirtyApp"]["worker_count"] is None
assert arbiter.app_specs["tests.support_dirty_app:SlowDirtyApp"]["worker_count"] is None
arbiter._cleanup_sync()
def test_parse_app_specs_with_worker_count(self):
"""Apps with :N have correct worker_count."""
cfg = Config()
cfg.set("dirty_apps", [
"tests.support_dirty_app:TestDirtyApp",
"tests.support_dirty_app:SlowDirtyApp:2",
])
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
assert arbiter.app_specs["tests.support_dirty_app:TestDirtyApp"]["worker_count"] is None
assert arbiter.app_specs["tests.support_dirty_app:SlowDirtyApp"]["worker_count"] == 2
arbiter._cleanup_sync()
def test_get_apps_for_new_worker_all_standard(self):
"""All apps returned when all have workers=None."""
cfg = Config()
cfg.set("dirty_apps", [
"tests.support_dirty_app:TestDirtyApp",
"tests.support_dirty_app:SlowDirtyApp",
])
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
apps = arbiter._get_apps_for_new_worker()
assert len(apps) == 2
assert "tests.support_dirty_app:TestDirtyApp" in apps
assert "tests.support_dirty_app:SlowDirtyApp" in apps
arbiter._cleanup_sync()
def test_get_apps_for_new_worker_respects_limit(self):
"""App with workers=2 stops assigning after 2 workers."""
cfg = Config()
cfg.set("dirty_apps", [
"tests.support_dirty_app:TestDirtyApp", # unlimited
"tests.support_dirty_app:SlowDirtyApp:2", # limited to 2
])
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
# First worker should get both apps
apps1 = arbiter._get_apps_for_new_worker()
assert len(apps1) == 2
arbiter._register_worker_apps(1001, apps1)
# Second worker should get both apps
apps2 = arbiter._get_apps_for_new_worker()
assert len(apps2) == 2
arbiter._register_worker_apps(1002, apps2)
# Third worker should only get unlimited app
apps3 = arbiter._get_apps_for_new_worker()
assert len(apps3) == 1
assert "tests.support_dirty_app:TestDirtyApp" in apps3
assert "tests.support_dirty_app:SlowDirtyApp" not in apps3
arbiter._cleanup_sync()
def test_register_worker_apps_updates_both_maps(self):
"""Both app_worker_map and worker_app_map updated."""
cfg = Config()
cfg.set("dirty_apps", ["tests.support_dirty_app:TestDirtyApp"])
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
app_path = "tests.support_dirty_app:TestDirtyApp"
arbiter._register_worker_apps(1001, [app_path])
# Check app_worker_map
assert 1001 in arbiter.app_worker_map[app_path]
# Check worker_app_map
assert app_path in arbiter.worker_app_map[1001]
arbiter._cleanup_sync()
def test_unregister_worker_cleans_both_maps(self):
"""Worker removal updates both maps correctly."""
cfg = Config()
cfg.set("dirty_apps", ["tests.support_dirty_app:TestDirtyApp"])
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
app_path = "tests.support_dirty_app:TestDirtyApp"
arbiter._register_worker_apps(1001, [app_path])
# Verify registered
assert 1001 in arbiter.app_worker_map[app_path]
assert 1001 in arbiter.worker_app_map
# Unregister
arbiter._unregister_worker(1001)
# Verify cleaned up
assert 1001 not in arbiter.app_worker_map[app_path]
assert 1001 not in arbiter.worker_app_map
arbiter._cleanup_sync()
class TestDirtyArbiterSpawnWorkerPerApp:
"""Tests for spawn_worker with per-app allocation."""
def test_cleanup_worker_queues_apps_for_respawn(self):
"""Dead worker's apps added to _pending_respawns."""
cfg = Config()
cfg.set("dirty_apps", ["tests.support_dirty_app:TestDirtyApp"])
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
arbiter.pid = 12345
# Simulate worker registration
app_path = "tests.support_dirty_app:TestDirtyApp"
arbiter.workers[1001] = "fake_worker"
arbiter.worker_sockets[1001] = "/tmp/fake.sock"
arbiter._register_worker_apps(1001, [app_path])
# Cleanup should queue apps for respawn
assert len(arbiter._pending_respawns) == 0
arbiter._cleanup_worker(1001)
assert len(arbiter._pending_respawns) == 1
assert app_path in arbiter._pending_respawns[0]
arbiter._cleanup_sync()
def test_pending_respawns_cleared_after_spawn(self):
"""Pending respawns consumed when spawning new worker."""
cfg = Config()
cfg.set("dirty_apps", ["tests.support_dirty_app:TestDirtyApp"])
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
arbiter.pid = 12345
# Add pending respawn
app_path = "tests.support_dirty_app:TestDirtyApp"
arbiter._pending_respawns.append([app_path])
# Get apps for new worker should use pending first
# But since spawn_worker forks, we test the logic directly
assert len(arbiter._pending_respawns) == 1
# When spawn_worker pops from pending_respawns
apps = arbiter._pending_respawns.pop(0)
assert apps == [app_path]
assert len(arbiter._pending_respawns) == 0
arbiter._cleanup_sync()
class TestDirtyArbiterRoutingPerApp:
"""Tests for app-aware routing."""
@pytest.mark.asyncio
async def test_get_available_worker_no_filter(self):
"""Without app_path, returns any worker round-robin."""
cfg = Config()
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
arbiter.workers[1001] = "worker1"
arbiter.workers[1002] = "worker2"
# Should return workers in round-robin
w1 = await arbiter._get_available_worker()
w2 = await arbiter._get_available_worker()
assert w1 in [1001, 1002]
assert w2 in [1001, 1002]
# They should be different (round-robin)
if len(arbiter.workers) >= 2:
assert w1 != w2 or len(arbiter.workers) == 1
arbiter._cleanup_sync()
@pytest.mark.asyncio
async def test_get_available_worker_with_app_filter(self):
"""With app_path, returns only workers that have it."""
cfg = Config()
cfg.set("dirty_apps", ["tests.support_dirty_app:TestDirtyApp"])
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
arbiter.workers[1001] = "worker1"
arbiter.workers[1002] = "worker2"
# Only register 1001 for the app
app_path = "tests.support_dirty_app:TestDirtyApp"
arbiter._register_worker_apps(1001, [app_path])
# Should only return 1001
worker = await arbiter._get_available_worker(app_path)
assert worker == 1001
arbiter._cleanup_sync()
@pytest.mark.asyncio
async def test_get_available_worker_app_no_workers_returns_none(self):
"""Returns None if no workers have the app."""
cfg = Config()
cfg.set("dirty_apps", ["tests.support_dirty_app:TestDirtyApp"])
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
arbiter.workers[1001] = "worker1"
# Worker 1001 has no apps registered - request for unknown app returns None
worker = await arbiter._get_available_worker("unknown:App")
assert worker is None
arbiter._cleanup_sync()
@pytest.mark.asyncio
async def test_route_request_app_not_loaded_error(self):
"""Error response when no worker has the app."""
from gunicorn.dirty.protocol import DirtyProtocol
cfg = Config()
cfg.set("dirty_apps", ["tests.support_dirty_app:TestDirtyApp"])
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
arbiter.pid = 12345
arbiter.workers[1001] = "worker1"
# No apps registered for this worker (worker exists but has no apps)
request = make_request(
request_id="test-123",
app_path="unknown:App",
action="test"
)
writer = MockStreamWriter()
await arbiter.route_request(request, writer)
assert len(writer.messages) == 1
response = writer.messages[0]
assert response["type"] == DirtyProtocol.MSG_TYPE_ERROR
assert "No workers available for app" in response["error"]["message"]
assert response["error"]["error_type"] == "DirtyNoWorkersAvailableError"
arbiter._cleanup_sync()

View File

@ -0,0 +1,76 @@
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
"""Tests for dirty errors module."""
import pytest
from gunicorn.dirty.errors import (
DirtyError,
DirtyNoWorkersAvailableError,
)
class TestDirtyNoWorkersAvailableError:
"""Tests for DirtyNoWorkersAvailableError exception."""
def test_error_contains_app_path(self):
"""Error includes the app_path."""
error = DirtyNoWorkersAvailableError("myapp:Model")
assert error.app_path == "myapp:Model"
assert "myapp:Model" in str(error)
assert "No workers available" in str(error)
def test_error_with_custom_message(self):
"""Error can have a custom message."""
error = DirtyNoWorkersAvailableError(
"myapp:Model",
message="Custom: no workers for heavy model"
)
assert error.app_path == "myapp:Model"
assert "Custom: no workers" in str(error)
def test_error_serialization_roundtrip(self):
"""Error survives to_dict/from_dict cycle."""
original = DirtyNoWorkersAvailableError("myapp.ml:HugeModel")
# Serialize
data = original.to_dict()
assert data["error_type"] == "DirtyNoWorkersAvailableError"
assert data["details"]["app_path"] == "myapp.ml:HugeModel"
# Deserialize
restored = DirtyError.from_dict(data)
assert isinstance(restored, DirtyNoWorkersAvailableError)
assert restored.app_path == "myapp.ml:HugeModel"
assert "No workers available" in str(restored)
def test_error_is_dirty_error_subclass(self):
"""DirtyNoWorkersAvailableError is a DirtyError subclass."""
error = DirtyNoWorkersAvailableError("app:Class")
assert isinstance(error, DirtyError)
def test_web_app_can_catch_specific_error(self):
"""Web app can catch DirtyNoWorkersAvailableError specifically."""
def simulate_execute():
raise DirtyNoWorkersAvailableError("myapp:HeavyModel")
# Catch specific error
try:
simulate_execute()
assert False, "Should have raised"
except DirtyNoWorkersAvailableError as e:
assert e.app_path == "myapp:HeavyModel"
def test_can_catch_as_base_error(self):
"""Can catch DirtyNoWorkersAvailableError as DirtyError."""
def simulate_execute():
raise DirtyNoWorkersAvailableError("myapp:Model")
try:
simulate_execute()
assert False, "Should have raised"
except DirtyError as e:
# Should catch it as the base class
assert hasattr(e, "app_path")