feat(dirty): add per-app worker allocation for memory optimization

Allow dirty apps to specify how many workers should load them, enabling
significant memory savings for heavy applications like ML models.

- Add `workers` class attribute to DirtyApp (None = all workers)
- Add `parse_dirty_app_spec()` to parse "module:Class:N" format
- Add `DirtyNoWorkersAvailableError` for app-specific error handling
- Update DirtyArbiter with per-app worker tracking and routing
- Maintain backward compatibility when no dirty_apps configured

Example: 8 workers x 10GB model = 80GB RAM needed
With workers=2: 2 x 10GB = 20GB RAM (75% savings)

Configuration formats:
- Class attribute: `workers = 2` on DirtyApp subclass
- Config format: `module:class:N` (e.g., `myapp.ml:HugeModel:2`)
This commit is contained in:
Benoit Chesneau 2026-02-01 02:40:09 +01:00
parent 1591a6c773
commit 8559854b4f
8 changed files with 1083 additions and 24 deletions

View File

@ -2885,23 +2885,47 @@ class DirtyApps(Setting):
desc = """\
Dirty applications to load in the dirty worker pool.
A list of application paths in pattern ``$(MODULE_NAME):$(CLASS_NAME)``.
A list of application paths in one of these formats:
- ``$(MODULE_NAME):$(CLASS_NAME)`` - all workers load this app
- ``$(MODULE_NAME):$(CLASS_NAME):$(N)`` - only N workers load this app
Each dirty app must be a class that inherits from ``DirtyApp`` base class
and implements the ``init()``, ``__call__()``, and ``close()`` methods.
Example::
dirty_apps = [
"myapp.ml:MLApp",
"myapp.images:ImageApp",
"myapp.ml:MLApp", # All workers load this
"myapp.images:ImageApp", # All workers load this
"myapp.heavy:HugeModel:2", # Only 2 workers load this
]
The per-app worker limit is useful for memory-intensive applications
like large ML models. Instead of all 8 workers loading a 10GB model
(80GB total), you can limit it to 2 workers (20GB total).
Alternatively, you can set the ``workers`` class attribute on your
DirtyApp subclass::
class HugeModelApp(DirtyApp):
workers = 2 # Only 2 workers load this app
def init(self):
self.model = load_10gb_model()
Note: The config format (``module:Class:N``) takes precedence over
the class attribute if both are specified.
Dirty apps are loaded once when the dirty worker starts and persist
in memory for the lifetime of the worker. This is ideal for loading
ML models, database connection pools, or other stateful resources
that are expensive to initialize.
.. versionadded:: 25.0.0
.. versionchanged:: 25.1.0
Added per-app worker allocation via ``:N`` format suffix.
"""

View File

@ -72,12 +72,29 @@ class DirtyApp:
self.models[name] = load_model(name)
return {"loaded": True, "name": name}
Worker Allocation
-----------------
By default, all dirty workers load all apps. For apps that consume
significant memory (like large ML models), you can limit how many
workers load the app by setting the ``workers`` class attribute::
class HeavyModelApp(DirtyApp):
workers = 2 # Only 2 workers will load this app
def init(self):
self.model = load_10gb_model()
Subclasses should implement:
- init(): Called once at worker startup to initialize resources
- __call__(action, *args, **kwargs): Handle requests from HTTP workers
- close(): Called at worker shutdown to cleanup resources
"""
# Number of workers that should load this app.
# None means all workers (default, backward compatible).
# Set to an integer to limit how many workers load this app.
workers = None
def init(self):
"""
Initialize the application.
@ -120,6 +137,81 @@ class DirtyApp:
"""
def parse_dirty_app_spec(spec):
"""
Parse a dirty app specification.
Supports two formats:
- ``"module:Class"`` - standard format, all workers load the app
- ``"module:Class:N"`` - worker-limited format, only N workers load the app
Args:
spec: The app specification string
Returns:
tuple: (import_path, worker_count)
- import_path: The "module:Class" part for importing
- worker_count: Integer limit or None for all workers
Raises:
DirtyAppError: If the spec format is invalid or worker_count is < 1
Examples::
>>> parse_dirty_app_spec("myapp:App")
("myapp:App", None)
>>> parse_dirty_app_spec("myapp:App:2")
("myapp:App", 2)
>>> parse_dirty_app_spec("myapp.sub:App:1")
("myapp.sub:App", 1)
"""
if ':' not in spec:
raise DirtyAppError(
f"Invalid import path format: {spec}. "
f"Expected 'module.path:ClassName' or 'module.path:ClassName:N'",
app_path=spec
)
parts = spec.split(':')
# Standard format: "module:Class" or "module.sub:Class"
if len(parts) == 2:
return (spec, None)
# Worker-limited format: "module:Class:N"
if len(parts) == 3:
module_path, class_name, count_str = parts
import_path = f"{module_path}:{class_name}"
# Validate the worker count
try:
worker_count = int(count_str)
except ValueError:
raise DirtyAppError(
f"Invalid worker count in spec: {spec}. "
f"Expected integer, got '{count_str}'",
app_path=spec
)
if worker_count < 1:
raise DirtyAppError(
f"Invalid worker count in spec: {spec}. "
f"Worker count must be >= 1, got {worker_count}",
app_path=spec
)
return (import_path, worker_count)
# Too many colons
raise DirtyAppError(
f"Invalid import path format: {spec}. "
f"Expected 'module.path:ClassName' or 'module.path:ClassName:N'",
app_path=spec
)
def load_dirty_app(import_path):
"""
Load a dirty app class from an import path.

