feat(dirty): add TTIN/TTOU signal support for dynamic worker scaling

Add support for SIGTTIN and SIGTTOU signals to the dirty arbiter,
allowing dynamic scaling of dirty workers at runtime without restarting
gunicorn.

Changes:
- Add TTIN/TTOU to DirtyArbiter.SIGNALS
- Add num_workers instance variable for dynamic count
- Add _get_minimum_workers() to enforce app worker constraints
- Add signal handlers for TTIN (increase) and TTOU (decrease)
- Update manage_workers() to use dynamic count
- Add documentation for dynamic scaling
- Add unit tests for signal handling
- Add Docker integration tests

The minimum worker constraint ensures TTOU cannot reduce workers below
what apps require (e.g., if an app has workers=3, minimum is 3).

Closes #3489
This commit is contained in:
Benoit Chesneau 2026-02-12 23:52:12 +01:00
parent ac00c862d7
commit 2639215aa3
9 changed files with 645 additions and 2 deletions

View File

@ -912,9 +912,40 @@ Dirty Arbiters integrate with the main arbiter's signal handling. Signals are fo
| `SIGQUIT` | Immediate exit via `sys.exit(0)` | Killed immediately | Fast shutdown, no cleanup |
| `SIGHUP` | Kills all workers, spawns new ones | Exits immediately | Hot reload of workers |
| `SIGUSR1` | Reopens log files, forwards to workers | Reopens log files | Log rotation support |
| `SIGTTIN` | Increases worker count by 1 | N/A | Dynamic scaling up |
| `SIGTTOU` | Decreases worker count by 1 | N/A | Dynamic scaling down |
| `SIGCHLD` | Handled by event loop, triggers reap | N/A | Worker death detection |
| `SIGINT` | Same as SIGTERM | Same as SIGTERM | Ctrl-C handling |
### Dynamic Scaling with TTIN/TTOU
You can dynamically scale the number of dirty workers at runtime using signals, without restarting gunicorn:
```bash
# Find the dirty arbiter process
ps aux | grep dirty-arbiter
# Or use the PID file (location depends on your app name)
cat /tmp/gunicorn-dirty-myapp.pid
# Increase dirty workers by 1
kill -TTIN <dirty-arbiter-pid>
# Decrease dirty workers by 1
kill -TTOU <dirty-arbiter-pid>
```
**Minimum Worker Constraint:** The dirty arbiter will not decrease below the minimum number of workers required by your app configurations. For example, if you have an app with `workers = 3`, you cannot scale below 3 dirty workers. When this limit is reached, a warning is logged:
```
WARNING: SIGTTOU: Cannot decrease below 3 workers (required by app specs)
```
**Use Cases:**
- **Burst handling** - Scale up when you anticipate heavy load
- **Cost optimization** - Scale down during low-traffic periods
- **Recovery** - Scale up if workers are busy with long-running tasks
### Forwarded Signals
The main arbiter forwards these signals to the dirty arbiter process:

View File

@ -57,7 +57,7 @@ class DirtyArbiter:
"""
SIGNALS = [getattr(signal, "SIG%s" % x) for x in
"HUP QUIT INT TERM USR1 USR2 CHLD".split()]
"HUP QUIT INT TERM TTIN TTOU USR1 USR2 CHLD".split()]
# Worker boot error code
WORKER_BOOT_ERROR = 3
@ -92,6 +92,7 @@ class DirtyArbiter:
self._worker_rr_index = 0 # Round-robin index for worker selection
self.worker_age = 0
self.alive = True
self.num_workers = self.cfg.dirty_workers # Dynamic count for TTIN/TTOU
self._server = None
self._loop = None
@ -150,6 +151,23 @@ class DirtyArbiter:
# Initialize the app_worker_map for this app
self.app_worker_map[import_path] = set()
def _get_minimum_workers(self):
"""
Calculate minimum number of workers required by app specs.
Returns the maximum worker_count across all apps that have limits.
Apps with worker_count=None don't impose a minimum.
Returns:
int: Minimum workers required (at least 1)
"""
min_required = 1
for spec in self.app_specs.values():
worker_count = spec['worker_count']
if worker_count is not None:
min_required = max(min_required, worker_count)
return min_required
def _get_apps_for_new_worker(self):
"""
Determine which apps a new worker should load.
@ -255,6 +273,8 @@ class DirtyArbiter:
signal.signal(signal.SIGHUP, self._signal_handler)
signal.signal(signal.SIGUSR1, self._signal_handler)
signal.signal(signal.SIGCHLD, self._signal_handler)
signal.signal(signal.SIGTTIN, self._signal_handler)
signal.signal(signal.SIGTTOU, self._signal_handler)
def _signal_handler(self, sig, frame):
"""Handle signals."""
@ -279,6 +299,36 @@ class DirtyArbiter:
)
return
if sig == signal.SIGTTIN:
# Increase number of workers
self.num_workers += 1
self.log.info("SIGTTIN: Increasing dirty workers to %s",
self.num_workers)
if self._loop:
self._loop.call_soon_threadsafe(
lambda: asyncio.create_task(self.manage_workers())
)
return
if sig == signal.SIGTTOU:
# Decrease number of workers (respecting minimum)
min_workers = self._get_minimum_workers()
if self.num_workers <= min_workers:
self.log.warning(
"SIGTTOU: Cannot decrease below %s workers "
"(required by app specs)",
min_workers
)
return
self.num_workers -= 1
self.log.info("SIGTTOU: Decreasing dirty workers to %s",
self.num_workers)
if self._loop:
self._loop.call_soon_threadsafe(
lambda: asyncio.create_task(self.manage_workers())
)
return
# Shutdown signals
self.alive = False
if self._loop:
@ -717,7 +767,7 @@ class DirtyArbiter:
if not self.alive:
return
num_workers = self.cfg.dirty_workers
num_workers = self.num_workers
# Spawn workers if needed
while self.alive and len(self.workers) < num_workers:

