feat(dirty): add class attribute workers support and e2e tests

- Add get_app_workers_attribute() to read workers class attribute
- Update _parse_app_specs() to check class attribute when no config override
- Add Docker-based e2e tests for per-app worker allocation
- Add test apps: HeavyModelApp (workers=2), LightweightApp
- Add unit tests for get_app_workers_attribute function
- Add integration tests for class attribute detection
This commit is contained in:
Benoit Chesneau 2026-02-01 03:04:35 +01:00
parent 8559854b4f
commit 1d0df29796
11 changed files with 944 additions and 1 deletions

View File

@ -295,3 +295,56 @@ def load_dirty_apps(import_paths):
for import_path in import_paths:
apps[import_path] = load_dirty_app(import_path)
return apps
def get_app_workers_attribute(import_path):
"""
Get the workers class attribute from a dirty app without instantiating it.
This is used by the arbiter to determine how many workers should load
an app based on the class attribute, without needing to actually load
the app.
Args:
import_path: String in format 'module.path:ClassName'
Returns:
The workers class attribute value (int or None)
Raises:
DirtyAppNotFoundError: If the module or class cannot be found
DirtyAppError: If the import path format is invalid
"""
if ':' not in import_path:
raise DirtyAppError(
f"Invalid import path format: {import_path}. "
f"Expected 'module.path:ClassName'",
app_path=import_path
)
module_path, class_name = import_path.rsplit(':', 1)
try:
# Import the module
if module_path in sys.modules:
module = sys.modules[module_path]
else:
module = importlib.import_module(module_path)
except ImportError as e:
raise DirtyAppNotFoundError(import_path) from e
# Get the class from the module
try:
app_class = getattr(module, class_name)
except AttributeError:
raise DirtyAppNotFoundError(import_path) from None
# Validate it's a class
if not isinstance(app_class, type):
raise DirtyAppError(
f"{import_path} is not a class",
app_path=import_path
)
# Return the workers attribute (defaults to None if not set)
return getattr(app_class, 'workers', None)

View File

