feat(ctl): query dirty arbiter for worker info in 'show all'

Add MSG_TYPE_STATUS to dirty protocol to allow querying the dirty
arbiter for its workers. The control socket now connects to the
dirty arbiter socket to retrieve worker information.
This commit is contained in:
Benoit Chesneau 2026-02-13 01:52:43 +01:00
parent 9f7000ff63
commit 3e6d6b94c5
5 changed files with 105 additions and 30 deletions

View File

@ -206,17 +206,21 @@ def format_all(data: dict) -> str:
dirty_workers = data.get("dirty_workers", [])
lines.append(f"DIRTY WORKERS ({data.get('dirty_worker_count', 0)})")
if dirty_workers:
lines.append(f" {'PID':<10} {'AGE':<6} {'APPS':<30} {'LAST_BEAT'}")
lines.append(f" {'-' * 58}")
lines.append(f" {'PID':<10} {'AGE':<6} {'APPS'}")
lines.append(f" {'-' * 50}")
for w in dirty_workers:
pid = w.get("pid", "?")
age = w.get("age", "?")
apps = ", ".join(w.get("apps", []))
if len(apps) > 28:
apps = apps[:25] + "..."
hb = w.get("last_heartbeat")
hb_str = f"{hb}s ago" if hb is not None else "n/a"
lines.append(f" {pid:<10} {age:<6} {apps:<30} {hb_str}")
apps = w.get("apps", [])
# Show each app on its own line if multiple
if apps:
first_app = apps[0].split(":")[-1] # Just the class name
lines.append(f" {pid:<10} {age:<6} {first_app}")
for app in apps[1:]:
app_name = app.split(":")[-1]
lines.append(f" {'':<10} {'':<6} {app_name}")
else:
lines.append(f" {pid:<10} {age:<6} (no apps)")
else:
lines.append(" (none)")
else:

View File

@ -441,7 +441,7 @@ class CommandHandlers:
# Sort by age
web_workers.sort(key=lambda w: w["age"])
# Dirty arbiter and workers
# Dirty arbiter info (runs in separate process)
dirty_arbiter_info = None
dirty_workers = []
@ -452,26 +452,8 @@ class CommandHandlers:
"role": "dirty master",
}
# Get dirty workers if we have access
dirty_arbiter = getattr(self.arbiter, 'dirty_arbiter', None)
if dirty_arbiter and hasattr(dirty_arbiter, 'workers'):
for pid, worker in dirty_arbiter.workers.items():
try:
last_update = worker.tmp.last_update()
last_heartbeat = round(now - last_update, 2)
except (OSError, ValueError, AttributeError):
last_heartbeat = None
dirty_workers.append({
"pid": pid,
"type": "dirty",
"age": worker.age,
"apps": getattr(worker, 'app_paths', []),
"booted": getattr(worker, 'booted', False),
"last_heartbeat": last_heartbeat,
})
dirty_workers.sort(key=lambda w: w["age"])
# Query dirty arbiter for worker info via its socket
dirty_workers = self._query_dirty_workers()
return {
"arbiter": arbiter_info,
@ -482,6 +464,47 @@ class CommandHandlers:
"dirty_worker_count": len(dirty_workers),
}
def _query_dirty_workers(self) -> list:
"""
Query the dirty arbiter for worker information.
Connects to the dirty arbiter socket and sends a status request.
Returns:
List of dirty worker info dicts, or empty list on error
"""
import socket
dirty_socket_path = os.environ.get('GUNICORN_DIRTY_SOCKET')
if not dirty_socket_path:
return []
try:
from gunicorn.dirty.protocol import DirtyProtocol
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.settimeout(2.0)
sock.connect(dirty_socket_path)
# Send status request
request = {
"type": DirtyProtocol.MSG_TYPE_STATUS,
"id": "ctl-status-1",
}
DirtyProtocol.write_message(sock, request)
# Read response
response = DirtyProtocol.read_message(sock)
sock.close()
if response.get("type") == DirtyProtocol.MSG_TYPE_RESPONSE:
data = response.get("data", {})
return data.get("workers", [])
except Exception:
pass
return []
def help(self) -> dict:
"""
Return list of available commands.

View File

@ -423,6 +423,9 @@ class DirtyArbiter:
# Handle stash operations
if msg_type == DirtyProtocol.MSG_TYPE_STASH:
await self.handle_stash_request(message, writer)
# Handle status queries
elif msg_type == DirtyProtocol.MSG_TYPE_STATUS:
await self.handle_status_request(message, writer)
else:
# Route request to a dirty worker - pass writer for streaming
await self.route_request(message, writer)
@ -646,6 +649,47 @@ class DirtyArbiter:
# Stash (shared state) operations - handled directly in arbiter
# -------------------------------------------------------------------------
async def handle_status_request(self, message, client_writer):
"""
Handle a status query request.
Returns information about the dirty arbiter and its workers.
Args:
message: Status request message
client_writer: StreamWriter to send response to client
"""
request_id = message.get("id", "unknown")
now = time.monotonic()
workers_info = []
for pid, worker in self.workers.items():
try:
last_update = worker.tmp.last_update()
last_heartbeat = round(now - last_update, 2)
except (OSError, ValueError, AttributeError):
last_heartbeat = None
workers_info.append({
"pid": pid,
"age": worker.age,
"apps": getattr(worker, 'app_paths', []),
"booted": getattr(worker, 'booted', False),
"last_heartbeat": last_heartbeat,
})
workers_info.sort(key=lambda w: w["age"])
result = {
"arbiter_pid": self.pid,
"workers": workers_info,
"worker_count": len(workers_info),
"apps": list(self.app_specs.keys()) if self.app_specs else [],
}
response = make_response(request_id, result)
await DirtyProtocol.write_message_async(client_writer, response)
async def handle_stash_request(self, message, client_writer):
"""
Handle a stash operation directly in the arbiter.

