From 8559854b4f09601920ed852b9bcfcbb55f726593 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Sun, 1 Feb 2026 02:40:09 +0100 Subject: [PATCH] feat(dirty): add per-app worker allocation for memory optimization Allow dirty apps to specify how many workers should load them, enabling significant memory savings for heavy applications like ML models. - Add `workers` class attribute to DirtyApp (None = all workers) - Add `parse_dirty_app_spec()` to parse "module:Class:N" format - Add `DirtyNoWorkersAvailableError` for app-specific error handling - Update DirtyArbiter with per-app worker tracking and routing - Maintain backward compatibility when no dirty_apps configured Example: 8 workers x 10GB model = 80GB RAM needed With workers=2: 2 x 10GB = 20GB RAM (75% savings) Configuration formats: - Class attribute: `workers = 2` on DirtyApp subclass - Config format: `module:class:N` (e.g., `myapp.ml:HugeModel:2`) --- gunicorn/config.py | 30 +- gunicorn/dirty/app.py | 92 ++++++ gunicorn/dirty/arbiter.py | 215 ++++++++++++-- gunicorn/dirty/errors.py | 37 +++ tests/dirty/test_per_app_worker_allocation.py | 273 ++++++++++++++++++ tests/test_dirty_app.py | 112 ++++++- tests/test_dirty_arbiter.py | 272 +++++++++++++++++ tests/test_dirty_errors.py | 76 +++++ 8 files changed, 1083 insertions(+), 24 deletions(-) create mode 100644 tests/dirty/test_per_app_worker_allocation.py create mode 100644 tests/test_dirty_errors.py diff --git a/gunicorn/config.py b/gunicorn/config.py index 73264657..91132f16 100644 --- a/gunicorn/config.py +++ b/gunicorn/config.py @@ -2885,23 +2885,47 @@ class DirtyApps(Setting): desc = """\ Dirty applications to load in the dirty worker pool. - A list of application paths in pattern ``$(MODULE_NAME):$(CLASS_NAME)``. + A list of application paths in one of these formats: + + - ``$(MODULE_NAME):$(CLASS_NAME)`` - all workers load this app + - ``$(MODULE_NAME):$(CLASS_NAME):$(N)`` - only N workers load this app + Each dirty app must be a class that inherits from ``DirtyApp`` base class and implements the ``init()``, ``__call__()``, and ``close()`` methods. Example:: dirty_apps = [ - "myapp.ml:MLApp", - "myapp.images:ImageApp", + "myapp.ml:MLApp", # All workers load this + "myapp.images:ImageApp", # All workers load this + "myapp.heavy:HugeModel:2", # Only 2 workers load this ] + The per-app worker limit is useful for memory-intensive applications + like large ML models. Instead of all 8 workers loading a 10GB model + (80GB total), you can limit it to 2 workers (20GB total). + + Alternatively, you can set the ``workers`` class attribute on your + DirtyApp subclass:: + + class HugeModelApp(DirtyApp): + workers = 2 # Only 2 workers load this app + + def init(self): + self.model = load_10gb_model() + + Note: The config format (``module:Class:N``) takes precedence over + the class attribute if both are specified. + Dirty apps are loaded once when the dirty worker starts and persist in memory for the lifetime of the worker. This is ideal for loading ML models, database connection pools, or other stateful resources that are expensive to initialize. .. versionadded:: 25.0.0 + + .. versionchanged:: 25.1.0 + Added per-app worker allocation via ``:N`` format suffix. """ diff --git a/gunicorn/dirty/app.py b/gunicorn/dirty/app.py index 17489d02..307269b2 100644 --- a/gunicorn/dirty/app.py +++ b/gunicorn/dirty/app.py @@ -72,12 +72,29 @@ class DirtyApp: self.models[name] = load_model(name) return {"loaded": True, "name": name} + Worker Allocation + ----------------- + By default, all dirty workers load all apps. For apps that consume + significant memory (like large ML models), you can limit how many + workers load the app by setting the ``workers`` class attribute:: + + class HeavyModelApp(DirtyApp): + workers = 2 # Only 2 workers will load this app + + def init(self): + self.model = load_10gb_model() + Subclasses should implement: - init(): Called once at worker startup to initialize resources - __call__(action, *args, **kwargs): Handle requests from HTTP workers - close(): Called at worker shutdown to cleanup resources """ + # Number of workers that should load this app. + # None means all workers (default, backward compatible). + # Set to an integer to limit how many workers load this app. + workers = None + def init(self): """ Initialize the application. @@ -120,6 +137,81 @@ class DirtyApp: """ +def parse_dirty_app_spec(spec): + """ + Parse a dirty app specification. + + Supports two formats: + - ``"module:Class"`` - standard format, all workers load the app + - ``"module:Class:N"`` - worker-limited format, only N workers load the app + + Args: + spec: The app specification string + + Returns: + tuple: (import_path, worker_count) + - import_path: The "module:Class" part for importing + - worker_count: Integer limit or None for all workers + + Raises: + DirtyAppError: If the spec format is invalid or worker_count is < 1 + + Examples:: + + >>> parse_dirty_app_spec("myapp:App") + ("myapp:App", None) + + >>> parse_dirty_app_spec("myapp:App:2") + ("myapp:App", 2) + + >>> parse_dirty_app_spec("myapp.sub:App:1") + ("myapp.sub:App", 1) + """ + if ':' not in spec: + raise DirtyAppError( + f"Invalid import path format: {spec}. " + f"Expected 'module.path:ClassName' or 'module.path:ClassName:N'", + app_path=spec + ) + + parts = spec.split(':') + + # Standard format: "module:Class" or "module.sub:Class" + if len(parts) == 2: + return (spec, None) + + # Worker-limited format: "module:Class:N" + if len(parts) == 3: + module_path, class_name, count_str = parts + import_path = f"{module_path}:{class_name}" + + # Validate the worker count + try: + worker_count = int(count_str) + except ValueError: + raise DirtyAppError( + f"Invalid worker count in spec: {spec}. " + f"Expected integer, got '{count_str}'", + app_path=spec + ) + + if worker_count < 1: + raise DirtyAppError( + f"Invalid worker count in spec: {spec}. " + f"Worker count must be >= 1, got {worker_count}", + app_path=spec + ) + + return (import_path, worker_count) + + # Too many colons + raise DirtyAppError( + f"Invalid import path format: {spec}. " + f"Expected 'module.path:ClassName' or 'module.path:ClassName:N'", + app_path=spec + ) + + def load_dirty_app(import_path): """ Load a dirty app class from an import path. diff --git a/gunicorn/dirty/arbiter.py b/gunicorn/dirty/arbiter.py index 3f2350e4..eceb6773 100644 --- a/gunicorn/dirty/arbiter.py +++ b/gunicorn/dirty/arbiter.py @@ -19,7 +19,13 @@ import time from gunicorn import util -from .errors import DirtyError, DirtyTimeoutError, DirtyWorkerError +from .app import parse_dirty_app_spec +from .errors import ( + DirtyError, + DirtyNoWorkersAvailableError, + DirtyTimeoutError, + DirtyWorkerError, +) from .protocol import ( DirtyProtocol, make_error_response, @@ -79,6 +85,100 @@ class DirtyArbiter: self._loop = None self._pending_requests = {} # request_id -> Future + # Per-app worker allocation tracking + # Maps import_path -> {import_path, worker_count, original_spec} + self.app_specs = {} + # Maps import_path -> set of worker PIDs that have loaded the app + self.app_worker_map = {} + # Maps worker_pid -> list of import_paths loaded by this worker + self.worker_app_map = {} + # Per-app round-robin indices for routing + self._app_rr_indices = {} + # Queue of app lists from dead workers to respawn with same apps + self._pending_respawns = [] + + # Parse app specs on init + self._parse_app_specs() + + def _parse_app_specs(self): + """ + Parse all app specifications from config. + + Populates self.app_specs with parsed information about each app, + including the import path and worker count limits. + """ + for spec in self.cfg.dirty_apps: + import_path, worker_count = parse_dirty_app_spec(spec) + self.app_specs[import_path] = { + 'import_path': import_path, + 'worker_count': worker_count, + 'original_spec': spec, + } + # Initialize the app_worker_map for this app + self.app_worker_map[import_path] = set() + + def _get_apps_for_new_worker(self): + """ + Determine which apps a new worker should load. + + Returns a list of import paths for apps that need more workers. + Apps with workers=None (all workers) are always included. + Apps with worker limits are included only if they haven't + reached their limit yet. + + Returns: + List of import paths to load, or empty list if no apps need workers + """ + app_paths = [] + + for import_path, spec in self.app_specs.items(): + worker_count = spec['worker_count'] + current_workers = len(self.app_worker_map.get(import_path, set())) + + # None means all workers should load this app + if worker_count is None: + app_paths.append(import_path) + # Otherwise check if we've reached the limit + elif current_workers < worker_count: + app_paths.append(import_path) + + return app_paths + + def _register_worker_apps(self, worker_pid, app_paths): + """ + Register which apps a worker has loaded. + + Updates both app_worker_map and worker_app_map to track the + bidirectional relationship between workers and apps. + + Args: + worker_pid: The PID of the worker + app_paths: List of app import paths loaded by this worker + """ + self.worker_app_map[worker_pid] = list(app_paths) + + for app_path in app_paths: + if app_path not in self.app_worker_map: + self.app_worker_map[app_path] = set() + self.app_worker_map[app_path].add(worker_pid) + + def _unregister_worker(self, worker_pid): + """ + Unregister a worker's apps when it exits. + + Removes the worker from all tracking maps. + + Args: + worker_pid: The PID of the worker to unregister + """ + # Get the apps this worker had + app_paths = self.worker_app_map.pop(worker_pid, []) + + # Remove worker from each app's worker set + for app_path in app_paths: + if app_path in self.app_worker_map: + self.app_worker_map[app_path].discard(worker_pid) + def run(self): """Run the dirty arbiter (blocking call).""" self.pid = os.getpid() @@ -256,14 +356,20 @@ class DirtyArbiter: client_writer: StreamWriter to send responses to client """ request_id = request.get("id", "unknown") + app_path = request.get("app_path") - # Find an available worker - worker_pid = await self._get_available_worker() + # Find an available worker (filtered by app if specified) + worker_pid = await self._get_available_worker(app_path) if worker_pid is None: - response = make_error_response( - request_id, - DirtyError("No dirty workers available") - ) + # Distinguish between no workers at all vs. no workers for this app + if not self.workers: + error = DirtyError("No dirty workers available") + elif app_path and self.app_specs: + # Per-app allocation is configured and no workers have this app + error = DirtyNoWorkersAvailableError(app_path) + else: + error = DirtyError("No dirty workers available") + response = make_error_response(request_id, error) await DirtyProtocol.write_message_async(client_writer, response) return @@ -373,20 +479,47 @@ class DirtyArbiter: ) await DirtyProtocol.write_message_async(client_writer, response) - async def _get_available_worker(self): + async def _get_available_worker(self, app_path=None): """ Get an available worker PID using round-robin selection. - Distributes requests across all available workers evenly to - maximize throughput when multiple workers are configured. + If app_path is provided, only returns workers that have loaded + that specific app. Uses per-app round-robin to ensure fair + distribution among eligible workers. + + Args: + app_path: Optional import path of the target app. If None, + returns any worker using global round-robin. + + Returns: + Worker PID or None if no eligible workers are available. """ - worker_pids = list(self.workers.keys()) - if not worker_pids: + # Determine eligible workers + if app_path and self.app_specs: + # Per-app allocation is configured - must return a worker + # that has this specific app + if app_path in self.app_worker_map: + eligible_pids = list(self.app_worker_map[app_path]) + else: + # App not known or no workers have it + return None + else: + # No specific app requested, or no app specs configured + # (backward compatible) - any worker will do + eligible_pids = list(self.workers.keys()) + + if not eligible_pids: return None - # Round-robin selection - self._worker_rr_index = (self._worker_rr_index + 1) % len(worker_pids) - return worker_pids[self._worker_rr_index] + # Per-app round-robin for fairness + if app_path and self.app_specs: + idx = self._app_rr_indices.get(app_path, 0) + self._app_rr_indices[app_path] = (idx + 1) % len(eligible_pids) + else: + idx = self._worker_rr_index + self._worker_rr_index = (idx + 1) % len(eligible_pids) + + return eligible_pids[idx % len(eligible_pids)] async def _get_worker_connection(self, worker_pid): """Get or create connection to a worker.""" @@ -424,7 +557,10 @@ class DirtyArbiter: # Spawn workers if needed while self.alive and len(self.workers) < num_workers: - self.spawn_worker() + result = self.spawn_worker() + if result is None: + # No apps need more workers - stop spawning + break await asyncio.sleep(0.1) # Kill excess workers @@ -436,7 +572,27 @@ class DirtyArbiter: await asyncio.sleep(0.1) def spawn_worker(self): - """Spawn a new dirty worker.""" + """ + Spawn a new dirty worker. + + Worker app assignment follows these priorities: + 1. If there are pending respawns (from dead workers), use those apps + 2. Otherwise, determine apps for a new worker based on allocation + + Returns: + Worker PID in parent process, or None if no apps need workers + """ + # Priority 1: Respawn dead worker with same apps + if self._pending_respawns: + app_paths = self._pending_respawns.pop(0) + else: + # Priority 2: New worker for initial pool + app_paths = self._get_apps_for_new_worker() + + if not app_paths: + self.log.warning("No apps need more workers, skipping spawn") + return None + self.worker_age += 1 socket_path = os.path.join( self.tmpdir, f"worker-{self.worker_age}.sock" @@ -445,7 +601,7 @@ class DirtyArbiter: worker = DirtyWorker( age=self.worker_age, ppid=self.pid, - app_paths=self.cfg.dirty_apps, + app_paths=app_paths, # Only assigned apps, not all apps cfg=self.cfg, log=self.log, socket_path=socket_path @@ -457,8 +613,13 @@ class DirtyArbiter: worker.pid = pid self.workers[pid] = worker self.worker_sockets[pid] = socket_path + + # Register which apps this worker has + self._register_worker_apps(pid, app_paths) + self.cfg.dirty_post_fork(self, worker) - self.log.info("Spawned dirty worker (pid: %s)", pid) + self.log.info("Spawned dirty worker (pid: %s) with apps: %s", + pid, app_paths) return pid # Child process @@ -484,7 +645,12 @@ class DirtyArbiter: self._cleanup_worker(pid) def _cleanup_worker(self, pid): - """Clean up after a worker exits.""" + """ + Clean up after a worker exits. + + Saves the dead worker's app list to pending respawns so the + replacement worker gets the same apps. + """ self._close_worker_connection(pid) # Cancel consumer task @@ -495,6 +661,15 @@ class DirtyArbiter: # Remove queue self.worker_queues.pop(pid, None) + # Save dead worker's apps for respawn BEFORE unregistering + if pid in self.worker_app_map: + dead_apps = list(self.worker_app_map[pid]) + if dead_apps: + self._pending_respawns.append(dead_apps) + + # Now safe to unregister the worker's apps + self._unregister_worker(pid) + worker = self.workers.pop(pid, None) if worker: self.cfg.dirty_worker_exit(self, worker) diff --git a/gunicorn/dirty/errors.py b/gunicorn/dirty/errors.py index 4f39f186..5ce25705 100644 --- a/gunicorn/dirty/errors.py +++ b/gunicorn/dirty/errors.py @@ -46,6 +46,7 @@ class DirtyError(Exception): "DirtyWorkerError": DirtyWorkerError, "DirtyAppError": DirtyAppError, "DirtyAppNotFoundError": DirtyAppNotFoundError, + "DirtyNoWorkersAvailableError": DirtyNoWorkersAvailableError, "DirtyProtocolError": DirtyProtocolError, } error_type = data.get("error_type", "DirtyError") @@ -70,6 +71,8 @@ class DirtyError(Exception): error.app_path = error.details.get("app_path") error.action = error.details.get("action") error.traceback = error.details.get("traceback") + elif error_class == DirtyNoWorkersAvailableError: + error.app_path = error.details.get("app_path") return error @@ -130,6 +133,40 @@ class DirtyAppNotFoundError(DirtyAppError): super().__init__(f"Dirty app not found: {app_path}", app_path=app_path) +class DirtyNoWorkersAvailableError(DirtyError): + """ + Raised when no workers are available for the requested app. + + This exception is raised when a request targets an app that has + worker limits configured, and no workers with that app are currently + available (e.g., all workers for that app crashed and haven't been + respawned yet). + + Web applications can catch this exception to provide graceful + degradation, such as queuing requests for retry or showing a + maintenance page. + + Example:: + + from gunicorn.dirty import get_dirty_client + from gunicorn.dirty.errors import DirtyNoWorkersAvailableError + + def my_view(request): + client = get_dirty_client() + try: + result = client.execute("myapp.ml:HeavyModel", "predict", data) + except DirtyNoWorkersAvailableError as e: + return {"error": "Service temporarily unavailable", + "app": e.app_path} + """ + + def __init__(self, app_path, message=None): + if message is None: + message = f"No workers available for app: {app_path}" + super().__init__(message, details={"app_path": app_path}) + self.app_path = app_path + + class DirtyProtocolError(DirtyError): """Raised when there is a protocol-level error.""" diff --git a/tests/dirty/test_per_app_worker_allocation.py b/tests/dirty/test_per_app_worker_allocation.py new file mode 100644 index 00000000..fd0ac5fe --- /dev/null +++ b/tests/dirty/test_per_app_worker_allocation.py @@ -0,0 +1,273 @@ +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +"""Integration tests for per-app worker allocation.""" + +import pytest + +from gunicorn.config import Config +from gunicorn.dirty.arbiter import DirtyArbiter + + +class MockLog: + """Mock logger for testing.""" + + def __init__(self): + self.messages = [] + + def debug(self, msg, *args): + self.messages.append(("debug", msg % args if args else msg)) + + def info(self, msg, *args): + self.messages.append(("info", msg % args if args else msg)) + + def warning(self, msg, *args): + self.messages.append(("warning", msg % args if args else msg)) + + def error(self, msg, *args): + self.messages.append(("error", msg % args if args else msg)) + + def critical(self, msg, *args): + self.messages.append(("critical", msg % args if args else msg)) + + def exception(self, msg, *args): + self.messages.append(("exception", msg % args if args else msg)) + + def close_on_exec(self): + pass + + def reopen_files(self): + pass + + +class TestPerAppWorkerAllocation: + """Integration tests for per-app worker allocation.""" + + def test_heavy_app_loaded_on_limited_workers(self): + """App with workers=2 only loaded on 2 of 4 workers.""" + cfg = Config() + cfg.set("dirty_workers", 4) + cfg.set("dirty_apps", [ + "tests.support_dirty_app:TestDirtyApp", # unlimited + "tests.support_dirty_app:SlowDirtyApp:2", # limited to 2 + ]) + log = MockLog() + + arbiter = DirtyArbiter(cfg=cfg, log=log) + + # Simulate spawning 4 workers + for i in range(4): + apps = arbiter._get_apps_for_new_worker() + arbiter._register_worker_apps(1000 + i, apps) + + # Check distribution + unlimited_app = "tests.support_dirty_app:TestDirtyApp" + limited_app = "tests.support_dirty_app:SlowDirtyApp" + + # Unlimited app should be on all 4 workers + assert len(arbiter.app_worker_map[unlimited_app]) == 4 + + # Limited app should only be on 2 workers + assert len(arbiter.app_worker_map[limited_app]) == 2 + + arbiter._cleanup_sync() + + def test_light_app_loaded_on_all_workers(self): + """App with workers=None loaded on all workers.""" + cfg = Config() + cfg.set("dirty_workers", 4) + cfg.set("dirty_apps", [ + "tests.support_dirty_app:TestDirtyApp", + ]) + log = MockLog() + + arbiter = DirtyArbiter(cfg=cfg, log=log) + + # Simulate spawning 4 workers + for i in range(4): + apps = arbiter._get_apps_for_new_worker() + arbiter._register_worker_apps(1000 + i, apps) + + # App should be on all 4 workers + app_path = "tests.support_dirty_app:TestDirtyApp" + assert len(arbiter.app_worker_map[app_path]) == 4 + + arbiter._cleanup_sync() + + def test_mixed_apps_correct_distribution(self): + """Mix of limited and unlimited apps distributed correctly.""" + cfg = Config() + cfg.set("dirty_workers", 4) + cfg.set("dirty_apps", [ + "tests.support_dirty_app:TestDirtyApp", # unlimited + "tests.support_dirty_app:SlowDirtyApp:1", # limited to 1 + ]) + log = MockLog() + + arbiter = DirtyArbiter(cfg=cfg, log=log) + + # Simulate spawning 4 workers + for i in range(4): + apps = arbiter._get_apps_for_new_worker() + arbiter._register_worker_apps(1000 + i, apps) + + unlimited_app = "tests.support_dirty_app:TestDirtyApp" + limited_app = "tests.support_dirty_app:SlowDirtyApp" + + # Unlimited app on all workers + assert len(arbiter.app_worker_map[unlimited_app]) == 4 + + # Limited app on only 1 worker + assert len(arbiter.app_worker_map[limited_app]) == 1 + + arbiter._cleanup_sync() + + @pytest.mark.asyncio + async def test_request_routing_respects_allocation(self): + """Requests only routed to workers with the target app.""" + cfg = Config() + cfg.set("dirty_apps", [ + "tests.support_dirty_app:TestDirtyApp", + "tests.support_dirty_app:SlowDirtyApp:1", + ]) + log = MockLog() + + arbiter = DirtyArbiter(cfg=cfg, log=log) + + # Set up workers + arbiter.workers[1001] = "worker1" + arbiter.workers[1002] = "worker2" + + # Worker 1001 has both apps, worker 1002 has only TestDirtyApp + arbiter._register_worker_apps(1001, [ + "tests.support_dirty_app:TestDirtyApp", + "tests.support_dirty_app:SlowDirtyApp", + ]) + arbiter._register_worker_apps(1002, [ + "tests.support_dirty_app:TestDirtyApp", + ]) + + # Request for SlowDirtyApp should only go to worker 1001 + worker = await arbiter._get_available_worker("tests.support_dirty_app:SlowDirtyApp") + assert worker == 1001 + + # Request for TestDirtyApp should go to either + worker = await arbiter._get_available_worker("tests.support_dirty_app:TestDirtyApp") + assert worker in [1001, 1002] + + arbiter._cleanup_sync() + + def test_worker_crash_app_reassigned_to_new_worker(self): + """When worker dies, new worker gets the app it had.""" + cfg = Config() + cfg.set("dirty_apps", [ + "tests.support_dirty_app:TestDirtyApp", + "tests.support_dirty_app:SlowDirtyApp:1", + ]) + log = MockLog() + + arbiter = DirtyArbiter(cfg=cfg, log=log) + arbiter.pid = 12345 + + # Set up initial workers + arbiter.workers[1001] = "worker1" + arbiter.worker_sockets[1001] = "/tmp/fake1.sock" + + # Worker 1001 has both apps + arbiter._register_worker_apps(1001, [ + "tests.support_dirty_app:TestDirtyApp", + "tests.support_dirty_app:SlowDirtyApp", + ]) + + # Simulate worker crash + arbiter._cleanup_worker(1001) + + # Apps should be queued for respawn + assert len(arbiter._pending_respawns) == 1 + pending_apps = arbiter._pending_respawns[0] + assert "tests.support_dirty_app:TestDirtyApp" in pending_apps + assert "tests.support_dirty_app:SlowDirtyApp" in pending_apps + + arbiter._cleanup_sync() + + @pytest.mark.asyncio + async def test_worker_crash_other_workers_still_serve_app(self): + """When one of two workers dies, other still serves requests.""" + cfg = Config() + cfg.set("dirty_apps", [ + "tests.support_dirty_app:TestDirtyApp", + ]) + log = MockLog() + + arbiter = DirtyArbiter(cfg=cfg, log=log) + arbiter.pid = 12345 + + # Set up two workers for the same app + arbiter.workers[1001] = "worker1" + arbiter.worker_sockets[1001] = "/tmp/fake1.sock" + arbiter.workers[1002] = "worker2" + arbiter.worker_sockets[1002] = "/tmp/fake2.sock" + + app_path = "tests.support_dirty_app:TestDirtyApp" + arbiter._register_worker_apps(1001, [app_path]) + arbiter._register_worker_apps(1002, [app_path]) + + # Both workers serve the app + assert len(arbiter.app_worker_map[app_path]) == 2 + + # Worker 1001 crashes + arbiter._cleanup_worker(1001) + + # Worker 1002 still serves requests + assert len(arbiter.app_worker_map[app_path]) == 1 + assert 1002 in arbiter.app_worker_map[app_path] + + worker = await arbiter._get_available_worker(app_path) + assert worker == 1002 + + arbiter._cleanup_sync() + + @pytest.mark.asyncio + async def test_worker_crash_sole_worker_app_unavailable_until_respawn(self): + """When sole worker for app dies, requests fail until respawn.""" + cfg = Config() + cfg.set("dirty_apps", [ + "tests.support_dirty_app:SlowDirtyApp:1", + ]) + log = MockLog() + + arbiter = DirtyArbiter(cfg=cfg, log=log) + arbiter.pid = 12345 + + # Only one worker for this app + arbiter.workers[1001] = "worker1" + arbiter.worker_sockets[1001] = "/tmp/fake1.sock" + + app_path = "tests.support_dirty_app:SlowDirtyApp" + arbiter._register_worker_apps(1001, [app_path]) + + # Worker crashes + arbiter._cleanup_worker(1001) + + # No workers available for the app + worker = await arbiter._get_available_worker(app_path) + assert worker is None + + arbiter._cleanup_sync() + + def test_config_format_module_class_n(self): + """Config 'mod:Class:2' correctly limits to 2 workers.""" + cfg = Config() + cfg.set("dirty_apps", [ + "tests.support_dirty_app:TestDirtyApp:2", + ]) + log = MockLog() + + arbiter = DirtyArbiter(cfg=cfg, log=log) + + # Check parsed spec + app_path = "tests.support_dirty_app:TestDirtyApp" + assert arbiter.app_specs[app_path]["worker_count"] == 2 + + arbiter._cleanup_sync() diff --git a/tests/test_dirty_app.py b/tests/test_dirty_app.py index dcbe243c..791069e0 100644 --- a/tests/test_dirty_app.py +++ b/tests/test_dirty_app.py @@ -6,7 +6,12 @@ import pytest -from gunicorn.dirty.app import DirtyApp, load_dirty_app, load_dirty_apps +from gunicorn.dirty.app import ( + DirtyApp, + load_dirty_app, + load_dirty_apps, + parse_dirty_app_spec, +) from gunicorn.dirty.errors import DirtyAppError, DirtyAppNotFoundError @@ -185,3 +190,108 @@ class TestDirtyAppStateful: with pytest.raises(ValueError) as exc_info: app("compute", 1, 2, operation="invalid") assert "Unknown operation" in str(exc_info.value) + + +class TestDirtyAppWorkersAttribute: + """Tests for DirtyApp workers class attribute.""" + + def test_default_workers_is_none(self): + """Base DirtyApp has workers=None (all workers).""" + assert DirtyApp.workers is None + + def test_subclass_can_set_workers(self): + """Subclass can override workers=2.""" + + class LimitedApp(DirtyApp): + workers = 2 + + assert LimitedApp.workers == 2 + + def test_workers_inherited_by_default(self): + """Subclass without workers attr inherits None.""" + + class InheritedApp(DirtyApp): + pass + + assert InheritedApp.workers is None + + def test_instance_has_workers_attribute(self): + """Instance should have access to workers attribute.""" + app = DirtyApp() + assert app.workers is None + + class LimitedApp(DirtyApp): + workers = 3 + + limited = LimitedApp() + assert limited.workers == 3 + + +class TestParseDirtyAppSpec: + """Tests for parse_dirty_app_spec function.""" + + def test_standard_format(self): + """'mod:Class' returns ('mod:Class', None).""" + import_path, count = parse_dirty_app_spec("mod:Class") + assert import_path == "mod:Class" + assert count is None + + def test_standard_format_with_dots(self): + """'mod.sub.pkg:Class' returns ('mod.sub.pkg:Class', None).""" + import_path, count = parse_dirty_app_spec("mod.sub.pkg:Class") + assert import_path == "mod.sub.pkg:Class" + assert count is None + + def test_with_worker_count(self): + """'mod:Class:2' returns ('mod:Class', 2).""" + import_path, count = parse_dirty_app_spec("mod:Class:2") + assert import_path == "mod:Class" + assert count == 2 + + def test_worker_count_one(self): + """'mod:Class:1' returns ('mod:Class', 1).""" + import_path, count = parse_dirty_app_spec("mod:Class:1") + assert import_path == "mod:Class" + assert count == 1 + + def test_worker_count_large(self): + """'mod:Class:100' returns ('mod:Class', 100).""" + import_path, count = parse_dirty_app_spec("mod:Class:100") + assert import_path == "mod:Class" + assert count == 100 + + def test_worker_count_zero_raises(self): + """'mod:Class:0' raises DirtyAppError.""" + with pytest.raises(DirtyAppError) as exc_info: + parse_dirty_app_spec("mod:Class:0") + assert "must be >= 1" in str(exc_info.value) + + def test_worker_count_negative_raises(self): + """'mod:Class:-1' raises DirtyAppError.""" + with pytest.raises(DirtyAppError) as exc_info: + parse_dirty_app_spec("mod:Class:-1") + assert "must be >= 1" in str(exc_info.value) + + def test_non_numeric_raises(self): + """'mod:Class:abc' raises DirtyAppError.""" + with pytest.raises(DirtyAppError) as exc_info: + parse_dirty_app_spec("mod:Class:abc") + assert "Expected integer" in str(exc_info.value) + + def test_no_colon_raises(self): + """'mod.Class' (no colon) raises DirtyAppError.""" + with pytest.raises(DirtyAppError) as exc_info: + parse_dirty_app_spec("mod.Class") + assert "Invalid import path format" in str(exc_info.value) + + def test_too_many_colons_raises(self): + """'mod:Class:2:extra' raises DirtyAppError.""" + with pytest.raises(DirtyAppError) as exc_info: + parse_dirty_app_spec("mod:Class:2:extra") + assert "Invalid import path format" in str(exc_info.value) + + def test_dotted_module_with_count(self): + """'mod.sub:Class:2' handles dots correctly.""" + import_path, count = parse_dirty_app_spec("mod.sub:Class:2") + assert import_path == "mod.sub:Class" + assert count == 2 diff --git a/tests/test_dirty_arbiter.py b/tests/test_dirty_arbiter.py index 1c69942a..40abb504 100644 --- a/tests/test_dirty_arbiter.py +++ b/tests/test_dirty_arbiter.py @@ -1144,3 +1144,275 @@ class TestDirtyArbiterQueueBehavior: assert task2.done() arbiter._cleanup_sync() + + +class TestDirtyArbiterAppTracking: + """Tests for per-app worker tracking.""" + + def test_parse_app_specs_standard_format(self): + """All standard format apps have worker_count=None.""" + cfg = Config() + cfg.set("dirty_apps", [ + "tests.support_dirty_app:TestDirtyApp", + "tests.support_dirty_app:SlowDirtyApp", + ]) + log = MockLog() + + arbiter = DirtyArbiter(cfg=cfg, log=log) + + assert len(arbiter.app_specs) == 2 + assert arbiter.app_specs["tests.support_dirty_app:TestDirtyApp"]["worker_count"] is None + assert arbiter.app_specs["tests.support_dirty_app:SlowDirtyApp"]["worker_count"] is None + + arbiter._cleanup_sync() + + def test_parse_app_specs_with_worker_count(self): + """Apps with :N have correct worker_count.""" + cfg = Config() + cfg.set("dirty_apps", [ + "tests.support_dirty_app:TestDirtyApp", + "tests.support_dirty_app:SlowDirtyApp:2", + ]) + log = MockLog() + + arbiter = DirtyArbiter(cfg=cfg, log=log) + + assert arbiter.app_specs["tests.support_dirty_app:TestDirtyApp"]["worker_count"] is None + assert arbiter.app_specs["tests.support_dirty_app:SlowDirtyApp"]["worker_count"] == 2 + + arbiter._cleanup_sync() + + def test_get_apps_for_new_worker_all_standard(self): + """All apps returned when all have workers=None.""" + cfg = Config() + cfg.set("dirty_apps", [ + "tests.support_dirty_app:TestDirtyApp", + "tests.support_dirty_app:SlowDirtyApp", + ]) + log = MockLog() + + arbiter = DirtyArbiter(cfg=cfg, log=log) + + apps = arbiter._get_apps_for_new_worker() + assert len(apps) == 2 + assert "tests.support_dirty_app:TestDirtyApp" in apps + assert "tests.support_dirty_app:SlowDirtyApp" in apps + + arbiter._cleanup_sync() + + def test_get_apps_for_new_worker_respects_limit(self): + """App with workers=2 stops assigning after 2 workers.""" + cfg = Config() + cfg.set("dirty_apps", [ + "tests.support_dirty_app:TestDirtyApp", # unlimited + "tests.support_dirty_app:SlowDirtyApp:2", # limited to 2 + ]) + log = MockLog() + + arbiter = DirtyArbiter(cfg=cfg, log=log) + + # First worker should get both apps + apps1 = arbiter._get_apps_for_new_worker() + assert len(apps1) == 2 + arbiter._register_worker_apps(1001, apps1) + + # Second worker should get both apps + apps2 = arbiter._get_apps_for_new_worker() + assert len(apps2) == 2 + arbiter._register_worker_apps(1002, apps2) + + # Third worker should only get unlimited app + apps3 = arbiter._get_apps_for_new_worker() + assert len(apps3) == 1 + assert "tests.support_dirty_app:TestDirtyApp" in apps3 + assert "tests.support_dirty_app:SlowDirtyApp" not in apps3 + + arbiter._cleanup_sync() + + def test_register_worker_apps_updates_both_maps(self): + """Both app_worker_map and worker_app_map updated.""" + cfg = Config() + cfg.set("dirty_apps", ["tests.support_dirty_app:TestDirtyApp"]) + log = MockLog() + + arbiter = DirtyArbiter(cfg=cfg, log=log) + + app_path = "tests.support_dirty_app:TestDirtyApp" + arbiter._register_worker_apps(1001, [app_path]) + + # Check app_worker_map + assert 1001 in arbiter.app_worker_map[app_path] + + # Check worker_app_map + assert app_path in arbiter.worker_app_map[1001] + + arbiter._cleanup_sync() + + def test_unregister_worker_cleans_both_maps(self): + """Worker removal updates both maps correctly.""" + cfg = Config() + cfg.set("dirty_apps", ["tests.support_dirty_app:TestDirtyApp"]) + log = MockLog() + + arbiter = DirtyArbiter(cfg=cfg, log=log) + + app_path = "tests.support_dirty_app:TestDirtyApp" + arbiter._register_worker_apps(1001, [app_path]) + + # Verify registered + assert 1001 in arbiter.app_worker_map[app_path] + assert 1001 in arbiter.worker_app_map + + # Unregister + arbiter._unregister_worker(1001) + + # Verify cleaned up + assert 1001 not in arbiter.app_worker_map[app_path] + assert 1001 not in arbiter.worker_app_map + + arbiter._cleanup_sync() + + +class TestDirtyArbiterSpawnWorkerPerApp: + """Tests for spawn_worker with per-app allocation.""" + + def test_cleanup_worker_queues_apps_for_respawn(self): + """Dead worker's apps added to _pending_respawns.""" + cfg = Config() + cfg.set("dirty_apps", ["tests.support_dirty_app:TestDirtyApp"]) + log = MockLog() + + arbiter = DirtyArbiter(cfg=cfg, log=log) + arbiter.pid = 12345 + + # Simulate worker registration + app_path = "tests.support_dirty_app:TestDirtyApp" + arbiter.workers[1001] = "fake_worker" + arbiter.worker_sockets[1001] = "/tmp/fake.sock" + arbiter._register_worker_apps(1001, [app_path]) + + # Cleanup should queue apps for respawn + assert len(arbiter._pending_respawns) == 0 + arbiter._cleanup_worker(1001) + assert len(arbiter._pending_respawns) == 1 + assert app_path in arbiter._pending_respawns[0] + + arbiter._cleanup_sync() + + def test_pending_respawns_cleared_after_spawn(self): + """Pending respawns consumed when spawning new worker.""" + cfg = Config() + cfg.set("dirty_apps", ["tests.support_dirty_app:TestDirtyApp"]) + log = MockLog() + + arbiter = DirtyArbiter(cfg=cfg, log=log) + arbiter.pid = 12345 + + # Add pending respawn + app_path = "tests.support_dirty_app:TestDirtyApp" + arbiter._pending_respawns.append([app_path]) + + # Get apps for new worker should use pending first + # But since spawn_worker forks, we test the logic directly + assert len(arbiter._pending_respawns) == 1 + + # When spawn_worker pops from pending_respawns + apps = arbiter._pending_respawns.pop(0) + assert apps == [app_path] + assert len(arbiter._pending_respawns) == 0 + + arbiter._cleanup_sync() + + +class TestDirtyArbiterRoutingPerApp: + """Tests for app-aware routing.""" + + @pytest.mark.asyncio + async def test_get_available_worker_no_filter(self): + """Without app_path, returns any worker round-robin.""" + cfg = Config() + log = MockLog() + + arbiter = DirtyArbiter(cfg=cfg, log=log) + arbiter.workers[1001] = "worker1" + arbiter.workers[1002] = "worker2" + + # Should return workers in round-robin + w1 = await arbiter._get_available_worker() + w2 = await arbiter._get_available_worker() + + assert w1 in [1001, 1002] + assert w2 in [1001, 1002] + # They should be different (round-robin) + if len(arbiter.workers) >= 2: + assert w1 != w2 or len(arbiter.workers) == 1 + + arbiter._cleanup_sync() + + @pytest.mark.asyncio + async def test_get_available_worker_with_app_filter(self): + """With app_path, returns only workers that have it.""" + cfg = Config() + cfg.set("dirty_apps", ["tests.support_dirty_app:TestDirtyApp"]) + log = MockLog() + + arbiter = DirtyArbiter(cfg=cfg, log=log) + arbiter.workers[1001] = "worker1" + arbiter.workers[1002] = "worker2" + + # Only register 1001 for the app + app_path = "tests.support_dirty_app:TestDirtyApp" + arbiter._register_worker_apps(1001, [app_path]) + + # Should only return 1001 + worker = await arbiter._get_available_worker(app_path) + assert worker == 1001 + + arbiter._cleanup_sync() + + @pytest.mark.asyncio + async def test_get_available_worker_app_no_workers_returns_none(self): + """Returns None if no workers have the app.""" + cfg = Config() + cfg.set("dirty_apps", ["tests.support_dirty_app:TestDirtyApp"]) + log = MockLog() + + arbiter = DirtyArbiter(cfg=cfg, log=log) + arbiter.workers[1001] = "worker1" + + # Worker 1001 has no apps registered - request for unknown app returns None + worker = await arbiter._get_available_worker("unknown:App") + assert worker is None + + arbiter._cleanup_sync() + + @pytest.mark.asyncio + async def test_route_request_app_not_loaded_error(self): + """Error response when no worker has the app.""" + from gunicorn.dirty.protocol import DirtyProtocol + + cfg = Config() + cfg.set("dirty_apps", ["tests.support_dirty_app:TestDirtyApp"]) + log = MockLog() + + arbiter = DirtyArbiter(cfg=cfg, log=log) + arbiter.pid = 12345 + arbiter.workers[1001] = "worker1" + # No apps registered for this worker (worker exists but has no apps) + + request = make_request( + request_id="test-123", + app_path="unknown:App", + action="test" + ) + + writer = MockStreamWriter() + await arbiter.route_request(request, writer) + + assert len(writer.messages) == 1 + response = writer.messages[0] + assert response["type"] == DirtyProtocol.MSG_TYPE_ERROR + assert "No workers available for app" in response["error"]["message"] + assert response["error"]["error_type"] == "DirtyNoWorkersAvailableError" + + arbiter._cleanup_sync() diff --git a/tests/test_dirty_errors.py b/tests/test_dirty_errors.py new file mode 100644 index 00000000..207e68af --- /dev/null +++ b/tests/test_dirty_errors.py @@ -0,0 +1,76 @@ +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +"""Tests for dirty errors module.""" + +import pytest + +from gunicorn.dirty.errors import ( + DirtyError, + DirtyNoWorkersAvailableError, +) + + +class TestDirtyNoWorkersAvailableError: + """Tests for DirtyNoWorkersAvailableError exception.""" + + def test_error_contains_app_path(self): + """Error includes the app_path.""" + error = DirtyNoWorkersAvailableError("myapp:Model") + assert error.app_path == "myapp:Model" + assert "myapp:Model" in str(error) + assert "No workers available" in str(error) + + def test_error_with_custom_message(self): + """Error can have a custom message.""" + error = DirtyNoWorkersAvailableError( + "myapp:Model", + message="Custom: no workers for heavy model" + ) + assert error.app_path == "myapp:Model" + assert "Custom: no workers" in str(error) + + def test_error_serialization_roundtrip(self): + """Error survives to_dict/from_dict cycle.""" + original = DirtyNoWorkersAvailableError("myapp.ml:HugeModel") + + # Serialize + data = original.to_dict() + assert data["error_type"] == "DirtyNoWorkersAvailableError" + assert data["details"]["app_path"] == "myapp.ml:HugeModel" + + # Deserialize + restored = DirtyError.from_dict(data) + assert isinstance(restored, DirtyNoWorkersAvailableError) + assert restored.app_path == "myapp.ml:HugeModel" + assert "No workers available" in str(restored) + + def test_error_is_dirty_error_subclass(self): + """DirtyNoWorkersAvailableError is a DirtyError subclass.""" + error = DirtyNoWorkersAvailableError("app:Class") + assert isinstance(error, DirtyError) + + def test_web_app_can_catch_specific_error(self): + """Web app can catch DirtyNoWorkersAvailableError specifically.""" + def simulate_execute(): + raise DirtyNoWorkersAvailableError("myapp:HeavyModel") + + # Catch specific error + try: + simulate_execute() + assert False, "Should have raised" + except DirtyNoWorkersAvailableError as e: + assert e.app_path == "myapp:HeavyModel" + + def test_can_catch_as_base_error(self): + """Can catch DirtyNoWorkersAvailableError as DirtyError.""" + def simulate_execute(): + raise DirtyNoWorkersAvailableError("myapp:Model") + + try: + simulate_execute() + assert False, "Should have raised" + except DirtyError as e: + # Should catch it as the base class + assert hasattr(e, "app_path")