diff --git a/README.rst b/README.rst index 98adea5b..adf12d7c 100644 --- a/README.rst +++ b/README.rst @@ -1,5 +1,5 @@ Fork Information --------- +---------------- This is a fork of gunicorn with the following changes: diff --git a/docs/design/companion-process-manager.md b/docs/design/companion-process-manager.md new file mode 100644 index 00000000..15066de4 --- /dev/null +++ b/docs/design/companion-process-manager.md @@ -0,0 +1,758 @@ + +Status: proposal / draft +Author: Tanmoy Sarkar +Scope: `gunicorn/arbiter.py`, `gunicorn/config.py`, `gunicorn/companion/` + +## 1. Problem + +A Frappe deployment is not only HTTP workers. + +Alongside Gunicorn, we usually run persistent non-HTTP processes: + +- RQ worker pools +- scheduler +- socket.io / websocket server +- custom background daemons + +Today these are usually managed separately through supervisor/systemd. + +That causes: + +- repeated app memory usage +- separate lifecycle for web and side processes +- reload drift between HTTP workers and background processes +- inconsistent shutdown behavior +- harder production process control + +With `preload_app=True`, Gunicorn workers already share preloaded app memory using copy-on-write. The goal is to give non-HTTP processes the same lifecycle and memory-sharing benefit without making them HTTP workers. + +## 2. Goal + +Gunicorn manages one extra child process: the **Companion Manager**. + +The Companion Manager manages all configured companion processes. + +```text +gunicorn master + ├── HTTP worker + ├── HTTP worker + └── companion manager + ├── rq-default + ├── rq-long + ├── scheduler + └── socketio +``` + +Core rule: + +```text +Gunicorn Arbiter manages one Companion Manager. +Companion Manager manages companion processes. +Each companion process manages its own internals. +``` + +## 3. Architecture + +```text + gunicorn master + preload_app=True + │ + ┌─────────────────────┼─────────────────────┐ + │ │ │ + ▼ ▼ ▼ + HTTP worker HTTP worker companion manager + serves HTTP serves HTTP manages companions + │ + ┌──────────────────────┼──────────────────────┐ + │ │ │ + ▼ ▼ ▼ + rq-default scheduler socketio +``` + +Memory sharing still works: + +```text +gunicorn master preloads app + └── forks companion manager + └── forks rq / scheduler / socketio +``` + +The manager is forked from the preloaded master. Companion processes are forked from the manager, so they can inherit preloaded application memory. + +## 4. Responsibility Boundary + +### Gunicorn Arbiter + +The Arbiter should: + +- start the Companion Manager +- restart it if it crashes +- stop it during Gunicorn shutdown +- ask it to `reread` config when needed +- avoid per-companion process logic + +### Companion Manager + +The manager should: + +- load and validate companion config +- spawn/reap companions +- stop/start/restart companions +- restart unexpected exits after a fixed delay +- track state and expose `status` +- expose a Unix control socket +- redirect stdout/stderr +- apply env and cwd +- log lifecycle events + +### Companion Process + +A companion runs the actual service, such as RQ, scheduler, socket.io, or a custom daemon. + +The companion process owns its own internals: + +- signal handling +- job draining +- child workers +- sockets +- event loops + +## 5. Companion Is Not an HTTP Worker + +A companion must not: + +- serve Gunicorn HTTP traffic +- use Gunicorn listener sockets +- use Gunicorn worker heartbeat files +- trigger HTTP worker boot-error halt behavior +- call HTTP worker lifecycle hooks + +If a companion exits with `WORKER_BOOT_ERROR` or `APP_LOAD_ERROR`, the web tier must not halt. The manager treats it as a normal companion exit. + +## 6. Configuration + +Use dict-based config. + +```python +preload_app = True + +companion_config_file = "/home/frappe/frappe-bench/companion.conf.py" +companion_control_socket = "/run/gunicorn/companion.sock" + +companion_workers = [ + { + "name": "rq-default", + "target": "frappe_companions:start_rq_default", + "cwd": "/home/frappe/frappe-bench", + "env": {"QUEUE": "default"}, + "stop_signal": "SIGTERM", + "stop_timeout": 300, + "reload_timeout": 60, + "stdout": "/var/log/frappe/rq-default.log", + "stderr": "/var/log/frappe/rq-default.error.log", + }, + { + "name": "socketio", + "target": "frappe_companions:start_socketio", + "cwd": "/home/frappe/frappe-bench", + "stop_signal": "SIGTERM", + "stop_timeout": 60, + "reload_timeout": 30, + "stdout": "/var/log/frappe/socketio.log", + "stderr": "/var/log/frappe/socketio.error.log", + }, +] +``` + +Global defaults: + +```python +companion_stop_signal = "SIGTERM" +companion_stop_timeout = 60 +companion_reload_timeout = 60 + +companion_stdout = None +companion_stderr = None +companion_cwd = None +companion_env = {} + +companion_startsecs = 1 +companion_restart_delay = 5 + +# seconds; used when manager timeout is computed dynamically +companion_manager_shutdown_buffer = 10 +companion_manager_stop_timeout = None + +companion_control_socket_mode = 0o600 +``` + +If manager timeouts are unset, compute them dynamically: + +```text +manager_stop_timeout = max(companion.stop_timeout) + companion_manager_shutdown_buffer +manager_reload_timeout = max(companion.reload_timeout) + companion_manager_shutdown_buffer +``` + +## 7. Config Fields + +Required: + +| Field | Meaning | +| -------- | --------------------------------------- | +| `name` | Unique process name | +| `target` | Zero-argument callable or import string | + +Optional: + +| Field | Meaning | +| ---------------- | -------------------------------------------------------------------------- | +| `cwd` | Working directory before target | +| `env` | Extra environment variables | +| `stop_signal` | Signal used on stop | +| `stop_timeout` | Max wait during shutdown | +| `reload_timeout` | Max wait during restart/reread | +| `stdout` | Stdout log file or inherit | +| `stderr` | Stderr log file, `stdout`, or inherit | +| `startsecs` | Seconds process must survive before `RUNNING`; makes `STARTING` meaningful | + +Validation must reject unknown keys, duplicate names, invalid signals/timeouts, invalid stdout/stderr values, and targets that are not zero-argument callables/import strings. + +Not supported: groups, disable/fatal state, max restart count, exponential backoff, process groups, per-companion user switching, HTTP/TCP health checks, process-specific RQ/socket.io behavior. + +## 8. Public States + +Status should mimic `supervisorctl status`. + +```text +STOPPED +STARTING +RUNNING +BACKOFF +STOPPING +``` + +| State | Meaning | +| ---------- | ---------------------------------------------------------------------------- | +| `STOPPED` | Manually stopped or not started | +| `STARTING` | Forked, but has not survived `startsecs` | +| `RUNNING` | Alive and survived `startsecs` | +| `BACKOFF` | Exited unexpectedly; will restart after `companion_restart_delay` | +| `STOPPING` | Stop is in progress, from first signal through optional `SIGKILL` until exit | + +No public `EXITED`, `UNKNOWN`, or `FATAL`. + +Exit metadata is tracked separately: + +```text +last_exit_code +last_exit_signal +last_exited_at +exit_count +``` + +## 9. State Transitions + +```text +STOPPED + └─ start + → STARTING + +STARTING + ├─ survives startsecs + │ → RUNNING + ├─ exits unexpectedly + │ → BACKOFF + └─ stop / restart / removed-by-reread + → STOPPING + +RUNNING + ├─ exits unexpectedly + │ → BACKOFF + └─ stop / restart / removed-by-reread + → STOPPING + +BACKOFF + ├─ retry timer expires + │ → STARTING + └─ stop + → STOPPED + +STOPPING + ├─ process exits + │ → STOPPED + └─ timeout exceeded + → SIGKILL + → STOPPED +``` + +When `waitpid` reaps a child, the manager records exit metadata and immediately moves to the next public state. + +Early exit during `STARTING` and unexpected exit after `RUNNING` both use the same fixed restart delay. + +## 10. Restart Behavior + +Configured companions are expected to stay running. + +Unexpected exit: + +```text +record exit metadata +state = BACKOFF +next_retry_at = now + companion_restart_delay +restart after companion_restart_delay +``` + +Default: + +```python +companion_restart_delay = 5 +``` + +There is no exponential backoff, max restart count, disable state, or fatal state. + +A configured process restarts forever unless: + +- manually stopped +- removed from config by `reread` +- Gunicorn is stopping/reloading + +## 11. Control Socket + +The manager exposes a Unix domain socket: + +```python +companion_control_socket = "/run/gunicorn/companion.sock" +``` + +Default permissions: + +```python +companion_control_socket_mode = 0o600 +``` + +Gunicorn runs as a non-root user, so the socket is owned by that user and no +group ownership switching is supported. + +Protocol: newline-delimited JSON. + +Commands: + +```text +status +reread +start +stop +restart +``` + +The manager creates the socket before entering the main loop. During full manager replacement, clients should retry on `ENOENT`, `ECONNREFUSED`, or timeout. + +## 12. Command Semantics + +### `status` + +Request: + +```json +{"cmd": "status"} +``` + +Human output should mimic `supervisorctl status`: + +```text +rq-default RUNNING pid 1234, uptime 2 days, 03:12:44 +rq-long BACKOFF exited with status 1, retrying in 3s +scheduler STOPPED stopped manually +``` + +JSON response: + +```json +{ + "ok": true, + "companions": [ + { + "name": "rq-default", + "state": "RUNNING", + "pid": 1234, + "description": "pid 1234, uptime 2 days, 03:12:44" + }, + { + "name": "rq-long", + "state": "BACKOFF", + "pid": null, + "description": "exited with status 1, retrying in 3s", + "next_retry_at": 1730000000, + "restart_delay": 5, + "last_exit_code": 1 + } + ] +} +``` + +### `start ` + +```json +{"cmd": "start", "name": "rq-default"} +``` + +Uses latest validated config. + +```text +STOPPED -> clear manual_stop, start now +BACKOFF -> cancel pending retry, clear manual_stop, start now +RUNNING -> success: already running +STARTING -> success: already starting +STOPPING -> error: process is stopping; poll status and retry +``` + +### `stop ` + +```json +{"cmd": "stop", "name": "rq-default"} +``` + +```text +RUNNING -> send stop_signal, wait stop_timeout, SIGKILL if needed, STOPPED +STARTING -> send stop_signal, wait stop_timeout, SIGKILL if needed, STOPPED +BACKOFF -> cancel pending retry, STOPPED +STOPPED -> success: already stopped +STOPPING -> success: already stopping +``` + +`stop` sets `manual_stop = True`. + +If stopping while `STARTING`, `stop_timeout` governs the stop window, not `startsecs`. + +### `restart ` + +```json +{"cmd": "restart", "name": "rq-default"} +``` + +```text +RUNNING -> clear manual_stop, stop using reload_timeout, start +STARTING -> enter STOPPING, stop current child using reload_timeout, start +BACKOFF -> cancel pending retry, clear manual_stop, start immediately +STOPPED -> clear manual_stop, start immediately +STOPPING -> error: process is stopping; poll status and retry +``` + +`restart` does not reread config. + +### `reread` + +```json +{"cmd": "reread"} +``` + +Transactional config reload: + +```text +new process -> add and start +removed process -> stop and remove +changed process -> update config; restart unless manual_stop=True +unchanged process -> keep current state +``` + +If a manually stopped process changes config: + +```text +update stored config +keep STOPPED +next start uses latest config +``` + +Success: + +```json +{ + "ok": true, + "added": ["new-worker"], + "removed": ["old-worker"], + "restarted": ["rq-default"], + "unchanged": ["socketio"] +} +``` + +`unchanged` means no process action was taken. It may include manually stopped companions whose config changed; the new config is accepted and stored, and the next `start ` uses it. + +Failure: + +```json +{ + "ok": false, + "error": "invalid config: duplicate companion name rq-default", + "kept_old_config": true +} +``` + +`kept_old_config=true` means no running process was changed and previous validated config remains active. + +## 13. Reread Diff + +Use one stable config hash per companion. + +```text +new name -> add/start +missing name -> stop/remove +hash changed -> update config; restart unless manual_stop=True +hash unchanged -> no process action +``` + +This intentionally restarts even if only `stop_timeout`, `stdout`, or `env` changes. Simpler and easier to test. + +`reread` flow: + +1. Read config file. +2. Extract companion settings. +3. Validate full config. +4. Compute one config hash per companion. +5. Diff old/new config. +6. Apply only if validation succeeds. + +Prefer a dedicated config file: + +```python +companion_config_file = "/home/frappe/frappe-bench/companion.conf.py" +``` + +If unset, the manager may fall back to Gunicorn config file, but must read only companion settings. + +## 14. stdout/stderr, env, cwd + +### stdout/stderr + +```python +"stdout": "/var/log/frappe/rq-default.log", +"stderr": "/var/log/frappe/rq-default.error.log", +``` + +Allowed: + +```python +None +"inherit" +"stdout" # only for stderr +"/path/to/file.log" +``` + +The companion child opens stdout/stderr after fork and before `target()`. + +Files are opened in append mode. + +Log rotation is external: + +- `copytruncate` works without restart +- `create`/rename rotation needs companion restart +- live fd reopen for already-running companions is out of scope + +### env/cwd + +Before `target()`: + +```python +os.chdir(cwd) +os.environ.update(env) +``` + +Changing stdout/stderr/env/cwd changes the config hash and causes restart unless manually stopped. + +## 15. File Descriptors + +Manager child must close Gunicorn-only fds: + +- master signal pipe +- HTTP listener sockets +- worker heartbeat tmp files + +Companion children must close manager-only fds before running target. + +Companions must not keep Gunicorn HTTP listener sockets open. + +## 16. Parent Death / Orphan Cleanup + +Manager exits if Gunicorn master dies. + +Linux: + +```python +prctl(PR_SET_PDEATHSIG, SIGTERM) +``` + +Non-Linux fallback: + +```text +manager records parent pid +manager checks os.getppid() every 5 seconds +if os.getppid() returns 1, manager exits +``` + +Companion children should also use parent-death signal where available. Without Linux `prctl`, cleanup after manager death is best-effort because target code takes over. + +## 17. Internal State + +Maintain enough state for `status`: + +- name +- state +- pid +- uptime +- restart count +- exit count +- last exit code/signal +- last started/exited time +- next retry time +- stop timeout kills +- manual stop flag +- stdout/stderr path + +No Prometheus exporter inside the manager. + +## 18. Implementation Layout + +```text +gunicorn/companion/ + __init__.py + config.py + process.py + manager.py + control.py +``` + +`config.py`: + +- load config +- validate config +- normalize defaults +- compute config hash + +`process.py`: + +- `CompanionConfig` +- `CompanionProcess` +- state model + +`manager.py`: + +- run loop +- spawn/reap +- start/stop/restart +- fixed restart delay +- state transitions +- stdout/stderr/env/cwd setup + +`control.py`: + +- Unix socket server +- JSON command parser +- JSON response writer + +## 19. Arbiter Changes + +Keep Arbiter changes small: + +- manager state +- spawn manager +- reap manager +- stop manager +- reload/reread manager +- helper to call control socket if needed + +No per-companion logic in Arbiter. + +## 20. Implementation Tasks + +- [x] Add companion config settings in `gunicorn/config.py`. +- [x] Add config validation for `companion_workers`. +- [x] Add `CompanionConfig` and config hash generation. +- [x] Add public process states. +- [x] Add `CompanionProcess` runtime state. +- [x] Add status description helpers. +- [x] Add `CompanionManager` skeleton. +- [x] Spawn one companion process from the manager. +- [x] Apply `cwd` and `env` before target. +- [x] Redirect `stdout` and `stderr`. +- [x] Reap exited companion processes. +- [x] Implement `STARTING -> RUNNING` using `startsecs`. +- [x] Implement `BACKOFF` with fixed `companion_restart_delay`. +- [x] Implement `start_process`. +- [x] Implement `stop_process`. +- [x] Implement `restart_process`. +- [x] Preserve and clear `manual_stop` correctly. +- [x] Add Unix control socket. +- [x] Implement JSON command protocol. +- [x] Implement `status`. +- [x] Implement `start`. +- [x] Implement `stop`. +- [x] Implement `restart`. +- [x] Implement transactional `reread`. +- [x] Add manager spawn/reap logic in Arbiter. +- [x] Add manager shutdown handling in Arbiter. +- [x] Wire Gunicorn reload to restart the manager only when companion config changed. +- [x] Close Gunicorn-only fds in manager child. +- [x] Close manager-only fds in companion child. +- [x] Add parent-death cleanup. +- [x] Add lifecycle logs. +- [x] Add tests for config validation. +- [x] Add tests for state transitions. +- [x] Add tests for control commands. +- [x] Add tests for transactional reread. +- [x] Add tests that HTTP worker behavior is unchanged. + +## 21. Test Plan + +Test: + +- config validation +- config hash diff +- transactional reread +- `reread` success/failure response +- manual stop + reread behavior +- `start`, `stop`, `restart` on all public states +- control socket commands and permissions +- control socket unavailable retry behavior +- supervisord-like status output +- state transitions +- manager lifecycle from Arbiter +- companion spawn/reap +- fixed 5s restart delay +- `startsecs` behavior +- stdout/stderr redirection +- env and cwd +- fd cleanup +- parent-death cleanup +- HTTP worker behavior unchanged + +## 22. Out of Scope + +Not supported: + +- groups +- dependency ordering +- process group killing +- disable/fatal state +- max restart count +- exponential backoff +- CLI config for companion specs +- RQ/socket.io/scheduler-specific behavior +- per-companion user switching +- HTTP/TCP/custom health checks +- live log fd reopen for already-running companions + +## 23. Summary + +Use a Companion Manager, not direct companion management inside Arbiter. + +This gives: + +- shared memory through `preload_app=True` +- small Arbiter changes +- supervisord-like process management and status +- controlled `start`, `stop`, `restart`, `reread`, `status` +- transactional config reread +- fixed restart delay +- simple process-running health +- per-companion env/cwd/stdout/stderr +- simple public state machine +- safer shutdown/reload behavior diff --git a/docs/source/companion.rst b/docs/source/companion.rst new file mode 100644 index 00000000..012d6bf0 --- /dev/null +++ b/docs/source/companion.rst @@ -0,0 +1,206 @@ +.. _companion: + +=================== +Companion Processes +=================== + +Most real deployments run more than HTTP workers. Alongside the web server you +often have background processes: task queues (RQ, Celery), a scheduler, a +websocket / socket.io server, or custom daemons. Normally these are started and +supervised separately with systemd or supervisor. + +The **companion process manager** lets Gunicorn run those processes for you, as +children of the same master. They get the same lifecycle as your web workers +and, when you use :ref:`preload-app`, they share the preloaded application +memory through copy-on-write. + +Why use it +========== + +- **One thing to run.** Web workers and background processes start, stop, and + reload together under a single Gunicorn command. +- **Less memory.** With ``--preload`` the application is loaded once in the + master; companions fork from it and share that memory instead of each loading + their own copy. +- **No drift.** There is one place that owns the lifecycle, so background + processes don't get out of step with the web workers. + +If you only run HTTP workers, you don't need this feature and can ignore it. + +How it works +============ + +Gunicorn forks **one** extra child after preload: the *companion manager*. The +manager forks and supervises each companion you configured. The arbiter only +watches the single manager; the manager handles everything below it. + +.. code-block:: text + + gunicorn master (preloaded app) + ├── HTTP worker + ├── HTTP worker + └── companion manager + ├── rq-default + ├── scheduler + └── socketio + +Each companion is just a Python callable you point Gunicorn at. The manager +forks a fresh process, runs the callable, and keeps it alive: if it crashes, the +manager restarts it after a short delay. + +Quick start +=========== + +A companion is configured in your normal Gunicorn config file (the one you pass +with ``-c``). Each entry needs a ``name`` and a ``target``. The target is a +``"module:callable"`` string; the callable takes no arguments and runs the +process (it is expected to block, like a worker's main loop). + +.. code-block:: python + + # gunicorn.conf.py + preload_app = True # required to share memory with companions + + companion_workers = [ + {"name": "scheduler", "target": "myapp.tasks:run_scheduler"}, + {"name": "rq-default", "target": "myapp.tasks:run_rq", "env": {"QUEUE": "default"}}, + ] + +Run Gunicorn as usual:: + + gunicorn -c gunicorn.conf.py --preload myapp:application + +You'll see the companion manager and each companion start in the logs. + +Per-companion options +--------------------- + +Each entry in ``companion_workers`` may set these keys in addition to ``name`` +and ``target``: + +.. list-table:: + :header-rows: 1 + :widths: 20 80 + + * - Key + - Meaning + * - ``cwd`` + - Directory to change into before running the target. + * - ``env`` + - Extra environment variables, merged onto the inherited env. + * - ``stop_signal`` + - Signal sent to ask the companion to stop (default ``SIGTERM``). + * - ``stop_timeout`` + - Seconds to wait after the stop signal before ``SIGKILL``. + * - ``reload_timeout`` + - Seconds to wait for the old process to exit on restart. + * - ``startsecs`` + - Seconds a companion must stay up to count as started. + * - ``stdout`` + - File path for stdout, or ``"inherit"`` (the default). + * - ``stderr`` + - File path, ``"stdout"`` to merge with stdout, or ``"inherit"``. + +Any key you leave out falls back to the matching global setting +(``companion_stop_signal``, ``companion_stop_timeout``, and so on), so you can +set a default once and override it per companion. + +Keeping companions in a separate file +-------------------------------------- + +If you want to change companion specs without touching your web config, put the +``companion_*`` settings in their own Python file and point Gunicorn at it:: + + companion_config_file = "/etc/gunicorn/companions.py" + +The manager reads its companion settings from that file instead of the main +config. + +States +====== + +A companion is always in one of these states (the same vocabulary as +``supervisorctl``): + +.. list-table:: + :header-rows: 1 + :widths: 20 80 + + * - State + - Meaning + * - ``STARTING`` + - Just forked, not yet past ``startsecs``. + * - ``RUNNING`` + - Up and healthy. + * - ``BACKOFF`` + - Crashed; waiting ``restart_delay`` seconds before retrying. + * - ``STOPPING`` + - Was asked to stop; draining before exit. + * - ``STOPPED`` + - Stopped on purpose and will not auto-restart. + +A companion that exits on its own goes to ``BACKOFF`` and is restarted. One you +stop by hand stays ``STOPPED`` until you start it again. + +Controlling companions at runtime +================================== + +Set a control socket and you can inspect and steer companions while Gunicorn +runs:: + + companion_control_socket = "/run/gunicorn/companion.sock" + +A small CLI, ``gunicorn-companion``, talks to it:: + + gunicorn-companion -s /run/gunicorn/companion.sock status + gunicorn-companion -s /run/gunicorn/companion.sock restart scheduler + gunicorn-companion -s /run/gunicorn/companion.sock stop rq-default + gunicorn-companion -s /run/gunicorn/companion.sock start rq-default + +You can also set ``GUNICORN_COMPANION_SOCKET`` instead of passing ``-s`` every +time. The protocol is plain newline-delimited JSON, so ``socat`` works too:: + + echo '{"cmd": "status"}' | socat - UNIX-CONNECT:/run/gunicorn/companion.sock + +Commands: + +- ``status`` — show every companion's state. +- ``start `` / ``stop `` / ``restart `` — act on one. +- ``reread`` — re-read the config file and apply only what changed: new + companions start, removed ones stop, changed ones restart, untouched ones are + left alone. It is transactional — if the new config is invalid, nothing + changes and the old one keeps running. + +The socket is created mode ``0o600`` (owner only). Change it with +``companion_control_socket_mode`` if you need group access. + +Reload and shutdown +=================== + +**Reload (SIGHUP).** A reload recycles your HTTP workers and re-reads config. +The companion manager is restarted **only if the companion config actually +changed** — an ordinary web reload leaves your companions running untouched, so +it stays fast. Note that, just like HTTP workers under ``--preload``, companions +pick up new *application code* only on a full restart, not on ``SIGHUP``. For +fine-grained changes without a full reload, use the ``reread`` command. + +**Shutdown (SIGTERM).** Gunicorn asks the manager to stop, which sends each +companion its ``stop_signal`` and waits up to ``stop_timeout`` before forcing it +down with ``SIGKILL``. Gunicorn gives the manager enough time to drain all its +companions before it gives up; tune that with +``companion_manager_stop_timeout`` (or it is derived from the slowest companion +plus ``companion_manager_shutdown_buffer``). + +Limitations +=========== + +- **Hot upgrade (USR2) is not supported with companions.** During a ``USR2`` + upgrade the old and new masters run side by side, so each runs its own + companion manager and every companion runs twice — bad for singletons like a + scheduler. Restart the master instead of using ``USR2`` when companions are + configured, or keep singletons out of the companion set. A ``SIGHUP`` reload + is fine. +- **Linux is the primary target.** Orphan cleanup uses ``prctl`` on Linux, with + a portable parent-watch fallback elsewhere. + +See the :ref:`settings` page for every ``companion_*`` option. diff --git a/docs/source/index.rst b/docs/source/index.rst index 3f89ce1e..61021f59 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -23,6 +23,7 @@ Features * Simple Python configuration * Multiple worker configurations * Various server hooks for extensibility +* Supervise non-HTTP :ref:`companion processes ` in the same master * Compatible with Python 3.x >= 3.7 @@ -37,6 +38,7 @@ Contents configure settings instrumentation + companion deploy signals custom diff --git a/docs/source/settings.rst b/docs/source/settings.rst index e1e91fa7..330ae7d9 100644 --- a/docs/source/settings.rst +++ b/docs/source/settings.rst @@ -20,6 +20,192 @@ for reference on setting at the command line. .. versionadded:: 19.7 +Companion Processes +------------------- + +.. _companion-config-file: + +``companion_config_file`` +~~~~~~~~~~~~~~~~~~~~~~~~~ + +**Command line:** ``--companion-config`` + +**Default:** ``None`` + +Dedicated Python file the manager reads for companion specs. + +Lets companions reload independently of the main Gunicorn config. +Example: ``companion_config_file = "/home/frappe/bench/companion.conf.py"``. +If unset, specs are read from the main Gunicorn config. + +.. _companion-workers: + +``companion_workers`` +~~~~~~~~~~~~~~~~~~~~~ + +**Default:** ``[]`` + +List of companion process specs, each a dict. + +A spec requires ``name`` and ``target``; other keys override defaults. +Example: ``[{"name": "scheduler", "target": "app:run_scheduler"}]``. + +.. _companion-control-socket: + +``companion_control_socket`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +**Command line:** ``--companion-control-socket`` + +**Default:** ``None`` + +Unix socket the manager listens on for control commands. + +Clients send ``status``/``start``/``stop``/``restart``/``reread`` as JSON. +Example: ``companion_control_socket = "/run/gunicorn/companion.sock"``. + +.. _companion-control-socket-mode: + +``companion_control_socket_mode`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +**Default:** ``384`` + +Octal file mode set on the control socket after it is created. + +Default ``0o600`` lets only the owner connect. +Example: ``companion_control_socket_mode = 0o660``. + +.. _companion-stop-signal: + +``companion_stop_signal`` +~~~~~~~~~~~~~~~~~~~~~~~~~ + +**Default:** ``'SIGTERM'`` + +Signal sent first when stopping a companion. + +The companion should catch it to drain work before exiting. +Example: ``companion_stop_signal = "SIGINT"``. + +.. _companion-stop-timeout: + +``companion_stop_timeout`` +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +**Default:** ``60`` + +Seconds to wait after the stop signal before sending SIGKILL. + +Give slow-draining workers a larger value. +Example: ``companion_stop_timeout = 300``. + +.. _companion-reload-timeout: + +``companion_reload_timeout`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +**Default:** ``60`` + +Seconds to wait for the old child to exit during restart/reread. + +Used instead of ``stop_timeout`` when replacing a companion. +Example: ``companion_reload_timeout = 30``. + +.. _companion-stdout: + +``companion_stdout`` +~~~~~~~~~~~~~~~~~~~~ + +**Default:** ``None`` + +Where to send companion stdout: a file path or ``"inherit"``. + +Files are opened in append mode; ``None`` inherits the manager's stdout. +Example: ``companion_stdout = "/var/log/frappe/rq.log"``. + +.. _companion-stderr: + +``companion_stderr`` +~~~~~~~~~~~~~~~~~~~~ + +**Default:** ``None`` + +Where to send companion stderr: a path, ``"stdout"``, or ``"inherit"``. + +Use ``"stdout"`` to merge stderr into the stdout target. +Example: ``companion_stderr = "stdout"``. + +.. _companion-cwd: + +``companion_cwd`` +~~~~~~~~~~~~~~~~~ + +**Default:** ``None`` + +Directory the child changes into before running the target. + +Example: ``companion_cwd = "/home/frappe/frappe-bench"``. + +.. _companion-env: + +``companion_env`` +~~~~~~~~~~~~~~~~~ + +**Default:** ``{}`` + +Extra environment variables merged into the child environment. + +Example: ``companion_env = {"QUEUE": "default"}``. + +.. _companion-startsecs: + +``companion_startsecs`` +~~~~~~~~~~~~~~~~~~~~~~~ + +**Default:** ``1`` + +Seconds a freshly forked companion must survive to reach RUNNING. + +Exiting earlier counts as a failed start and triggers BACKOFF. +Example: ``companion_startsecs = 5``. + +.. _companion-restart-delay: + +``companion_restart_delay`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +**Default:** ``5`` + +Fixed seconds to wait before restarting a crashed companion. + +No exponential backoff; the same delay is used every time. +Example: ``companion_restart_delay = 10``. + +.. _companion-manager-shutdown-buffer: + +``companion_manager_shutdown_buffer`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +**Default:** ``10`` + +Extra seconds added to the slowest companion timeout for the manager. + +Gives the manager headroom to stop all children before Gunicorn kills it. +Example: ``companion_manager_shutdown_buffer = 15``. + +.. _companion-manager-stop-timeout: + +``companion_manager_stop_timeout`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +**Default:** ``None`` + +Seconds Gunicorn waits for the manager to stop during shutdown. + +By default it uses the slowest companion ``stop_timeout`` plus a buffer. +Example: ``companion_manager_stop_timeout = 120``. + Config File ----------- @@ -1148,7 +1334,7 @@ change the worker process user. Switch worker process to run as this group. A valid group id (as an integer) or the name of a user that can be -retrieved with a call to ``pwd.getgrnam(value)`` or ``None`` to not +retrieved with a call to ``grp.getgrnam(value)`` or ``None`` to not change the worker processes group. .. _umask: @@ -1703,6 +1889,56 @@ The maximum number of simultaneous clients. This setting only affects the ``gthread``, ``eventlet`` and ``gevent`` worker types. +.. _enable-adaptive-queueing: + +``enable_adaptive_queueing`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +**Command line:** ``--enable-adaptive-queueing`` + +**Default:** ``False`` + +Enable adaptive multi-queue routing in the ``gthread`` worker. + +Can also be enabled by setting the ``GUNICORN_ENABLE_ADAPTIVE_QUEUEING`` +environment variable to ``true``. + +When enabled, the worker splits its :ref:`threads` roughly evenly into +two lanes — a *fast* lane and a *slow* lane — and routes each request +to one of them by predicting, from previously observed timings of the +same route (method + path), whether it will exceed +:ref:`slow-request-threshold`. Slow-predicted requests go to the slow +lane so they can never starve the fast lane, even under a flood of +slow requests. + +Requires :ref:`threads` to be at least 2. + +This setting only affects the ``gthread`` worker type. + +.. versionadded:: 23.1.0 + +.. _slow-request-threshold: + +``slow_request_threshold`` +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +**Command line:** ``--slow-request-threshold FLOAT`` + +**Default:** ``5.0`` + +Processing time (in seconds) above which a request route is treated as +"slow" by the ``gthread`` worker when :ref:`enable-adaptive-queueing` +is enabled. + +A route is learned as slow once it has been observed exceeding this +threshold (either on completion or while still running); its timing +decays back below the threshold if it becomes fast again. + +Only used by the ``gthread`` worker when :ref:`enable-adaptive-queueing` +is enabled. + +.. versionadded:: 23.1.0 + .. _max-requests: ``max_requests`` diff --git a/gunicorn/arbiter.py b/gunicorn/arbiter.py index 646d684e..3f04381f 100644 --- a/gunicorn/arbiter.py +++ b/gunicorn/arbiter.py @@ -14,6 +14,9 @@ import socket from gunicorn.errors import HaltServer, AppImportError from gunicorn.pidfile import Pidfile from gunicorn import sock, systemd, util +from gunicorn.companion.config import build_companion_configs +from gunicorn.companion.control import ControlServer +from gunicorn.companion.manager import CompanionManager from gunicorn import __version__, SERVER_SOFTWARE @@ -33,6 +36,10 @@ class Arbiter: # A flag indicating if an application failed to be loaded APP_LOAD_ERROR = 4 + # Cap on the crash-backoff delay before respawning a companion manager + # that keeps exiting unexpectedly, so a crash loop cannot busy-spin. + COMPANION_MANAGER_MAX_RESPAWN_DELAY = 30 + START_CTX = {} LISTENERS = [] @@ -63,6 +70,18 @@ class Arbiter: self.reexec_pid = 0 self.master_pid = 0 self.master_name = "Master" + self.companion_manager_pid = 0 + # True while a manager exit is expected (deliberate stop or reload), so + # the reaper logs it as info instead of an unexpected-crash error. + self._companion_manager_stopping = False + # Crash backoff: earliest monotonic time the main loop may respawn a + # manager that exited unexpectedly, plus the consecutive-crash count + # that sizes the delay. + self._companion_manager_respawn_at = 0 + self._companion_manager_failures = 0 + # Configs of the currently running companion manager, cached at spawn so + # shutdown can size its wait without re-reading the config file. + self._companion_configs = [] cwd = util.getcwd() @@ -201,6 +220,7 @@ class Arbiter: try: self.manage_workers() + self.manage_companion_manager() while True: self.maybe_promote_master() @@ -210,6 +230,7 @@ class Arbiter: self.sleep() self.murder_workers() self.manage_workers() + self.manage_companion_manager() continue if sig not in self.SIG_NAMES: @@ -389,14 +410,18 @@ class Arbiter: sig = signal.SIGTERM if not graceful: sig = signal.SIGQUIT - limit = time.time() + self.cfg.graceful_timeout - # instruct the workers to exit + # The manager may need longer than a worker to drain its companions. + limit = time.time() + max( + self.cfg.graceful_timeout, self.companion_manager_stop_timeout()) + # instruct the workers and the companion manager to exit self.kill_workers(sig) + self.stop_companion_manager(sig) # wait until the graceful timeout - while self.WORKERS and time.time() < limit: + while (self.WORKERS or self.companion_manager_pid) and time.time() < limit: time.sleep(0.1) self.kill_workers(signal.SIGKILL) + self.stop_companion_manager(signal.SIGKILL) def reexec(self): """\ @@ -487,6 +512,9 @@ class Arbiter: # manage workers self.manage_workers() + # reload companions with the new configuration + self.reload_companion_manager() + def murder_workers(self): """\ Kill unused/idle workers @@ -519,6 +547,27 @@ class Arbiter: break if self.reexec_pid == wpid: self.reexec_pid = 0 + elif self.companion_manager_pid == wpid: + # The manager itself exited; clear its pid so the main + # loop respawns it. It owns its companions' lifecycles. + self.companion_manager_pid = 0 + if self._companion_manager_stopping: + # Expected exit from a deliberate stop or reload. + self._companion_manager_stopping = False + self.log.info( + "Companion manager (pid:%s) exited", wpid) + else: + # Unexpected crash: back off before respawning so a + # manager that cannot boot does not busy-spin. + self._companion_manager_failures += 1 + delay = min( + 2 ** (self._companion_manager_failures - 1), + self.COMPANION_MANAGER_MAX_RESPAWN_DELAY) + self._companion_manager_respawn_at = ( + time.monotonic() + delay) + self.log.error( + "Companion manager (pid:%s) exited unexpectedly; " + "respawning in %ss", wpid, delay) else: # A worker was terminated. If the termination reason was # that it could not boot, we'll shut it down to avoid @@ -645,6 +694,162 @@ class Arbiter: self.spawn_worker() time.sleep(0.1 * random.random()) + def manage_companion_manager(self): + """Keep the companion manager alive, spawning it if it is not running. + + Does nothing unless companions are configured. The manager is a single + child of the arbiter; per-companion supervision lives entirely inside + it, so the arbiter only ensures the one manager process exists. + """ + if not (self.companion_manager_pid == 0 and self.cfg.companion_workers): + return + if time.monotonic() < self._companion_manager_respawn_at: + # Still inside the crash-backoff window; wait before respawning. + return + self.spawn_companion_manager() + + def spawn_companion_manager(self): + """Fork the companion manager process. + + The configs are built in the parent before the fork; the parent then + records the manager pid and returns, while the child runs the manager's + supervision loop and exits when the loop returns. The manager forks the + individual companions itself. + """ + configs = build_companion_configs(self.cfg) + if not configs: + return + manager = CompanionManager(configs, self.log) + manager.config_loader = lambda: build_companion_configs(self.cfg) + if self.cfg.companion_control_socket: + manager.control = ControlServer( + manager.handle_command, + self.cfg.companion_control_socket, + mode=self.cfg.companion_control_socket_mode or 0o600, + log=self.log) + + pid = os.fork() + if pid != 0: + self.companion_manager_pid = pid + self._companion_configs = configs + self.log.info("Companion manager started (pid:%s)", pid) + return + + # Process Child + try: + self._close_gunicorn_fds() + util._setproctitle("companion manager [%s]" % self.proc_name) + manager.run() + sys.exit(0) + except SystemExit: + raise + except Exception: + self.log.exception("Exception in companion manager process") + sys.exit(-1) + + def _close_gunicorn_fds(self): + """Close fds the manager inherited from the arbiter but never uses. + + The companion manager serves no HTTP traffic and does not run the + arbiter's signal loop, so it drops the listening sockets, the arbiter's + wakeup pipe, and the worker heartbeat files. Closing them keeps the + manager (and the companions it forks) from pinning the arbiter's fds. + """ + for listener in self.LISTENERS: + listener.close() + for pipe_fd in self.PIPE: + try: + os.close(pipe_fd) + except OSError: + pass + for worker in self.WORKERS.values(): + worker.tmp.close() + + def reload_companion_manager(self): + """Reconcile the companion manager with the reloaded configuration. + + A web reload (SIGHUP) recycles HTTP workers and re-reads config, but + with ``--preload`` it does not reload application code -- the WSGI + callable is loaded once and cached -- so the running companions are + already current. Restarting them on every reload would bounce them for + nothing, so the manager is only reloaded when the companion config + actually changed. + + When it did change, a running manager is asked to stop (it drains its + companions first); the SIGCHLD reaper clears its pid so the main loop + respawns it from the fresh cfg, and a manager started where none ran + comes up right away. An unchanged config leaves the manager and its + companions untouched. Per-companion transactional reread stays + available separately through the control socket. + """ + try: + new_configs = build_companion_configs(self.cfg) + except Exception: + self.log.exception( + "Could not read companion config on reload; " + "leaving companion manager unchanged") + return + if not self._companion_configs_changed(new_configs): + return + if self.companion_manager_pid != 0: + self.log.info("Companion config changed, reloading companion manager") + self.stop_companion_manager(signal.SIGTERM) + self.manage_companion_manager() + + def _companion_configs_changed(self, new_configs): + """True when the companion config differs from the running manager's. + + Compares the sorted ``config_hash`` of every companion, so a changed + field, an added name, or a removed name all count, while a pure web + reload with the same companion specs does not. + """ + old_hashes = sorted(c.config_hash for c in self._companion_configs) + new_hashes = sorted(c.config_hash for c in new_configs) + return old_hashes != new_hashes + + def stop_companion_manager(self, sig): + """Signal the companion manager to exit, if it is running. + + A graceful SIGTERM lets the manager stop its own companions before it + exits; SIGKILL forces it down. The reaper clears the pid once it dies, + so a manager that is already gone is a no-op here. + """ + if self.companion_manager_pid == 0: + return + # This exit is on purpose: mark it expected and clear any crash backoff + # so the reaper logs info and a reload can respawn without delay. + self._companion_manager_stopping = True + self._companion_manager_failures = 0 + self._companion_manager_respawn_at = 0 + try: + os.kill(self.companion_manager_pid, sig) + except OSError as e: + if e.errno == errno.ESRCH: + self.companion_manager_pid = 0 + return + raise + + def companion_manager_stop_timeout(self): + """Seconds to wait for the companion manager during shutdown: the + explicit setting, else the slowest companion stop_timeout plus the + shutdown buffer, else 0 when no companions are configured. + """ + if self.cfg.companion_manager_stop_timeout is not None: + return self.cfg.companion_manager_stop_timeout + # Prefer the configs cached at spawn over re-reading the config file + # mid-shutdown, where a since-changed or removed file could raise. + configs = self._companion_configs + if not configs: + try: + configs = build_companion_configs(self.cfg) + except Exception: + self.log.exception("could not read companion config for shutdown") + return 0 + if not configs: + return 0 + slowest = max(config.stop_timeout for config in configs) + return slowest + self.cfg.companion_manager_shutdown_buffer + def kill_workers(self, sig): """\ Kill all workers with the signal `sig` diff --git a/gunicorn/companion/README.md b/gunicorn/companion/README.md new file mode 100644 index 00000000..4d7e7daf --- /dev/null +++ b/gunicorn/companion/README.md @@ -0,0 +1,155 @@ +# Companion processes + +Gunicorn runs HTTP workers. Many apps also need non-HTTP side processes next to +them: RQ workers, a scheduler, socket.io, a custom daemon. This package lets +Gunicorn supervise those too, so they share the preloaded application memory +(copy-on-write) and one process tree instead of running under a separate +supervisor. + +## Architecture + +``` + clients (HTTP) + │ + ▼ + ┌──────────────────────────────────────────────────────────┐ + │ arbiter (master) preloaded app — shared (COW) │ + └──┬──────────────────┬────────────────────────┬───────────┘ + │ fork │ fork │ fork (after preload) + ▼ ▼ ▼ +┌───────────┐ ┌───────────┐ ┌────────────────────┐ +│HTTP Worker│ ... │HTTP worker│ │ companion manager │◀─── control +└───────────┘ └───────────┘ └─────────┬──────────┘ socket (JSON) + │ fork + supervise ▲ + ┌──────────────┼──────────────┐ │ + ▼ ▼ ▼ gunicorn-companion + ┌─────────┐ ┌─────────┐ ┌─────────┐ / socat + │companion│ │companion│ │companion│ + │ rq │ │scheduler│ │socketio │ + └─────────┘ └─────────┘ └─────────┘ +``` + +The arbiter forks one **companion manager** after `preload_app`. The manager +forks and supervises each configured companion, owns the control socket, and +exits when the arbiter does. It is the only companion-aware part of the arbiter; +all per-process logic lives in the manager. Companions inherit the preloaded +application memory copy-on-write, the same way HTTP workers do. + +## States + +``` +STOPPED ──start──▶ STARTING ──(survives startsecs)──▶ RUNNING + ▲ │ + │ stop / crash + │ ▼ + └────────────── STOPPED / STOPPING ◀── BACKOFF (unexpected exit) +``` + +- An unexpected exit goes to `BACKOFF` and restarts after a fixed + `companion_restart_delay` (no exponential backoff, no retry cap). +- A manual `stop` exits to `STOPPED` and stays there. +- `stop` sends `companion_stop_signal`, then `SIGKILL` after + `companion_stop_timeout`. + +## Configuration + +Companions live in the normal Gunicorn config — a Python file you pass with `-c`. To reload companions independently of the web config, put them in a dedicated file and point `companion_config_file` (or `--companion-config`) at it; the manager then reads its companion settings from there instead. + +Save a `gunicorn.conf.py`: + +```python +preload_app = True # required to share memory +companion_control_socket = "/run/gunicorn/companion.sock" +companion_workers = [ + { + "name": "ticker", + "target": "myapp.jobs:run", # callable or "module:attr" + "stdout": "/var/log/myapp/ticker.log", + "stderr": "stdout", # path, "stdout", or "inherit" + }, +] +``` + +Start Gunicorn pointing at it; the manager and companions come up with the HTTP +workers: + +```sh +gunicorn -c gunicorn.conf.py myapp.wsgi:application +``` + +`companion_workers` is a list of dicts. `name` and `target` are required; every +other field falls back to the matching global `companion_*` setting, so a dict +only names what differs from the defaults: + +| Setting | Per-companion key | Meaning | +|-------------------------------|-------------------|-------------------------------------------| +| `companion_control_socket` | — | Unix socket the manager listens on | +| `companion_cwd` | `cwd` | working directory before the target runs | +| `companion_env` | `env` | extra environment variables (merged) | +| `companion_stop_signal` | `stop_signal` | signal sent first on stop (`SIGTERM`) | +| `companion_stop_timeout` | `stop_timeout` | seconds before `SIGKILL` | +| `companion_startsecs` | `startsecs` | seconds alive to reach `RUNNING` | +| `companion_restart_delay` | — | seconds before restarting a crash | +| `companion_stdout` | `stdout` | stdout file, or `"inherit"` | +| `companion_stderr` | `stderr` | stderr file, `"stdout"`, or `"inherit"` | + +`target` is either an import string `"module:attr"` or a zero-argument callable. +The child applies `cwd`/`env`, redirects `stdout`/`stderr`, then calls the +target. Log rotation stays external. + +## Control + +The manager listens on the Unix socket at `companion_control_socket` (0o600, +owned by the user Gunicorn runs as). The protocol is one JSON object per line. + +Use the CLI: + +```sh +export GUNICORN_COMPANION_SOCKET=/run/gunicorn/companion.sock +gunicorn-companion status +gunicorn-companion stop ticker +gunicorn-companion restart ticker +gunicorn-companion reread # re-read config; restart only changed companions +``` + +Or talk to the socket directly: + +```sh +echo '{"cmd": "status"}' | socat - UNIX-CONNECT:/run/gunicorn/companion.sock +``` + +Commands: `status`, `start `, `stop `, `restart `, `reread`. + +`reread` is transactional: the new config is validated first, and on any error +nothing changes and the old config keeps running. + +A `SIGHUP` to Gunicorn recycles the HTTP workers, then reloads the companion +manager **only if the companion config changed**. With `--preload`, a reload +does not re-import application code (the WSGI callable is loaded once and +cached), so unchanged companions are already current and are left running -- +the common web reload never bounces them. When the companion specs do change, +the manager is restarted with the new config. Note that, like the HTTP +workers, companions pick up new application code only on a full restart, not on +`SIGHUP`. Use `reread` for finer, per-companion changes without touching the +others. + +## Files + +| File | Responsibility | +|--------------|-----------------------------------------------------------| +| `config.py` | `CompanionConfig`, config hash, build configs from cfg | +| `process.py` | `CompanionProcess` runtime state, public states | +| `manager.py` | fork/reap, state transitions, restart delay, run loop | +| `control.py` | Unix socket server and JSON framing | +| `ctl.py` | `gunicorn-companion` command-line client | + +## Limitations + +Hot reexec (`SIGUSR2` zero-downtime upgrade) is not supported with +companions. During the overlap the old and new masters each run their own +companion manager, so every companion runs twice. For singletons such as a +scheduler this means duplicate work (e.g. double-fired jobs). Stop and +restart the master instead of reexec'ing when companions are configured, or +keep singleton companions out of the companion set. A graceful reload +(`SIGHUP`) is fine: it restarts the single manager in place. + diff --git a/gunicorn/companion/__init__.py b/gunicorn/companion/__init__.py new file mode 100644 index 00000000..9487cda3 --- /dev/null +++ b/gunicorn/companion/__init__.py @@ -0,0 +1,10 @@ +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +"""Companion process manager. + +Gunicorn manages one extra child, the Companion Manager, which manages all +configured non-HTTP companion processes (RQ workers, scheduler, socket.io, +custom daemons). See ``docs/design/companion-process-manager.md``. +""" diff --git a/gunicorn/companion/config.py b/gunicorn/companion/config.py new file mode 100644 index 00000000..b14bb18e --- /dev/null +++ b/gunicorn/companion/config.py @@ -0,0 +1,156 @@ +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +import hashlib +import json +import signal + + +# Maps each optional companion field to the global setting build_companion_configs +# reads when a spec omits it. ``name`` and ``target`` are required per spec and +# have no global default. ``restart_delay`` is global-only, so it is absent here. +FIELD_DEFAULTS = { + "cwd": "companion_cwd", + "env": "companion_env", + "stop_signal": "companion_stop_signal", + "stop_timeout": "companion_stop_timeout", + "reload_timeout": "companion_reload_timeout", + "stdout": "companion_stdout", + "stderr": "companion_stderr", + "startsecs": "companion_startsecs", +} + + +class CompanionConfig: + """Validated, normalized config for a single companion. + + Built from one entry of ``companion_workers`` with global defaults already + applied. ``config_hash`` is a stable digest of every field; the manager + restarts a companion whenever its hash changes on reread. + """ + + def __init__( + self, + name, + target, + cwd=None, + env=None, + stop_signal="SIGTERM", + stop_timeout=60, + reload_timeout=60, + stdout=None, + stderr=None, + startsecs=1, + restart_delay=5, + ): + self.name = name + self.target = target + self.cwd = cwd + self.env = dict(env or {}) + self.stop_signal = stop_signal + self.stop_timeout = stop_timeout + self.reload_timeout = reload_timeout + self.stdout = stdout + self.stderr = stderr + self.startsecs = startsecs + self.restart_delay = restart_delay + + def to_dict(self): + return { + "name": self.name, + "target": self.target, + "cwd": self.cwd, + "env": self.env, + "stop_signal": self.stop_signal, + "stop_timeout": self.stop_timeout, + "reload_timeout": self.reload_timeout, + "stdout": self.stdout, + "stderr": self.stderr, + "startsecs": self.startsecs, + } + + @property + def config_hash(self): + # Sort keys so dict ordering never changes the digest. A callable + # target has no stable repr across runs, so use its qualified name. + data = self.to_dict() + data["target"] = self._target_key(self.target) + payload = json.dumps(data, sort_keys=True, default=str) + return hashlib.sha256(payload.encode("utf-8")).hexdigest() + + @staticmethod + def _target_key(target): + if callable(target): + module = getattr(target, "__module__", "") + qualified_name = getattr(target, "__qualname__", repr(target)) + return "%s:%s" % (module, qualified_name) + return str(target) + + def __repr__(self): + return "" % self.name + + +ALLOWED_SPEC_KEYS = {"name", "target"} | set(FIELD_DEFAULTS) + + +def _validate_stop_signal(stop_signal, name): + """Reject a stop_signal that does not name a real signal. + + Caught here at build time so a typo like ``"SIGTRM"`` fails loudly when + config is loaded or rereaded, rather than crashing the manager later when + it tries to send the signal. + """ + if isinstance(stop_signal, str): + valid = stop_signal in signal.Signals.__members__ + else: + valid = stop_signal in set(signal.Signals) + if not valid: + raise ValueError( + "companion %s has unknown stop_signal %r" % (name, stop_signal)) + + +def _load_companion_settings(cfg): + """Return the ``companion_*`` settings from ``companion_config_file``, or + ``{}`` when no dedicated file is configured.""" + path = getattr(cfg, "companion_config_file", None) + if not path: + return {} + namespace = {} + with open(path) as config_file: + # The companion config file is trusted operator input, like the main + # Gunicorn config; running it is the point. + exec(compile(config_file.read(), path, "exec"), namespace) # pylint: disable=exec-used + return {name: value for name, value in namespace.items() + if name.startswith("companion_")} + + +def build_companion_configs(cfg): + """Build a CompanionConfig list from the companion settings. + + Settings come from ``companion_config_file`` when set, otherwise ``cfg``. A + spec is rejected if it is missing ``name``/``target`` or carries an unknown + key. + """ + overrides = _load_companion_settings(cfg) + + def setting(name): + return overrides.get(name, getattr(cfg, name)) + + configs = [] + for spec in setting("companion_workers"): + if "name" not in spec or "target" not in spec: + raise ValueError( + "each companion worker needs 'name' and 'target': %s" % spec) + unknown = set(spec) - ALLOWED_SPEC_KEYS + if unknown: + raise ValueError( + "unknown companion worker key(s) %s in %s" + % (sorted(unknown), spec)) + fields = {field: spec.get(field, setting(global_setting)) + for field, global_setting in FIELD_DEFAULTS.items()} + _validate_stop_signal(fields["stop_signal"], spec["name"]) + configs.append(CompanionConfig( + name=spec["name"], target=spec["target"], + restart_delay=setting("companion_restart_delay"), **fields)) + return configs diff --git a/gunicorn/companion/control.py b/gunicorn/companion/control.py new file mode 100644 index 00000000..2e15cc50 --- /dev/null +++ b/gunicorn/companion/control.py @@ -0,0 +1,136 @@ +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + + +import json +import os +import socket + + +class CommandError(Exception): + """A control request the manager understood but had to reject. + + Raised for malformed input (bad JSON, missing ``cmd``). It is turned into + an ``{"ok": false, "error": ...}`` response rather than crashing the + manager, so a buggy or hostile client can never take the socket down. + """ + + +def decode_command(line): + """Parse one request line into a command dict. + + The wire protocol is newline-delimited JSON: each request is a single JSON + object on its own line, e.g. ``{"cmd": "status"}``. Every request must be a + JSON object carrying a string ``cmd``; anything else is a ``CommandError``. + """ + try: + command = json.loads(line) + except (ValueError, TypeError): + raise CommandError("invalid JSON") + if not isinstance(command, dict): + raise CommandError("request must be a JSON object") + if not isinstance(command.get("cmd"), str): + raise CommandError("missing 'cmd'") + return command + + +def encode_response(response): + """Encode a response dict as one newline-terminated JSON line of bytes.""" + return (json.dumps(response) + "\n").encode("utf-8") + + +# A control request is a single short JSON line. Cap the unframed buffer so a +# client that never sends a newline cannot grow it without bound. +MAX_LINE_BYTES = 1 << 20 + + +class ControlServer: + """The manager's Unix-socket control endpoint. + + Owns the listening socket and the request framing only. Turning a decoded + command into an action is delegated to ``dispatch`` (wired to the manager's + command handlers in a later task); this class just decodes each line, runs + it through ``dispatch``, and writes back the encoded reply. + + The socket is created with mode 0o600 and owned by the (non-root) user + gunicorn runs as. There is no group-ownership switching. + """ + + def __init__(self, dispatch, path, mode=0o600, log=None, backlog=64): + self.dispatch = dispatch + self.path = path + self.mode = mode + self.log = log + self.backlog = backlog + self.listener = None + + def create(self): + """Bind and listen on the Unix socket, replacing any stale one. + + A leftover socket file from a previous manager would make ``bind`` + fail, so it is unlinked first. Called once before the manager enters + its run loop, as clients expect the socket to exist by then. + """ + if os.path.exists(self.path): + os.unlink(self.path) + listener = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + listener.bind(self.path) + os.chmod(self.path, self.mode) + listener.listen(self.backlog) + self.listener = listener + return listener + + def close(self): + """Close the listening socket and remove its file.""" + if self.listener is not None: + self.listener.close() + self.listener = None + if os.path.exists(self.path): + os.unlink(self.path) + + def handle_line(self, line): + """Run one request line and return the encoded response bytes. + + Decoding and dispatch failures are caught and rendered as an error + response, so one bad request never breaks the connection or the + manager. CommandError is the expected rejection; any other exception + from dispatch is an unexpected handler bug, caught here for the same + reason -- it must not escape and kill the manager's run loop. + """ + try: + response = self.dispatch(decode_command(line)) + except CommandError as error: + response = {"ok": False, "error": str(error)} + except Exception as error: + if self.log is not None: + self.log.exception("companion control command failed") + response = {"ok": False, "error": "internal error: %s" % error} + return encode_response(response) + + def serve_connection(self, connection): + """Serve newline-delimited requests on one accepted connection. + + Reads until the client hangs up, buffering partial reads and answering + each complete line as it arrives. A trailing fragment without a newline + is ignored. A single line that grows past ``MAX_LINE_BYTES`` without a + newline is treated as abuse: the connection is dropped so it cannot pin + unbounded memory. + """ + buffer = b"" + with connection: + while True: + chunk = connection.recv(65536) + if not chunk: + break + buffer += chunk + while b"\n" in buffer: + line, buffer = buffer.split(b"\n", 1) + if line.strip(): + connection.sendall(self.handle_line(line)) + if len(buffer) > MAX_LINE_BYTES: + if self.log is not None: + self.log.warning( + "companion control line exceeded %d bytes; closing", + MAX_LINE_BYTES) + break diff --git a/gunicorn/companion/ctl.py b/gunicorn/companion/ctl.py new file mode 100644 index 00000000..6441077b --- /dev/null +++ b/gunicorn/companion/ctl.py @@ -0,0 +1,83 @@ +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +"""Command-line client for the companion control socket. + +Speaks the newline-delimited JSON protocol the manager's ControlServer serves: +sends one command and prints the manager's reply. Installed as the +``gunicorn-companion`` console script. +""" + +import argparse +import json +import os +import socket +import sys + +# Commands that act on one named companion and so require a name argument. +PER_NAME_COMMANDS = ("start", "stop", "restart") +COMMANDS = ("status", "reread") + PER_NAME_COMMANDS + + +def send_command(socket_path, command): + """Send one command dict to the control socket and return the reply dict.""" + client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + try: + client.connect(socket_path) + client.sendall((json.dumps(command) + "\n").encode("utf-8")) + chunks = [] + while True: + chunk = client.recv(65536) + if not chunk: + break + chunks.append(chunk) + if b"\n" in chunk: + break + finally: + client.close() + return json.loads(b"".join(chunks).decode("utf-8")) + + +def build_parser(): + parser = argparse.ArgumentParser( + prog="gunicorn-companion", + description="Control gunicorn companion processes.") + parser.add_argument( + "-s", "--socket", + default=os.environ.get("GUNICORN_COMPANION_SOCKET"), + help="path to the companion control socket " + "(defaults to $GUNICORN_COMPANION_SOCKET)") + parser.add_argument("command", choices=COMMANDS) + parser.add_argument( + "name", nargs="?", + help="companion name (required for start, stop, restart)") + return parser + + +def run(argv=None): + parser = build_parser() + args = parser.parse_args(argv) + if not args.socket: + parser.error("no control socket; pass --socket or set " + "GUNICORN_COMPANION_SOCKET") + if args.command in PER_NAME_COMMANDS and not args.name: + parser.error("%s requires a companion name" % args.command) + + command = {"cmd": args.command} + if args.name: + command["name"] = args.name + + try: + response = send_command(args.socket, command) + except OSError as error: + print("cannot reach companion socket %s: %s" % (args.socket, error), + file=sys.stderr) + return 2 + + print(json.dumps(response, indent=2)) + return 0 if response.get("ok") else 1 + + +if __name__ == "__main__": + sys.exit(run()) diff --git a/gunicorn/companion/manager.py b/gunicorn/companion/manager.py new file mode 100644 index 00000000..f03f81f9 --- /dev/null +++ b/gunicorn/companion/manager.py @@ -0,0 +1,682 @@ +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + + +from __future__ import annotations + +import ctypes +import importlib +import os +import select +import signal +import sys +import time +from typing import TYPE_CHECKING, Callable, Iterable, Union + +from gunicorn import util +from gunicorn.companion.control import CommandError +from gunicorn.companion.process import CompanionProcess, State + +if TYPE_CHECKING: + from gunicorn.companion.config import CompanionConfig + +# prctl option number for "send me this signal when my parent dies". +PR_SET_PDEATHSIG = 1 + + +# Signals the arbiter and manager install handlers for; a forked companion +# resets them to the default so its stop signal works and the target starts +# from a clean slate, the same way a gunicorn worker does. +INHERITED_SIGNALS = [ + "SIGCHLD", "SIGTERM", "SIGINT", "SIGHUP", "SIGQUIT", + "SIGUSR1", "SIGUSR2", "SIGWINCH", "SIGTTIN", "SIGTTOU", +] + + +def reset_child_signals() -> None: + """Restore default signal handling in a freshly forked companion. + + The companion inherits the manager's (and arbiter's) handlers across the + fork. Without this, a stop signal like SIGTERM would hit the manager's + handler -- which just flips a flag -- instead of terminating the companion. + """ + for name in INHERITED_SIGNALS: + number = getattr(signal, name, None) + if number is not None: + signal.signal(number, signal.SIG_DFL) + + +def set_parent_death_signal(stop_signal) -> bool: + """Ask the kernel to send ``stop_signal`` when this process's parent dies. + + Uses Linux ``prctl(PR_SET_PDEATHSIG)`` so an orphaned manager or companion + is signalled the moment its parent goes away, rather than lingering. Returns + True when armed and False on any non-Linux platform or error, so callers can + fall back to polling ``os.getppid()``. + """ + if not sys.platform.startswith("linux"): + return False + try: + libc = ctypes.CDLL("libc.so.6", use_errno=True) + return libc.prctl(PR_SET_PDEATHSIG, int(stop_signal), 0, 0, 0) == 0 + except (OSError, AttributeError): + return False + + +class CompanionManager: + """Forks and supervises companion processes. + + Created by the arbiter after preload. Holds one ``CompanionProcess`` per + configured companion and owns the fork lifecycle. This skeleton wires + construction and single-companion spawn; reaping, backoff, the control + socket, and the run loop arrive in later tasks. + """ + + def __init__(self, configs: Iterable[CompanionConfig], log): + self.log = log + self.pid = os.getpid() + self.processes = {c.name: CompanionProcess(c) for c in configs} + # Set by the arbiter wiring: a no-arg callable that re-reads and + # validates companion config, returning a fresh CompanionConfig list. + self.config_loader = None + # Set by the arbiter wiring: the ControlServer, or None when no control + # socket is configured. Created inside the child by run(). + self.control = None + self.stopping = False + self._wakeup_pipe = None + self.parent_pid = None + + def run(self) -> None: + """Run the manager's supervision loop. This is the forked child body. + + Installs signal handling, brings up the control socket, starts every + companion, then loops servicing the socket and the companions until a + SIGTERM or SIGINT asks it to stop, at which point it shuts the + companions down and returns. Each tick reaps exited companions, + retries any that are backing off, promotes those past ``startsecs``, + and kills any that overran their stop deadline. + + If the arbiter dies, the manager stops too: it arms a parent-death + signal on Linux and, as a portable fallback, watches ``getppid`` each + tick so it never keeps companions running under a dead arbiter. + """ + # __init__ ran in the arbiter, so refresh pid/parent now that this is + # the manager process: the companion parent-death guard compares its + # getppid() against self.pid. + self.pid = os.getpid() + self.parent_pid = os.getppid() + self._install_signals() + set_parent_death_signal(signal.SIGTERM) + if self.control is not None: + self.control.create() + for process in self.processes.values(): + self.spawn_process(process) + self.log.info("companion manager running (pid %s)", self.pid) + try: + while not self.stopping: + if self._parent_gone(): + self.log.info("companion manager parent gone, stopping") + break + self._tick() + self._wait() + self.stop_all() + finally: + if self.control is not None: + self.control.close() + self.log.info("companion manager stopped (pid %s)", self.pid) + + def _parent_gone(self) -> bool: + """True once the arbiter that forked the manager has exited.""" + return os.getppid() != self.parent_pid + + def _tick(self, now: float = None) -> None: + """One supervision pass over every companion.""" + now = now or time.time() + self.reap_processes() + self.retry_backoff(now) + self.promote_running(now) + self.enforce_deadlines(now) + + def _wait(self, timeout: float = 1.0) -> None: + """Block until a signal or a control request arrives, or we time out. + + The self-pipe carries signal wake-ups; the control listener carries + client connections. Either readable (or the timeout) ends the wait, so + the next loop pass reacts promptly without busy-spinning. + """ + readers = [self._wakeup_pipe[0]] + if self.control is not None and self.control.listener is not None: + readers.append(self.control.listener) + try: + ready, _, _ = select.select(readers, [], [], timeout) + except (InterruptedError, OSError): + return + for reader in ready: + if reader is self._wakeup_pipe[0]: + self._drain_wakeup() + else: + self._accept_control() + + def _accept_control(self) -> None: + """Accept one waiting control connection and answer its requests.""" + try: + connection, _ = self.control.listener.accept() + except OSError: + return + self.control.serve_connection(connection) + + def enforce_deadlines(self, now: float = None) -> None: + """SIGKILL companions that overran their stop deadline. + + A graceful ``stop_signal`` may be ignored or take too long; once the + deadline set by stop/restart passes, the companion is force-killed so + it cannot wedge the manager. The reaper picks up the exit afterwards. + """ + now = now or time.time() + for process in self.processes.values(): + if process.state != State.STOPPING or process.stop_deadline is None: + continue + if now >= process.stop_deadline and process.pid is not None: + self._safe_kill(process.pid, signal.SIGKILL) + process.kill_count += 1 + process.stop_deadline = None + self.log.warning("companion %s killed after timeout (pid %s)", + process.name, process.pid) + + def stop_all(self) -> None: + """Stop every companion and reap them as they exit. + + Sends each one its stop signal, then keeps reaping and enforcing stop + deadlines until they are all gone, so the manager exits without leaving + orphaned companions behind. + """ + self.log.info("stopping all companions") + for name in list(self.processes): + self.stop_process(name) + while any(process.pid is not None for process in self.processes.values()): + now = time.time() + self.enforce_deadlines(now) + self.reap_processes() + self._wait(timeout=0.2) + self.log.info("all companions stopped") + + def _install_signals(self) -> None: + """Set up the self-pipe and signal handlers for the supervision loop.""" + self._wakeup_pipe = os.pipe() + for fd in self._wakeup_pipe: + util.set_non_blocking(fd) + util.close_on_exec(fd) + signal.signal(signal.SIGCHLD, self._wakeup) + signal.signal(signal.SIGTERM, self._signal_stop) + signal.signal(signal.SIGINT, self._signal_stop) + + def _signal_stop(self, signum, frame) -> None: + self.stopping = True + self._wakeup() + + def _wakeup(self, signum=None, frame=None) -> None: + """Wake the loop out of ``select`` by writing to the self-pipe.""" + try: + os.write(self._wakeup_pipe[1], b".") + except OSError: + pass + + def _drain_wakeup(self) -> None: + try: + while os.read(self._wakeup_pipe[0], 4096): + pass + except OSError: + pass + + def handle_command(self, command: dict) -> dict: + """Route a decoded control command to its action. + + This is the ``dispatch`` the control socket calls. ``status`` returns a + snapshot of every companion; ``start``/``stop``/``restart`` act on the + one named companion and report ``(ok, message)``. Per-companion + commands need a string ``name``, and anything else raises ``CommandError`` so the + socket replies with an error envelope. + """ + command_name = command["cmd"] + if command_name == "status": + return {"ok": True, "companions": self.status()} + if command_name == "reread": + if self.config_loader is None: + raise CommandError("reread not configured") + try: + new_configs = self.config_loader() + except Exception as error: + return {"ok": False, "error": "invalid config: %s" % error, + "kept_old_config": True} + return self.reread_config(new_configs) + + # Every remaining command acts on one named companion. + name = command.get("name") + if not isinstance(name, str): + raise CommandError("'%s' requires a 'name'" % command_name) + if command_name == "start": + ok, message = self.start_process(name) + elif command_name == "stop": + ok, message = self.stop_process(name) + elif command_name == "restart": + ok, message = self.restart_process(name) + else: + raise CommandError("unknown command %r" % command_name) + return {"ok": ok, "message": message} + + def status(self, now: float = None) -> list: + """Status entry for every companion, for the ``status`` command.""" + now = now or time.time() + return [process.status_dict(now) for process in self.processes.values()] + + def reread_config(self, new_configs) -> dict: + """Transactionally apply a fresh set of companion configs. + + Each companion is compared with the running set by ``config_hash``: + a new name is added and started, a missing name is stopped and removed, + a changed hash stores the new config and restarts (unless the companion + was manually stopped, which keeps it STOPPED with the new config ready + for its next start), and an unchanged hash is left alone. Validation + runs first, so a bad config touches nothing and the old one stays live. + """ + try: + new_by_name = self._index_configs(new_configs) + except CommandError as e: + return {"ok": False, "error": str(e), "kept_old_config": True} + + added, removed, restarted, unchanged = [], [], [], [] + old_names = set(self.processes) + new_names = set(new_by_name) + + for name in old_names - new_names: + self.stop_process(name) + del self.processes[name] + removed.append(name) + + for name in new_names - old_names: + process = CompanionProcess(new_by_name[name]) + self.processes[name] = process + self.spawn_process(process) + added.append(name) + + for name in new_names & old_names: + process = self.processes[name] + if process.config.config_hash == new_by_name[name].config_hash: + unchanged.append(name) + continue + process.config = new_by_name[name] + if process.manual_stop: + unchanged.append(name) + else: + self.restart_process(name) + restarted.append(name) + + self.log.info( + "companion reread applied: added %s, removed %s, restarted %s, unchanged %s", + added, removed, restarted, unchanged) + return {"ok": True, "added": added, "removed": removed, + "restarted": restarted, "unchanged": unchanged} + + @staticmethod + def _index_configs(configs) -> dict: + """Index configs by name, rejecting duplicates.""" + by_name = {} + for config in configs: + if config.name in by_name: + raise CommandError( + "invalid config: duplicate companion name %s" % config.name) + by_name[config.name] = config + return by_name + + def spawn_process(self, process: CompanionProcess) -> int: + """Fork one companion. + + Parent records the pid and moves the companion to STARTING. Child + resolves and runs the target, exiting the worker on any failure so a + crashed companion never leaks back into the manager's control flow. + + Spawning is policy-neutral: it does not touch ``manual_stop``. Clearing + that flag is the job of the commands that intentionally bring a + companion back (:meth:`start_process`, :meth:`restart_process`), and a + companion only ever reaches a respawn path with the flag already false. + """ + pid = os.fork() + if pid != 0: + process.pid = pid + process.state = State.STARTING + process.started_at = time.time() + self.log.info("companion %s started (pid %s)", process.name, pid) + return pid + + try: + self._close_manager_fds() + reset_child_signals() + set_parent_death_signal(signal.SIGTERM) + if os.getppid() != self.pid: + # Manager already died between fork and arming: do not run. + os._exit(0) + self._apply_environment(process.config) + self._redirect_output(process.config) + target = self._resolve_target(process.config.target) + target() + except SystemExit: + raise + except BaseException: + self.log.exception("companion %s crashed", process.name) + os._exit(1) + os._exit(0) + + def _close_manager_fds(self) -> None: + """Close the manager's own fds in a freshly forked companion. + + A companion inherits the manager's control socket and wakeup pipe but + must not keep them: an open listener would let a companion answer + control requests, and the pipe is the manager's private signal path. + Both are closed before the target runs. + """ + if self.control is not None and self.control.listener is not None: + self.control.listener.close() + if self._wakeup_pipe is not None: + for fd in self._wakeup_pipe: + try: + os.close(fd) + except OSError: + pass + + def start_process(self, name: str): + """Start a companion by name (the control ``start`` command). + + Follows the supervisor-style rules: a STOPPED or BACKOFF companion + clears its ``manual_stop`` flag, drops any pending retry, and is spawned + right away. RUNNING and STARTING are already-up, so they report success + without doing anything. STOPPING is rejected so the caller polls status + and retries once the old child is gone. Returns ``(ok, message)``. + """ + process = self.processes.get(name) + if process is None: + return False, "unknown companion %s" % name + if process.state in (State.RUNNING, State.STARTING): + return True, "%s already %s" % (name, process.state.lower()) + if process.state == State.STOPPING: + return False, "%s is stopping; retry" % name + process.manual_stop = False + process.next_retry_at = None + self.spawn_process(process) + return True, "%s started" % name + + def stop_process(self, name: str, now: float = None): + """Stop a companion by name (the control ``stop`` command). + + Sets ``manual_stop`` so the companion will not auto-restart. A live + companion (RUNNING or STARTING) is sent its ``stop_signal`` and moved + to STOPPING with a ``stop_deadline``; the run loop reaps it, or SIGKILLs + it once the deadline passes. BACKOFF just cancels the pending retry and + settles in STOPPED. STOPPED and STOPPING are already-there success + no-ops. Returns ``(ok, message)``. + """ + process = self.processes.get(name) + if process is None: + return False, "unknown companion %s" % name + process.manual_stop = True + # A stop must win over an in-flight restart: clearing this keeps + # handle_exit from respawning a companion the user asked to stop. + process.restart_pending = False + if process.state in (State.STOPPED, State.STOPPING): + return True, "%s already %s" % (name, process.state.lower()) + if process.state == State.BACKOFF: + process.next_retry_at = None + process.state = State.STOPPED + return True, "%s stopped" % name + now = now or time.time() + self._safe_kill(process.pid, self._signal_number(process.config.stop_signal)) + process.state = State.STOPPING + process.stop_deadline = now + process.config.stop_timeout + self.log.info("companion %s stopping (pid %s)", name, process.pid) + return True, "%s stopping" % name + + def restart_process(self, name: str, now: float = None): + """Restart a companion by name (the control ``restart`` command). + + Always clears ``manual_stop`` so the companion comes back. A live + companion (RUNNING or STARTING) is asked to stop -- it goes STOPPING + with ``restart_pending`` set and a deadline based on ``reload_timeout``, + and the reaper respawns it as soon as the old child exits. BACKOFF and + STOPPED start again immediately. STOPPING is rejected so the caller + retries. This never rereads config. Returns ``(ok, message)``. + """ + process = self.processes.get(name) + if process is None: + return False, "unknown companion %s" % name + if process.state == State.STOPPING: + return False, "%s is stopping; retry" % name + process.manual_stop = False + if process.state in (State.RUNNING, State.STARTING): + now = now or time.time() + process.restart_pending = True + self._safe_kill(process.pid, self._signal_number(process.config.stop_signal)) + process.state = State.STOPPING + process.stop_deadline = now + process.config.reload_timeout + self.log.info("companion %s restarting (pid %s)", name, process.pid) + return True, "%s restarting" % name + process.next_retry_at = None + self.spawn_process(process) + return True, "%s started" % name + + @staticmethod + def _safe_kill(pid: int, sig) -> None: + """Send ``sig`` to ``pid``, ignoring an already-dead target. + + A companion can exit between the manager deciding to signal it and the + kill itself (the exit is only reaped on the next tick). Without this the + resulting ``ProcessLookupError`` would escape and take the manager down. + """ + try: + os.kill(pid, sig) + except ProcessLookupError: + pass + + @staticmethod + def _signal_number(stop_signal) -> int: + """Resolve a stop signal to its number, e.g. ``"SIGTERM"`` -> 15. + + Accepts a signal name or a raw number and validates both against the + real signal table, so a typo like ``"SIGTRM"`` fails loudly here rather + than silently sending the wrong signal (or none). + """ + try: + if isinstance(stop_signal, str): + return signal.Signals[stop_signal] + return signal.Signals(stop_signal) + except (KeyError, ValueError): + raise ValueError("unknown stop signal %r" % (stop_signal,)) + + def reap_processes(self) -> list: + """Reap any companions that have exited and record their exit info. + + A companion runs as a forked child of the manager, so when it dies the + kernel hands its exit status back to us as a zombie until we collect it + with ``waitpid``. This method does that collecting, and is meant to be + called once per run-loop tick (typically after a ``SIGCHLD``). + + ``waitpid(-1, WNOHANG)`` asks the kernel for any one dead child without + blocking. It returns ``(pid, status)`` for a child it reaped, or + ``(0, 0)`` when children are still alive but none have exited. Several + companions can die between two ticks, so we loop until one of those two + stop conditions is hit: ``(0, 0)`` (nothing more to reap right now) or + ``ChildProcessError`` (no child processes exist at all). + + For each reaped pid we look up its companion, then in order: record the + exit (signal or code, time, count), free the pid, and move it to its + next public state via :meth:`handle_exit` -- STOPPED if it was stopped + on purpose, otherwise BACKOFF for a later restart. Pids we don't + recognise are ignored. Returns the list of companions reaped this call. + """ + reaped = [] + while True: + try: + pid, status = os.waitpid(-1, os.WNOHANG) + except ChildProcessError: + break + if pid == 0: + break + process = self._process_by_pid(pid) + if process is not None: + self._record_exit(process, status) + self._log_exit(process) + self.handle_exit(process) + reaped.append(process) + return reaped + + def _log_exit(self, process: CompanionProcess) -> None: + """Log how a reaped companion exited, before its fate is decided.""" + if process.last_exit_signal is not None: + self.log.info("companion %s exited on signal %s", + process.name, process.last_exit_signal) + else: + self.log.info("companion %s exited with status %s", + process.name, process.last_exit_code) + + def handle_exit(self, process: CompanionProcess, now: float = None) -> None: + """Decide a companion's fate after it exits: restart, stop, or back off. + + A pending restart wins: the old child was asked to stop only so a fresh + one could take its place, so it is respawned immediately. Otherwise a + companion that was stopped on purpose settles in STOPPED and stays + there, and any other exit is unexpected, so it enters BACKOFF and is + scheduled to restart after a fixed ``restart_delay`` (no exponential + backoff, no retry cap). + """ + now = now or time.time() + if process.restart_pending: + process.restart_pending = False + process.restart_count += 1 + self.log.info("companion %s restarting", process.name) + self.spawn_process(process) + return + if process.manual_stop: + process.state = State.STOPPED + process.next_retry_at = None + self.log.info("companion %s stopped", process.name) + return + process.state = State.BACKOFF + process.next_retry_at = now + process.restart_delay + self.log.info("companion %s backing off, retrying in %ss", + process.name, process.restart_delay) + + def retry_backoff(self, now: float = None) -> list: + """Respawn BACKOFF companions whose fixed retry delay has elapsed. + + Each retry bumps ``restart_count`` and re-forks the companion, which + puts it back into STARTING. Returns the companions that were retried. + """ + now = now or time.time() + retried = [] + for process in self.processes.values(): + if process.state != State.BACKOFF or process.next_retry_at is None: + continue + if now >= process.next_retry_at: + process.restart_count += 1 + process.next_retry_at = None + self.spawn_process(process) + retried.append(process) + return retried + + def promote_running(self, now: float = None) -> list: + """Move companions that survived ``startsecs`` from STARTING to RUNNING. + + A freshly spawned companion starts in STARTING. If it stays alive for + its ``startsecs`` window it is considered up and becomes RUNNING; if it + dies first, reaping handles it instead. Returns the promoted ones. + """ + now = now or time.time() + promoted = [] + for process in self.processes.values(): + if process.state != State.STARTING or process.started_at is None: + continue + if now - process.started_at >= process.config.startsecs: + process.state = State.RUNNING + self.log.info("companion %s running (pid %s)", process.name, process.pid) + promoted.append(process) + return promoted + + def _process_by_pid(self, pid: int): + for process in self.processes.values(): + if process.pid == pid: + return process + return None + + @staticmethod + def _record_exit(process: CompanionProcess, status: int) -> None: + """Store how a companion died: signal number or exit code, plus time. + + ``status`` is the packed value from ``waitpid``. ``WIFSIGNALED`` tells + us a signal killed it, in which case ``WTERMSIG`` gives the signal + number; otherwise it exited normally and ``WEXITSTATUS`` gives its exit + code. Only one of the two is ever set, so the other is cleared. + """ + if os.WIFSIGNALED(status): + process.last_exit_signal = os.WTERMSIG(status) + process.last_exit_code = None + else: + process.last_exit_code = os.WEXITSTATUS(status) + process.last_exit_signal = None + process.exited_at = time.time() + process.exit_count += 1 + process.pid = None + + @staticmethod + def _apply_environment(config: CompanionConfig) -> None: + """Apply ``cwd`` and ``env`` in the child before running the target. + + cwd is changed first so a relative path in env (or the target itself) + resolves against it. env is merged onto the inherited environment, not + replaced, so the companion keeps the manager's variables. + """ + if config.cwd: + os.chdir(config.cwd) + if config.env: + os.environ.update(config.env) + + @staticmethod + def _redirect_output(config: CompanionConfig) -> None: + """Send the companion's stdout and stderr to its configured log files. + + By default a companion just inherits the manager's stdout/stderr, so + leaving these unset (or ``"inherit"``) keeps that. Give a file path and + we append the output there instead. For stderr you can also pass + ``"stdout"`` to fold the two streams into one file. + """ + stdout_fd = CompanionManager._open_output(config.stdout) + if stdout_fd is not None: + os.dup2(stdout_fd, 1) + os.close(stdout_fd) + if config.stderr == "stdout": + os.dup2(1, 2) + else: + stderr_fd = CompanionManager._open_output(config.stderr) + if stderr_fd is not None: + os.dup2(stderr_fd, 2) + os.close(stderr_fd) + + @staticmethod + def _open_output(value): + """Open one log file for writing, or return None to leave the stream + as-is when the companion should keep inheriting it.""" + if value in (None, "inherit"): + return None + return os.open(value, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 0o644) + + @staticmethod + def _resolve_target(target: Union[Callable, str]) -> Callable: + """Return the zero-arg callable for a companion target. + + Accepts an already-callable target or a ``"module:attr"`` import + string, e.g. ``"frappe_companions:start_rq_default"``. + """ + if callable(target): + return target + module_name, separator, attribute = target.partition(":") + if not separator: + raise ValueError("companion target %r must be 'module:callable'" % target) + return getattr(importlib.import_module(module_name), attribute) diff --git a/gunicorn/companion/process.py b/gunicorn/companion/process.py new file mode 100644 index 00000000..bbc28ae6 --- /dev/null +++ b/gunicorn/companion/process.py @@ -0,0 +1,117 @@ +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + + +import time +from enum import Enum + +from gunicorn.util import format_uptime + + +class State(str, Enum): + """Public states, mimicking ``supervisorctl status``. + + The manager never exposes EXITED/FATAL/UNKNOWN; an exited companion is + either STOPPED (manual) or BACKOFF (waiting to restart). Members subclass + ``str`` so they compare and JSON-serialize as their plain value. + """ + + STOPPED = "STOPPED" + STARTING = "STARTING" + RUNNING = "RUNNING" + BACKOFF = "BACKOFF" + STOPPING = "STOPPING" + + +class CompanionProcess: + """Runtime state for one companion, separate from its static config. + + Holds everything ``status`` needs: current public state, live pid, restart + and exit counters, last exit info, and the ``manual_stop`` flag that keeps a + user-stopped companion from auto-restarting. + """ + + def __init__(self, config): + self.config = config + self.state = State.STOPPED + self.pid = None + self.restart_delay = config.restart_delay + + self.started_at = None + self.exited_at = None + self.next_retry_at = None + self.stop_deadline = None + + self.restart_count = 0 + self.exit_count = 0 + self.kill_count = 0 + + self.last_exit_code = None + self.last_exit_signal = None + + self.manual_stop = False + self.restart_pending = False + + @property + def name(self): + return self.config.name + + def uptime(self, now=None): + """Seconds since this companion last started, or ``None`` if not up.""" + if self.state not in (State.RUNNING, State.STARTING) or self.started_at is None: + return None + return (now or time.time()) - self.started_at + + def description(self, now=None): + """Human one-liner: state label plus runtime details.""" + now = now or time.time() + label = self.state.lower() + detail = self._detail(now) + return "%s, %s" % (label, detail) if detail else label + + def _detail(self, now): + if self.state == State.RUNNING: + return "pid %s, uptime %s" % ( + self.pid, + format_uptime(self.uptime(now) or 0), + ) + if self.state == State.BACKOFF: + return self._backoff_detail(now) + if self.state == State.STOPPED: + return self._stopped_detail() + return "" + + def _backoff_detail(self, now): + if self.next_retry_at is not None: + seconds_left = max(0, int(self.next_retry_at - now)) + return "exited with %s, retrying in %ds" % (self._exit_status(), seconds_left) + return "exited with %s" % self._exit_status() + + def _stopped_detail(self): + if self.manual_stop: + return "stopped manually" + if self.exited_at is not None: + return "exited with %s" % self._exit_status() + return "not started" + + def _exit_status(self): + if self.last_exit_signal is not None: + return "signal %s" % self.last_exit_signal + return "status %s" % self.last_exit_code + + def status_dict(self, now=None): + """Machine-readable status entry for the JSON control protocol.""" + backoff = self.state == State.BACKOFF + return { + "name": self.name, + "state": self.state, + "pid": self.pid, + "description": self.description(now or time.time()), + "next_retry_at": self.next_retry_at if backoff else None, + "restart_delay": self.restart_delay if backoff else None, + "last_exit_code": self.last_exit_code if backoff else None, + } + + def __repr__(self): + return "" % (self.name, self.state) diff --git a/gunicorn/config.py b/gunicorn/config.py index f6c69a5f..ccaa7844 100644 --- a/gunicorn/config.py +++ b/gunicorn/config.py @@ -354,6 +354,31 @@ def validate_dict(val): return val +def validate_pos_int_or_none(val): + if val is None: + return None + return validate_pos_int(val) + + +def validate_companion_workers(val): + if val is None: + return [] + if not isinstance(val, list): + raise TypeError("companion_workers must be a list: %s" % val) + for item in val: + if not isinstance(item, dict): + raise TypeError("each companion worker must be a dict: %s" % item) + return val + + +def validate_octal_or_none(val): + if val is None: + return None + if isinstance(val, str): + val = int(val, 8) + return int(val) + + def validate_pos_int(val): if not isinstance(val, int): val = int(val, 0) @@ -2502,3 +2527,199 @@ class HeaderMap(Setting): .. versionadded:: 22.0.0 """ + + +class CompanionConfigFile(Setting): + name = "companion_config_file" + section = "Companion Processes" + cli = ["--companion-config"] + validator = validate_string + default = None + desc = """\ + Dedicated Python file the manager reads for companion specs. + + Lets companions reload independently of the main Gunicorn config. + Example: ``companion_config_file = "/home/frappe/bench/companion.conf.py"``. + If unset, specs are read from the main Gunicorn config. + """ + + +class CompanionWorkers(Setting): + name = "companion_workers" + section = "Companion Processes" + validator = validate_companion_workers + default = [] + desc = """\ + List of companion process specs, each a dict. + + A spec requires ``name`` and ``target``; other keys override defaults. + Example: ``[{"name": "scheduler", "target": "app:run_scheduler"}]``. + """ + + +class CompanionControlSocket(Setting): + name = "companion_control_socket" + section = "Companion Processes" + cli = ["--companion-control-socket"] + validator = validate_string + default = None + desc = """\ + Unix socket the manager listens on for control commands. + + Clients send ``status``/``start``/``stop``/``restart``/``reread`` as JSON. + Example: ``companion_control_socket = "/run/gunicorn/companion.sock"``. + """ + + +class CompanionControlSocketMode(Setting): + name = "companion_control_socket_mode" + section = "Companion Processes" + validator = validate_octal_or_none + default = 0o600 + desc = """\ + Octal file mode set on the control socket after it is created. + + Default ``0o600`` lets only the owner connect. + Example: ``companion_control_socket_mode = 0o660``. + """ + + +class CompanionStopSignal(Setting): + name = "companion_stop_signal" + section = "Companion Processes" + validator = validate_string + default = "SIGTERM" + desc = """\ + Signal sent first when stopping a companion. + + The companion should catch it to drain work before exiting. + Example: ``companion_stop_signal = "SIGINT"``. + """ + + +class CompanionStopTimeout(Setting): + name = "companion_stop_timeout" + section = "Companion Processes" + validator = validate_pos_int + default = 60 + desc = """\ + Seconds to wait after the stop signal before sending SIGKILL. + + Give slow-draining workers a larger value. + Example: ``companion_stop_timeout = 300``. + """ + + +class CompanionReloadTimeout(Setting): + name = "companion_reload_timeout" + section = "Companion Processes" + validator = validate_pos_int + default = 60 + desc = """\ + Seconds to wait for the old child to exit during restart/reread. + + Used instead of ``stop_timeout`` when replacing a companion. + Example: ``companion_reload_timeout = 30``. + """ + + +class CompanionStdout(Setting): + name = "companion_stdout" + section = "Companion Processes" + validator = validate_string + default = None + desc = """\ + Where to send companion stdout: a file path or ``"inherit"``. + + Files are opened in append mode; ``None`` inherits the manager's stdout. + Example: ``companion_stdout = "/var/log/frappe/rq.log"``. + """ + + +class CompanionStderr(Setting): + name = "companion_stderr" + section = "Companion Processes" + validator = validate_string + default = None + desc = """\ + Where to send companion stderr: a path, ``"stdout"``, or ``"inherit"``. + + Use ``"stdout"`` to merge stderr into the stdout target. + Example: ``companion_stderr = "stdout"``. + """ + + +class CompanionCwd(Setting): + name = "companion_cwd" + section = "Companion Processes" + validator = validate_string + default = None + desc = """\ + Directory the child changes into before running the target. + + Example: ``companion_cwd = "/home/frappe/frappe-bench"``. + """ + + +class CompanionEnv(Setting): + name = "companion_env" + section = "Companion Processes" + validator = validate_dict + default = {} + desc = """\ + Extra environment variables merged into the child environment. + + Example: ``companion_env = {"QUEUE": "default"}``. + """ + + +class CompanionStartsecs(Setting): + name = "companion_startsecs" + section = "Companion Processes" + validator = validate_pos_int + default = 1 + desc = """\ + Seconds a freshly forked companion must survive to reach RUNNING. + + Exiting earlier counts as a failed start and triggers BACKOFF. + Example: ``companion_startsecs = 5``. + """ + + +class CompanionRestartDelay(Setting): + name = "companion_restart_delay" + section = "Companion Processes" + validator = validate_pos_int + default = 5 + desc = """\ + Fixed seconds to wait before restarting a crashed companion. + + No exponential backoff; the same delay is used every time. + Example: ``companion_restart_delay = 10``. + """ + + +class CompanionManagerShutdownBuffer(Setting): + name = "companion_manager_shutdown_buffer" + section = "Companion Processes" + validator = validate_pos_int + default = 10 + desc = """\ + Extra seconds added to the slowest companion timeout for the manager. + + Gives the manager headroom to stop all children before Gunicorn kills it. + Example: ``companion_manager_shutdown_buffer = 15``. + """ + + +class CompanionManagerStopTimeout(Setting): + name = "companion_manager_stop_timeout" + section = "Companion Processes" + validator = validate_pos_int_or_none + default = None + desc = """\ + Seconds Gunicorn waits for the manager to stop during shutdown. + + By default it uses the slowest companion ``stop_timeout`` plus a buffer. + Example: ``companion_manager_stop_timeout = 120``. + """ diff --git a/gunicorn/util.py b/gunicorn/util.py index e66dbebf..73afb70c 100644 --- a/gunicorn/util.py +++ b/gunicorn/util.py @@ -647,3 +647,15 @@ def bytes_to_str(b): def unquote_to_wsgi_str(string): return urllib.parse.unquote_to_bytes(string).decode('latin-1') + + +def format_uptime(seconds): + """Render a duration like supervisor: ``2 days, 03:12:44`` or ``0:05:12``.""" + seconds = int(seconds) + days, remainder = divmod(seconds, 86400) + hours, remainder = divmod(remainder, 3600) + minutes, remaining_seconds = divmod(remainder, 60) + if days: + unit = "day" if days == 1 else "days" + return "%d %s, %02d:%02d:%02d" % (days, unit, hours, minutes, remaining_seconds) + return "%d:%02d:%02d" % (hours, minutes, remaining_seconds) diff --git a/pyproject.toml b/pyproject.toml index eaca1eac..b84366a7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -66,6 +66,7 @@ testing = [ [project.scripts] # duplicates "python -m gunicorn" handling in __main__.py gunicorn = "gunicorn.app.wsgiapp:run" +gunicorn-companion = "gunicorn.companion.ctl:run" # note the quotes around "paste.server_runner" to escape the dot [project.entry-points."paste.server_runner"] diff --git a/tests/test_arbiter.py b/tests/test_arbiter.py index 8c1527e2..c7747194 100644 --- a/tests/test_arbiter.py +++ b/tests/test_arbiter.py @@ -2,11 +2,15 @@ # This file is part of gunicorn released under the MIT license. # See the NOTICE for more information. +import errno import os +import signal +import time from unittest import mock import gunicorn.app.base import gunicorn.arbiter +from gunicorn.companion.config import build_companion_configs from gunicorn.config import ReusePort @@ -147,6 +151,244 @@ def test_arbiter_reap_workers(mock_os_waitpid): arbiter.cfg.child_exit.assert_called_with(arbiter, mock_worker) +def test_arbiter_manage_companion_manager_spawns_when_configured(): + arbiter = gunicorn.arbiter.Arbiter(DummyApplication()) + arbiter.cfg.set("companion_workers", [{"name": "rq", "target": "pkg:run"}]) + arbiter.spawn_companion_manager = mock.Mock() + arbiter.manage_companion_manager() + arbiter.spawn_companion_manager.assert_called_once_with() + + +def test_arbiter_manage_companion_manager_noop_without_companions(): + arbiter = gunicorn.arbiter.Arbiter(DummyApplication()) + arbiter.spawn_companion_manager = mock.Mock() + arbiter.manage_companion_manager() + arbiter.spawn_companion_manager.assert_not_called() + + +def test_arbiter_manage_companion_manager_noop_when_running(): + arbiter = gunicorn.arbiter.Arbiter(DummyApplication()) + arbiter.cfg.set("companion_workers", [{"name": "rq", "target": "pkg:run"}]) + arbiter.companion_manager_pid = 4242 + arbiter.spawn_companion_manager = mock.Mock() + arbiter.manage_companion_manager() + arbiter.spawn_companion_manager.assert_not_called() + + +@mock.patch('os.waitpid') +def test_arbiter_reap_clears_companion_manager_pid(mock_os_waitpid): + mock_os_waitpid.side_effect = [(4242, 0), (0, 0)] + arbiter = gunicorn.arbiter.Arbiter(DummyApplication()) + arbiter.companion_manager_pid = 4242 + arbiter.reap_workers() + assert arbiter.companion_manager_pid == 0 + + +@mock.patch('os.waitpid') +def test_arbiter_reap_unexpected_manager_exit_backs_off(mock_os_waitpid): + # An unexpected manager exit (no deliberate stop) is an error and arms the + # crash backoff so the main loop does not respawn it immediately. + mock_os_waitpid.side_effect = [(4242, 0), (0, 0)] + arbiter = gunicorn.arbiter.Arbiter(DummyApplication()) + arbiter.companion_manager_pid = 4242 + arbiter.log = mock.Mock() + arbiter.reap_workers() + assert arbiter.companion_manager_pid == 0 + assert arbiter._companion_manager_failures == 1 + assert arbiter._companion_manager_respawn_at > 0 + arbiter.log.error.assert_called_once() + arbiter.log.info.assert_not_called() + + +@mock.patch('os.waitpid') +def test_arbiter_reap_deliberate_manager_exit_is_info(mock_os_waitpid): + # A deliberate stop (stopping flag set) is an expected exit: logged as info + # with no backoff, so a reload respawns without delay. + mock_os_waitpid.side_effect = [(4242, 0), (0, 0)] + arbiter = gunicorn.arbiter.Arbiter(DummyApplication()) + arbiter.companion_manager_pid = 4242 + arbiter._companion_manager_stopping = True + arbiter.log = mock.Mock() + arbiter.reap_workers() + assert arbiter.companion_manager_pid == 0 + assert arbiter._companion_manager_stopping is False + assert arbiter._companion_manager_respawn_at == 0 + arbiter.log.info.assert_called_once() + arbiter.log.error.assert_not_called() + + +def test_arbiter_manage_companion_manager_waits_during_backoff(): + # While inside the crash-backoff window the manager is not respawned. + arbiter = gunicorn.arbiter.Arbiter(DummyApplication()) + arbiter.cfg.set("companion_workers", [{"name": "rq", "target": "pkg:run"}]) + arbiter._companion_manager_respawn_at = time.monotonic() + 60 + arbiter.spawn_companion_manager = mock.Mock() + arbiter.manage_companion_manager() + arbiter.spawn_companion_manager.assert_not_called() + + +def test_stop_companion_manager_marks_expected_and_clears_backoff(): + arbiter = gunicorn.arbiter.Arbiter(DummyApplication()) + arbiter.companion_manager_pid = 4242 + arbiter._companion_manager_failures = 3 + arbiter._companion_manager_respawn_at = time.monotonic() + 60 + with mock.patch("os.kill"): + arbiter.stop_companion_manager(signal.SIGTERM) + assert arbiter._companion_manager_stopping is True + assert arbiter._companion_manager_failures == 0 + assert arbiter._companion_manager_respawn_at == 0 + + +def test_stop_companion_manager_signals_running(): + arbiter = gunicorn.arbiter.Arbiter(DummyApplication()) + arbiter.companion_manager_pid = 4242 + with mock.patch("os.kill") as kill: + arbiter.stop_companion_manager(signal.SIGTERM) + kill.assert_called_once_with(4242, signal.SIGTERM) + + +def test_stop_companion_manager_noop_when_not_running(): + arbiter = gunicorn.arbiter.Arbiter(DummyApplication()) + with mock.patch("os.kill") as kill: + arbiter.stop_companion_manager(signal.SIGTERM) + kill.assert_not_called() + + +def test_stop_companion_manager_clears_pid_when_already_gone(): + arbiter = gunicorn.arbiter.Arbiter(DummyApplication()) + arbiter.companion_manager_pid = 4242 + with mock.patch("os.kill", side_effect=OSError(errno.ESRCH, "no such process")): + arbiter.stop_companion_manager(signal.SIGTERM) + assert arbiter.companion_manager_pid == 0 + + +@mock.patch('os.waitpid') +def test_worker_reap_unaffected_by_companion_manager(mock_os_waitpid): + # A worker exit is still reaped normally while a companion manager runs; + # the companion reap branch must not swallow worker exits. + mock_os_waitpid.side_effect = [(42, 0), (0, 0)] + arbiter = gunicorn.arbiter.Arbiter(DummyApplication()) + arbiter.cfg.settings['child_exit'] = mock.Mock() + arbiter.companion_manager_pid = 9999 + mock_worker = mock.Mock() + arbiter.WORKERS = {42: mock_worker} + arbiter.reap_workers() + mock_worker.tmp.close.assert_called_with() + arbiter.cfg.child_exit.assert_called_with(arbiter, mock_worker) + assert arbiter.companion_manager_pid == 9999 + + +@mock.patch('os.fork', return_value=77) +def test_spawn_worker_unaffected_by_companions(mock_os_fork): + # With companions configured, an HTTP worker is still spawned and recorded + # exactly as before; companion config does not touch the worker path. + arbiter = gunicorn.arbiter.Arbiter(DummyApplication()) + arbiter.cfg.set("companion_workers", [{"name": "rq", "target": "pkg:run"}]) + arbiter.pid = 1234 + arbiter.WORKERS = {} # instance dict, do not mutate the shared class attr + mock_worker = mock.Mock() + arbiter.worker_class = mock.Mock(return_value=mock_worker) + pid = arbiter.spawn_worker() + assert pid == 77 + assert arbiter.WORKERS[77] is mock_worker + + +def test_close_gunicorn_fds_in_manager_child(): + arbiter = gunicorn.arbiter.Arbiter(DummyApplication()) + listener = mock.Mock() + worker = mock.Mock() + arbiter.LISTENERS = [listener] + arbiter.WORKERS = {1: worker} + arbiter.PIPE = [7, 8] + with mock.patch("os.close") as os_close: + arbiter._close_gunicorn_fds() + listener.close.assert_called_once_with() + worker.tmp.close.assert_called_once_with() + assert os_close.call_count == 2 + + +def test_reload_companion_manager_restarts_running(): + arbiter = gunicorn.arbiter.Arbiter(DummyApplication()) + arbiter.cfg.set("companion_workers", [{"name": "rq", "target": "pkg:run"}]) + arbiter.companion_manager_pid = 4242 + arbiter.stop_companion_manager = mock.Mock() + arbiter.spawn_companion_manager = mock.Mock() + arbiter.reload_companion_manager() + arbiter.stop_companion_manager.assert_called_once_with(signal.SIGTERM) + # pid still set (stop is mocked), so no respawn until the old one is reaped + arbiter.spawn_companion_manager.assert_not_called() + + +def test_reload_companion_manager_starts_when_none_running(): + arbiter = gunicorn.arbiter.Arbiter(DummyApplication()) + arbiter.cfg.set("companion_workers", [{"name": "rq", "target": "pkg:run"}]) + arbiter.stop_companion_manager = mock.Mock() + arbiter.spawn_companion_manager = mock.Mock() + arbiter.reload_companion_manager() + arbiter.stop_companion_manager.assert_not_called() + arbiter.spawn_companion_manager.assert_called_once_with() + + +def test_reload_companion_manager_noop_when_config_unchanged(): + arbiter = gunicorn.arbiter.Arbiter(DummyApplication()) + arbiter.cfg.set("companion_workers", [{"name": "rq", "target": "pkg:run"}]) + # The running manager was spawned with this exact config. + arbiter._companion_configs = build_companion_configs(arbiter.cfg) + arbiter.companion_manager_pid = 4242 + arbiter.stop_companion_manager = mock.Mock() + arbiter.spawn_companion_manager = mock.Mock() + arbiter.reload_companion_manager() + # A web reload with unchanged companion specs leaves the manager alone. + arbiter.stop_companion_manager.assert_not_called() + arbiter.spawn_companion_manager.assert_not_called() + + +def test_reload_companion_manager_restarts_when_field_changed(): + arbiter = gunicorn.arbiter.Arbiter(DummyApplication()) + arbiter.cfg.set("companion_workers", + [{"name": "rq", "target": "pkg:run", "startsecs": 1}]) + arbiter._companion_configs = build_companion_configs(arbiter.cfg) + arbiter.companion_manager_pid = 4242 + # Same name, changed field -> different config_hash -> reload. + arbiter.cfg.set("companion_workers", + [{"name": "rq", "target": "pkg:run", "startsecs": 9}]) + arbiter.stop_companion_manager = mock.Mock() + arbiter.spawn_companion_manager = mock.Mock() + arbiter.reload_companion_manager() + arbiter.stop_companion_manager.assert_called_once_with(signal.SIGTERM) + + +@mock.patch('gunicorn.sock.close_sockets') +def test_arbiter_stop_signals_companion_manager(close_sockets): + arbiter = gunicorn.arbiter.Arbiter(DummyApplication()) + arbiter.stop_companion_manager = mock.Mock() + arbiter.stop() + signals = [call.args[0] for call in arbiter.stop_companion_manager.call_args_list] + assert signal.SIGTERM in signals + assert signal.SIGKILL in signals + + +def test_companion_manager_stop_timeout_uses_explicit(): + arbiter = gunicorn.arbiter.Arbiter(DummyApplication()) + arbiter.cfg.set("companion_manager_stop_timeout", 120) + assert arbiter.companion_manager_stop_timeout() == 120 + + +def test_companion_manager_stop_timeout_derives_from_slowest(): + arbiter = gunicorn.arbiter.Arbiter(DummyApplication()) + arbiter.cfg.set("companion_workers", [ + {"name": "rq", "target": "pkg:run", "stop_timeout": 300}, + {"name": "scheduler", "target": "pkg:sched", "stop_timeout": 30}, + ]) + arbiter.cfg.set("companion_manager_shutdown_buffer", 10) + assert arbiter.companion_manager_stop_timeout() == 310 + + +def test_companion_manager_stop_timeout_zero_without_companions(): + arbiter = gunicorn.arbiter.Arbiter(DummyApplication()) + assert arbiter.companion_manager_stop_timeout() == 0 + + class PreloadedAppWithEnvSettings(DummyApplication): """ Simple application that makes use of the 'preload' feature to diff --git a/tests/test_companion_config.py b/tests/test_companion_config.py new file mode 100644 index 00000000..8a0a4f53 --- /dev/null +++ b/tests/test_companion_config.py @@ -0,0 +1,129 @@ +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +import pytest + +from gunicorn.config import Config, validate_companion_workers +from gunicorn.companion.config import CompanionConfig, build_companion_configs + + +def make_config(workers, **overrides): + cfg = Config() + cfg.set("companion_workers", workers) + for key, value in overrides.items(): + cfg.set(key, value) + return cfg + + +def test_build_applies_global_defaults(): + cfg = make_config( + [{"name": "rq", "target": "pkg:run"}], + companion_stop_signal="SIGINT", + companion_startsecs=5) + config, = build_companion_configs(cfg) + assert config.name == "rq" + assert config.target == "pkg:run" + assert config.stop_signal == "SIGINT" + assert config.startsecs == 5 + + +def test_build_per_spec_overrides_global(): + cfg = make_config( + [{"name": "rq", "target": "pkg:run", "stop_signal": "SIGTERM"}], + companion_stop_signal="SIGINT") + config, = build_companion_configs(cfg) + assert config.stop_signal == "SIGTERM" + + +def test_build_applies_global_restart_delay(): + cfg = make_config( + [{"name": "rq", "target": "pkg:run"}], + companion_restart_delay=42) + config, = build_companion_configs(cfg) + assert config.restart_delay == 42 + + +def test_build_rejects_unknown_spec_key(): + with pytest.raises(ValueError): + build_companion_configs( + make_config([{"name": "rq", "target": "pkg:run", "stop_sigal": "X"}])) + + +def test_build_rejects_unknown_stop_signal(): + with pytest.raises(ValueError): + build_companion_configs( + make_config([{"name": "rq", "target": "pkg:run", + "stop_signal": "SIGTRM"}])) + + +def test_build_rejects_unknown_global_stop_signal(): + with pytest.raises(ValueError): + build_companion_configs( + make_config([{"name": "rq", "target": "pkg:run"}], + companion_stop_signal="NOPE")) + + +def test_build_reads_companion_config_file(tmp_path): + config_file = tmp_path / "companion.conf.py" + config_file.write_text( + 'companion_workers = [{"name": "scheduler", "target": "app:run"}]\n' + 'companion_stop_signal = "SIGINT"\n') + cfg = make_config([], companion_config_file=str(config_file)) + config, = build_companion_configs(cfg) + assert config.name == "scheduler" + assert config.stop_signal == "SIGINT" + + +def test_build_empty_when_none_configured(): + assert build_companion_configs(make_config([])) == [] + + +def test_build_requires_name_and_target(): + with pytest.raises(ValueError): + build_companion_configs(make_config([{"name": "rq"}])) + with pytest.raises(ValueError): + build_companion_configs(make_config([{"target": "pkg:run"}])) + + +def test_validate_companion_workers_accepts_none_and_list(): + assert validate_companion_workers(None) == [] + workers = [{"name": "rq", "target": "pkg:run"}] + assert validate_companion_workers(workers) == workers + + +def test_validate_companion_workers_rejects_non_list(): + with pytest.raises(TypeError): + validate_companion_workers("rq") + + +def test_validate_companion_workers_rejects_non_dict_item(): + with pytest.raises(TypeError): + validate_companion_workers(["rq"]) + + +def test_config_hash_stable_and_field_sensitive(): + base = CompanionConfig(name="rq", target="pkg:run") + same = CompanionConfig(name="rq", target="pkg:run") + changed = CompanionConfig(name="rq", target="pkg:run", stop_timeout=99) + assert base.config_hash == same.config_hash + assert base.config_hash != changed.config_hash + + +def test_config_hash_ignores_restart_delay(): + # restart_delay does not affect the spawned process, so changing it must not + # change the hash (and so must not trigger a restart on reread). + base = CompanionConfig(name="rq", target="pkg:run", restart_delay=5) + changed = CompanionConfig(name="rq", target="pkg:run", restart_delay=30) + assert base.config_hash == changed.config_hash + + +def test_config_hash_keys_callable_target_by_qualified_name(): + def run(): + pass + + keyed = CompanionConfig._target_key(run) + assert ":" in keyed and keyed.endswith("run") + # A callable target hashes stably across CompanionConfig instances. + assert (CompanionConfig(name="rq", target=run).config_hash + == CompanionConfig(name="rq", target=run).config_hash) diff --git a/tests/test_companion_control.py b/tests/test_companion_control.py new file mode 100644 index 00000000..8607583e --- /dev/null +++ b/tests/test_companion_control.py @@ -0,0 +1,185 @@ +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +import json +from unittest import mock + +import pytest + +from gunicorn.companion.config import CompanionConfig +from gunicorn.companion.control import ( + MAX_LINE_BYTES, + CommandError, + ControlServer, + decode_command, + encode_response, +) +from gunicorn.companion.manager import CompanionManager + + +def make_manager(*names): + configs = [CompanionConfig(name=name, target=lambda: None) for name in names] + return CompanionManager(configs, log=mock.Mock()) + + +def server_for(manager): + return ControlServer(dispatch=manager.handle_command, path="/tmp/x.sock") + + +def test_decode_command_valid(): + assert decode_command('{"cmd": "status"}') == {"cmd": "status"} + + +def test_decode_command_bad_json(): + with pytest.raises(CommandError): + decode_command("{not json") + + +def test_decode_command_not_object(): + with pytest.raises(CommandError): + decode_command("[1, 2, 3]") + + +def test_decode_command_missing_cmd(): + with pytest.raises(CommandError): + decode_command('{"name": "rq"}') + + +def test_encode_response_newline_terminated(): + response = encode_response({"ok": True}) + assert response.endswith(b"\n") + assert json.loads(response) == {"ok": True} + + +def test_handle_line_dispatches(): + server = ControlServer( + dispatch=lambda command: {"ok": True, "echo": command["cmd"]}, + path="/tmp/x.sock") + response = server.handle_line('{"cmd": "status"}') + assert json.loads(response) == {"ok": True, "echo": "status"} + + +def test_handle_line_bad_json_error_envelope(): + server = ControlServer(dispatch=lambda command: {"ok": True}, path="/tmp/x.sock") + response = json.loads(server.handle_line("garbage")) + assert response["ok"] is False and "JSON" in response["error"] + + +def test_handle_line_dispatch_command_error(): + def dispatch(command): + raise CommandError("unknown command") + server = ControlServer(dispatch=dispatch, path="/tmp/x.sock") + response = json.loads(server.handle_line('{"cmd": "bogus"}')) + assert response["ok"] is False and response["error"] == "unknown command" + + +def test_handle_line_unexpected_exception_caught(): + def dispatch(command): + raise ValueError("unknown stop signal 'SIGTRM'") + server = ControlServer(dispatch=dispatch, path="/tmp/x.sock", + log=mock.Mock()) + response = json.loads(server.handle_line('{"cmd": "stop", "name": "rq"}')) + assert response["ok"] is False and "internal error" in response["error"] + + +def test_create_unlinks_stale_and_chmods(): + server = ControlServer(dispatch=lambda command: {}, path="/tmp/x.sock", + mode=0o600) + listener = mock.Mock() + with mock.patch("os.path.exists", return_value=True), \ + mock.patch("os.unlink") as unlink, \ + mock.patch("socket.socket", return_value=listener), \ + mock.patch("os.chmod") as chmod: + server.create() + unlink.assert_called_once_with("/tmp/x.sock") + listener.bind.assert_called_once_with("/tmp/x.sock") + chmod.assert_called_once_with("/tmp/x.sock", 0o600) + listener.listen.assert_called_once() + + +def test_close_unlinks(): + server = ControlServer(dispatch=lambda command: {}, path="/tmp/x.sock") + server.listener = mock.Mock() + with mock.patch("os.path.exists", return_value=True), \ + mock.patch("os.unlink") as unlink: + server.close() + unlink.assert_called_once_with("/tmp/x.sock") + assert server.listener is None + + +def test_control_status_command_end_to_end(): + manager = make_manager("rq") + response = json.loads(server_for(manager).handle_line('{"cmd": "status"}')) + assert response["ok"] is True + assert response["companions"][0]["name"] == "rq" + + +def test_control_start_command_end_to_end(): + manager = make_manager("rq") + with mock.patch("os.fork", return_value=10): + response = json.loads( + server_for(manager).handle_line('{"cmd": "start", "name": "rq"}')) + assert response["ok"] is True + assert "rq" in response["message"] + + +def test_control_unknown_command_error_envelope(): + manager = make_manager("rq") + response = json.loads( + server_for(manager).handle_line('{"cmd": "bogus", "name": "rq"}')) + assert response["ok"] is False + assert "unknown" in response["error"] + + +def test_control_missing_name_error_envelope(): + manager = make_manager("rq") + response = json.loads(server_for(manager).handle_line('{"cmd": "start"}')) + assert response["ok"] is False + assert "name" in response["error"] + + +def test_control_reread_without_loader_error_envelope(): + manager = make_manager("rq") + response = json.loads(server_for(manager).handle_line('{"cmd": "reread"}')) + assert response["ok"] is False + assert "reread" in response["error"] + + +class FakeConnection: + """Minimal connection stand-in for serve_connection: yields preset chunks.""" + + def __init__(self, chunks): + self._chunks = list(chunks) + self.sent = [] + + def __enter__(self): + return self + + def __exit__(self, *exc): + return False + + def recv(self, _size): + return self._chunks.pop(0) if self._chunks else b"" + + def sendall(self, data): + self.sent.append(data) + + +def test_serve_connection_drops_oversized_line(): + log = mock.Mock() + server = ControlServer(dispatch=lambda command: {"ok": True}, + path="/tmp/x.sock", log=log) + # A flood with no newline: the connection is dropped, nothing dispatched. + connection = FakeConnection([b"x" * (MAX_LINE_BYTES + 1)]) + server.serve_connection(connection) + assert connection.sent == [] + log.warning.assert_called_once() + + +def test_serve_connection_answers_complete_line(): + manager = make_manager("rq") + connection = FakeConnection([b'{"cmd": "status"}\n']) + server_for(manager).serve_connection(connection) + assert len(connection.sent) == 1 + assert json.loads(connection.sent[0])["ok"] is True diff --git a/tests/test_companion_ctl.py b/tests/test_companion_ctl.py new file mode 100644 index 00000000..d9969762 --- /dev/null +++ b/tests/test_companion_ctl.py @@ -0,0 +1,61 @@ +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +from unittest import mock + +import pytest + +from gunicorn.companion import ctl + + +def test_run_status_prints_and_returns_zero(capsys): + with mock.patch.object( + ctl, "send_command", return_value={"ok": True, "companions": []} + ) as send: + code = ctl.run(["--socket", "/tmp/x.sock", "status"]) + assert code == 0 + send.assert_called_once_with("/tmp/x.sock", {"cmd": "status"}) + assert "ok" in capsys.readouterr().out + + +def test_run_per_name_command_sends_name(): + with mock.patch.object( + ctl, "send_command", return_value={"ok": True, "message": "x"} + ) as send: + code = ctl.run(["--socket", "/tmp/x.sock", "stop", "ticker"]) + assert code == 0 + send.assert_called_once_with("/tmp/x.sock", {"cmd": "stop", "name": "ticker"}) + + +def test_run_failure_response_returns_one(): + with mock.patch.object( + ctl, "send_command", return_value={"ok": False, "error": "bad"} + ): + assert ctl.run(["--socket", "/tmp/x.sock", "status"]) == 1 + + +def test_run_per_name_command_requires_name(): + with pytest.raises(SystemExit): + ctl.run(["--socket", "/tmp/x.sock", "stop"]) + + +def test_run_requires_socket(monkeypatch): + monkeypatch.delenv("GUNICORN_COMPANION_SOCKET", raising=False) + with pytest.raises(SystemExit): + ctl.run(["status"]) + + +def test_run_unreachable_socket_returns_two(): + with mock.patch.object(ctl, "send_command", side_effect=OSError("nope")): + assert ctl.run(["--socket", "/tmp/x.sock", "status"]) == 2 + + +def test_send_command_round_trip(): + client = mock.Mock() + client.recv.side_effect = [b'{"ok": true}\n'] + with mock.patch("socket.socket", return_value=client): + result = ctl.send_command("/tmp/x.sock", {"cmd": "status"}) + client.connect.assert_called_once_with("/tmp/x.sock") + assert client.sendall.call_args.args[0] == b'{"cmd": "status"}\n' + assert result == {"ok": True} diff --git a/tests/test_companion_manager.py b/tests/test_companion_manager.py new file mode 100644 index 00000000..243bffd5 --- /dev/null +++ b/tests/test_companion_manager.py @@ -0,0 +1,698 @@ +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +import os +import signal +from unittest import mock + +import pytest + +from gunicorn.companion.control import CommandError +from gunicorn.companion.manager import ( + CompanionManager, + reset_child_signals, + set_parent_death_signal, +) +from gunicorn.companion.config import CompanionConfig +from gunicorn.companion.process import State + + +def make_manager(*names): + configs = [CompanionConfig(name=n, target=lambda: None) for n in names] + return CompanionManager(configs, log=mock.Mock()) + + +def test_manager_builds_one_process_per_config(): + manager = make_manager("rq", "scheduler") + assert set(manager.processes) == {"rq", "scheduler"} + assert manager.processes["rq"].state == State.STOPPED + + +def test_resolve_target_accepts_callable(): + fn = lambda: None + assert CompanionManager._resolve_target(fn) is fn + + +def test_resolve_target_import_string(): + # os.getpid is a real "module:attr" target. + assert CompanionManager._resolve_target("os:getpid") is __import__("os").getpid + + +def test_resolve_target_rejects_bad_string(): + with pytest.raises(ValueError): + CompanionManager._resolve_target("no_colon") + + +def test_log_exit_reports_signal_or_status(): + manager = make_manager("rq") + process = manager.processes["rq"] + process.last_exit_signal, process.last_exit_code = 9, None + manager._log_exit(process) + process.last_exit_signal, process.last_exit_code = None, 1 + manager._log_exit(process) + messages = [call.args[0] for call in manager.log.info.call_args_list] + assert any("signal" in message for message in messages) + assert any("status" in message for message in messages) + + +def test_reset_child_signals_restores_defaults(): + with mock.patch("signal.signal") as signal_signal: + reset_child_signals() + restored = {call.args[0]: call.args[1] for call in signal_signal.call_args_list} + # The stop signal must reach the default disposition, not the manager's + # inherited handler, so a forked companion actually terminates on SIGTERM. + assert restored[signal.SIGTERM] is signal.SIG_DFL + assert restored[signal.SIGINT] is signal.SIG_DFL + + +def test_set_parent_death_signal_noop_off_linux(): + with mock.patch("sys.platform", "darwin"): + assert set_parent_death_signal(signal.SIGTERM) is False + + +def test_set_parent_death_signal_arms_on_linux(): + libc = mock.Mock() + libc.prctl.return_value = 0 + with mock.patch("sys.platform", "linux"), \ + mock.patch("ctypes.CDLL", return_value=libc): + assert set_parent_death_signal(signal.SIGTERM) is True + libc.prctl.assert_called_once() + + +def test_parent_gone_detects_reparenting(): + manager = make_manager("rq") + manager.parent_pid = 4242 + with mock.patch("os.getppid", return_value=4242): + assert manager._parent_gone() is False + with mock.patch("os.getppid", return_value=1): + assert manager._parent_gone() is True + + +def test_close_manager_fds_closes_control_and_pipe(): + manager = make_manager("rq") + manager.control = mock.Mock() + manager._wakeup_pipe = (7, 8) + with mock.patch("os.close") as os_close: + manager._close_manager_fds() + manager.control.listener.close.assert_called_once_with() + assert os_close.call_count == 2 + + +def test_close_manager_fds_noop_when_unset(): + manager = make_manager("rq") + manager._close_manager_fds() # control and pipe are None: must not raise + + +def test_apply_environment_sets_cwd_and_env(): + config = CompanionConfig(name="rq", target=lambda: None, + cwd="/tmp", env={"COMPANION_X": "1"}) + with mock.patch("os.chdir") as chdir, \ + mock.patch.dict("os.environ", {}, clear=False): + CompanionManager._apply_environment(config) + chdir.assert_called_once_with("/tmp") + import os + assert os.environ["COMPANION_X"] == "1" + + +def test_apply_environment_noop_without_cwd_env(): + config = CompanionConfig(name="rq", target=lambda: None) + with mock.patch("os.chdir") as chdir: + CompanionManager._apply_environment(config) + chdir.assert_not_called() + + +def test_open_output_inherit_returns_none(): + assert CompanionManager._open_output(None) is None + assert CompanionManager._open_output("inherit") is None + + +def test_open_output_path_opens_append(): + with mock.patch("os.open", return_value=9) as open_mock: + fd = CompanionManager._open_output("/var/log/rq.log") + assert fd == 9 + flags = open_mock.call_args.args[1] + assert flags & os.O_APPEND and flags & os.O_CREAT + + +def test_redirect_output_files(): + config = CompanionConfig(name="rq", target=lambda: None, + stdout="/o.log", stderr="/e.log") + with mock.patch("os.open", side_effect=[10, 11]), \ + mock.patch("os.dup2") as dup2, \ + mock.patch("os.close") as close: + CompanionManager._redirect_output(config) + dup2.assert_any_call(10, 1) + dup2.assert_any_call(11, 2) + # The opened fds are closed after being duped onto 1/2, no leak. + close.assert_any_call(10) + close.assert_any_call(11) + + +def test_redirect_output_stderr_to_stdout(): + config = CompanionConfig(name="rq", target=lambda: None, + stdout="/o.log", stderr="stdout") + with mock.patch("os.open", return_value=10), \ + mock.patch("os.dup2") as dup2, \ + mock.patch("os.close") as close: + CompanionManager._redirect_output(config) + dup2.assert_any_call(10, 1) + dup2.assert_any_call(1, 2) + close.assert_called_once_with(10) + + +def test_redirect_output_inherit_noop(): + config = CompanionConfig(name="rq", target=lambda: None) + with mock.patch("os.dup2") as dup2: + CompanionManager._redirect_output(config) + dup2.assert_not_called() + + +def test_reap_records_exit_code(): + manager = make_manager("rq") + proc = manager.processes["rq"] + proc.pid = 4321 + # exit code 1 -> status 1<<8; second call drains the queue. + with mock.patch("os.waitpid", side_effect=[(4321, 1 << 8), (0, 0)]): + reaped = manager.reap_processes() + assert reaped == [proc] + assert proc.last_exit_code == 1 + assert proc.last_exit_signal is None + assert proc.exit_count == 1 + assert proc.pid is None + + +def test_reap_records_signal(): + manager = make_manager("rq") + proc = manager.processes["rq"] + proc.pid = 4321 + with mock.patch("os.waitpid", side_effect=[(4321, 9), (0, 0)]): + manager.reap_processes() + assert proc.last_exit_signal == 9 + assert proc.last_exit_code is None + + +def test_reap_no_children(): + manager = make_manager("rq") + with mock.patch("os.waitpid", side_effect=ChildProcessError): + assert manager.reap_processes() == [] + + +def test_status_lists_all_companions(): + manager = make_manager("rq", "scheduler") + entries = manager.status(now=100.0) + assert {e["name"] for e in entries} == {"rq", "scheduler"} + assert all("state" in e and "description" in e for e in entries) + + +def test_handle_command_status(): + manager = make_manager("rq") + resp = manager.handle_command({"cmd": "status"}) + assert resp["ok"] is True + assert resp["companions"][0]["name"] == "rq" + + +def test_handle_command_start_routes(): + manager = make_manager("rq") + with mock.patch.object(manager, "start_process", + return_value=(True, "rq started")) as start_mock: + resp = manager.handle_command({"cmd": "start", "name": "rq"}) + start_mock.assert_called_once_with("rq") + assert resp == {"ok": True, "message": "rq started"} + + +def test_handle_command_stop_and_restart_route(): + manager = make_manager("rq") + with mock.patch.object(manager, "stop_process", return_value=(True, "s")) as stop_mock, \ + mock.patch.object(manager, "restart_process", return_value=(True, "r")) as restart_mock: + manager.handle_command({"cmd": "stop", "name": "rq"}) + manager.handle_command({"cmd": "restart", "name": "rq"}) + stop_mock.assert_called_once_with("rq") + restart_mock.assert_called_once_with("rq") + + +def test_handle_command_missing_name(): + manager = make_manager("rq") + with pytest.raises(CommandError): + manager.handle_command({"cmd": "start"}) + + +def test_handle_command_unknown(): + manager = make_manager("rq") + with pytest.raises(CommandError): + manager.handle_command({"cmd": "reread"}) + + +def make_config(name, **kwargs): + return CompanionConfig(name=name, target=lambda: None, **kwargs) + + +def test_reread_adds_new(): + manager = make_manager("rq") + new = [make_config("rq"), make_config("scheduler")] + with mock.patch("os.fork", return_value=10): + result = manager.reread_config(new) + assert result["added"] == ["scheduler"] + assert "scheduler" in manager.processes + assert manager.processes["scheduler"].state == State.STARTING + + +def test_reread_removes_missing(): + manager = make_manager("rq", "scheduler") + manager.processes["scheduler"].state = State.RUNNING + manager.processes["scheduler"].pid = 11 + with mock.patch("os.kill"): + result = manager.reread_config([make_config("rq")]) + assert result["removed"] == ["scheduler"] + assert "scheduler" not in manager.processes + + +def test_reread_restarts_changed(): + manager = make_manager("rq") + manager.processes["rq"].state = State.RUNNING + manager.processes["rq"].pid = 12 + changed = make_config("rq", env={"X": "1"}) # different hash + with mock.patch("os.kill"): + result = manager.reread_config([changed]) + assert result["restarted"] == ["rq"] + assert manager.processes["rq"].config is changed + assert manager.processes["rq"].state == State.STOPPING + + +def test_reread_changed_manual_stop_keeps_stopped(): + manager = make_manager("rq") + proc = manager.processes["rq"] + proc.manual_stop = True + proc.state = State.STOPPED + changed = make_config("rq", env={"X": "1"}) + result = manager.reread_config([changed]) + assert result["unchanged"] == ["rq"] + assert proc.config is changed and proc.state == State.STOPPED + + +def test_reread_unchanged_noop(): + manager = make_manager("rq") + same = manager.processes["rq"].config + result = manager.reread_config([same]) + assert result["unchanged"] == ["rq"] + assert result["restarted"] == [] + + +def test_reread_duplicate_name_keeps_old(): + manager = make_manager("rq") + result = manager.reread_config([make_config("rq"), make_config("rq")]) + assert result["ok"] is False and result["kept_old_config"] is True + assert "duplicate" in result["error"] + + +def test_reread_validation_failure_mutates_nothing(): + manager = make_manager("rq") + original_config = manager.processes["rq"].config + original_names = set(manager.processes) + # This batch would change "rq" and add "scheduler", but the duplicate + # "scheduler" makes the whole reread invalid: nothing must be applied. + bad = [make_config("rq", env={"X": "1"}), make_config("scheduler"), + make_config("scheduler")] + with mock.patch("os.fork") as fork, mock.patch("os.kill") as kill: + result = manager.reread_config(bad) + assert result["ok"] is False and result["kept_old_config"] is True + assert set(manager.processes) == original_names + assert manager.processes["rq"].config is original_config + fork.assert_not_called() + kill.assert_not_called() + + +def test_handle_command_reread_no_loader(): + manager = make_manager("rq") + with pytest.raises(CommandError): + manager.handle_command({"cmd": "reread"}) + + +def test_handle_command_reread_runs_loader(): + manager = make_manager("rq") + manager.config_loader = lambda: [manager.processes["rq"].config] + resp = manager.handle_command({"cmd": "reread"}) + assert resp["ok"] is True and resp["unchanged"] == ["rq"] + + +def test_handle_command_reread_bad_config(): + manager = make_manager("rq") + def boom(): + raise ValueError("duplicate companion name rq") + manager.config_loader = boom + resp = manager.handle_command({"cmd": "reread"}) + assert resp["ok"] is False and resp["kept_old_config"] is True + + +def test_start_process_stopped_spawns(): + manager = make_manager("rq") + proc = manager.processes["rq"] + with mock.patch("os.fork", return_value=70) as fork: + ok, _ = manager.start_process("rq") + fork.assert_called_once() + assert ok and proc.state == State.STARTING and proc.manual_stop is False + + +def test_start_process_backoff_cancels_retry(): + manager = make_manager("rq") + proc = manager.processes["rq"] + proc.state = State.BACKOFF + proc.next_retry_at = 999.0 + proc.manual_stop = True + with mock.patch("os.fork", return_value=71): + ok, _ = manager.start_process("rq") + assert ok and proc.state == State.STARTING + assert proc.next_retry_at is None and proc.manual_stop is False + + +def test_start_process_running_is_noop(): + manager = make_manager("rq") + manager.processes["rq"].state = State.RUNNING + with mock.patch("os.fork") as fork: + ok, _ = manager.start_process("rq") + assert ok + fork.assert_not_called() + + +def test_start_process_stopping_rejected(): + manager = make_manager("rq") + manager.processes["rq"].state = State.STOPPING + ok, msg = manager.start_process("rq") + assert not ok and "stopping" in msg + + +def test_start_process_unknown(): + manager = make_manager("rq") + ok, _ = manager.start_process("nope") + assert not ok + + +def test_stop_process_running_signals_and_stopping(): + manager = make_manager("rq") + proc = manager.processes["rq"] + proc.state = State.RUNNING + proc.pid = 80 + proc.config.stop_timeout = 60 + with mock.patch("os.kill") as kill: + ok, _ = manager.stop_process("rq", now=200.0) + kill.assert_called_once_with(80, signal.SIGTERM) + assert ok and proc.state == State.STOPPING + assert proc.manual_stop is True and proc.stop_deadline == 260.0 + + +def test_stop_process_backoff_to_stopped(): + manager = make_manager("rq") + proc = manager.processes["rq"] + proc.state = State.BACKOFF + proc.next_retry_at = 999.0 + with mock.patch("os.kill") as kill: + ok, _ = manager.stop_process("rq") + kill.assert_not_called() + assert ok and proc.state == State.STOPPED + assert proc.next_retry_at is None and proc.manual_stop is True + + +def test_stop_process_already_stopped(): + manager = make_manager("rq") + with mock.patch("os.kill") as kill: + ok, _ = manager.stop_process("rq") + kill.assert_not_called() + assert ok and manager.processes["rq"].manual_stop is True + + +def test_stop_process_unknown(): + manager = make_manager("rq") + ok, _ = manager.stop_process("nope") + assert not ok + + +def test_stop_during_restart_cancels_pending_restart(): + manager = make_manager("rq") + proc = manager.processes["rq"] + proc.state = State.STOPPING + proc.pid = 70 + proc.restart_pending = True + with mock.patch("os.kill") as kill: + ok, _ = manager.stop_process("rq") + kill.assert_not_called() + assert ok and proc.manual_stop is True and proc.restart_pending is False + # On exit the companion now settles STOPPED instead of being respawned. + with mock.patch.object(manager, "spawn_process") as spawn: + manager.handle_exit(proc) + spawn.assert_not_called() + assert proc.state == State.STOPPED + + +def test_safe_kill_ignores_dead_process(): + with mock.patch("os.kill", side_effect=ProcessLookupError): + CompanionManager._safe_kill(123, signal.SIGTERM) # must not raise + + +def test_stop_process_survives_dead_companion(): + manager = make_manager("rq") + proc = manager.processes["rq"] + proc.state = State.RUNNING + proc.pid = 80 + with mock.patch("os.kill", side_effect=ProcessLookupError): + ok, _ = manager.stop_process("rq", now=1.0) + assert ok and proc.state == State.STOPPING + + +def test_signal_number_resolves_name(): + assert CompanionManager._signal_number("SIGKILL") == signal.SIGKILL + assert CompanionManager._signal_number(9) == 9 + + +def test_signal_number_rejects_bad(): + with pytest.raises(ValueError): + CompanionManager._signal_number("SIGTRM") + + +def test_restart_process_running_stops_with_reload_timeout(): + manager = make_manager("rq") + proc = manager.processes["rq"] + proc.state = State.RUNNING + proc.pid = 90 + proc.config.reload_timeout = 30 + proc.manual_stop = True + with mock.patch("os.kill") as kill: + ok, _ = manager.restart_process("rq", now=300.0) + kill.assert_called_once_with(90, signal.SIGTERM) + assert ok and proc.state == State.STOPPING + assert proc.restart_pending is True and proc.stop_deadline == 330.0 + assert proc.manual_stop is False + + +def test_restart_pending_reap_respawns_immediately(): + manager = make_manager("rq") + proc = manager.processes["rq"] + proc.state = State.STOPPING + proc.restart_pending = True + proc.pid = 91 + with mock.patch("os.waitpid", side_effect=[(91, 0), (0, 0)]), \ + mock.patch("os.fork", return_value=92): + manager.reap_processes() + assert proc.state == State.STARTING + assert proc.pid == 92 + assert proc.restart_pending is False + assert proc.restart_count == 1 + + +def test_restart_process_stopped_starts_now(): + manager = make_manager("rq") + proc = manager.processes["rq"] + with mock.patch("os.fork", return_value=93), mock.patch("os.kill") as kill: + ok, _ = manager.restart_process("rq") + kill.assert_not_called() + assert ok and proc.state == State.STARTING + + +def test_restart_process_backoff_starts_now(): + manager = make_manager("rq") + proc = manager.processes["rq"] + proc.state = State.BACKOFF + proc.next_retry_at = 999.0 + with mock.patch("os.fork", return_value=94): + ok, _ = manager.restart_process("rq") + assert ok and proc.state == State.STARTING and proc.next_retry_at is None + + +def test_restart_process_stopping_rejected(): + manager = make_manager("rq") + manager.processes["rq"].state = State.STOPPING + ok, msg = manager.restart_process("rq") + assert not ok and "stopping" in msg + + +def test_manual_stop_preserved_through_exit(): + # stop a running companion, then reap its child: it must settle in STOPPED + # with manual_stop still set so it is not auto-restarted. + manager = make_manager("rq") + proc = manager.processes["rq"] + proc.state = State.RUNNING + proc.pid = 60 + with mock.patch("os.kill"): + manager.stop_process("rq", now=10.0) + with mock.patch("os.waitpid", side_effect=[(60, 0), (0, 0)]), \ + mock.patch("os.fork") as fork: + manager.reap_processes() + fork.assert_not_called() + assert proc.state == State.STOPPED and proc.manual_stop is True + + +def test_start_clears_manual_stop(): + manager = make_manager("rq") + proc = manager.processes["rq"] + proc.manual_stop = True + with mock.patch("os.fork", return_value=61): + manager.start_process("rq") + assert proc.manual_stop is False + + +def test_spawn_does_not_touch_manual_stop(): + manager = make_manager("rq") + proc = manager.processes["rq"] + proc.manual_stop = True + with mock.patch("os.fork", return_value=62): + manager.spawn_process(proc) + assert proc.manual_stop is True + + +def test_handle_exit_unexpected_backoff(): + manager = make_manager("rq") + proc = manager.processes["rq"] + proc.restart_delay = 5 + manager.handle_exit(proc, now=100.0) + assert proc.state == State.BACKOFF + assert proc.next_retry_at == 105.0 + + +def test_handle_exit_manual_stop_stays_stopped(): + manager = make_manager("rq") + proc = manager.processes["rq"] + proc.manual_stop = True + manager.handle_exit(proc, now=100.0) + assert proc.state == State.STOPPED + assert proc.next_retry_at is None + + +def test_retry_backoff_respawns_when_due(): + manager = make_manager("rq") + proc = manager.processes["rq"] + proc.state = State.BACKOFF + proc.next_retry_at = 100.0 + with mock.patch("os.fork", return_value=555): + retried = manager.retry_backoff(now=101.0) + assert retried == [proc] + assert proc.restart_count == 1 + assert proc.state == State.STARTING + assert proc.pid == 555 + + +def test_retry_backoff_waits_until_due(): + manager = make_manager("rq") + proc = manager.processes["rq"] + proc.state = State.BACKOFF + proc.next_retry_at = 100.0 + assert manager.retry_backoff(now=99.0) == [] + assert proc.state == State.BACKOFF + + +def test_reap_unexpected_exit_enters_backoff(): + manager = make_manager("rq") + proc = manager.processes["rq"] + proc.pid = 4321 + with mock.patch("os.waitpid", side_effect=[(4321, 1 << 8), (0, 0)]): + manager.reap_processes() + assert proc.state == State.BACKOFF + assert proc.next_retry_at is not None + + +def test_promote_running_after_startsecs(): + manager = make_manager("rq") + proc = manager.processes["rq"] + proc.config.startsecs = 1 + proc.state = State.STARTING + proc.started_at = 100.0 + promoted = manager.promote_running(now=101.5) + assert promoted == [proc] + assert proc.state == State.RUNNING + + +def test_promote_running_too_early(): + manager = make_manager("rq") + proc = manager.processes["rq"] + proc.config.startsecs = 5 + proc.state = State.STARTING + proc.started_at = 100.0 + assert manager.promote_running(now=102.0) == [] + assert proc.state == State.STARTING + + +def test_promote_running_ignores_non_starting(): + manager = make_manager("rq") + proc = manager.processes["rq"] + proc.state = State.BACKOFF + proc.started_at = 100.0 + assert manager.promote_running(now=999.0) == [] + assert proc.state == State.BACKOFF + + +def test_spawn_parent_records_pid_and_starting(): + manager = make_manager("rq") + proc = manager.processes["rq"] + with mock.patch("os.fork", return_value=4321): + pid = manager.spawn_process(proc) + assert pid == 4321 + assert proc.pid == 4321 + assert proc.state == State.STARTING + assert proc.started_at is not None + assert proc.manual_stop is False + + +def test_lifecycle_running_crash_backoff_retry(): + manager = make_manager("rq") + process = manager.processes["rq"] + assert process.state == State.STOPPED + with mock.patch("os.fork", return_value=100): + manager.spawn_process(process) + assert process.state == State.STARTING + manager.promote_running(now=process.started_at + process.config.startsecs) + assert process.state == State.RUNNING + manager.handle_exit(process, now=1000.0) + assert process.state == State.BACKOFF + assert process.next_retry_at == 1000.0 + process.restart_delay + with mock.patch("os.fork", return_value=101): + manager.retry_backoff(now=process.next_retry_at) + assert process.state == State.STARTING + + +def test_lifecycle_stop_to_stopped(): + manager = make_manager("rq") + process = manager.processes["rq"] + with mock.patch("os.fork", return_value=200): + manager.spawn_process(process) + manager.promote_running(now=process.started_at + process.config.startsecs) + with mock.patch("os.kill") as kill: + manager.stop_process("rq", now=500.0) + assert process.state == State.STOPPING + assert process.manual_stop is True + kill.assert_called_once() + manager.handle_exit(process, now=501.0) + assert process.state == State.STOPPED + + +def test_lifecycle_restart_respawns_after_exit(): + manager = make_manager("rq") + process = manager.processes["rq"] + with mock.patch("os.fork", return_value=300): + manager.spawn_process(process) + manager.promote_running(now=process.started_at + process.config.startsecs) + with mock.patch("os.kill"): + manager.restart_process("rq", now=600.0) + assert process.state == State.STOPPING + assert process.restart_pending is True + with mock.patch("os.fork", return_value=301): + manager.handle_exit(process, now=601.0) + assert process.state == State.STARTING + assert process.restart_pending is False