View File

@ -43,6 +43,7 @@ MSG_TYPE_ERROR = 0x03
MSG_TYPE_CHUNK = 0x04
MSG_TYPE_END = 0x05
MSG_TYPE_STASH = 0x10 # Stash operations (shared state between workers)
MSG_TYPE_STATUS = 0x11 # Status query for arbiter/workers
# Message type names (for backwards compatibility with old API)
MSG_TYPE_REQUEST_STR = "request"
@ -51,6 +52,7 @@ MSG_TYPE_ERROR_STR = "error"
MSG_TYPE_CHUNK_STR = "chunk"
MSG_TYPE_END_STR = "end"
MSG_TYPE_STASH_STR = "stash"
MSG_TYPE_STATUS_STR = "status"
# Map int types to string names
MSG_TYPE_TO_STR = {
@ -60,6 +62,7 @@ MSG_TYPE_TO_STR = {
MSG_TYPE_CHUNK: MSG_TYPE_CHUNK_STR,
MSG_TYPE_END: MSG_TYPE_END_STR,
MSG_TYPE_STASH: MSG_TYPE_STASH_STR,
MSG_TYPE_STATUS: MSG_TYPE_STATUS_STR,
}
# Map string names to int types
@ -98,6 +101,7 @@ class BinaryProtocol:
MSG_TYPE_CHUNK = MSG_TYPE_CHUNK_STR
MSG_TYPE_END = MSG_TYPE_END_STR
MSG_TYPE_STASH = MSG_TYPE_STASH_STR
MSG_TYPE_STATUS = MSG_TYPE_STATUS_STR
@staticmethod
def encode_header(msg_type: int, request_id: int, payload_length: int) -> bytes:

View File

@ -381,7 +381,7 @@ class TestShowAll:
assert "dirty_arbiter" in result
assert result["dirty_arbiter"] is None
assert "dirty_workers" in result
# No dirty workers when no dirty arbiter
assert result["dirty_worker_count"] == 0
def test_show_all_with_dirty(self):