View File

@ -0,0 +1,234 @@
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
"""Tests for dirty arbiter TTIN/TTOU signal handling."""
import signal
from unittest.mock import Mock
import pytest
class TestDirtyArbiterSignals:
"""Test TTIN/TTOU signal handling in DirtyArbiter."""
@pytest.fixture
def arbiter(self, tmp_path):
"""Create a DirtyArbiter for testing."""
from gunicorn.dirty.arbiter import DirtyArbiter
cfg = Mock()
cfg.dirty_workers = 2
cfg.dirty_apps = []
cfg.dirty_timeout = 30
cfg.dirty_graceful_timeout = 30
cfg.on_dirty_starting = Mock()
log = Mock()
arbiter = DirtyArbiter(cfg, log, socket_path=str(tmp_path / "test.sock"))
return arbiter
def test_initial_num_workers_from_config(self, arbiter):
"""num_workers should be initialized from config."""
assert arbiter.num_workers == 2
def test_ttin_increases_num_workers(self, arbiter):
"""SIGTTIN should increase num_workers by 1."""
assert arbiter.num_workers == 2
arbiter._signal_handler(signal.SIGTTIN, None)
assert arbiter.num_workers == 3
def test_ttin_logs_info(self, arbiter):
"""SIGTTIN should log info about the change."""
arbiter._signal_handler(signal.SIGTTIN, None)
arbiter.log.info.assert_called()
call_args = arbiter.log.info.call_args[0]
assert "SIGTTIN" in call_args[0]
assert "3" in str(call_args)
def test_ttou_decreases_num_workers(self, arbiter):
"""SIGTTOU should decrease num_workers by 1."""
arbiter.num_workers = 3
arbiter._signal_handler(signal.SIGTTOU, None)
assert arbiter.num_workers == 2
def test_ttou_logs_info(self, arbiter):
"""SIGTTOU should log info about the change."""
arbiter.num_workers = 3
arbiter._signal_handler(signal.SIGTTOU, None)
arbiter.log.info.assert_called()
call_args = arbiter.log.info.call_args[0]
assert "SIGTTOU" in call_args[0]
assert "2" in str(call_args)
def test_ttou_respects_minimum_one_worker(self, arbiter):
"""SIGTTOU should not go below 1 worker by default."""
arbiter.num_workers = 1
arbiter._signal_handler(signal.SIGTTOU, None)
assert arbiter.num_workers == 1
def test_ttou_logs_warning_at_minimum(self, arbiter):
"""SIGTTOU should log warning when at minimum."""
arbiter.num_workers = 1
arbiter._signal_handler(signal.SIGTTOU, None)
arbiter.log.warning.assert_called()
call_args = arbiter.log.warning.call_args[0]
assert "Cannot decrease below" in call_args[0]
def test_ttou_respects_app_minimum(self, arbiter):
"""SIGTTOU should not go below app-required minimum."""
# App requires 3 workers
arbiter.app_specs = {
'myapp:HeavyTask': {
'import_path': 'myapp:HeavyTask',
'worker_count': 3,
'original_spec': 'myapp:HeavyTask:3',
}
}
arbiter.num_workers = 3
# Should not decrease below 3
arbiter._signal_handler(signal.SIGTTOU, None)
assert arbiter.num_workers == 3
arbiter.log.warning.assert_called()
def test_ttou_with_unlimited_app(self, arbiter):
"""Apps with worker_count=None should not impose minimum."""
arbiter.app_specs = {
'myapp:UnlimitedTask': {
'import_path': 'myapp:UnlimitedTask',
'worker_count': None,
'original_spec': 'myapp:UnlimitedTask',
}
}
arbiter.num_workers = 2
# Should decrease to 1 (default minimum)
arbiter._signal_handler(signal.SIGTTOU, None)
assert arbiter.num_workers == 1
def test_multiple_ttin_signals(self, arbiter):
"""Multiple TTIN signals should keep incrementing."""
assert arbiter.num_workers == 2
arbiter._signal_handler(signal.SIGTTIN, None)
arbiter._signal_handler(signal.SIGTTIN, None)
arbiter._signal_handler(signal.SIGTTIN, None)
assert arbiter.num_workers == 5
def test_multiple_ttou_signals(self, arbiter):
"""Multiple TTOU signals should decrement until minimum."""
arbiter.num_workers = 5
arbiter._signal_handler(signal.SIGTTOU, None)
arbiter._signal_handler(signal.SIGTTOU, None)
arbiter._signal_handler(signal.SIGTTOU, None)
arbiter._signal_handler(signal.SIGTTOU, None)
# Should stop at 1
assert arbiter.num_workers == 1
class TestGetMinimumWorkers:
"""Test _get_minimum_workers calculation."""
@pytest.fixture
def arbiter(self, tmp_path):
"""Create a DirtyArbiter for testing."""
from gunicorn.dirty.arbiter import DirtyArbiter
cfg = Mock()
cfg.dirty_workers = 2
cfg.dirty_apps = []
cfg.dirty_timeout = 30
cfg.dirty_graceful_timeout = 30
cfg.on_dirty_starting = Mock()
log = Mock()
arbiter = DirtyArbiter(cfg, log, socket_path=str(tmp_path / "test.sock"))
return arbiter
def test_minimum_workers_no_apps(self, arbiter):
"""With no apps, minimum should be 1."""
arbiter.app_specs = {}
assert arbiter._get_minimum_workers() == 1
def test_minimum_workers_single_app_with_limit(self, arbiter):
"""Single app with worker_count should set minimum."""
arbiter.app_specs = {
'app:Task': {
'import_path': 'app:Task',
'worker_count': 3,
'original_spec': 'app:Task:3',
}
}
assert arbiter._get_minimum_workers() == 3
def test_minimum_workers_single_app_unlimited(self, arbiter):
"""Single app with worker_count=None should use default minimum."""
arbiter.app_specs = {
'app:Task': {
'import_path': 'app:Task',
'worker_count': None,
'original_spec': 'app:Task',
}
}
assert arbiter._get_minimum_workers() == 1
def test_minimum_workers_multiple_apps_with_limits(self, arbiter):
"""Multiple apps should use the maximum worker_count."""
arbiter.app_specs = {
'app1:Task1': {
'import_path': 'app1:Task1',
'worker_count': 2,
'original_spec': 'app1:Task1:2',
},
'app2:Task2': {
'import_path': 'app2:Task2',
'worker_count': 4,
'original_spec': 'app2:Task2:4',
},
'app3:Task3': {
'import_path': 'app3:Task3',
'worker_count': 3,
'original_spec': 'app3:Task3:3',
},
}
# Maximum of (2, 4, 3) = 4
assert arbiter._get_minimum_workers() == 4
def test_minimum_workers_mixed_limited_and_unlimited(self, arbiter):
"""Mixed apps should use max of limited apps only."""
arbiter.app_specs = {
'app1:Task1': {
'import_path': 'app1:Task1',
'worker_count': 2,
'original_spec': 'app1:Task1:2',
},
'app2:Task2': {
'import_path': 'app2:Task2',
'worker_count': None,
'original_spec': 'app2:Task2',
},
'app3:Task3': {
'import_path': 'app3:Task3',
'worker_count': 4,
'original_spec': 'app3:Task3:4',
},
}
# Maximum of (2, 4) = 4, None is ignored
assert arbiter._get_minimum_workers() == 4
def test_minimum_workers_all_unlimited(self, arbiter):
"""All unlimited apps should use default minimum."""
arbiter.app_specs = {
'app1:Task1': {
'import_path': 'app1:Task1',
'worker_count': None,
'original_spec': 'app1:Task1',
},
'app2:Task2': {
'import_path': 'app2:Task2',
'worker_count': None,
'original_spec': 'app2:Task2',
},
}
assert arbiter._get_minimum_workers() == 1

