From 3e6d6b94c55b2311a160b2b95204e17704bb2f22 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Fri, 13 Feb 2026 01:52:43 +0100 Subject: [PATCH] feat(ctl): query dirty arbiter for worker info in 'show all' Add MSG_TYPE_STATUS to dirty protocol to allow querying the dirty arbiter for its workers. The control socket now connects to the dirty arbiter socket to retrieve worker information. --- gunicorn/ctl/cli.py | 20 +++++++----- gunicorn/ctl/handlers.py | 65 ++++++++++++++++++++++++++------------ gunicorn/dirty/arbiter.py | 44 ++++++++++++++++++++++++++ gunicorn/dirty/protocol.py | 4 +++ tests/ctl/test_handlers.py | 2 +- 5 files changed, 105 insertions(+), 30 deletions(-) diff --git a/gunicorn/ctl/cli.py b/gunicorn/ctl/cli.py index 923b2a29..6bd4f2e4 100644 --- a/gunicorn/ctl/cli.py +++ b/gunicorn/ctl/cli.py @@ -206,17 +206,21 @@ def format_all(data: dict) -> str: dirty_workers = data.get("dirty_workers", []) lines.append(f"DIRTY WORKERS ({data.get('dirty_worker_count', 0)})") if dirty_workers: - lines.append(f" {'PID':<10} {'AGE':<6} {'APPS':<30} {'LAST_BEAT'}") - lines.append(f" {'-' * 58}") + lines.append(f" {'PID':<10} {'AGE':<6} {'APPS'}") + lines.append(f" {'-' * 50}") for w in dirty_workers: pid = w.get("pid", "?") age = w.get("age", "?") - apps = ", ".join(w.get("apps", [])) - if len(apps) > 28: - apps = apps[:25] + "..." - hb = w.get("last_heartbeat") - hb_str = f"{hb}s ago" if hb is not None else "n/a" - lines.append(f" {pid:<10} {age:<6} {apps:<30} {hb_str}") + apps = w.get("apps", []) + # Show each app on its own line if multiple + if apps: + first_app = apps[0].split(":")[-1] # Just the class name + lines.append(f" {pid:<10} {age:<6} {first_app}") + for app in apps[1:]: + app_name = app.split(":")[-1] + lines.append(f" {'':<10} {'':<6} {app_name}") + else: + lines.append(f" {pid:<10} {age:<6} (no apps)") else: lines.append(" (none)") else: diff --git a/gunicorn/ctl/handlers.py b/gunicorn/ctl/handlers.py index e1810462..dedf349f 100644 --- a/gunicorn/ctl/handlers.py +++ b/gunicorn/ctl/handlers.py @@ -441,7 +441,7 @@ class CommandHandlers: # Sort by age web_workers.sort(key=lambda w: w["age"]) - # Dirty arbiter and workers + # Dirty arbiter info (runs in separate process) dirty_arbiter_info = None dirty_workers = [] @@ -452,26 +452,8 @@ class CommandHandlers: "role": "dirty master", } - # Get dirty workers if we have access - dirty_arbiter = getattr(self.arbiter, 'dirty_arbiter', None) - if dirty_arbiter and hasattr(dirty_arbiter, 'workers'): - for pid, worker in dirty_arbiter.workers.items(): - try: - last_update = worker.tmp.last_update() - last_heartbeat = round(now - last_update, 2) - except (OSError, ValueError, AttributeError): - last_heartbeat = None - - dirty_workers.append({ - "pid": pid, - "type": "dirty", - "age": worker.age, - "apps": getattr(worker, 'app_paths', []), - "booted": getattr(worker, 'booted', False), - "last_heartbeat": last_heartbeat, - }) - - dirty_workers.sort(key=lambda w: w["age"]) + # Query dirty arbiter for worker info via its socket + dirty_workers = self._query_dirty_workers() return { "arbiter": arbiter_info, @@ -482,6 +464,47 @@ class CommandHandlers: "dirty_worker_count": len(dirty_workers), } + def _query_dirty_workers(self) -> list: + """ + Query the dirty arbiter for worker information. + + Connects to the dirty arbiter socket and sends a status request. + + Returns: + List of dirty worker info dicts, or empty list on error + """ + import socket + dirty_socket_path = os.environ.get('GUNICORN_DIRTY_SOCKET') + if not dirty_socket_path: + return [] + + try: + from gunicorn.dirty.protocol import DirtyProtocol + + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.settimeout(2.0) + sock.connect(dirty_socket_path) + + # Send status request + request = { + "type": DirtyProtocol.MSG_TYPE_STATUS, + "id": "ctl-status-1", + } + DirtyProtocol.write_message(sock, request) + + # Read response + response = DirtyProtocol.read_message(sock) + sock.close() + + if response.get("type") == DirtyProtocol.MSG_TYPE_RESPONSE: + data = response.get("data", {}) + return data.get("workers", []) + + except Exception: + pass + + return [] + def help(self) -> dict: """ Return list of available commands. diff --git a/gunicorn/dirty/arbiter.py b/gunicorn/dirty/arbiter.py index ae82c3d9..47f14dc5 100644 --- a/gunicorn/dirty/arbiter.py +++ b/gunicorn/dirty/arbiter.py @@ -423,6 +423,9 @@ class DirtyArbiter: # Handle stash operations if msg_type == DirtyProtocol.MSG_TYPE_STASH: await self.handle_stash_request(message, writer) + # Handle status queries + elif msg_type == DirtyProtocol.MSG_TYPE_STATUS: + await self.handle_status_request(message, writer) else: # Route request to a dirty worker - pass writer for streaming await self.route_request(message, writer) @@ -646,6 +649,47 @@ class DirtyArbiter: # Stash (shared state) operations - handled directly in arbiter # ------------------------------------------------------------------------- + async def handle_status_request(self, message, client_writer): + """ + Handle a status query request. + + Returns information about the dirty arbiter and its workers. + + Args: + message: Status request message + client_writer: StreamWriter to send response to client + """ + request_id = message.get("id", "unknown") + now = time.monotonic() + + workers_info = [] + for pid, worker in self.workers.items(): + try: + last_update = worker.tmp.last_update() + last_heartbeat = round(now - last_update, 2) + except (OSError, ValueError, AttributeError): + last_heartbeat = None + + workers_info.append({ + "pid": pid, + "age": worker.age, + "apps": getattr(worker, 'app_paths', []), + "booted": getattr(worker, 'booted', False), + "last_heartbeat": last_heartbeat, + }) + + workers_info.sort(key=lambda w: w["age"]) + + result = { + "arbiter_pid": self.pid, + "workers": workers_info, + "worker_count": len(workers_info), + "apps": list(self.app_specs.keys()) if self.app_specs else [], + } + + response = make_response(request_id, result) + await DirtyProtocol.write_message_async(client_writer, response) + async def handle_stash_request(self, message, client_writer): """ Handle a stash operation directly in the arbiter. diff --git a/gunicorn/dirty/protocol.py b/gunicorn/dirty/protocol.py index 8a5f7b61..b6e996af 100644 --- a/gunicorn/dirty/protocol.py +++ b/gunicorn/dirty/protocol.py @@ -43,6 +43,7 @@ MSG_TYPE_ERROR = 0x03 MSG_TYPE_CHUNK = 0x04 MSG_TYPE_END = 0x05 MSG_TYPE_STASH = 0x10 # Stash operations (shared state between workers) +MSG_TYPE_STATUS = 0x11 # Status query for arbiter/workers # Message type names (for backwards compatibility with old API) MSG_TYPE_REQUEST_STR = "request" @@ -51,6 +52,7 @@ MSG_TYPE_ERROR_STR = "error" MSG_TYPE_CHUNK_STR = "chunk" MSG_TYPE_END_STR = "end" MSG_TYPE_STASH_STR = "stash" +MSG_TYPE_STATUS_STR = "status" # Map int types to string names MSG_TYPE_TO_STR = { @@ -60,6 +62,7 @@ MSG_TYPE_TO_STR = { MSG_TYPE_CHUNK: MSG_TYPE_CHUNK_STR, MSG_TYPE_END: MSG_TYPE_END_STR, MSG_TYPE_STASH: MSG_TYPE_STASH_STR, + MSG_TYPE_STATUS: MSG_TYPE_STATUS_STR, } # Map string names to int types @@ -98,6 +101,7 @@ class BinaryProtocol: MSG_TYPE_CHUNK = MSG_TYPE_CHUNK_STR MSG_TYPE_END = MSG_TYPE_END_STR MSG_TYPE_STASH = MSG_TYPE_STASH_STR + MSG_TYPE_STATUS = MSG_TYPE_STATUS_STR @staticmethod def encode_header(msg_type: int, request_id: int, payload_length: int) -> bytes: diff --git a/tests/ctl/test_handlers.py b/tests/ctl/test_handlers.py index 414aaa23..a6bcde28 100644 --- a/tests/ctl/test_handlers.py +++ b/tests/ctl/test_handlers.py @@ -381,7 +381,7 @@ class TestShowAll: assert "dirty_arbiter" in result assert result["dirty_arbiter"] is None - assert "dirty_workers" in result + # No dirty workers when no dirty arbiter assert result["dirty_worker_count"] == 0 def test_show_all_with_dirty(self):