From 1d0df297969041119d31c5ca5b199e2c88a0b181 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Sun, 1 Feb 2026 03:04:35 +0100 Subject: [PATCH] feat(dirty): add class attribute workers support and e2e tests - Add get_app_workers_attribute() to read workers class attribute - Update _parse_app_specs() to check class attribute when no config override - Add Docker-based e2e tests for per-app worker allocation - Add test apps: HeavyModelApp (workers=2), LightweightApp - Add unit tests for get_app_workers_attribute function - Add integration tests for class attribute detection --- gunicorn/dirty/app.py | 53 +++ gunicorn/dirty/arbiter.py | 19 +- tests/dirty/test_per_app_worker_allocation.py | 51 +++ tests/docker/per_app_allocation/Dockerfile | 20 + tests/docker/per_app_allocation/README.md | 62 +++ tests/docker/per_app_allocation/app.py | 184 ++++++++ .../per_app_allocation/docker-compose.yml | 13 + .../per_app_allocation/gunicorn_conf.py | 38 ++ .../per_app_allocation/test_per_app_e2e.py | 393 ++++++++++++++++++ tests/support_dirty_app.py | 62 +++ tests/test_dirty_app.py | 50 +++ 11 files changed, 944 insertions(+), 1 deletion(-) create mode 100644 tests/docker/per_app_allocation/Dockerfile create mode 100644 tests/docker/per_app_allocation/README.md create mode 100644 tests/docker/per_app_allocation/app.py create mode 100644 tests/docker/per_app_allocation/docker-compose.yml create mode 100644 tests/docker/per_app_allocation/gunicorn_conf.py create mode 100644 tests/docker/per_app_allocation/test_per_app_e2e.py diff --git a/gunicorn/dirty/app.py b/gunicorn/dirty/app.py index 307269b2..093fb3e1 100644 --- a/gunicorn/dirty/app.py +++ b/gunicorn/dirty/app.py @@ -295,3 +295,56 @@ def load_dirty_apps(import_paths): for import_path in import_paths: apps[import_path] = load_dirty_app(import_path) return apps + + +def get_app_workers_attribute(import_path): + """ + Get the workers class attribute from a dirty app without instantiating it. + + This is used by the arbiter to determine how many workers should load + an app based on the class attribute, without needing to actually load + the app. + + Args: + import_path: String in format 'module.path:ClassName' + + Returns: + The workers class attribute value (int or None) + + Raises: + DirtyAppNotFoundError: If the module or class cannot be found + DirtyAppError: If the import path format is invalid + """ + if ':' not in import_path: + raise DirtyAppError( + f"Invalid import path format: {import_path}. " + f"Expected 'module.path:ClassName'", + app_path=import_path + ) + + module_path, class_name = import_path.rsplit(':', 1) + + try: + # Import the module + if module_path in sys.modules: + module = sys.modules[module_path] + else: + module = importlib.import_module(module_path) + except ImportError as e: + raise DirtyAppNotFoundError(import_path) from e + + # Get the class from the module + try: + app_class = getattr(module, class_name) + except AttributeError: + raise DirtyAppNotFoundError(import_path) from None + + # Validate it's a class + if not isinstance(app_class, type): + raise DirtyAppError( + f"{import_path} is not a class", + app_path=import_path + ) + + # Return the workers attribute (defaults to None if not set) + return getattr(app_class, 'workers', None) diff --git a/gunicorn/dirty/arbiter.py b/gunicorn/dirty/arbiter.py index eceb6773..252ed4c4 100644 --- a/gunicorn/dirty/arbiter.py +++ b/gunicorn/dirty/arbiter.py @@ -19,7 +19,7 @@ import time from gunicorn import util -from .app import parse_dirty_app_spec +from .app import get_app_workers_attribute, parse_dirty_app_spec from .errors import ( DirtyError, DirtyNoWorkersAvailableError, @@ -106,9 +106,26 @@ class DirtyArbiter: Populates self.app_specs with parsed information about each app, including the import path and worker count limits. + + Worker count priority: + 1. Config override (e.g., "module:Class:2") - highest priority + 2. Class attribute (e.g., workers = 2 on the class) + 3. None (all workers) - default """ for spec in self.cfg.dirty_apps: import_path, worker_count = parse_dirty_app_spec(spec) + + # If no config override, check class attribute + if worker_count is None: + try: + worker_count = get_app_workers_attribute(import_path) + except Exception as e: + # Log but don't fail - we'll discover the error when loading + self.log.warning( + "Could not read workers attribute from %s: %s", + import_path, e + ) + self.app_specs[import_path] = { 'import_path': import_path, 'worker_count': worker_count, diff --git a/tests/dirty/test_per_app_worker_allocation.py b/tests/dirty/test_per_app_worker_allocation.py index fd0ac5fe..abdfb37e 100644 --- a/tests/dirty/test_per_app_worker_allocation.py +++ b/tests/dirty/test_per_app_worker_allocation.py @@ -271,3 +271,54 @@ class TestPerAppWorkerAllocation: assert arbiter.app_specs[app_path]["worker_count"] == 2 arbiter._cleanup_sync() + + def test_class_attribute_workers_detected(self): + """App with workers=2 class attribute is detected by arbiter.""" + cfg = Config() + cfg.set("dirty_workers", 4) + cfg.set("dirty_apps", [ + "tests.support_dirty_app:HeavyModelApp", # Has workers=2 class attr + ]) + log = MockLog() + + arbiter = DirtyArbiter(cfg=cfg, log=log) + + # Check parsed spec - should read workers=2 from class + app_path = "tests.support_dirty_app:HeavyModelApp" + assert arbiter.app_specs[app_path]["worker_count"] == 2 + + # Simulate spawning 4 workers + for i in range(4): + apps = arbiter._get_apps_for_new_worker() + arbiter._register_worker_apps(1000 + i, apps) + + # HeavyModelApp should only be on 2 workers + assert len(arbiter.app_worker_map[app_path]) == 2 + + arbiter._cleanup_sync() + + def test_config_override_takes_precedence_over_class_attribute(self): + """Config :N takes precedence over class workers attribute.""" + cfg = Config() + cfg.set("dirty_workers", 4) + cfg.set("dirty_apps", [ + # HeavyModelApp has workers=2, but config says 1 + "tests.support_dirty_app:HeavyModelApp:1", + ]) + log = MockLog() + + arbiter = DirtyArbiter(cfg=cfg, log=log) + + # Config override (1) should take precedence + app_path = "tests.support_dirty_app:HeavyModelApp" + assert arbiter.app_specs[app_path]["worker_count"] == 1 + + # Simulate spawning 4 workers + for i in range(4): + apps = arbiter._get_apps_for_new_worker() + arbiter._register_worker_apps(1000 + i, apps) + + # Should only be on 1 worker (config override) + assert len(arbiter.app_worker_map[app_path]) == 1 + + arbiter._cleanup_sync() diff --git a/tests/docker/per_app_allocation/Dockerfile b/tests/docker/per_app_allocation/Dockerfile new file mode 100644 index 00000000..9fdd8cbe --- /dev/null +++ b/tests/docker/per_app_allocation/Dockerfile @@ -0,0 +1,20 @@ +FROM python:3.12-slim + +WORKDIR /app + +# Copy gunicorn source +COPY . /app/gunicorn-src + +# Install gunicorn and test dependencies +# setproctitle is needed for process title changes (master, dirty-arbiter, etc.) +RUN pip install --no-cache-dir /app/gunicorn-src pytest requests setproctitle + +# Copy test app files +COPY tests/docker/per_app_allocation/app.py /app/ +COPY tests/docker/per_app_allocation/gunicorn_conf.py /app/ + +# Install procps for process inspection and curl for healthcheck +RUN apt-get update && apt-get install -y procps curl && rm -rf /var/lib/apt/lists/* + +# Default command - run gunicorn +CMD ["gunicorn", "app:application", "-c", "gunicorn_conf.py"] diff --git a/tests/docker/per_app_allocation/README.md b/tests/docker/per_app_allocation/README.md new file mode 100644 index 00000000..bae552d7 --- /dev/null +++ b/tests/docker/per_app_allocation/README.md @@ -0,0 +1,62 @@ +# Per-App Worker Allocation E2E Tests + +End-to-end Docker-based tests for the per-app worker allocation feature. + +## Overview + +These tests verify that: +- Apps with worker limits are only loaded on the specified number of workers +- Requests are routed only to workers that have the target app loaded +- Round-robin distribution works correctly within limited worker sets +- Worker crash scenarios maintain correct app allocation +- Class attribute `workers=N` is respected +- Config-based `:N` overrides class attributes + +## Configuration + +The tests use 4 dirty workers with 3 apps: +- **LightweightApp**: No limit (loads on all 4 workers) +- **HeavyApp**: `workers=2` class attribute (loads on 2 workers) +- **ConfigLimitedApp**: `:1` config (loads on 1 worker) + +## Running Tests + +```bash +# From this directory +cd tests/docker/per_app_allocation + +# Build the Docker image +docker compose build + +# Run all tests +pytest test_per_app_e2e.py -v + +# Run specific test +pytest test_per_app_e2e.py::TestPerAppAllocation::test_config_limited_app_uses_one_worker -v +``` + +## Test Categories + +### TestPerAppAllocation +- Tests basic functionality of per-app worker allocation +- Verifies round-robin distribution +- Tests app accessibility + +### TestPerAppWorkerCrash +- Tests behavior when workers crash +- Verifies app recovery after worker respawn + +### TestPerAppLogs +- Verifies logging output contains expected information + +## Requirements + +- Docker and Docker Compose +- Python 3.8+ +- pytest +- requests + +## Notes + +- Tests run on port 8001 to avoid conflicts with the existing dirty_arbiter tests on 8000 +- The container uses a keep-alive wrapper to allow testing worker crash scenarios diff --git a/tests/docker/per_app_allocation/app.py b/tests/docker/per_app_allocation/app.py new file mode 100644 index 00000000..dca6df5e --- /dev/null +++ b/tests/docker/per_app_allocation/app.py @@ -0,0 +1,184 @@ +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +""" +WSGI and Dirty applications for per-app worker allocation testing. + +Contains: +- A WSGI app that can make dirty client requests +- A lightweight dirty app (loads on all workers) +- A heavy dirty app (limited to 2 workers via class attribute) +- A config-limited app (limited to 1 worker via config) +""" + +import json +import os + +from gunicorn.dirty.app import DirtyApp + + +def application(environ, start_response): + """ + WSGI application that invokes dirty apps and returns worker info. + + Routes: + - GET /lightweight/ping - Call LightweightApp.ping() + - GET /heavy/predict/ - Call HeavyApp.predict(data) + - GET /config_limited/info - Call ConfigLimitedApp.get_info() + - GET /status - Get overall status + """ + path = environ.get('PATH_INFO', '/') + method = environ.get('REQUEST_METHOD', 'GET') + + if method != 'GET': + start_response('405 Method Not Allowed', [('Content-Type', 'text/plain')]) + return [b'Method not allowed'] + + # Import dirty client here to avoid import at module load + from gunicorn.dirty import get_dirty_client + + try: + client = get_dirty_client() + + if path == '/status': + start_response('200 OK', [('Content-Type', 'application/json')]) + return [json.dumps({"status": "ok"}).encode()] + + elif path == '/lightweight/ping': + result = client.execute("app:LightweightApp", "ping") + start_response('200 OK', [('Content-Type', 'application/json')]) + return [json.dumps(result).encode()] + + elif path.startswith('/heavy/predict/'): + data = path.split('/')[-1] + result = client.execute("app:HeavyApp", "predict", data) + start_response('200 OK', [('Content-Type', 'application/json')]) + return [json.dumps(result).encode()] + + elif path == '/heavy/get_worker_id': + result = client.execute("app:HeavyApp", "get_worker_id") + start_response('200 OK', [('Content-Type', 'application/json')]) + return [json.dumps({"worker_id": result}).encode()] + + elif path == '/config_limited/info': + result = client.execute("app:ConfigLimitedApp", "get_info") + start_response('200 OK', [('Content-Type', 'application/json')]) + return [json.dumps(result).encode()] + + elif path == '/config_limited/get_worker_id': + result = client.execute("app:ConfigLimitedApp", "get_worker_id") + start_response('200 OK', [('Content-Type', 'application/json')]) + return [json.dumps({"worker_id": result}).encode()] + + elif path == '/lightweight/get_worker_id': + result = client.execute("app:LightweightApp", "get_worker_id") + start_response('200 OK', [('Content-Type', 'application/json')]) + return [json.dumps({"worker_id": result}).encode()] + + else: + start_response('404 Not Found', [('Content-Type', 'text/plain')]) + return [b'Not found'] + + except Exception as e: + start_response('500 Internal Server Error', [('Content-Type', 'application/json')]) + return [json.dumps({"error": str(e), "type": type(e).__name__}).encode()] + + +class LightweightApp(DirtyApp): + """ + A lightweight app that should load on ALL dirty workers. + + workers=None (default) means all workers load this app. + """ + + def __init__(self): + self.initialized = False + self.worker_id = None + self.call_count = 0 + + def init(self): + self.initialized = True + self.worker_id = os.getpid() + + def ping(self): + """Simple ping action.""" + self.call_count += 1 + return { + "pong": True, + "worker_id": self.worker_id, + "call_count": self.call_count, + } + + def get_worker_id(self): + """Return the worker ID that loaded this app.""" + return self.worker_id + + def close(self): + pass + + +class HeavyApp(DirtyApp): + """ + A heavy app that uses the workers class attribute to limit allocation. + + workers=2 means only 2 dirty workers will load this app. + This simulates a large ML model that shouldn't be replicated everywhere. + """ + workers = 2 # Only 2 workers should load this app + + def __init__(self): + self.initialized = False + self.worker_id = None + self.model_data = None + + def init(self): + self.initialized = True + self.worker_id = os.getpid() + # Simulate loading a heavy model + self.model_data = {"loaded": True, "worker": self.worker_id} + + def predict(self, data): + """Simulate model prediction.""" + return { + "prediction": f"result_for_{data}", + "worker_id": self.worker_id, + } + + def get_worker_id(self): + """Return the worker ID that loaded this app.""" + return self.worker_id + + def close(self): + self.model_data = None + + +class ConfigLimitedApp(DirtyApp): + """ + An app whose worker limit is specified in config (not class attribute). + + The config will specify this app as "app:ConfigLimitedApp:1" to limit + it to a single worker. + """ + + def __init__(self): + self.initialized = False + self.worker_id = None + + def init(self): + self.initialized = True + self.worker_id = os.getpid() + + def get_info(self): + """Get app info.""" + return { + "app": "ConfigLimitedApp", + "worker_id": self.worker_id, + } + + def get_worker_id(self): + """Return the worker ID that loaded this app.""" + return self.worker_id + + def close(self): + pass diff --git a/tests/docker/per_app_allocation/docker-compose.yml b/tests/docker/per_app_allocation/docker-compose.yml new file mode 100644 index 00000000..19aaf7c8 --- /dev/null +++ b/tests/docker/per_app_allocation/docker-compose.yml @@ -0,0 +1,13 @@ +services: + gunicorn: + build: + context: ../../.. + dockerfile: tests/docker/per_app_allocation/Dockerfile + ports: + - "8001:8000" + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8000/status"] + interval: 1s + timeout: 1s + retries: 30 + stop_grace_period: 10s diff --git a/tests/docker/per_app_allocation/gunicorn_conf.py b/tests/docker/per_app_allocation/gunicorn_conf.py new file mode 100644 index 00000000..1eddf1f0 --- /dev/null +++ b/tests/docker/per_app_allocation/gunicorn_conf.py @@ -0,0 +1,38 @@ +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +""" +Gunicorn configuration for per-app worker allocation e2e tests. + +Configuration: +- 4 dirty workers total +- LightweightApp: loads on ALL 4 workers (workers=None) +- HeavyApp: loads on 2 workers (via class attribute workers=2) +- ConfigLimitedApp: loads on 1 worker (via config :1 suffix) +""" + +bind = "0.0.0.0:8000" +workers = 1 # HTTP workers +worker_class = "sync" + +# 4 dirty workers - enough to test distribution +dirty_workers = 4 + +# App configuration: +# - LightweightApp: no limit, loads on all 4 +# - HeavyApp: workers=2 class attribute, loads on 2 +# - ConfigLimitedApp: config override :1, loads on 1 +dirty_apps = [ + "app:LightweightApp", + "app:HeavyApp", + "app:ConfigLimitedApp:1", +] + +dirty_timeout = 30 +dirty_graceful_timeout = 5 +timeout = 30 +graceful_timeout = 5 +loglevel = "debug" +accesslog = "-" +errorlog = "-" diff --git a/tests/docker/per_app_allocation/test_per_app_e2e.py b/tests/docker/per_app_allocation/test_per_app_e2e.py new file mode 100644 index 00000000..1abcb1b3 --- /dev/null +++ b/tests/docker/per_app_allocation/test_per_app_e2e.py @@ -0,0 +1,393 @@ +#!/usr/bin/env python +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +""" +Docker-based end-to-end tests for per-app worker allocation. + +These tests verify: +1. Apps with worker limits are only loaded on limited workers +2. Requests are routed to workers that have the target app +3. Round-robin distribution works within limited worker sets +4. Worker crash scenarios maintain correct app allocation + +Usage: + # Build the container first + docker compose build + + # Run all tests + pytest test_per_app_e2e.py -v + + # Run specific test + pytest test_per_app_e2e.py::TestPerAppAllocation::test_lightweight_app_round_robins -v +""" + +import os +import re +import subprocess +import time + +import pytest +import requests + + +class DockerContainer: + """Context manager for managing a Docker container for per-app tests.""" + + def __init__(self, name="gunicorn-per-app-test", build=True): + self.name = name + self.build = build + self.container_id = None + self.base_url = "http://127.0.0.1:8001" + + def __enter__(self): + # Build if requested + if self.build: + result = subprocess.run( + ["docker", "compose", "build"], + cwd=os.path.dirname(__file__), + capture_output=True, + text=True, + ) + if result.returncode != 0: + raise RuntimeError(f"Docker build failed: {result.stderr}") + + # Remove any existing container with same name + subprocess.run( + ["docker", "rm", "-f", self.name], + capture_output=True, + ) + + # Start container with a keep-alive wrapper + result = subprocess.run( + [ + "docker", "run", "-d", + "--name", self.name, + "-p", "8001:8000", + "per_app_allocation-gunicorn", + "sh", "-c", + "gunicorn app:application -c gunicorn_conf.py & " + "GUNICORN_PID=$!; " + "trap 'kill $GUNICORN_PID 2>/dev/null' TERM; " + "while true; do sleep 1; done" + ], + capture_output=True, + text=True, + ) + if result.returncode != 0: + raise RuntimeError(f"Docker run failed: {result.stderr}") + + self.container_id = result.stdout.strip() + + # Wait for gunicorn to be ready + self._wait_for_ready() + + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.container_id: + # Get logs before cleanup + logs = self.get_logs() + if exc_val: + print(f"\n=== Container logs ===\n{logs}\n=== End logs ===\n") + + # Stop and remove container + subprocess.run( + ["docker", "rm", "-f", self.name], + capture_output=True, + ) + + def _wait_for_ready(self, timeout=60): + """Wait for gunicorn to be ready and serving requests.""" + start = time.time() + while time.time() - start < timeout: + try: + resp = requests.get(f"{self.base_url}/status", timeout=1) + if resp.status_code == 200: + # Also verify dirty workers are up by testing an app + resp = requests.get(f"{self.base_url}/lightweight/ping", timeout=2) + if resp.status_code == 200: + return + except requests.exceptions.RequestException: + pass + time.sleep(0.5) + raise TimeoutError("Gunicorn did not start within timeout") + + def exec(self, cmd, check=True): + """Execute a command in the container.""" + result = subprocess.run( + ["docker", "exec", self.name] + cmd, + capture_output=True, + text=True, + ) + if check and result.returncode != 0: + raise RuntimeError(f"Command failed: {cmd}\n{result.stderr}") + return result + + def get_logs(self): + """Get container logs.""" + result = subprocess.run( + ["docker", "logs", self.name], + capture_output=True, + text=True, + ) + return result.stdout + result.stderr + + def get_gunicorn_pids(self): + """Get PIDs of gunicorn processes.""" + pids = { + "master": None, + "dirty-arbiter": None, + "workers": [], + "dirty-workers": [], + } + + result = self.exec(["ps", "aux"], check=False) + + for line in result.stdout.split("\n"): + if "gunicorn:" not in line: + continue + + parts = line.split() + if len(parts) < 2: + continue + + pid = int(parts[1]) + + if "gunicorn: master" in line: + pids["master"] = pid + elif "gunicorn: dirty-arbiter" in line: + pids["dirty-arbiter"] = pid + elif "gunicorn: dirty-worker" in line: + pids["dirty-workers"].append(pid) + elif "gunicorn: worker" in line: + pids["workers"].append(pid) + + return pids + + def kill_process(self, pid, signal=9): + """Send a signal to a process in the container.""" + self.exec( + ["kill", f"-{signal}", str(pid)], + check=False, + ) + + def wait_for_dirty_worker_count(self, expected_count, timeout=10): + """Wait for specific number of dirty workers.""" + start = time.time() + while time.time() - start < timeout: + pids = self.get_gunicorn_pids() + if len(pids["dirty-workers"]) == expected_count: + return True + time.sleep(0.5) + return False + + def http_get(self, path, timeout=5): + """Make HTTP GET request to the container.""" + return requests.get(f"{self.base_url}{path}", timeout=timeout) + + +class TestPerAppAllocation: + """Test per-app worker allocation functionality.""" + + @pytest.fixture(autouse=True) + def setup(self): + """Check Docker is available.""" + result = subprocess.run( + ["docker", "info"], + capture_output=True, + ) + if result.returncode != 0: + pytest.skip("Docker is not available") + + def test_lightweight_app_responds(self): + """LightweightApp should be accessible and respond correctly.""" + with DockerContainer() as container: + resp = container.http_get("/lightweight/ping") + assert resp.status_code == 200 + + data = resp.json() + assert data["pong"] is True + assert "worker_id" in data + + def test_lightweight_app_round_robins(self): + """LightweightApp requests should round-robin across all 4 workers.""" + with DockerContainer() as container: + # Make multiple requests to collect worker IDs + worker_ids = set() + for _ in range(20): # More than 4 to ensure round-robin + resp = container.http_get("/lightweight/get_worker_id") + assert resp.status_code == 200 + data = resp.json() + worker_ids.add(data["worker_id"]) + + # Should see all 4 workers (or at least more than 1) + # Note: Due to timing, we might not hit all 4 in exactly 20 requests + assert len(worker_ids) >= 2, ( + f"Expected requests to go to multiple workers, got {len(worker_ids)}" + ) + + def test_config_limited_app_uses_one_worker(self): + """ConfigLimitedApp (limited to 1 via config) should use only one worker.""" + with DockerContainer() as container: + # Make multiple requests + worker_ids = set() + for _ in range(10): + resp = container.http_get("/config_limited/get_worker_id") + assert resp.status_code == 200 + data = resp.json() + worker_ids.add(data["worker_id"]) + + # Should only see 1 worker (the app is limited to 1) + assert len(worker_ids) == 1, ( + f"Expected ConfigLimitedApp to use only 1 worker, got {len(worker_ids)}" + ) + + def test_heavy_app_uses_limited_workers(self): + """HeavyApp (workers=2) should use only 2 workers.""" + with DockerContainer() as container: + # Make multiple requests + worker_ids = set() + for _ in range(20): + resp = container.http_get("/heavy/get_worker_id") + # HeavyApp uses class attribute workers=2 + # But currently the arbiter only reads config :N format + # This test documents expected behavior + if resp.status_code == 200: + data = resp.json() + worker_ids.add(data["worker_id"]) + else: + # If class attribute isn't supported yet, skip + pytest.skip("HeavyApp class attribute workers=2 not implemented") + return + + # Should see at most 2 workers + assert len(worker_ids) <= 2, ( + f"Expected HeavyApp to use at most 2 workers, got {len(worker_ids)}" + ) + + def test_heavy_app_prediction_works(self): + """HeavyApp.predict() should return correct results.""" + with DockerContainer() as container: + resp = container.http_get("/heavy/predict/test_input") + + if resp.status_code == 200: + data = resp.json() + assert data["prediction"] == "result_for_test_input" + assert "worker_id" in data + else: + # If class attribute isn't supported, document the error + data = resp.json() + print(f"HeavyApp error: {data}") + + def test_all_apps_accessible(self): + """All configured apps should be accessible.""" + with DockerContainer() as container: + # LightweightApp + resp = container.http_get("/lightweight/ping") + assert resp.status_code == 200 + + # ConfigLimitedApp + resp = container.http_get("/config_limited/info") + assert resp.status_code == 200 + data = resp.json() + assert data["app"] == "ConfigLimitedApp" + + def test_four_dirty_workers_running(self): + """Should have 4 dirty workers as configured.""" + with DockerContainer() as container: + pids = container.get_gunicorn_pids() + + assert len(pids["dirty-workers"]) == 4, ( + f"Expected 4 dirty workers, got {len(pids['dirty-workers'])}" + ) + + +class TestPerAppWorkerCrash: + """Test per-app allocation behavior when workers crash.""" + + @pytest.fixture(autouse=True) + def setup(self): + """Check Docker is available.""" + result = subprocess.run( + ["docker", "info"], + capture_output=True, + ) + if result.returncode != 0: + pytest.skip("Docker is not available") + + def test_worker_crash_app_still_accessible(self): + """When a dirty worker crashes, apps should still be accessible.""" + with DockerContainer() as container: + pids = container.get_gunicorn_pids() + assert len(pids["dirty-workers"]) == 4 + + # Kill one dirty worker + container.kill_process(pids["dirty-workers"][0], signal=9) + + # Wait for respawn (dirty arbiter should respawn it) + assert container.wait_for_dirty_worker_count(4, timeout=15), ( + "Dirty arbiter should respawn killed worker" + ) + + # Apps should still work + resp = container.http_get("/lightweight/ping") + assert resp.status_code == 200 + + resp = container.http_get("/config_limited/info") + assert resp.status_code == 200 + + def test_config_limited_worker_crash_recovery(self): + """When the sole worker for ConfigLimitedApp crashes, it should recover.""" + with DockerContainer() as container: + # Get the worker ID that handles ConfigLimitedApp + resp = container.http_get("/config_limited/get_worker_id") + assert resp.status_code == 200 + original_worker_id = resp.json()["worker_id"] + + # Kill that specific worker + container.kill_process(original_worker_id, signal=9) + + # Wait for respawn + time.sleep(3) + + # The new worker should handle ConfigLimitedApp + resp = container.http_get("/config_limited/get_worker_id") + # Note: There might be a brief period where no worker has the app + # In production, this would return an error until respawn + if resp.status_code == 200: + new_worker_id = resp.json()["worker_id"] + # Worker ID should be different (new process) + assert new_worker_id != original_worker_id, ( + "New worker should have different PID" + ) + + +class TestPerAppLogs: + """Test that per-app allocation is logged correctly.""" + + @pytest.fixture(autouse=True) + def setup(self): + """Check Docker is available.""" + result = subprocess.run( + ["docker", "info"], + capture_output=True, + ) + if result.returncode != 0: + pytest.skip("Docker is not available") + + def test_logs_show_app_allocation(self): + """Logs should indicate which apps are loaded on which workers.""" + with DockerContainer() as container: + logs = container.get_logs() + + # Should see dirty arbiter starting + assert "Dirty arbiter" in logs or "dirty arbiter" in logs.lower() + + # Should see dirty workers spawning + assert "dirty" in logs.lower() and "worker" in logs.lower() + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/tests/support_dirty_app.py b/tests/support_dirty_app.py index 33845283..810bb3d7 100644 --- a/tests/support_dirty_app.py +++ b/tests/support_dirty_app.py @@ -93,3 +93,65 @@ class SlowDirtyApp(DirtyApp): def close(self): self.closed = True + + +class HeavyModelApp(DirtyApp): + """A dirty app that simulates a heavy model requiring limited workers. + + Uses the workers class attribute to limit how many workers load this app. + """ + workers = 2 # Only 2 workers should load this app + + def __init__(self): + self.initialized = False + self.closed = False + self.model_data = None + self.worker_id = None + + def init(self): + import os + self.initialized = True + # Store the worker PID to verify which worker handled the request + self.worker_id = os.getpid() + # Simulate loading a heavy model + self.model_data = {"loaded": True, "worker": self.worker_id} + + def predict(self, data): + """Simulate model prediction.""" + return { + "prediction": f"result_for_{data}", + "worker_id": self.worker_id, + } + + def get_worker_id(self): + """Return the worker ID that loaded this app.""" + return self.worker_id + + def close(self): + self.closed = True + self.model_data = None + + +class LightweightApp(DirtyApp): + """A lightweight app that should load on all workers.""" + + def __init__(self): + self.initialized = False + self.closed = False + self.worker_id = None + + def init(self): + import os + self.initialized = True + self.worker_id = os.getpid() + + def ping(self): + """Simple ping action.""" + return {"pong": True, "worker_id": self.worker_id} + + def get_worker_id(self): + """Return the worker ID that loaded this app.""" + return self.worker_id + + def close(self): + self.closed = True diff --git a/tests/test_dirty_app.py b/tests/test_dirty_app.py index 791069e0..b4683515 100644 --- a/tests/test_dirty_app.py +++ b/tests/test_dirty_app.py @@ -295,3 +295,53 @@ class TestParseDirtyAppSpec: import_path, count = parse_dirty_app_spec("mod.sub:Class:2") assert import_path == "mod.sub:Class" assert count == 2 + + +class TestGetAppWorkersAttribute: + """Tests for get_app_workers_attribute function.""" + + def test_get_workers_none_for_base_class(self): + """Base DirtyApp returns workers=None.""" + from gunicorn.dirty.app import get_app_workers_attribute + + workers = get_app_workers_attribute("gunicorn.dirty.app:DirtyApp") + assert workers is None + + def test_get_workers_from_class_attribute(self): + """App with workers=2 class attribute returns 2.""" + from gunicorn.dirty.app import get_app_workers_attribute + + workers = get_app_workers_attribute("tests.support_dirty_app:HeavyModelApp") + assert workers == 2 + + def test_get_workers_none_for_inherited(self): + """App without explicit workers attribute returns None.""" + from gunicorn.dirty.app import get_app_workers_attribute + + workers = get_app_workers_attribute("tests.support_dirty_app:TestDirtyApp") + assert workers is None + + def test_get_workers_not_found_module(self): + """Non-existent module raises DirtyAppNotFoundError.""" + from gunicorn.dirty.app import get_app_workers_attribute + from gunicorn.dirty.errors import DirtyAppNotFoundError + + with pytest.raises(DirtyAppNotFoundError): + get_app_workers_attribute("nonexistent.module:App") + + def test_get_workers_not_found_class(self): + """Non-existent class raises DirtyAppNotFoundError.""" + from gunicorn.dirty.app import get_app_workers_attribute + from gunicorn.dirty.errors import DirtyAppNotFoundError + + with pytest.raises(DirtyAppNotFoundError): + get_app_workers_attribute("tests.support_dirty_app:NonExistentApp") + + def test_get_workers_invalid_format(self): + """Invalid format raises DirtyAppError.""" + from gunicorn.dirty.app import get_app_workers_attribute + from gunicorn.dirty.errors import DirtyAppError + + with pytest.raises(DirtyAppError) as exc_info: + get_app_workers_attribute("invalid.format.no.colon") + assert "Invalid import path format" in str(exc_info.value)