@ -19,7 +19,7 @@ import time
from gunicorn import util
from .app import parse_dirty_app_spec
from .app import get_app_workers_attribute, parse_dirty_app_spec
from .errors import (
DirtyError,
DirtyNoWorkersAvailableError,
@ -106,9 +106,26 @@ class DirtyArbiter:
Populates self.app_specs with parsed information about each app,
including the import path and worker count limits.
Worker count priority:
1. Config override (e.g., "module:Class:2") - highest priority
2. Class attribute (e.g., workers = 2 on the class)
3. None (all workers) - default
"""
for spec in self.cfg.dirty_apps:
import_path, worker_count = parse_dirty_app_spec(spec)
# If no config override, check class attribute
if worker_count is None:
try:
worker_count = get_app_workers_attribute(import_path)
except Exception as e:
# Log but don't fail - we'll discover the error when loading
self.log.warning(
"Could not read workers attribute from %s: %s",
import_path, e
)
self.app_specs[import_path] = {
'import_path': import_path,
'worker_count': worker_count,

View File

@ -271,3 +271,54 @@ class TestPerAppWorkerAllocation:
assert arbiter.app_specs[app_path]["worker_count"] == 2
arbiter._cleanup_sync()
def test_class_attribute_workers_detected(self):
"""App with workers=2 class attribute is detected by arbiter."""
cfg = Config()
cfg.set("dirty_workers", 4)
cfg.set("dirty_apps", [
"tests.support_dirty_app:HeavyModelApp", # Has workers=2 class attr
])
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
# Check parsed spec - should read workers=2 from class
app_path = "tests.support_dirty_app:HeavyModelApp"
assert arbiter.app_specs[app_path]["worker_count"] == 2
# Simulate spawning 4 workers
for i in range(4):
apps = arbiter._get_apps_for_new_worker()
arbiter._register_worker_apps(1000 + i, apps)
# HeavyModelApp should only be on 2 workers
assert len(arbiter.app_worker_map[app_path]) == 2
arbiter._cleanup_sync()
def test_config_override_takes_precedence_over_class_attribute(self):
"""Config :N takes precedence over class workers attribute."""
cfg = Config()
cfg.set("dirty_workers", 4)
cfg.set("dirty_apps", [
# HeavyModelApp has workers=2, but config says 1
"tests.support_dirty_app:HeavyModelApp:1",
])
log = MockLog()
arbiter = DirtyArbiter(cfg=cfg, log=log)
# Config override (1) should take precedence
app_path = "tests.support_dirty_app:HeavyModelApp"
assert arbiter.app_specs[app_path]["worker_count"] == 1
# Simulate spawning 4 workers
for i in range(4):
apps = arbiter._get_apps_for_new_worker()
arbiter._register_worker_apps(1000 + i, apps)
# Should only be on 1 worker (config override)
assert len(arbiter.app_worker_map[app_path]) == 1
arbiter._cleanup_sync()

View File

@ -0,0 +1,20 @@
FROM python:3.12-slim
WORKDIR /app
# Copy gunicorn source
COPY . /app/gunicorn-src
# Install gunicorn and test dependencies
# setproctitle is needed for process title changes (master, dirty-arbiter, etc.)
RUN pip install --no-cache-dir /app/gunicorn-src pytest requests setproctitle
# Copy test app files
COPY tests/docker/per_app_allocation/app.py /app/
COPY tests/docker/per_app_allocation/gunicorn_conf.py /app/
# Install procps for process inspection and curl for healthcheck
RUN apt-get update && apt-get install -y procps curl && rm -rf /var/lib/apt/lists/*
# Default command - run gunicorn
CMD ["gunicorn", "app:application", "-c", "gunicorn_conf.py"]

View File

@ -0,0 +1,62 @@
# Per-App Worker Allocation E2E Tests
End-to-end Docker-based tests for the per-app worker allocation feature.
## Overview
These tests verify that:
- Apps with worker limits are only loaded on the specified number of workers
- Requests are routed only to workers that have the target app loaded
- Round-robin distribution works correctly within limited worker sets
- Worker crash scenarios maintain correct app allocation
- Class attribute `workers=N` is respected
- Config-based `:N` overrides class attributes
## Configuration
The tests use 4 dirty workers with 3 apps:
- **LightweightApp**: No limit (loads on all 4 workers)
- **HeavyApp**: `workers=2` class attribute (loads on 2 workers)
- **ConfigLimitedApp**: `:1` config (loads on 1 worker)
## Running Tests
```bash
# From this directory
cd tests/docker/per_app_allocation
# Build the Docker image
docker compose build
# Run all tests
pytest test_per_app_e2e.py -v
# Run specific test
pytest test_per_app_e2e.py::TestPerAppAllocation::test_config_limited_app_uses_one_worker -v
```
## Test Categories
### TestPerAppAllocation
- Tests basic functionality of per-app worker allocation
- Verifies round-robin distribution
- Tests app accessibility
### TestPerAppWorkerCrash
- Tests behavior when workers crash
- Verifies app recovery after worker respawn
### TestPerAppLogs
- Verifies logging output contains expected information
## Requirements
- Docker and Docker Compose
- Python 3.8+
- pytest
- requests
## Notes
- Tests run on port 8001 to avoid conflicts with the existing dirty_arbiter tests on 8000
- The container uses a keep-alive wrapper to allow testing worker crash scenarios

View File

@ -0,0 +1,184 @@
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
"""
WSGI and Dirty applications for per-app worker allocation testing.
Contains:
- A WSGI app that can make dirty client requests
- A lightweight dirty app (loads on all workers)
- A heavy dirty app (limited to 2 workers via class attribute)
- A config-limited app (limited to 1 worker via config)
"""
import json
import os
from gunicorn.dirty.app import DirtyApp
def application(environ, start_response):
"""
WSGI application that invokes dirty apps and returns worker info.
Routes:
- GET /lightweight/ping - Call LightweightApp.ping()
- GET /heavy/predict/<data> - Call HeavyApp.predict(data)
- GET /config_limited/info - Call ConfigLimitedApp.get_info()
- GET /status - Get overall status
"""
path = environ.get('PATH_INFO', '/')
method = environ.get('REQUEST_METHOD', 'GET')
if method != 'GET':
start_response('405 Method Not Allowed', [('Content-Type', 'text/plain')])
return [b'Method not allowed']
# Import dirty client here to avoid import at module load
from gunicorn.dirty import get_dirty_client
try:
client = get_dirty_client()
if path == '/status':
start_response('200 OK', [('Content-Type', 'application/json')])
return [json.dumps({"status": "ok"}).encode()]
elif path == '/lightweight/ping':
result = client.execute("app:LightweightApp", "ping")
start_response('200 OK', [('Content-Type', 'application/json')])
return [json.dumps(result).encode()]
elif path.startswith('/heavy/predict/'):
data = path.split('/')[-1]
result = client.execute("app:HeavyApp", "predict", data)
start_response('200 OK', [('Content-Type', 'application/json')])
return [json.dumps(result).encode()]
elif path == '/heavy/get_worker_id':
result = client.execute("app:HeavyApp", "get_worker_id")
start_response('200 OK', [('Content-Type', 'application/json')])
return [json.dumps({"worker_id": result}).encode()]
elif path == '/config_limited/info':
result = client.execute("app:ConfigLimitedApp", "get_info")
start_response('200 OK', [('Content-Type', 'application/json')])
return [json.dumps(result).encode()]
elif path == '/config_limited/get_worker_id':
result = client.execute("app:ConfigLimitedApp", "get_worker_id")
start_response('200 OK', [('Content-Type', 'application/json')])
return [json.dumps({"worker_id": result}).encode()]
elif path == '/lightweight/get_worker_id':
result = client.execute("app:LightweightApp", "get_worker_id")
start_response('200 OK', [('Content-Type', 'application/json')])
return [json.dumps({"worker_id": result}).encode()]
else:
start_response('404 Not Found', [('Content-Type', 'text/plain')])
return [b'Not found']
except Exception as e:
start_response('500 Internal Server Error', [('Content-Type', 'application/json')])
return [json.dumps({"error": str(e), "type": type(e).__name__}).encode()]
class LightweightApp(DirtyApp):
"""
A lightweight app that should load on ALL dirty workers.
workers=None (default) means all workers load this app.
"""
def __init__(self):
self.initialized = False
self.worker_id = None
self.call_count = 0
def init(self):
self.initialized = True
self.worker_id = os.getpid()
def ping(self):
"""Simple ping action."""
self.call_count += 1
return {
"pong": True,
"worker_id": self.worker_id,
"call_count": self.call_count,
}
def get_worker_id(self):
"""Return the worker ID that loaded this app."""
return self.worker_id
def close(self):
pass
class HeavyApp(DirtyApp):
"""
A heavy app that uses the workers class attribute to limit allocation.
workers=2 means only 2 dirty workers will load this app.
This simulates a large ML model that shouldn't be replicated everywhere.
"""
workers = 2 # Only 2 workers should load this app
def __init__(self):
self.initialized = False
self.worker_id = None
self.model_data = None
def init(self):
self.initialized = True
self.worker_id = os.getpid()
# Simulate loading a heavy model
self.model_data = {"loaded": True, "worker": self.worker_id}
def predict(self, data):
"""Simulate model prediction."""
return {
"prediction": f"result_for_{data}",
"worker_id": self.worker_id,
}
def get_worker_id(self):
"""Return the worker ID that loaded this app."""
return self.worker_id
def close(self):
self.model_data = None
class ConfigLimitedApp(DirtyApp):
"""
An app whose worker limit is specified in config (not class attribute).
The config will specify this app as "app:ConfigLimitedApp:1" to limit
it to a single worker.
"""
def __init__(self):
self.initialized = False
self.worker_id = None
def init(self):
self.initialized = True
self.worker_id = os.getpid()
def get_info(self):
"""Get app info."""
return {
"app": "ConfigLimitedApp",
"worker_id": self.worker_id,
}
def get_worker_id(self):
"""Return the worker ID that loaded this app."""
return self.worker_id
def close(self):
pass

View File

@ -0,0 +1,13 @@
services:
gunicorn:
build:
context: ../../..
dockerfile: tests/docker/per_app_allocation/Dockerfile
ports:
- "8001:8000"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/status"]
interval: 1s
timeout: 1s
retries: 30
stop_grace_period: 10s

View File

@ -0,0 +1,38 @@
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
"""
Gunicorn configuration for per-app worker allocation e2e tests.
Configuration:
- 4 dirty workers total
- LightweightApp: loads on ALL 4 workers (workers=None)
- HeavyApp: loads on 2 workers (via class attribute workers=2)
- ConfigLimitedApp: loads on 1 worker (via config :1 suffix)
"""
bind = "0.0.0.0:8000"
workers = 1 # HTTP workers
worker_class = "sync"
# 4 dirty workers - enough to test distribution
dirty_workers = 4
# App configuration:
# - LightweightApp: no limit, loads on all 4
# - HeavyApp: workers=2 class attribute, loads on 2
# - ConfigLimitedApp: config override :1, loads on 1
dirty_apps = [
"app:LightweightApp",
"app:HeavyApp",
"app:ConfigLimitedApp:1",
]
dirty_timeout = 30
dirty_graceful_timeout = 5
timeout = 30
graceful_timeout = 5
loglevel = "debug"
accesslog = "-"
errorlog = "-"

View File

@ -0,0 +1,393 @@
#!/usr/bin/env python
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
"""
Docker-based end-to-end tests for per-app worker allocation.
These tests verify:
1. Apps with worker limits are only loaded on limited workers
2. Requests are routed to workers that have the target app
3. Round-robin distribution works within limited worker sets
4. Worker crash scenarios maintain correct app allocation
Usage:
# Build the container first
docker compose build
# Run all tests
pytest test_per_app_e2e.py -v
# Run specific test
pytest test_per_app_e2e.py::TestPerAppAllocation::test_lightweight_app_round_robins -v
"""
import os
import re
import subprocess
import time
import pytest
import requests
class DockerContainer:
"""Context manager for managing a Docker container for per-app tests."""
def __init__(self, name="gunicorn-per-app-test", build=True):
self.name = name
self.build = build
self.container_id = None
self.base_url = "http://127.0.0.1:8001"
def __enter__(self):
# Build if requested
if self.build:
result = subprocess.run(
["docker", "compose", "build"],
cwd=os.path.dirname(__file__),
capture_output=True,
text=True,
)
if result.returncode != 0:
raise RuntimeError(f"Docker build failed: {result.stderr}")
# Remove any existing container with same name
subprocess.run(
["docker", "rm", "-f", self.name],
capture_output=True,
)
# Start container with a keep-alive wrapper
result = subprocess.run(
[
"docker", "run", "-d",
"--name", self.name,
"-p", "8001:8000",
"per_app_allocation-gunicorn",
"sh", "-c",
"gunicorn app:application -c gunicorn_conf.py & "
"GUNICORN_PID=$!; "
"trap 'kill $GUNICORN_PID 2>/dev/null' TERM; "
"while true; do sleep 1; done"
],
capture_output=True,
text=True,
)
if result.returncode != 0:
raise RuntimeError(f"Docker run failed: {result.stderr}")
self.container_id = result.stdout.strip()
# Wait for gunicorn to be ready
self._wait_for_ready()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.container_id:
# Get logs before cleanup
logs = self.get_logs()
if exc_val:
print(f"\n=== Container logs ===\n{logs}\n=== End logs ===\n")
# Stop and remove container
subprocess.run(
["docker", "rm", "-f", self.name],
capture_output=True,
)
def _wait_for_ready(self, timeout=60):
"""Wait for gunicorn to be ready and serving requests."""
start = time.time()
while time.time() - start < timeout:
try:
resp = requests.get(f"{self.base_url}/status", timeout=1)
if resp.status_code == 200:
# Also verify dirty workers are up by testing an app
resp = requests.get(f"{self.base_url}/lightweight/ping", timeout=2)
if resp.status_code == 200:
return
except requests.exceptions.RequestException:
pass
time.sleep(0.5)
raise TimeoutError("Gunicorn did not start within timeout")
def exec(self, cmd, check=True):
"""Execute a command in the container."""
result = subprocess.run(
["docker", "exec", self.name] + cmd,
capture_output=True,
text=True,
)
if check and result.returncode != 0:
raise RuntimeError(f"Command failed: {cmd}\n{result.stderr}")
return result
def get_logs(self):
"""Get container logs."""
result = subprocess.run(
["docker", "logs", self.name],
capture_output=True,
text=True,
)
return result.stdout + result.stderr
def get_gunicorn_pids(self):
"""Get PIDs of gunicorn processes."""
pids = {
"master": None,
"dirty-arbiter": None,
"workers": [],
"dirty-workers": [],
}
result = self.exec(["ps", "aux"], check=False)
for line in result.stdout.split("\n"):
if "gunicorn:" not in line:
continue
parts = line.split()
if len(parts) < 2:
continue
pid = int(parts[1])
if "gunicorn: master" in line:
pids["master"] = pid
elif "gunicorn: dirty-arbiter" in line:
pids["dirty-arbiter"] = pid
elif "gunicorn: dirty-worker" in line:
pids["dirty-workers"].append(pid)
elif "gunicorn: worker" in line:
pids["workers"].append(pid)
return pids
def kill_process(self, pid, signal=9):
"""Send a signal to a process in the container."""
self.exec(
["kill", f"-{signal}", str(pid)],
check=False,
)
def wait_for_dirty_worker_count(self, expected_count, timeout=10):
"""Wait for specific number of dirty workers."""
start = time.time()
while time.time() - start < timeout:
pids = self.get_gunicorn_pids()
if len(pids["dirty-workers"]) == expected_count:
return True
time.sleep(0.5)
return False
def http_get(self, path, timeout=5):
"""Make HTTP GET request to the container."""
return requests.get(f"{self.base_url}{path}", timeout=timeout)
class TestPerAppAllocation:
"""Test per-app worker allocation functionality."""
@pytest.fixture(autouse=True)
def setup(self):
"""Check Docker is available."""
result = subprocess.run(
["docker", "info"],
capture_output=True,
)
if result.returncode != 0:
pytest.skip("Docker is not available")
def test_lightweight_app_responds(self):
"""LightweightApp should be accessible and respond correctly."""
with DockerContainer() as container:
resp = container.http_get("/lightweight/ping")
assert resp.status_code == 200
data = resp.json()
assert data["pong"] is True
assert "worker_id" in data
def test_lightweight_app_round_robins(self):
"""LightweightApp requests should round-robin across all 4 workers."""
with DockerContainer() as container:
# Make multiple requests to collect worker IDs
worker_ids = set()
for _ in range(20): # More than 4 to ensure round-robin
resp = container.http_get("/lightweight/get_worker_id")
assert resp.status_code == 200
data = resp.json()
worker_ids.add(data["worker_id"])
# Should see all 4 workers (or at least more than 1)
# Note: Due to timing, we might not hit all 4 in exactly 20 requests
assert len(worker_ids) >= 2, (
f"Expected requests to go to multiple workers, got {len(worker_ids)}"
)
def test_config_limited_app_uses_one_worker(self):
"""ConfigLimitedApp (limited to 1 via config) should use only one worker."""
with DockerContainer() as container:
# Make multiple requests
worker_ids = set()
for _ in range(10):
resp = container.http_get("/config_limited/get_worker_id")
assert resp.status_code == 200
data = resp.json()
worker_ids.add(data["worker_id"])
# Should only see 1 worker (the app is limited to 1)
assert len(worker_ids) == 1, (
f"Expected ConfigLimitedApp to use only 1 worker, got {len(worker_ids)}"
)
def test_heavy_app_uses_limited_workers(self):
"""HeavyApp (workers=2) should use only 2 workers."""
with DockerContainer() as container:
# Make multiple requests
worker_ids = set()
for _ in range(20):
resp = container.http_get("/heavy/get_worker_id")
# HeavyApp uses class attribute workers=2
# But currently the arbiter only reads config :N format
# This test documents expected behavior
if resp.status_code == 200:
data = resp.json()
worker_ids.add(data["worker_id"])
else:
# If class attribute isn't supported yet, skip
pytest.skip("HeavyApp class attribute workers=2 not implemented")
return
# Should see at most 2 workers
assert len(worker_ids) <= 2, (
f"Expected HeavyApp to use at most 2 workers, got {len(worker_ids)}"
)
def test_heavy_app_prediction_works(self):
"""HeavyApp.predict() should return correct results."""
with DockerContainer() as container:
resp = container.http_get("/heavy/predict/test_input")
if resp.status_code == 200:
data = resp.json()
assert data["prediction"] == "result_for_test_input"
assert "worker_id" in data
else:
# If class attribute isn't supported, document the error
data = resp.json()
print(f"HeavyApp error: {data}")
def test_all_apps_accessible(self):
"""All configured apps should be accessible."""
with DockerContainer() as container:
# LightweightApp
resp = container.http_get("/lightweight/ping")
assert resp.status_code == 200
# ConfigLimitedApp
resp = container.http_get("/config_limited/info")
assert resp.status_code == 200
data = resp.json()
assert data["app"] == "ConfigLimitedApp"
def test_four_dirty_workers_running(self):
"""Should have 4 dirty workers as configured."""
with DockerContainer() as container:
pids = container.get_gunicorn_pids()
assert len(pids["dirty-workers"]) == 4, (
f"Expected 4 dirty workers, got {len(pids['dirty-workers'])}"
)
class TestPerAppWorkerCrash:
"""Test per-app allocation behavior when workers crash."""
@pytest.fixture(autouse=True)
def setup(self):
"""Check Docker is available."""
result = subprocess.run(
["docker", "info"],
capture_output=True,
)
if result.returncode != 0:
pytest.skip("Docker is not available")
def test_worker_crash_app_still_accessible(self):
"""When a dirty worker crashes, apps should still be accessible."""
with DockerContainer() as container:
pids = container.get_gunicorn_pids()
assert len(pids["dirty-workers"]) == 4
# Kill one dirty worker
container.kill_process(pids["dirty-workers"][0], signal=9)
# Wait for respawn (dirty arbiter should respawn it)
assert container.wait_for_dirty_worker_count(4, timeout=15), (
"Dirty arbiter should respawn killed worker"
)
# Apps should still work
resp = container.http_get("/lightweight/ping")
assert resp.status_code == 200
resp = container.http_get("/config_limited/info")
assert resp.status_code == 200
def test_config_limited_worker_crash_recovery(self):
"""When the sole worker for ConfigLimitedApp crashes, it should recover."""
with DockerContainer() as container:
# Get the worker ID that handles ConfigLimitedApp
resp = container.http_get("/config_limited/get_worker_id")
assert resp.status_code == 200
original_worker_id = resp.json()["worker_id"]
# Kill that specific worker
container.kill_process(original_worker_id, signal=9)
# Wait for respawn
time.sleep(3)
# The new worker should handle ConfigLimitedApp
resp = container.http_get("/config_limited/get_worker_id")
# Note: There might be a brief period where no worker has the app
# In production, this would return an error until respawn
if resp.status_code == 200:
new_worker_id = resp.json()["worker_id"]
# Worker ID should be different (new process)
assert new_worker_id != original_worker_id, (
"New worker should have different PID"
)
class TestPerAppLogs:
"""Test that per-app allocation is logged correctly."""
@pytest.fixture(autouse=True)
def setup(self):
"""Check Docker is available."""
result = subprocess.run(
["docker", "info"],
capture_output=True,
)
if result.returncode != 0:
pytest.skip("Docker is not available")
def test_logs_show_app_allocation(self):
"""Logs should indicate which apps are loaded on which workers."""
with DockerContainer() as container:
logs = container.get_logs()
# Should see dirty arbiter starting
assert "Dirty arbiter" in logs or "dirty arbiter" in logs.lower()
# Should see dirty workers spawning
assert "dirty" in logs.lower() and "worker" in logs.lower()
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@ -93,3 +93,65 @@ class SlowDirtyApp(DirtyApp):
def close(self):
self.closed = True
class HeavyModelApp(DirtyApp):
"""A dirty app that simulates a heavy model requiring limited workers.
Uses the workers class attribute to limit how many workers load this app.
"""
workers = 2 # Only 2 workers should load this app
def __init__(self):
self.initialized = False
self.closed = False
self.model_data = None
self.worker_id = None
def init(self):
import os
self.initialized = True
# Store the worker PID to verify which worker handled the request
self.worker_id = os.getpid()
# Simulate loading a heavy model
self.model_data = {"loaded": True, "worker": self.worker_id}
def predict(self, data):
"""Simulate model prediction."""
return {
"prediction": f"result_for_{data}",
"worker_id": self.worker_id,
}
def get_worker_id(self):
"""Return the worker ID that loaded this app."""
return self.worker_id
def close(self):
self.closed = True
self.model_data = None
class LightweightApp(DirtyApp):
"""A lightweight app that should load on all workers."""
def __init__(self):
self.initialized = False
self.closed = False
self.worker_id = None
def init(self):
import os
self.initialized = True
self.worker_id = os.getpid()
def ping(self):
"""Simple ping action."""
return {"pong": True, "worker_id": self.worker_id}
def get_worker_id(self):
"""Return the worker ID that loaded this app."""
return self.worker_id
def close(self):
self.closed = True

View File

@ -295,3 +295,53 @@ class TestParseDirtyAppSpec:
import_path, count = parse_dirty_app_spec("mod.sub:Class:2")
assert import_path == "mod.sub:Class"
assert count == 2
class TestGetAppWorkersAttribute:
"""Tests for get_app_workers_attribute function."""
def test_get_workers_none_for_base_class(self):
"""Base DirtyApp returns workers=None."""
from gunicorn.dirty.app import get_app_workers_attribute
workers = get_app_workers_attribute("gunicorn.dirty.app:DirtyApp")
assert workers is None
def test_get_workers_from_class_attribute(self):
"""App with workers=2 class attribute returns 2."""
from gunicorn.dirty.app import get_app_workers_attribute
workers = get_app_workers_attribute("tests.support_dirty_app:HeavyModelApp")
assert workers == 2
def test_get_workers_none_for_inherited(self):
"""App without explicit workers attribute returns None."""
from gunicorn.dirty.app import get_app_workers_attribute
workers = get_app_workers_attribute("tests.support_dirty_app:TestDirtyApp")
assert workers is None
def test_get_workers_not_found_module(self):
"""Non-existent module raises DirtyAppNotFoundError."""
from gunicorn.dirty.app import get_app_workers_attribute
from gunicorn.dirty.errors import DirtyAppNotFoundError
with pytest.raises(DirtyAppNotFoundError):
get_app_workers_attribute("nonexistent.module:App")
def test_get_workers_not_found_class(self):
"""Non-existent class raises DirtyAppNotFoundError."""
from gunicorn.dirty.app import get_app_workers_attribute
from gunicorn.dirty.errors import DirtyAppNotFoundError
with pytest.raises(DirtyAppNotFoundError):
get_app_workers_attribute("tests.support_dirty_app:NonExistentApp")
def test_get_workers_invalid_format(self):
"""Invalid format raises DirtyAppError."""
from gunicorn.dirty.app import get_app_workers_attribute
from gunicorn.dirty.errors import DirtyAppError
with pytest.raises(DirtyAppError) as exc_info:
get_app_workers_attribute("invalid.format.no.colon")
assert "Invalid import path format" in str(exc_info.value)