From 06aba09251226a3a6c8bac305559a78f6d832d83 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Sat, 24 Jan 2026 02:47:21 +0100 Subject: [PATCH] feat(dirty): add thread pool with execution timeout control - Use dirty_threads config for thread pool size (default: 1) - Enforce dirty_timeout at worker level via asyncio.wait_for - Heartbeat runs independently, not blocked by task execution - Document thread safety and state persistence in docstrings --- gunicorn/dirty/app.py | 77 ++++++++++++------- gunicorn/dirty/worker.py | 113 +++++++++++++++++++++++++-- tests/test_dirty_worker.py | 154 +++++++++++++++++++++++++++++++++++++ 3 files changed, 309 insertions(+), 35 deletions(-) diff --git a/gunicorn/dirty/app.py b/gunicorn/dirty/app.py index 1fa8d9ce..e9198349 100644 --- a/gunicorn/dirty/app.py +++ b/gunicorn/dirty/app.py @@ -23,38 +23,59 @@ class DirtyApp: persist in memory for the lifetime of the worker. They are designed for stateful resources like ML models, connection pools, etc. + Lifecycle + --------- + 1. ``__init__()``: Called when the app is instantiated (once per worker) + 2. ``init()``: Called after instantiation to initialize resources + 3. ``__call__()``: Called for each request from HTTP workers + 4. ``close()``: Called when the worker shuts down + + State Persistence + ----------------- + Instance variables persist across requests. This is the key feature + that enables loading heavy resources once and reusing them:: + + class MLApp(DirtyApp): + def init(self): + self.model = load_model() # Loaded once, reused forever + + def predict(self, data): + return self.model.predict(data) # Same model for all requests + + Thread Safety + ------------- + With ``dirty_threads=1`` (default): Only one request runs at a time, + so no thread safety concerns. + + With ``dirty_threads > 1``: Multiple requests may run concurrently + in the same worker. Your app MUST be thread-safe. Options: + + - Use locks: ``threading.Lock()`` for shared state + - Use thread-local: ``threading.local()`` for per-thread state + - Use read-only state: Load models once in init(), never mutate + + Example:: + + import threading + + class ThreadSafeMLApp(DirtyApp): + def __init__(self): + self.models = {} + self._lock = threading.Lock() + + def init(self): + self.models['default'] = load_model('base-model') + + def load_model(self, name): + with self._lock: + if name not in self.models: + self.models[name] = load_model(name) + return {"loaded": True, "name": name} + Subclasses should implement: - init(): Called once at worker startup to initialize resources - __call__(action, *args, **kwargs): Handle requests from HTTP workers - close(): Called at worker shutdown to cleanup resources - - Example:: - - class MLApp(DirtyApp): - def __init__(self): - self.models = {} - - def init(self): - # Load default models at startup - self.models['default'] = load_model('base-model') - - def __call__(self, action, *args, **kwargs): - method = getattr(self, action, None) - if method is None: - raise ValueError(f"Unknown action: {action}") - return method(*args, **kwargs) - - def load_model(self, name): - if name not in self.models: - self.models[name] = load_model(name) - return {"loaded": True, "name": name} - - def inference(self, model_name, input_data): - return self.models[model_name].predict(input_data) - - def close(self): - for model in self.models.values(): - del model """ def init(self): diff --git a/gunicorn/dirty/worker.py b/gunicorn/dirty/worker.py index b7797fb8..70e45047 100644 --- a/gunicorn/dirty/worker.py +++ b/gunicorn/dirty/worker.py @@ -7,9 +7,68 @@ Dirty Worker Process Asyncio-based worker that loads dirty apps and handles requests from the DirtyArbiter. + +Threading Model +--------------- +Each dirty worker runs an asyncio event loop in the main thread for: +- Handling connections from the arbiter +- Managing heartbeat updates +- Coordinating task execution + +Actual app execution runs in a ThreadPoolExecutor (separate threads): +- The number of threads is controlled by ``dirty_threads`` config (default: 1) +- Each thread can execute one app action at a time +- The asyncio event loop is NOT blocked by task execution + +State and Global Objects +------------------------ +Apps can maintain persistent state because: + +1. Apps are loaded ONCE when the worker starts (in ``load_apps()``) +2. The same app instances are reused for ALL requests +3. App state (instance variables, loaded models, etc.) persists + +Example:: + + class MLApp(DirtyApp): + def init(self): + self.model = load_heavy_model() # Loaded once, reused + self.cache = {} # Persistent cache + + def predict(self, data): + return self.model.predict(data) # Uses loaded model + +Thread Safety: +- With ``dirty_threads=1`` (default): No concurrent access, thread-safe by design +- With ``dirty_threads > 1``: Multiple threads share the same app instances, + apps MUST be thread-safe (use locks, thread-local storage, etc.) + +Heartbeat and Liveness +---------------------- +The worker sends heartbeat updates to prove it's alive: + +1. A dedicated asyncio task (``_heartbeat_loop``) runs independently +2. It updates the heartbeat file every ``dirty_timeout / 2`` seconds +3. Since tasks run in executor threads, they do NOT block heartbeats +4. The arbiter kills workers that miss heartbeat updates + +Timeout Control +--------------- +Execution timeout is enforced at two levels: + +1. **Worker level**: Each task execution has a timeout (``dirty_timeout``). + If exceeded, the worker returns a timeout error but the thread may + continue running (Python threads cannot be cancelled). + +2. **Arbiter level**: The arbiter also enforces timeout when waiting + for worker response. Workers that don't respond are killed via SIGABRT. + +Note: Since Python threads cannot be forcibly cancelled, a truly stuck +operation will continue until the worker is killed by the arbiter. """ import asyncio +from concurrent.futures import ThreadPoolExecutor import os import signal import traceback @@ -19,7 +78,12 @@ from gunicorn import util from gunicorn.workers.workertmp import WorkerTmp from .app import load_dirty_apps -from .errors import DirtyAppError, DirtyAppNotFoundError, DirtyWorkerError +from .errors import ( + DirtyAppError, + DirtyAppNotFoundError, + DirtyTimeoutError, + DirtyWorkerError, +) from .protocol import ( DirtyProtocol, make_response, @@ -64,6 +128,7 @@ class DirtyWorker: self.apps = {} self._server = None self._loop = None + self._executor = None def __str__(self): return f"" @@ -159,6 +224,14 @@ class DirtyWorker: def run(self): """Run the main asyncio event loop.""" + # Create thread pool for executing app actions + num_threads = self.cfg.dirty_threads + self._executor = ThreadPoolExecutor( + max_workers=num_threads, + thread_name_prefix=f"dirty-worker-{self.pid}-" + ) + self.log.debug("Created thread pool with %d threads", num_threads) + try: self._loop = asyncio.new_event_loop() asyncio.set_event_loop(self._loop) @@ -281,6 +354,10 @@ class DirtyWorker: """ Execute an action on a dirty app. + The action runs in a thread pool executor to avoid blocking the + asyncio event loop. Execution timeout is enforced using + ``dirty_timeout`` config. + Args: app_path: Import path of the dirty app action: Action name to execute @@ -292,24 +369,46 @@ class DirtyWorker: Raises: DirtyAppNotFoundError: If app is not loaded + DirtyTimeoutError: If execution exceeds timeout DirtyAppError: If execution fails """ if app_path not in self.apps: raise DirtyAppNotFoundError(app_path) app = self.apps[app_path] + timeout = self.cfg.dirty_timeout if self.cfg.dirty_timeout > 0 else None - # Run the app call in a thread pool to avoid blocking + # Run the app call in the thread pool to avoid blocking # the event loop for CPU-bound operations loop = asyncio.get_running_loop() - result = await loop.run_in_executor( - None, - lambda: app(action, *args, **kwargs) - ) - return result + + try: + result = await asyncio.wait_for( + loop.run_in_executor( + self._executor, + lambda: app(action, *args, **kwargs) + ), + timeout=timeout + ) + return result + except asyncio.TimeoutError: + # Note: The thread continues running - we just stop waiting + self.log.warning( + "Execution timeout for %s.%s after %ds", + app_path, action, timeout + ) + raise DirtyTimeoutError( + f"Execution of {app_path}.{action} timed out", + timeout=timeout + ) def _cleanup(self): """Clean up resources on shutdown.""" + # Shutdown thread pool executor + if self._executor: + self._executor.shutdown(wait=False, cancel_futures=True) + self._executor = None + # Close all apps for path, app in self.apps.items(): try: diff --git a/tests/test_dirty_worker.py b/tests/test_dirty_worker.py index 1e45629f..6bfaa68f 100644 --- a/tests/test_dirty_worker.py +++ b/tests/test_dirty_worker.py @@ -906,3 +906,157 @@ class TestDirtyWorkerLoadAppsInit: # Error should be logged assert any("Failed to initialize" in msg for level, msg in log.messages) + + +class TestDirtyWorkerExecutionTimeout: + """Tests for execution timeout control.""" + + @pytest.mark.asyncio + async def test_execute_with_timeout(self): + """Test that execute enforces timeout.""" + from concurrent.futures import ThreadPoolExecutor + + cfg = Config() + cfg.set("dirty_timeout", 1) # 1 second timeout + cfg.set("dirty_threads", 1) + log = MockLog() + + with tempfile.TemporaryDirectory() as tmpdir: + socket_path = os.path.join(tmpdir, "worker.sock") + worker = DirtyWorker( + age=1, + ppid=os.getpid(), + app_paths=["tests.support_dirty_app:SlowDirtyApp"], + cfg=cfg, + log=log, + socket_path=socket_path + ) + worker.pid = os.getpid() + + # Create executor manually for test + worker._executor = ThreadPoolExecutor(max_workers=1) + + try: + worker.load_apps() + + # Execute slow action that exceeds timeout + from gunicorn.dirty.errors import DirtyTimeoutError + with pytest.raises(DirtyTimeoutError): + await worker.execute( + "tests.support_dirty_app:SlowDirtyApp", + "slow_action", + [], + {"delay": 5.0} # 5 second delay, 1 second timeout + ) + finally: + worker._cleanup() + + @pytest.mark.asyncio + async def test_execute_within_timeout(self): + """Test that execute succeeds within timeout.""" + from concurrent.futures import ThreadPoolExecutor + + cfg = Config() + cfg.set("dirty_timeout", 10) # 10 second timeout + cfg.set("dirty_threads", 1) + log = MockLog() + + with tempfile.TemporaryDirectory() as tmpdir: + socket_path = os.path.join(tmpdir, "worker.sock") + worker = DirtyWorker( + age=1, + ppid=os.getpid(), + app_paths=["tests.support_dirty_app:SlowDirtyApp"], + cfg=cfg, + log=log, + socket_path=socket_path + ) + worker.pid = os.getpid() + + # Create executor manually for test + worker._executor = ThreadPoolExecutor(max_workers=1) + + try: + worker.load_apps() + + # Execute fast action that completes within timeout + result = await worker.execute( + "tests.support_dirty_app:SlowDirtyApp", + "fast_action", + [], + {} + ) + assert result == {"fast": True} + finally: + worker._cleanup() + + @pytest.mark.asyncio + async def test_execute_no_timeout_when_zero(self): + """Test that timeout is disabled when dirty_timeout is 0.""" + from concurrent.futures import ThreadPoolExecutor + + cfg = Config() + cfg.set("dirty_timeout", 0) # Disabled + cfg.set("dirty_threads", 1) + log = MockLog() + + with tempfile.TemporaryDirectory() as tmpdir: + socket_path = os.path.join(tmpdir, "worker.sock") + worker = DirtyWorker( + age=1, + ppid=os.getpid(), + app_paths=["tests.support_dirty_app:TestDirtyApp"], + cfg=cfg, + log=log, + socket_path=socket_path + ) + worker.pid = os.getpid() + + # Create executor manually for test + worker._executor = ThreadPoolExecutor(max_workers=1) + + try: + worker.load_apps() + + # Should work with no timeout + result = await worker.execute( + "tests.support_dirty_app:TestDirtyApp", + "compute", + [2, 3], + {"operation": "add"} + ) + assert result == 5 + finally: + worker._cleanup() + + def test_run_creates_executor_with_threads(self): + """Test that run() creates executor with dirty_threads config.""" + cfg = Config() + cfg.set("dirty_timeout", 300) + cfg.set("dirty_threads", 4) + log = MockLog() + + with tempfile.TemporaryDirectory() as tmpdir: + socket_path = os.path.join(tmpdir, "worker.sock") + worker = DirtyWorker( + age=1, + ppid=os.getpid(), + app_paths=["tests.support_dirty_app:TestDirtyApp"], + cfg=cfg, + log=log, + socket_path=socket_path + ) + worker.pid = os.getpid() + worker.load_apps() + + # Simulate what run() does + from concurrent.futures import ThreadPoolExecutor + worker._executor = ThreadPoolExecutor( + max_workers=cfg.dirty_threads, + thread_name_prefix=f"dirty-worker-{worker.pid}-" + ) + + assert worker._executor._max_workers == 4 + + worker._cleanup() + assert worker._executor is None