View File

@ -19,7 +19,13 @@ import time
from gunicorn import util
from .errors import DirtyError, DirtyTimeoutError, DirtyWorkerError
from .app import parse_dirty_app_spec
from .errors import (
DirtyError,
DirtyNoWorkersAvailableError,
DirtyTimeoutError,
DirtyWorkerError,
)
from .protocol import (
DirtyProtocol,
make_error_response,
@ -79,6 +85,100 @@ class DirtyArbiter:
self._loop = None
self._pending_requests = {} # request_id -> Future
# Per-app worker allocation tracking
# Maps import_path -> {import_path, worker_count, original_spec}
self.app_specs = {}
# Maps import_path -> set of worker PIDs that have loaded the app
self.app_worker_map = {}
# Maps worker_pid -> list of import_paths loaded by this worker
self.worker_app_map = {}
# Per-app round-robin indices for routing
self._app_rr_indices = {}
# Queue of app lists from dead workers to respawn with same apps
self._pending_respawns = []
# Parse app specs on init
self._parse_app_specs()
def _parse_app_specs(self):
"""
Parse all app specifications from config.
Populates self.app_specs with parsed information about each app,
including the import path and worker count limits.
"""
for spec in self.cfg.dirty_apps:
import_path, worker_count = parse_dirty_app_spec(spec)
self.app_specs[import_path] = {
'import_path': import_path,
'worker_count': worker_count,
'original_spec': spec,
}
# Initialize the app_worker_map for this app
self.app_worker_map[import_path] = set()
def _get_apps_for_new_worker(self):
"""
Determine which apps a new worker should load.
Returns a list of import paths for apps that need more workers.
Apps with workers=None (all workers) are always included.
Apps with worker limits are included only if they haven't
reached their limit yet.
Returns:
List of import paths to load, or empty list if no apps need workers
"""
app_paths = []
for import_path, spec in self.app_specs.items():
worker_count = spec['worker_count']
current_workers = len(self.app_worker_map.get(import_path, set()))
# None means all workers should load this app
if worker_count is None:
app_paths.append(import_path)
# Otherwise check if we've reached the limit
elif current_workers < worker_count:
app_paths.append(import_path)
return app_paths
def _register_worker_apps(self, worker_pid, app_paths):
"""
Register which apps a worker has loaded.
Updates both app_worker_map and worker_app_map to track the
bidirectional relationship between workers and apps.
Args:
worker_pid: The PID of the worker
app_paths: List of app import paths loaded by this worker
"""
self.worker_app_map[worker_pid] = list(app_paths)
for app_path in app_paths:
if app_path not in self.app_worker_map:
self.app_worker_map[app_path] = set()
self.app_worker_map[app_path].add(worker_pid)
def _unregister_worker(self, worker_pid):
"""
Unregister a worker's apps when it exits.
Removes the worker from all tracking maps.
Args:
worker_pid: The PID of the worker to unregister
"""
# Get the apps this worker had
app_paths = self.worker_app_map.pop(worker_pid, [])
# Remove worker from each app's worker set
for app_path in app_paths:
if app_path in self.app_worker_map:
self.app_worker_map[app_path].discard(worker_pid)
def run(self):
"""Run the dirty arbiter (blocking call)."""
self.pid = os.getpid()
@ -256,14 +356,20 @@ class DirtyArbiter:
client_writer: StreamWriter to send responses to client
"""
request_id = request.get("id", "unknown")
app_path = request.get("app_path")
# Find an available worker
worker_pid = await self._get_available_worker()
# Find an available worker (filtered by app if specified)
worker_pid = await self._get_available_worker(app_path)
if worker_pid is None:
response = make_error_response(
request_id,
DirtyError("No dirty workers available")
)
# Distinguish between no workers at all vs. no workers for this app
if not self.workers:
error = DirtyError("No dirty workers available")
elif app_path and self.app_specs:
# Per-app allocation is configured and no workers have this app
error = DirtyNoWorkersAvailableError(app_path)
else:
error = DirtyError("No dirty workers available")
response = make_error_response(request_id, error)
await DirtyProtocol.write_message_async(client_writer, response)
return
@ -373,20 +479,47 @@ class DirtyArbiter:
)
await DirtyProtocol.write_message_async(client_writer, response)
async def _get_available_worker(self):
async def _get_available_worker(self, app_path=None):
"""
Get an available worker PID using round-robin selection.
Distributes requests across all available workers evenly to
maximize throughput when multiple workers are configured.
If app_path is provided, only returns workers that have loaded
that specific app. Uses per-app round-robin to ensure fair
distribution among eligible workers.
Args:
app_path: Optional import path of the target app. If None,
returns any worker using global round-robin.
Returns:
Worker PID or None if no eligible workers are available.
"""
worker_pids = list(self.workers.keys())
if not worker_pids:
# Determine eligible workers
if app_path and self.app_specs:
# Per-app allocation is configured - must return a worker
# that has this specific app
if app_path in self.app_worker_map:
eligible_pids = list(self.app_worker_map[app_path])
else:
# App not known or no workers have it
return None
else:
# No specific app requested, or no app specs configured
# (backward compatible) - any worker will do
eligible_pids = list(self.workers.keys())
if not eligible_pids:
return None
# Round-robin selection
self._worker_rr_index = (self._worker_rr_index + 1) % len(worker_pids)
return worker_pids[self._worker_rr_index]
# Per-app round-robin for fairness
if app_path and self.app_specs:
idx = self._app_rr_indices.get(app_path, 0)
self._app_rr_indices[app_path] = (idx + 1) % len(eligible_pids)
else:
idx = self._worker_rr_index
self._worker_rr_index = (idx + 1) % len(eligible_pids)
return eligible_pids[idx % len(eligible_pids)]
async def _get_worker_connection(self, worker_pid):
"""Get or create connection to a worker."""
@ -424,7 +557,10 @@ class DirtyArbiter:
# Spawn workers if needed
while self.alive and len(self.workers) < num_workers:
self.spawn_worker()
result = self.spawn_worker()
if result is None:
# No apps need more workers - stop spawning
break
await asyncio.sleep(0.1)
# Kill excess workers
@ -436,7 +572,27 @@ class DirtyArbiter:
await asyncio.sleep(0.1)
def spawn_worker(self):
"""Spawn a new dirty worker."""
"""
Spawn a new dirty worker.
Worker app assignment follows these priorities:
1. If there are pending respawns (from dead workers), use those apps
2. Otherwise, determine apps for a new worker based on allocation
Returns:
Worker PID in parent process, or None if no apps need workers
"""
# Priority 1: Respawn dead worker with same apps
if self._pending_respawns:
app_paths = self._pending_respawns.pop(0)
else:
# Priority 2: New worker for initial pool
app_paths = self._get_apps_for_new_worker()
if not app_paths:
self.log.warning("No apps need more workers, skipping spawn")
return None
self.worker_age += 1
socket_path = os.path.join(
self.tmpdir, f"worker-{self.worker_age}.sock"
@ -445,7 +601,7 @@ class DirtyArbiter:
worker = DirtyWorker(
age=self.worker_age,
ppid=self.pid,
app_paths=self.cfg.dirty_apps,
app_paths=app_paths, # Only assigned apps, not all apps
cfg=self.cfg,
log=self.log,
socket_path=socket_path
@ -457,8 +613,13 @@ class DirtyArbiter:
worker.pid = pid
self.workers[pid] = worker
self.worker_sockets[pid] = socket_path
# Register which apps this worker has
self._register_worker_apps(pid, app_paths)
self.cfg.dirty_post_fork(self, worker)
self.log.info("Spawned dirty worker (pid: %s)", pid)
self.log.info("Spawned dirty worker (pid: %s) with apps: %s",
pid, app_paths)
return pid
# Child process
@ -484,7 +645,12 @@ class DirtyArbiter:
self._cleanup_worker(pid)
def _cleanup_worker(self, pid):
"""Clean up after a worker exits."""
"""
Clean up after a worker exits.
Saves the dead worker's app list to pending respawns so the
replacement worker gets the same apps.
"""
self._close_worker_connection(pid)
# Cancel consumer task
@ -495,6 +661,15 @@ class DirtyArbiter:
# Remove queue
self.worker_queues.pop(pid, None)
# Save dead worker's apps for respawn BEFORE unregistering
if pid in self.worker_app_map:
dead_apps = list(self.worker_app_map[pid])
if dead_apps:
self._pending_respawns.append(dead_apps)
# Now safe to unregister the worker's apps
self._unregister_worker(pid)
worker = self.workers.pop(pid, None)
if worker:
self.cfg.dirty_worker_exit(self, worker)

View File

@ -46,6 +46,7 @@ class DirtyError(Exception):
"DirtyWorkerError": DirtyWorkerError,
"DirtyAppError": DirtyAppError,
"DirtyAppNotFoundError": DirtyAppNotFoundError,
"DirtyNoWorkersAvailableError": DirtyNoWorkersAvailableError,
"DirtyProtocolError": DirtyProtocolError,
}
error_type = data.get("error_type", "DirtyError")
@ -70,6 +71,8 @@ class DirtyError(Exception):
error.app_path = error.details.get("app_path")
error.action = error.details.get("action")
error.traceback = error.details.get("traceback")
elif error_class == DirtyNoWorkersAvailableError:
error.app_path = error.details.get("app_path")
return error
@ -130,6 +133,40 @@ class DirtyAppNotFoundError(DirtyAppError):
super().__init__(f"Dirty app not found: {app_path}", app_path=app_path)
class DirtyNoWorkersAvailableError(DirtyError):
"""
Raised when no workers are available for the requested app.
This exception is raised when a request targets an app that has
worker limits configured, and no workers with that app are currently
available (e.g., all workers for that app crashed and haven't been
respawned yet).
Web applications can catch this exception to provide graceful
degradation, such as queuing requests for retry or showing a
maintenance page.
Example::
from gunicorn.dirty import get_dirty_client
from gunicorn.dirty.errors import DirtyNoWorkersAvailableError
def my_view(request):
client = get_dirty_client()
try:
result = client.execute("myapp.ml:HeavyModel", "predict", data)
except DirtyNoWorkersAvailableError as e:
return {"error": "Service temporarily unavailable",
"app": e.app_path}
"""
def __init__(self, app_path, message=None):
if message is None:
message = f"No workers available for app: {app_path}"
super().__init__(message, details={"app_path": app_path})
self.app_path = app_path
class DirtyProtocolError(DirtyError):
"""Raised when there is a protocol-level error."""

View File

@ -0,0 +1,273 @@
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
"""Integration tests for per-app worker allocation."""
import pytest
from gunicorn.config import Config
from gunicorn.dirty.arbiter import DirtyArbiter
class MockLog:
"""Mock logger for testing."""
def __init__(self):
self.messages = []
def debug(self, msg, *args):
self.messages.append(("debug", msg % args if args else msg))
def info(self, msg, *args):
self.messages.append(("info", msg % args if args else msg))
def warning(self, msg, *args):
self.messages.append(("warning", msg % args if args else msg))
def error(self, msg, *args):
self.messages.append(("error", msg % args if args else msg))
def critical(self, msg, *args):
self.messages.append(("critical", msg % args if args else msg))
def exception(self, msg, *args):
self.messages.append(("exception", msg % args if args else msg))
def close_on_exec(self):
pass
def reopen_files(self):
pass
class TestPerAppWorkerAllocation:
"""Integration tests for per-app worker allocation."""
def test_heavy_app_loaded_on_limited_workers(self):
"""App with workers=2 only loaded on 2 of 4 workers."""
cfg = Config()
cfg.set("dirty_workers", 4)
cfg.set("dirty_apps", [
"tests.support_dirty_app:TestDirtyApp", # unlimited
"tests.support_dirty_app:SlowDirtyApp:2", # limited to 2
])
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
# Simulate spawning 4 workers
for i in range(4):
apps = arbiter._get_apps_for_new_worker()
arbiter._register_worker_apps(1000 + i, apps)
# Check distribution
unlimited_app = "tests.support_dirty_app:TestDirtyApp"
limited_app = "tests.support_dirty_app:SlowDirtyApp"
# Unlimited app should be on all 4 workers
assert len(arbiter.app_worker_map[unlimited_app]) == 4
# Limited app should only be on 2 workers
assert len(arbiter.app_worker_map[limited_app]) == 2
arbiter._cleanup_sync()
def test_light_app_loaded_on_all_workers(self):
"""App with workers=None loaded on all workers."""
cfg = Config()
cfg.set("dirty_workers", 4)
cfg.set("dirty_apps", [
"tests.support_dirty_app:TestDirtyApp",
])
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
# Simulate spawning 4 workers
for i in range(4):
apps = arbiter._get_apps_for_new_worker()
arbiter._register_worker_apps(1000 + i, apps)
# App should be on all 4 workers
app_path = "tests.support_dirty_app:TestDirtyApp"
assert len(arbiter.app_worker_map[app_path]) == 4
arbiter._cleanup_sync()
def test_mixed_apps_correct_distribution(self):
"""Mix of limited and unlimited apps distributed correctly."""
cfg = Config()
cfg.set("dirty_workers", 4)
cfg.set("dirty_apps", [
"tests.support_dirty_app:TestDirtyApp", # unlimited
"tests.support_dirty_app:SlowDirtyApp:1", # limited to 1
])
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
# Simulate spawning 4 workers
for i in range(4):
apps = arbiter._get_apps_for_new_worker()
arbiter._register_worker_apps(1000 + i, apps)
unlimited_app = "tests.support_dirty_app:TestDirtyApp"
limited_app = "tests.support_dirty_app:SlowDirtyApp"
# Unlimited app on all workers
assert len(arbiter.app_worker_map[unlimited_app]) == 4
# Limited app on only 1 worker
assert len(arbiter.app_worker_map[limited_app]) == 1
arbiter._cleanup_sync()
@pytest.mark.asyncio
async def test_request_routing_respects_allocation(self):
"""Requests only routed to workers with the target app."""
cfg = Config()
cfg.set("dirty_apps", [
"tests.support_dirty_app:TestDirtyApp",
"tests.support_dirty_app:SlowDirtyApp:1",
])
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
# Set up workers
arbiter.workers[1001] = "worker1"
arbiter.workers[1002] = "worker2"
# Worker 1001 has both apps, worker 1002 has only TestDirtyApp
arbiter._register_worker_apps(1001, [
"tests.support_dirty_app:TestDirtyApp",
"tests.support_dirty_app:SlowDirtyApp",
])
arbiter._register_worker_apps(1002, [
"tests.support_dirty_app:TestDirtyApp",
])
# Request for SlowDirtyApp should only go to worker 1001
worker = await arbiter._get_available_worker("tests.support_dirty_app:SlowDirtyApp")
assert worker == 1001
# Request for TestDirtyApp should go to either
worker = await arbiter._get_available_worker("tests.support_dirty_app:TestDirtyApp")
assert worker in [1001, 1002]
arbiter._cleanup_sync()
def test_worker_crash_app_reassigned_to_new_worker(self):
"""When worker dies, new worker gets the app it had."""
cfg = Config()
cfg.set("dirty_apps", [
"tests.support_dirty_app:TestDirtyApp",
"tests.support_dirty_app:SlowDirtyApp:1",
])
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
arbiter.pid = 12345
# Set up initial workers
arbiter.workers[1001] = "worker1"
arbiter.worker_sockets[1001] = "/tmp/fake1.sock"
# Worker 1001 has both apps
arbiter._register_worker_apps(1001, [
"tests.support_dirty_app:TestDirtyApp",
"tests.support_dirty_app:SlowDirtyApp",
])
# Simulate worker crash
arbiter._cleanup_worker(1001)
# Apps should be queued for respawn
assert len(arbiter._pending_respawns) == 1
pending_apps = arbiter._pending_respawns[0]
assert "tests.support_dirty_app:TestDirtyApp" in pending_apps
assert "tests.support_dirty_app:SlowDirtyApp" in pending_apps
arbiter._cleanup_sync()
@pytest.mark.asyncio
async def test_worker_crash_other_workers_still_serve_app(self):
"""When one of two workers dies, other still serves requests."""
cfg = Config()
cfg.set("dirty_apps", [
"tests.support_dirty_app:TestDirtyApp",
])
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
arbiter.pid = 12345
# Set up two workers for the same app
arbiter.workers[1001] = "worker1"
arbiter.worker_sockets[1001] = "/tmp/fake1.sock"
arbiter.workers[1002] = "worker2"
arbiter.worker_sockets[1002] = "/tmp/fake2.sock"
app_path = "tests.support_dirty_app:TestDirtyApp"
arbiter._register_worker_apps(1001, [app_path])
arbiter._register_worker_apps(1002, [app_path])
# Both workers serve the app
assert len(arbiter.app_worker_map[app_path]) == 2
# Worker 1001 crashes
arbiter._cleanup_worker(1001)
# Worker 1002 still serves requests
assert len(arbiter.app_worker_map[app_path]) == 1
assert 1002 in arbiter.app_worker_map[app_path]
worker = await arbiter._get_available_worker(app_path)
assert worker == 1002
arbiter._cleanup_sync()
@pytest.mark.asyncio
async def test_worker_crash_sole_worker_app_unavailable_until_respawn(self):
"""When sole worker for app dies, requests fail until respawn."""
cfg = Config()
cfg.set("dirty_apps", [
"tests.support_dirty_app:SlowDirtyApp:1",
])
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
arbiter.pid = 12345
# Only one worker for this app
arbiter.workers[1001] = "worker1"
arbiter.worker_sockets[1001] = "/tmp/fake1.sock"
app_path = "tests.support_dirty_app:SlowDirtyApp"
arbiter._register_worker_apps(1001, [app_path])
# Worker crashes
arbiter._cleanup_worker(1001)
# No workers available for the app
worker = await arbiter._get_available_worker(app_path)
assert worker is None
arbiter._cleanup_sync()
def test_config_format_module_class_n(self):
"""Config 'mod:Class:2' correctly limits to 2 workers."""
cfg = Config()
cfg.set("dirty_apps", [
"tests.support_dirty_app:TestDirtyApp:2",
])
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
# Check parsed spec
app_path = "tests.support_dirty_app:TestDirtyApp"
assert arbiter.app_specs[app_path]["worker_count"] == 2
arbiter._cleanup_sync()

View File

@ -6,7 +6,12 @@
import pytest
from gunicorn.dirty.app import DirtyApp, load_dirty_app, load_dirty_apps
from gunicorn.dirty.app import (
DirtyApp,
load_dirty_app,
load_dirty_apps,
parse_dirty_app_spec,
)
from gunicorn.dirty.errors import DirtyAppError, DirtyAppNotFoundError
@ -185,3 +190,108 @@ class TestDirtyAppStateful:
with pytest.raises(ValueError) as exc_info:
app("compute", 1, 2, operation="invalid")
assert "Unknown operation" in str(exc_info.value)
class TestDirtyAppWorkersAttribute:
"""Tests for DirtyApp workers class attribute."""
def test_default_workers_is_none(self):
"""Base DirtyApp has workers=None (all workers)."""
assert DirtyApp.workers is None
def test_subclass_can_set_workers(self):
"""Subclass can override workers=2."""
class LimitedApp(DirtyApp):
workers = 2
assert LimitedApp.workers == 2
def test_workers_inherited_by_default(self):
"""Subclass without workers attr inherits None."""
class InheritedApp(DirtyApp):
pass
assert InheritedApp.workers is None
def test_instance_has_workers_attribute(self):
"""Instance should have access to workers attribute."""
app = DirtyApp()
assert app.workers is None
class LimitedApp(DirtyApp):
workers = 3
limited = LimitedApp()
assert limited.workers == 3
class TestParseDirtyAppSpec:
"""Tests for parse_dirty_app_spec function."""
def test_standard_format(self):
"""'mod:Class' returns ('mod:Class', None)."""
import_path, count = parse_dirty_app_spec("mod:Class")
assert import_path == "mod:Class"
assert count is None
def test_standard_format_with_dots(self):
"""'mod.sub.pkg:Class' returns ('mod.sub.pkg:Class', None)."""
import_path, count = parse_dirty_app_spec("mod.sub.pkg:Class")
assert import_path == "mod.sub.pkg:Class"
assert count is None
def test_with_worker_count(self):
"""'mod:Class:2' returns ('mod:Class', 2)."""
import_path, count = parse_dirty_app_spec("mod:Class:2")
assert import_path == "mod:Class"
assert count == 2
def test_worker_count_one(self):
"""'mod:Class:1' returns ('mod:Class', 1)."""
import_path, count = parse_dirty_app_spec("mod:Class:1")
assert import_path == "mod:Class"
assert count == 1
def test_worker_count_large(self):
"""'mod:Class:100' returns ('mod:Class', 100)."""
import_path, count = parse_dirty_app_spec("mod:Class:100")
assert import_path == "mod:Class"
assert count == 100
def test_worker_count_zero_raises(self):
"""'mod:Class:0' raises DirtyAppError."""
with pytest.raises(DirtyAppError) as exc_info:
parse_dirty_app_spec("mod:Class:0")
assert "must be >= 1" in str(exc_info.value)
def test_worker_count_negative_raises(self):
"""'mod:Class:-1' raises DirtyAppError."""
with pytest.raises(DirtyAppError) as exc_info:
parse_dirty_app_spec("mod:Class:-1")
assert "must be >= 1" in str(exc_info.value)
def test_non_numeric_raises(self):
"""'mod:Class:abc' raises DirtyAppError."""
with pytest.raises(DirtyAppError) as exc_info:
parse_dirty_app_spec("mod:Class:abc")
assert "Expected integer" in str(exc_info.value)
def test_no_colon_raises(self):
"""'mod.Class' (no colon) raises DirtyAppError."""
with pytest.raises(DirtyAppError) as exc_info:
parse_dirty_app_spec("mod.Class")
assert "Invalid import path format" in str(exc_info.value)
def test_too_many_colons_raises(self):
"""'mod:Class:2:extra' raises DirtyAppError."""
with pytest.raises(DirtyAppError) as exc_info:
parse_dirty_app_spec("mod:Class:2:extra")
assert "Invalid import path format" in str(exc_info.value)
def test_dotted_module_with_count(self):
"""'mod.sub:Class:2' handles dots correctly."""
import_path, count = parse_dirty_app_spec("mod.sub:Class:2")
assert import_path == "mod.sub:Class"
assert count == 2

View File

@ -1144,3 +1144,275 @@ class TestDirtyArbiterQueueBehavior:
assert task2.done()
arbiter._cleanup_sync()
class TestDirtyArbiterAppTracking:
"""Tests for per-app worker tracking."""
def test_parse_app_specs_standard_format(self):
"""All standard format apps have worker_count=None."""
cfg = Config()
cfg.set("dirty_apps", [
"tests.support_dirty_app:TestDirtyApp",
"tests.support_dirty_app:SlowDirtyApp",
])
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
assert len(arbiter.app_specs) == 2
assert arbiter.app_specs["tests.support_dirty_app:TestDirtyApp"]["worker_count"] is None
assert arbiter.app_specs["tests.support_dirty_app:SlowDirtyApp"]["worker_count"] is None
arbiter._cleanup_sync()
def test_parse_app_specs_with_worker_count(self):
"""Apps with :N have correct worker_count."""
cfg = Config()
cfg.set("dirty_apps", [
"tests.support_dirty_app:TestDirtyApp",
"tests.support_dirty_app:SlowDirtyApp:2",
])
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
assert arbiter.app_specs["tests.support_dirty_app:TestDirtyApp"]["worker_count"] is None
assert arbiter.app_specs["tests.support_dirty_app:SlowDirtyApp"]["worker_count"] == 2
arbiter._cleanup_sync()
def test_get_apps_for_new_worker_all_standard(self):
"""All apps returned when all have workers=None."""
cfg = Config()
cfg.set("dirty_apps", [
"tests.support_dirty_app:TestDirtyApp",
"tests.support_dirty_app:SlowDirtyApp",
])
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
apps = arbiter._get_apps_for_new_worker()
assert len(apps) == 2
assert "tests.support_dirty_app:TestDirtyApp" in apps
assert "tests.support_dirty_app:SlowDirtyApp" in apps
arbiter._cleanup_sync()
def test_get_apps_for_new_worker_respects_limit(self):
"""App with workers=2 stops assigning after 2 workers."""
cfg = Config()
cfg.set("dirty_apps", [
"tests.support_dirty_app:TestDirtyApp", # unlimited
"tests.support_dirty_app:SlowDirtyApp:2", # limited to 2
])
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
# First worker should get both apps
apps1 = arbiter._get_apps_for_new_worker()
assert len(apps1) == 2
arbiter._register_worker_apps(1001, apps1)
# Second worker should get both apps
apps2 = arbiter._get_apps_for_new_worker()
assert len(apps2) == 2
arbiter._register_worker_apps(1002, apps2)
# Third worker should only get unlimited app
apps3 = arbiter._get_apps_for_new_worker()
assert len(apps3) == 1
assert "tests.support_dirty_app:TestDirtyApp" in apps3
assert "tests.support_dirty_app:SlowDirtyApp" not in apps3
arbiter._cleanup_sync()
def test_register_worker_apps_updates_both_maps(self):
"""Both app_worker_map and worker_app_map updated."""
cfg = Config()
cfg.set("dirty_apps", ["tests.support_dirty_app:TestDirtyApp"])
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
app_path = "tests.support_dirty_app:TestDirtyApp"
arbiter._register_worker_apps(1001, [app_path])
# Check app_worker_map
assert 1001 in arbiter.app_worker_map[app_path]
# Check worker_app_map
assert app_path in arbiter.worker_app_map[1001]
arbiter._cleanup_sync()
def test_unregister_worker_cleans_both_maps(self):
"""Worker removal updates both maps correctly."""
cfg = Config()
cfg.set("dirty_apps", ["tests.support_dirty_app:TestDirtyApp"])
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
app_path = "tests.support_dirty_app:TestDirtyApp"
arbiter._register_worker_apps(1001, [app_path])
# Verify registered
assert 1001 in arbiter.app_worker_map[app_path]
assert 1001 in arbiter.worker_app_map
# Unregister
arbiter._unregister_worker(1001)
# Verify cleaned up
assert 1001 not in arbiter.app_worker_map[app_path]
assert 1001 not in arbiter.worker_app_map
arbiter._cleanup_sync()
class TestDirtyArbiterSpawnWorkerPerApp:
"""Tests for spawn_worker with per-app allocation."""
def test_cleanup_worker_queues_apps_for_respawn(self):
"""Dead worker's apps added to _pending_respawns."""
cfg = Config()
cfg.set("dirty_apps", ["tests.support_dirty_app:TestDirtyApp"])
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
arbiter.pid = 12345
# Simulate worker registration
app_path = "tests.support_dirty_app:TestDirtyApp"
arbiter.workers[1001] = "fake_worker"
arbiter.worker_sockets[1001] = "/tmp/fake.sock"
arbiter._register_worker_apps(1001, [app_path])
# Cleanup should queue apps for respawn
assert len(arbiter._pending_respawns) == 0
arbiter._cleanup_worker(1001)
assert len(arbiter._pending_respawns) == 1
assert app_path in arbiter._pending_respawns[0]
arbiter._cleanup_sync()
def test_pending_respawns_cleared_after_spawn(self):
"""Pending respawns consumed when spawning new worker."""
cfg = Config()
cfg.set("dirty_apps", ["tests.support_dirty_app:TestDirtyApp"])
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
arbiter.pid = 12345
# Add pending respawn
app_path = "tests.support_dirty_app:TestDirtyApp"
arbiter._pending_respawns.append([app_path])
# Get apps for new worker should use pending first
# But since spawn_worker forks, we test the logic directly
assert len(arbiter._pending_respawns) == 1
# When spawn_worker pops from pending_respawns
apps = arbiter._pending_respawns.pop(0)
assert apps == [app_path]
assert len(arbiter._pending_respawns) == 0
arbiter._cleanup_sync()
class TestDirtyArbiterRoutingPerApp:
"""Tests for app-aware routing."""
@pytest.mark.asyncio
async def test_get_available_worker_no_filter(self):
"""Without app_path, returns any worker round-robin."""
cfg = Config()
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
arbiter.workers[1001] = "worker1"
arbiter.workers[1002] = "worker2"
# Should return workers in round-robin
w1 = await arbiter._get_available_worker()
w2 = await arbiter._get_available_worker()
assert w1 in [1001, 1002]
assert w2 in [1001, 1002]
# They should be different (round-robin)
if len(arbiter.workers) >= 2:
assert w1 != w2 or len(arbiter.workers) == 1
arbiter._cleanup_sync()
@pytest.mark.asyncio
async def test_get_available_worker_with_app_filter(self):
"""With app_path, returns only workers that have it."""
cfg = Config()
cfg.set("dirty_apps", ["tests.support_dirty_app:TestDirtyApp"])
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
arbiter.workers[1001] = "worker1"
arbiter.workers[1002] = "worker2"
# Only register 1001 for the app
app_path = "tests.support_dirty_app:TestDirtyApp"
arbiter._register_worker_apps(1001, [app_path])
# Should only return 1001
worker = await arbiter._get_available_worker(app_path)
assert worker == 1001
arbiter._cleanup_sync()
@pytest.mark.asyncio
async def test_get_available_worker_app_no_workers_returns_none(self):
"""Returns None if no workers have the app."""
cfg = Config()
cfg.set("dirty_apps", ["tests.support_dirty_app:TestDirtyApp"])
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
arbiter.workers[1001] = "worker1"
# Worker 1001 has no apps registered - request for unknown app returns None
worker = await arbiter._get_available_worker("unknown:App")
assert worker is None
arbiter._cleanup_sync()
@pytest.mark.asyncio
async def test_route_request_app_not_loaded_error(self):
"""Error response when no worker has the app."""
from gunicorn.dirty.protocol import DirtyProtocol
cfg = Config()
cfg.set("dirty_apps", ["tests.support_dirty_app:TestDirtyApp"])
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
arbiter.pid = 12345
arbiter.workers[1001] = "worker1"
# No apps registered for this worker (worker exists but has no apps)
request = make_request(
request_id="test-123",
app_path="unknown:App",
action="test"
)
writer = MockStreamWriter()
await arbiter.route_request(request, writer)
assert len(writer.messages) == 1
response = writer.messages[0]
assert response["type"] == DirtyProtocol.MSG_TYPE_ERROR
assert "No workers available for app" in response["error"]["message"]
assert response["error"]["error_type"] == "DirtyNoWorkersAvailableError"
arbiter._cleanup_sync()

View File

@ -0,0 +1,76 @@
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
"""Tests for dirty errors module."""
import pytest
from gunicorn.dirty.errors import (
DirtyError,
DirtyNoWorkersAvailableError,
)
class TestDirtyNoWorkersAvailableError:
"""Tests for DirtyNoWorkersAvailableError exception."""
def test_error_contains_app_path(self):
"""Error includes the app_path."""
error = DirtyNoWorkersAvailableError("myapp:Model")
assert error.app_path == "myapp:Model"
assert "myapp:Model" in str(error)
assert "No workers available" in str(error)
def test_error_with_custom_message(self):
"""Error can have a custom message."""
error = DirtyNoWorkersAvailableError(
"myapp:Model",
message="Custom: no workers for heavy model"
)
assert error.app_path == "myapp:Model"
assert "Custom: no workers" in str(error)
def test_error_serialization_roundtrip(self):
"""Error survives to_dict/from_dict cycle."""
original = DirtyNoWorkersAvailableError("myapp.ml:HugeModel")
# Serialize
data = original.to_dict()
assert data["error_type"] == "DirtyNoWorkersAvailableError"
assert data["details"]["app_path"] == "myapp.ml:HugeModel"
# Deserialize
restored = DirtyError.from_dict(data)
assert isinstance(restored, DirtyNoWorkersAvailableError)
assert restored.app_path == "myapp.ml:HugeModel"
assert "No workers available" in str(restored)
def test_error_is_dirty_error_subclass(self):
"""DirtyNoWorkersAvailableError is a DirtyError subclass."""
error = DirtyNoWorkersAvailableError("app:Class")
assert isinstance(error, DirtyError)
def test_web_app_can_catch_specific_error(self):
"""Web app can catch DirtyNoWorkersAvailableError specifically."""
def simulate_execute():
raise DirtyNoWorkersAvailableError("myapp:HeavyModel")
# Catch specific error
try:
simulate_execute()
assert False, "Should have raised"
except DirtyNoWorkersAvailableError as e:
assert e.app_path == "myapp:HeavyModel"
def test_can_catch_as_base_error(self):
"""Can catch DirtyNoWorkersAvailableError as DirtyError."""
def simulate_execute():
raise DirtyNoWorkersAvailableError("myapp:Model")
try:
simulate_execute()
assert False, "Should have raised"
except DirtyError as e:
# Should catch it as the base class
assert hasattr(e, "app_path")