View File

@ -0,0 +1,17 @@
FROM python:3.12-slim
RUN apt-get update && apt-get install -y --no-install-recommends \
curl procps \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
# Install gunicorn from source
COPY . /gunicorn-src/
RUN pip install --no-cache-dir /gunicorn-src/
# Copy test app
COPY tests/docker/dirty_ttin_ttou/app.py /app/
COPY tests/docker/dirty_ttin_ttou/gunicorn_conf.py /app/
CMD ["gunicorn", "-c", "gunicorn_conf.py", "app:app"]

View File

@ -0,0 +1,5 @@
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
"""Docker integration tests for dirty arbiter TTIN/TTOU signals."""

View File

@ -0,0 +1,71 @@
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
"""Test app with multiple dirty tasks for TTIN/TTOU testing."""
import json
import time
from gunicorn.dirty import DirtyApp, get_dirty_client
# Unlimited workers - runs on all dirty workers
class UnlimitedTask(DirtyApp):
"""Task that runs on all dirty workers."""
def setup(self):
pass
def process(self, data):
return {"task": "unlimited", "data": data}
# Limited to 2 workers
class LimitedTask(DirtyApp):
"""Task limited to 2 workers."""
workers = 2
def setup(self):
pass
def process(self, data):
delay = data.get("delay", 0)
if delay:
time.sleep(delay)
return {"task": "limited", "data": data}
def app(environ, start_response):
"""Simple WSGI app for testing."""
path = environ.get('PATH_INFO', '/')
if path == '/health':
start_response('200 OK', [('Content-Type', 'text/plain')])
return [b'OK']
if path == '/unlimited':
try:
client = get_dirty_client()
result = client.execute('app:UnlimitedTask', {'test': 'data'})
start_response('200 OK', [('Content-Type', 'application/json')])
return [json.dumps(result).encode()]
except Exception as e:
start_response('500 Internal Server Error',
[('Content-Type', 'text/plain')])
return [str(e).encode()]
if path == '/limited':
try:
client = get_dirty_client()
result = client.execute('app:LimitedTask', {'test': 'data'})
start_response('200 OK', [('Content-Type', 'application/json')])
return [json.dumps(result).encode()]
except Exception as e:
start_response('500 Internal Server Error',
[('Content-Type', 'text/plain')])
return [str(e).encode()]
start_response('404 Not Found', [('Content-Type', 'text/plain')])
return [b'Not Found']

