feat(dirty): add thread pool with execution timeout control

- Use dirty_threads config for thread pool size (default: 1)
- Enforce dirty_timeout at worker level via asyncio.wait_for
- Heartbeat runs independently, not blocked by task execution
- Document thread safety and state persistence in docstrings
This commit is contained in:
Benoit Chesneau 2026-01-24 02:47:21 +01:00
parent 21f769ce16
commit 06aba09251
3 changed files with 309 additions and 35 deletions

View File

@ -23,38 +23,59 @@ class DirtyApp:
persist in memory for the lifetime of the worker. They are designed
for stateful resources like ML models, connection pools, etc.
Lifecycle
---------
1. ``__init__()``: Called when the app is instantiated (once per worker)
2. ``init()``: Called after instantiation to initialize resources
3. ``__call__()``: Called for each request from HTTP workers
4. ``close()``: Called when the worker shuts down
State Persistence
-----------------
Instance variables persist across requests. This is the key feature
that enables loading heavy resources once and reusing them::
class MLApp(DirtyApp):
def init(self):
self.model = load_model() # Loaded once, reused forever
def predict(self, data):
return self.model.predict(data) # Same model for all requests
Thread Safety
-------------
With ``dirty_threads=1`` (default): Only one request runs at a time,
so no thread safety concerns.
With ``dirty_threads > 1``: Multiple requests may run concurrently
in the same worker. Your app MUST be thread-safe. Options:
- Use locks: ``threading.Lock()`` for shared state
- Use thread-local: ``threading.local()`` for per-thread state
- Use read-only state: Load models once in init(), never mutate
Example::
import threading
class ThreadSafeMLApp(DirtyApp):
def __init__(self):
self.models = {}
self._lock = threading.Lock()
def init(self):
self.models['default'] = load_model('base-model')
def load_model(self, name):
with self._lock:
if name not in self.models:
self.models[name] = load_model(name)
return {"loaded": True, "name": name}
Subclasses should implement:
- init(): Called once at worker startup to initialize resources
- __call__(action, *args, **kwargs): Handle requests from HTTP workers
- close(): Called at worker shutdown to cleanup resources
Example::
class MLApp(DirtyApp):
def __init__(self):
self.models = {}
def init(self):
# Load default models at startup
self.models['default'] = load_model('base-model')
def __call__(self, action, *args, **kwargs):
method = getattr(self, action, None)
if method is None:
raise ValueError(f"Unknown action: {action}")
return method(*args, **kwargs)
def load_model(self, name):
if name not in self.models:
self.models[name] = load_model(name)
return {"loaded": True, "name": name}
def inference(self, model_name, input_data):
return self.models[model_name].predict(input_data)
def close(self):
for model in self.models.values():
del model
"""
def init(self):

View File

@ -7,9 +7,68 @@ Dirty Worker Process
Asyncio-based worker that loads dirty apps and handles requests
from the DirtyArbiter.
Threading Model
---------------
Each dirty worker runs an asyncio event loop in the main thread for:
- Handling connections from the arbiter
- Managing heartbeat updates
- Coordinating task execution
Actual app execution runs in a ThreadPoolExecutor (separate threads):
- The number of threads is controlled by ``dirty_threads`` config (default: 1)
- Each thread can execute one app action at a time
- The asyncio event loop is NOT blocked by task execution
State and Global Objects
------------------------
Apps can maintain persistent state because:
1. Apps are loaded ONCE when the worker starts (in ``load_apps()``)
2. The same app instances are reused for ALL requests
3. App state (instance variables, loaded models, etc.) persists
Example::
class MLApp(DirtyApp):
def init(self):
self.model = load_heavy_model() # Loaded once, reused
self.cache = {} # Persistent cache
def predict(self, data):
return self.model.predict(data) # Uses loaded model
Thread Safety:
- With ``dirty_threads=1`` (default): No concurrent access, thread-safe by design
- With ``dirty_threads > 1``: Multiple threads share the same app instances,
apps MUST be thread-safe (use locks, thread-local storage, etc.)
Heartbeat and Liveness
----------------------
The worker sends heartbeat updates to prove it's alive:
1. A dedicated asyncio task (``_heartbeat_loop``) runs independently
2. It updates the heartbeat file every ``dirty_timeout / 2`` seconds
3. Since tasks run in executor threads, they do NOT block heartbeats
4. The arbiter kills workers that miss heartbeat updates
Timeout Control
---------------
Execution timeout is enforced at two levels:
1. **Worker level**: Each task execution has a timeout (``dirty_timeout``).
If exceeded, the worker returns a timeout error but the thread may
continue running (Python threads cannot be cancelled).
2. **Arbiter level**: The arbiter also enforces timeout when waiting
for worker response. Workers that don't respond are killed via SIGABRT.
Note: Since Python threads cannot be forcibly cancelled, a truly stuck
operation will continue until the worker is killed by the arbiter.
"""
import asyncio
from concurrent.futures import ThreadPoolExecutor
import os
import signal
import traceback
@ -19,7 +78,12 @@ from gunicorn import util
from gunicorn.workers.workertmp import WorkerTmp
from .app import load_dirty_apps
from .errors import DirtyAppError, DirtyAppNotFoundError, DirtyWorkerError
from .errors import (
DirtyAppError,
DirtyAppNotFoundError,
DirtyTimeoutError,
DirtyWorkerError,
)
from .protocol import (
DirtyProtocol,
make_response,
@ -64,6 +128,7 @@ class DirtyWorker:
self.apps = {}
self._server = None
self._loop = None
self._executor = None
def __str__(self):
return f"<DirtyWorker {self.pid}>"
@ -159,6 +224,14 @@ class DirtyWorker:
def run(self):
"""Run the main asyncio event loop."""
# Create thread pool for executing app actions
num_threads = self.cfg.dirty_threads
self._executor = ThreadPoolExecutor(
max_workers=num_threads,
thread_name_prefix=f"dirty-worker-{self.pid}-"
)
self.log.debug("Created thread pool with %d threads", num_threads)
try:
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
@ -281,6 +354,10 @@ class DirtyWorker:
"""
Execute an action on a dirty app.
The action runs in a thread pool executor to avoid blocking the
asyncio event loop. Execution timeout is enforced using
``dirty_timeout`` config.
Args:
app_path: Import path of the dirty app
action: Action name to execute
@ -292,24 +369,46 @@ class DirtyWorker:
Raises:
DirtyAppNotFoundError: If app is not loaded
DirtyTimeoutError: If execution exceeds timeout
DirtyAppError: If execution fails
"""
if app_path not in self.apps:
raise DirtyAppNotFoundError(app_path)
app = self.apps[app_path]
timeout = self.cfg.dirty_timeout if self.cfg.dirty_timeout > 0 else None
# Run the app call in a thread pool to avoid blocking
# Run the app call in the thread pool to avoid blocking
# the event loop for CPU-bound operations
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(
None,
lambda: app(action, *args, **kwargs)
)
return result
try:
result = await asyncio.wait_for(
loop.run_in_executor(
self._executor,
lambda: app(action, *args, **kwargs)
),
timeout=timeout
)
return result
except asyncio.TimeoutError:
# Note: The thread continues running - we just stop waiting
self.log.warning(
"Execution timeout for %s.%s after %ds",
app_path, action, timeout
)
raise DirtyTimeoutError(
f"Execution of {app_path}.{action} timed out",
timeout=timeout
)
def _cleanup(self):
"""Clean up resources on shutdown."""
# Shutdown thread pool executor
if self._executor:
self._executor.shutdown(wait=False, cancel_futures=True)
self._executor = None
# Close all apps
for path, app in self.apps.items():
try:

View File

@ -906,3 +906,157 @@ class TestDirtyWorkerLoadAppsInit:
# Error should be logged
assert any("Failed to initialize" in msg for level, msg in log.messages)
class TestDirtyWorkerExecutionTimeout:
"""Tests for execution timeout control."""
@pytest.mark.asyncio
async def test_execute_with_timeout(self):
"""Test that execute enforces timeout."""
from concurrent.futures import ThreadPoolExecutor
cfg = Config()
cfg.set("dirty_timeout", 1) # 1 second timeout
cfg.set("dirty_threads", 1)
log = MockLog()
with tempfile.TemporaryDirectory() as tmpdir:
socket_path = os.path.join(tmpdir, "worker.sock")
worker = DirtyWorker(
age=1,
ppid=os.getpid(),
app_paths=["tests.support_dirty_app:SlowDirtyApp"],
cfg=cfg,
log=log,
socket_path=socket_path
)
worker.pid = os.getpid()
# Create executor manually for test
worker._executor = ThreadPoolExecutor(max_workers=1)
try:
worker.load_apps()
# Execute slow action that exceeds timeout
from gunicorn.dirty.errors import DirtyTimeoutError
with pytest.raises(DirtyTimeoutError):
await worker.execute(
"tests.support_dirty_app:SlowDirtyApp",
"slow_action",
[],
{"delay": 5.0} # 5 second delay, 1 second timeout
)
finally:
worker._cleanup()
@pytest.mark.asyncio
async def test_execute_within_timeout(self):
"""Test that execute succeeds within timeout."""
from concurrent.futures import ThreadPoolExecutor
cfg = Config()
cfg.set("dirty_timeout", 10) # 10 second timeout
cfg.set("dirty_threads", 1)
log = MockLog()
with tempfile.TemporaryDirectory() as tmpdir:
socket_path = os.path.join(tmpdir, "worker.sock")
worker = DirtyWorker(
age=1,
ppid=os.getpid(),
app_paths=["tests.support_dirty_app:SlowDirtyApp"],
cfg=cfg,
log=log,
socket_path=socket_path
)
worker.pid = os.getpid()
# Create executor manually for test
worker._executor = ThreadPoolExecutor(max_workers=1)
try:
worker.load_apps()
# Execute fast action that completes within timeout
result = await worker.execute(
"tests.support_dirty_app:SlowDirtyApp",
"fast_action",
[],
{}
)
assert result == {"fast": True}
finally:
worker._cleanup()
@pytest.mark.asyncio
async def test_execute_no_timeout_when_zero(self):
"""Test that timeout is disabled when dirty_timeout is 0."""
from concurrent.futures import ThreadPoolExecutor
cfg = Config()
cfg.set("dirty_timeout", 0) # Disabled
cfg.set("dirty_threads", 1)
log = MockLog()
with tempfile.TemporaryDirectory() as tmpdir:
socket_path = os.path.join(tmpdir, "worker.sock")
worker = DirtyWorker(
age=1,
ppid=os.getpid(),
app_paths=["tests.support_dirty_app:TestDirtyApp"],
cfg=cfg,
log=log,
socket_path=socket_path
)
worker.pid = os.getpid()
# Create executor manually for test
worker._executor = ThreadPoolExecutor(max_workers=1)
try:
worker.load_apps()
# Should work with no timeout
result = await worker.execute(
"tests.support_dirty_app:TestDirtyApp",
"compute",
[2, 3],
{"operation": "add"}
)
assert result == 5
finally:
worker._cleanup()
def test_run_creates_executor_with_threads(self):
"""Test that run() creates executor with dirty_threads config."""
cfg = Config()
cfg.set("dirty_timeout", 300)
cfg.set("dirty_threads", 4)
log = MockLog()
with tempfile.TemporaryDirectory() as tmpdir:
socket_path = os.path.join(tmpdir, "worker.sock")
worker = DirtyWorker(
age=1,
ppid=os.getpid(),
app_paths=["tests.support_dirty_app:TestDirtyApp"],
cfg=cfg,
log=log,
socket_path=socket_path
)
worker.pid = os.getpid()
worker.load_apps()
# Simulate what run() does
from concurrent.futures import ThreadPoolExecutor
worker._executor = ThreadPoolExecutor(
max_workers=cfg.dirty_threads,
thread_name_prefix=f"dirty-worker-{worker.pid}-"
)
assert worker._executor._max_workers == 4
worker._cleanup()
assert worker._executor is None