View File

@ -0,0 +1,14 @@
services:
gunicorn:
build:
context: ../../..
dockerfile: tests/docker/dirty_ttin_ttou/Dockerfile
ports:
- "18000:8000"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 2s
timeout: 5s
retries: 15
start_period: 5s
stop_grace_period: 10s

View File

@ -0,0 +1,23 @@
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
"""Gunicorn configuration for TTIN/TTOU testing."""
bind = "0.0.0.0:8000"
workers = 2
worker_class = "gthread"
threads = 2
# Dirty arbiter config
dirty_apps = [
"app:UnlimitedTask",
"app:LimitedTask", # Has workers=2 attribute
]
dirty_workers = 3
dirty_timeout = 30
# Logging
loglevel = "debug"
accesslog = "-"
errorlog = "-"

View File

@ -0,0 +1,198 @@
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
"""Docker integration tests for dirty arbiter TTIN/TTOU signals."""
import os
import subprocess
import time
from pathlib import Path
import pytest
import requests
pytestmark = [
pytest.mark.docker,
pytest.mark.integration,
]
# Directory containing this test file
TEST_DIR = Path(__file__).parent
COMPOSE_FILE = TEST_DIR / "docker-compose.yml"
BASE_URL = "http://localhost:18000"
@pytest.fixture(scope="module")
def docker_services():
"""Start Docker services for the test module."""
# Start services
subprocess.run(
["docker", "compose", "-f", str(COMPOSE_FILE), "up", "-d", "--build"],
check=True,
cwd=TEST_DIR
)
# Wait for health
for _ in range(30):
try:
resp = requests.get(f"{BASE_URL}/health", timeout=2)
if resp.status_code == 200:
break
except requests.RequestException:
pass
time.sleep(1)
else:
# Print logs for debugging
subprocess.run(
["docker", "compose", "-f", str(COMPOSE_FILE), "logs"],
cwd=TEST_DIR
)
pytest.fail("Services did not become healthy")
yield
# Cleanup
subprocess.run(
["docker", "compose", "-f", str(COMPOSE_FILE), "down", "-v"],
cwd=TEST_DIR
)
def get_dirty_arbiter_pid():
"""Get the dirty arbiter PID from the container."""
result = subprocess.run(
["docker", "compose", "-f", str(COMPOSE_FILE),
"exec", "-T", "gunicorn", "pgrep", "-f", "dirty-arbiter"],
capture_output=True,
text=True,
cwd=TEST_DIR
)
pids = result.stdout.strip().split('\n')
# Return the first PID (there should only be one dirty-arbiter)
return int(pids[0]) if pids and pids[0] else None
def get_dirty_worker_count():
"""Get the current number of dirty workers."""
result = subprocess.run(
["docker", "compose", "-f", str(COMPOSE_FILE),
"exec", "-T", "gunicorn", "pgrep", "-c", "-f", "dirty-worker"],
capture_output=True,
text=True,
cwd=TEST_DIR
)
count = result.stdout.strip()
return int(count) if count else 0
def send_signal_to_dirty_arbiter(sig):
"""Send a signal to the dirty arbiter."""
pid = get_dirty_arbiter_pid()
if pid is None:
raise RuntimeError("Could not find dirty arbiter PID")
subprocess.run(
["docker", "compose", "-f", str(COMPOSE_FILE),
"exec", "-T", "gunicorn", "kill", f"-{sig}", str(pid)],
check=True,
cwd=TEST_DIR
)
class TestTTINSignal:
"""Test SIGTTIN increases dirty workers."""
def test_ttin_increases_workers(self, docker_services):
"""TTIN should spawn additional dirty worker."""
initial_count = get_dirty_worker_count()
assert initial_count == 3, f"Expected 3 initial workers, got {initial_count}"
send_signal_to_dirty_arbiter("TTIN")
time.sleep(2) # Wait for worker to spawn
new_count = get_dirty_worker_count()
assert new_count == 4, f"Expected 4 workers after TTIN, got {new_count}"
def test_multiple_ttin_increases(self, docker_services):
"""Multiple TTIN signals should keep increasing workers."""
# Get current count (may be 4 from previous test)
current_count = get_dirty_worker_count()
send_signal_to_dirty_arbiter("TTIN")
time.sleep(2)
new_count = get_dirty_worker_count()
assert new_count == current_count + 1
class TestTTOUSignal:
"""Test SIGTTOU decreases dirty workers."""
def test_ttou_decreases_workers(self, docker_services):
"""TTOU should kill a dirty worker."""
# First make sure we have more than minimum
send_signal_to_dirty_arbiter("TTIN")
time.sleep(2)
count_before = get_dirty_worker_count()
send_signal_to_dirty_arbiter("TTOU")
time.sleep(2)
count_after = get_dirty_worker_count()
assert count_after == count_before - 1
def test_ttou_respects_minimum(self, docker_services):
"""TTOU should not go below app minimum (2 for LimitedTask)."""
# Try to decrease multiple times
for _ in range(10):
send_signal_to_dirty_arbiter("TTOU")
time.sleep(0.5)
time.sleep(2) # Wait for all signals to be processed
# Should not go below 2 (LimitedTask.workers = 2)
final_count = get_dirty_worker_count()
assert final_count >= 2, f"Worker count {final_count} is below minimum of 2"
class TestUnlimitedApps:
"""Test apps with worker_count=None work correctly."""
def test_unlimited_app_works(self, docker_services):
"""UnlimitedTask should work."""
resp = requests.get(f"{BASE_URL}/unlimited", timeout=10)
assert resp.status_code == 200
data = resp.json()
assert data["task"] == "unlimited"
def test_limited_app_works(self, docker_services):
"""LimitedTask should work."""
resp = requests.get(f"{BASE_URL}/limited", timeout=10)
assert resp.status_code == 200
data = resp.json()
assert data["task"] == "limited"
def test_apps_work_after_scaling(self, docker_services):
"""Both apps should work after scaling up and down."""
# Scale up
send_signal_to_dirty_arbiter("TTIN")
time.sleep(2)
# Test both apps
resp = requests.get(f"{BASE_URL}/unlimited", timeout=10)
assert resp.status_code == 200
resp = requests.get(f"{BASE_URL}/limited", timeout=10)
assert resp.status_code == 200
# Scale down
send_signal_to_dirty_arbiter("TTOU")
time.sleep(2)
# Test both apps again
resp = requests.get(f"{BASE_URL}/unlimited", timeout=10)
assert resp.status_code == 200
resp = requests.get(f"{BASE_URL}/limited", timeout=10)
assert resp.status_code == 200