From 7dc4d184bee02b7a3233e4c964385e78b96720bd Mon Sep 17 00:00:00 2001 From: Ankush Menat Date: Thu, 28 May 2026 16:33:54 +0530 Subject: [PATCH] refactor: Simpler implementation Just do multiple queues, nothing else. --- docs/design/gthread-slow-request-isolation.md | 86 +++++++------- gunicorn/config.py | 109 +++++------------- gunicorn/workers/gthread.py | 59 +++------- 3 files changed, 92 insertions(+), 162 deletions(-) diff --git a/docs/design/gthread-slow-request-isolation.md b/docs/design/gthread-slow-request-isolation.md index cc62b8f7..18ed8797 100644 --- a/docs/design/gthread-slow-request-isolation.md +++ b/docs/design/gthread-slow-request-isolation.md @@ -57,23 +57,23 @@ the actual failure mode — prediction is effective after the first sample. │ fast │ slow ▼ ▼ ┌───────────┐ ┌───────────┐ - │ fast_pool │ │ slow_pool │ (bounded, 503 on full) + │ fast_pool │ │ slow_pool │ │ F threads │ │ S threads │ └─────┬─────┘ └─────┬─────┘ └───────┬───────┘ on completion: predictor.update(route, duration) ``` -- **Fast lane**: a `ThreadPoolExecutor` of `F = cfg.threads` threads. Only ever - runs fast-classified work. -- **Slow lane**: a separate `ThreadPoolExecutor` of `S = cfg.slow_threads` - threads (default 1). Only ever runs slow-classified work. -- Total OS threads per worker = `F + S`. -- The slow lane is bounded: a counter (`nr_slow`) tracks slow requests - submitted-but-not-finished; once it reaches `S + cfg.slow_queue_maxsize` (i.e. - running plus queued), further slow requests are rejected with `503` instead of - growing the executor's unbounded internal queue. The fast lane is governed by - the existing `worker_connections` admission like today. +- **Fast lane**: a `ThreadPoolExecutor` of `F = ceil(cfg.threads / 2)` threads. + Only ever runs fast-classified work. +- **Slow lane**: a separate `ThreadPoolExecutor` of `S = cfg.threads // 2` + threads. Only ever runs slow-classified work. +- Total OS threads per worker stays at `cfg.threads` — adaptive-queueing mode splits + the existing budget, it does not expand it. `cfg.threads` must be at least 2 + for the split to be meaningful; otherwise the worker logs a warning and runs + with a single pool. +- Both lanes share the existing `worker_connections` admission like today; the + slow lane is not separately bounded. ### Why two plain pools (and not a custom dual-queue scheduler) @@ -157,13 +157,13 @@ the handshake completes. For SSL connections in this first cut: New settings, mirroring `WorkerThreads` (`config.py:697`): +- `enable_adaptive_queueing` — boolean; when true, the `gthread` worker splits its + `cfg.threads` budget between a fast and a slow lane and routes by prediction. + Default `False` (single pool, today's behavior). Requires `cfg.threads >= 2`; + otherwise the worker logs a warning and falls back to the single pool. - `slow_request_threshold` — float seconds; a route whose learned timing meets/ - exceeds this is "slow". Default e.g. `1.0`. **`0` disables the whole feature** - and restores today's single-pool behavior exactly. -- `slow_threads` — `S`, slow-lane worker count. Default `1`. -- `slow_queue_maxsize` — bound on `slow_q`; overflow ⇒ `503`. Default e.g. `100` - (`0` = unbounded). -- `slow_lane_retry_after` — seconds for the `Retry-After` header on 503. + exceeds this is "slow". Default `1.0`. Only consulted when `enable_adaptive_queueing` is + enabled. A `slow_route_key` hook to customize the route key (e.g. collapse `/users/`) is a possible future addition; the default key is method + path @@ -171,17 +171,17 @@ with the query string stripped. ### 5.2 Two thread pools -`init_process` builds two plain `ThreadPoolExecutor`s when routing is enabled — -`fast_pool` (`F = cfg.threads`) and `slow_pool` (`S = cfg.slow_threads`) — and -falls back to the single `get_thread_pool()` executor when it is disabled. -`enqueue_req(conn, slow)` submits to the matching pool; both produce ordinary +`init_process` builds two plain `ThreadPoolExecutor`s when `enable_adaptive_queueing` is +enabled — `fast_pool` with `F = ceil(cfg.threads / 2)` workers and `slow_pool` +with `S = cfg.threads // 2` workers — and falls back to the single +`get_thread_pool()` executor when it is disabled. `enqueue_req(conn, slow)` +submits to the matching pool; both produce ordinary `concurrent.futures.Future`s, so `_wrap_future`, `add_done_callback`, `self.futures` tracking, and `futures.wait` all keep working unchanged. -- Bounding the slow lane: `nr_slow` counts slow requests submitted-but-not-yet- - finished. `enqueue_req` rejects (503) when `nr_slow >= S + slow_queue_maxsize`; - `finish_request` decrements it. This caps the slow executor's otherwise - unbounded internal queue. +- The slow lane is not separately bounded; its executor's internal queue is + capped indirectly by the worker's existing `worker_connections` admission, + the same as the single-pool path today. - Shutdown drains both pools via a `_shutdown_pools` helper, replacing the single `tpool.shutdown` calls; the `graceful_timeout` `futures.wait` is unchanged. @@ -215,16 +215,16 @@ A small, self-contained, thread-safe object: ## 6. Behavior under load (the cases that matter) -- **Flood of a previously-seen slow route**: every such request - is routed to the slow pool. The `F` fast threads are never given this work and - keep serving fast traffic at full capacity. When the slow lane reaches - `S + slow_queue_maxsize`, further slow requests get a fast `503` — backpressure - is contained to the slow lane. +- **Flood of a previously-seen slow route**: every such request is routed to + the slow pool. The `F` fast threads are never given this work and keep + serving fast traffic at full capacity. Excess slow requests sit in the slow + pool's queue, gated overall by `worker_connections`. - **Flood of a never-seen slow route**: the first occurrence(s) run in the fast lane; mid-flight learning (§5.4.2) flips the route to slow after one threshold interval, so the flood is contained quickly. - **Mixed fast traffic, idle slow lane**: the `S` slow threads stay parked (no - work stealing in this design — see §3), so fast throughput is `F`, not `F + S`. + work stealing in this design — see §3), so fast throughput is `F`, not + `F + S`. This is the cost of splitting a fixed `cfg.threads` budget. - **Misprediction (route marked slow but now fast)**: handled gracefully — it runs in the slow lane, and EWMA decay restores it to the fast lane over time. @@ -232,23 +232,23 @@ A small, self-contained, thread-safe object: Implemented: -- `config.py` — `slow_request_threshold`, `slow_threads`, `slow_queue_maxsize`, - `slow_lane_retry_after`, plus `validate_pos_float`. +- `config.py` — `enable_adaptive_queueing`, `slow_request_threshold`, plus + `validate_pos_float`. - `gthread.py` `init_process`/`get_thread_pool` — build `fast_pool` and - `slow_pool` (or the single legacy pool when disabled); `_shutdown_pools`. -- `gthread.py` `enqueue_req` — route to the matching pool; `nr_slow` bound + - `reject_overloaded` (503). + `slow_pool` (split from `cfg.threads`) when `enable_adaptive_queueing` is on, or the + single legacy pool when off; `_shutdown_pools`. +- `gthread.py` `enqueue_req` — route to the matching pool. - `gthread.py` `accept`/`park_for_request`/`classify_and_dispatch`/ `_peek_request_line`/`_route_key` — poller-driven request-line peek + routing. -- `gthread.py` `finish_request` — `predictor.update`, `nr_slow` decrement, - routing-aware keepalive re-park. +- `gthread.py` `finish_request` — `predictor.update`, routing-aware keepalive + re-park. - `gthread.py` run-loop sweep — mid-flight learning. - `gthread_routing.py` — `SlowRoutePredictor`. ## 8. Backward compatibility -- `slow_request_threshold = 0` ⇒ feature off: single pool, no classification, no - rejection — byte-for-byte current behavior. +- `enable_adaptive_queueing = False` (the default) ⇒ feature off: single pool, no + classification — byte-for-byte current behavior. - Hard per-request timeout (`gthread.py:243-250`) preserved unchanged; this adds a softer, non-fatal classification on top. - Worker `handle`/`handle_request`, keepalive semantics, and the @@ -265,8 +265,8 @@ Implemented: dispatches to the expected lane. - **Integration — flood isolation**: app with a known-slow route flooded concurrently; assert fast-route latency stays low and slow requests never - occupy fast workers; assert 503 once the slow lane is full. + occupy fast workers. - **Integration — cold start**: never-seen slow route burst ⇒ confirm the lane flips to slow within ~one threshold interval via mid-flight learning. -- **Regression**: `slow_request_threshold = 0` ⇒ current behavior; keepalive, - SSL, and graceful shutdown paths still pass existing tests. +- **Regression**: `enable_adaptive_queueing = False` ⇒ current behavior; keepalive, SSL, and + graceful shutdown paths still pass existing tests. diff --git a/gunicorn/config.py b/gunicorn/config.py index 0bbfd3fc..7681b043 100644 --- a/gunicorn/config.py +++ b/gunicorn/config.py @@ -744,32 +744,26 @@ class WorkerConnections(Setting): """ -class SlowRequestThreshold(Setting): - name = "slow_request_threshold" +class EnableAdaptiveQueueing(Setting): + name = "enable_adaptive_queueing" section = "Worker Processes" - cli = ["--slow-request-threshold"] - meta = "FLOAT" - validator = validate_pos_float - type = float - default = 0.0 + cli = ["--enable-adaptive-queueing"] + validator = validate_bool + action = "store_true" + type = bool + default = False desc = """\ - Processing time (in seconds) above which a request route is treated as - "slow" by the ``gthread`` worker. + Enable adaptive multi-queue routing in the ``gthread`` worker. - When set to a positive value, the ``gthread`` worker runs two separate - lanes of threads: a *fast* lane (sized by :ref:`threads`) and a *slow* - lane (sized by :ref:`slow-threads`). Requests are routed by predicting, - from previously observed timings of the same route (method + path), - whether they will take longer than this threshold. Slow-predicted - requests go to the slow lane so they can never starve the fast lane, - even under a flood. + When enabled, the worker splits its :ref:`threads` roughly evenly into + two lanes — a *fast* lane and a *slow* lane — and routes each request + to one of them by predicting, from previously observed timings of the + same route (method + path), whether it will exceed + :ref:`slow-request-threshold`. Slow-predicted requests go to the slow + lane so they can never starve the fast lane, even under a flood of + slow requests. - A route is learned as slow once it has been observed exceeding this - threshold (either on completion or while still running); its timing - decays back below the threshold if it becomes fast again. - - The default of ``0`` disables the feature and restores the single - thread-pool behaviour. + Requires :ref:`threads` to be at least 2. This setting only affects the ``gthread`` worker type. @@ -777,66 +771,25 @@ class SlowRequestThreshold(Setting): """ -class SlowThreads(Setting): - name = "slow_threads" +class SlowRequestThreshold(Setting): + name = "slow_request_threshold" section = "Worker Processes" - cli = ["--slow-threads"] - meta = "INT" - validator = validate_pos_int - type = int - default = 1 + cli = ["--slow-request-threshold"] + meta = "FLOAT" + validator = validate_pos_float + type = float + default = 1.0 desc = """\ - The number of worker threads dedicated to the slow lane. + Processing time (in seconds) above which a request route is treated as + "slow" by the ``gthread`` worker when :ref:`enable-adaptive-queueing` + is enabled. - These threads only ever run requests predicted to be slow. When the slow - lane has no pending work, they help drain the fast lane, so they are - never idle while fast work is waiting. Total threads per worker is - ``threads + slow_threads``. + A route is learned as slow once it has been observed exceeding this + threshold (either on completion or while still running); its timing + decays back below the threshold if it becomes fast again. - Only used by the ``gthread`` worker when - :ref:`slow-request-threshold` is set. - - .. versionadded:: 23.1.0 - """ - - -class SlowQueueMaxsize(Setting): - name = "slow_queue_maxsize" - section = "Worker Processes" - cli = ["--slow-queue-maxsize"] - meta = "INT" - validator = validate_pos_int - type = int - default = 100 - desc = """\ - Maximum number of requests allowed to wait in the slow lane queue. - - When the slow lane queue is full, additional slow-predicted requests are - rejected immediately with a ``503 Service Unavailable`` response instead - of being queued, keeping the slow lane's backpressure contained. A value - of ``0`` means the slow queue is unbounded. - - Only used by the ``gthread`` worker when - :ref:`slow-request-threshold` is set. - - .. versionadded:: 23.1.0 - """ - - -class SlowLaneRetryAfter(Setting): - name = "slow_lane_retry_after" - section = "Worker Processes" - cli = ["--slow-lane-retry-after"] - meta = "INT" - validator = validate_pos_int - type = int - default = 1 - desc = """\ - Value (in seconds) of the ``Retry-After`` header sent with the ``503`` - response when the slow lane queue is full. - - Only used by the ``gthread`` worker when - :ref:`slow-request-threshold` is set. + Only used by the ``gthread`` worker when :ref:`enable-adaptive-queueing` + is enabled. .. versionadded:: 23.1.0 """ diff --git a/gunicorn/workers/gthread.py b/gunicorn/workers/gthread.py index c74e62f2..9bb0e1f1 100644 --- a/gunicorn/workers/gthread.py +++ b/gunicorn/workers/gthread.py @@ -76,24 +76,22 @@ class ThreadWorker(base.Worker): super().__init__(*args, **kwargs) self.worker_connections = self.cfg.worker_connections self.max_keepalived = self.cfg.worker_connections - self.cfg.threads + # request routing: when adaptive queueing is enabled, the configured + # threads are split into a fast lane and a slow lane so slow requests + # cannot starve fast ones + self.routing_enabled = ( + self.cfg.enable_adaptive_queueing and self.cfg.threads >= 2) + self.slow_threshold = self.cfg.slow_request_threshold # initialise the pool(s): a single pool when routing is disabled, or a # separate fast (``self.tpool``) and slow pool when it is enabled self.tpool = None self.slow_pool = None - # number of slow requests submitted but not yet finished, used to bound - # the slow lane (running + queued) and shed load with 503 - self.nr_slow = 0 self.poller = None self.shutdown_event = os.eventfd(0) self._lock = None self.futures = deque() self._keep = deque() self.nr_conns = 0 - - # request routing: when a slow-request threshold is configured, slow - # requests are routed to a separate lane so they cannot starve fast ones - self.slow_threshold = self.cfg.slow_request_threshold - self.routing_enabled = self.slow_threshold > 0 self.predictor = None @classmethod @@ -104,14 +102,21 @@ class ThreadWorker(base.Worker): log.warning("No keepalived connections can be handled. " + "Check the number of worker connections and threads.") + if cfg.enable_adaptive_queueing and cfg.threads < 2: + log.warning("enable_adaptive_queueing requires at least 2 threads; " + "running with a single pool.") + def init_process(self): - self.tpool = self.get_thread_pool() if self.routing_enabled: + # split the configured threads roughly evenly between the two + # lanes; the fast lane gets the extra thread when threads is odd + slow = self.cfg.threads // 2 + fast = self.cfg.threads - slow + self.tpool = futures.ThreadPoolExecutor(max_workers=fast) + self.slow_pool = futures.ThreadPoolExecutor(max_workers=slow) self.predictor = SlowRoutePredictor(self.slow_threshold) - # a dedicated pool for the slow lane: slow requests can never - # occupy the fast pool's (``self.tpool``) threads - self.slow_pool = futures.ThreadPoolExecutor( - max_workers=self.cfg.slow_threads) + else: + self.tpool = self.get_thread_pool() self.poller = selectors.DefaultSelector() self._lock = RLock() super().init_process() @@ -148,15 +153,7 @@ class ThreadWorker(base.Worker): def enqueue_req(self, conn, slow=False): conn.init() - # submit the connection to the appropriate pool if self.routing_enabled and slow: - cap = self.cfg.slow_queue_maxsize - if cap and self.nr_slow >= self.cfg.slow_threads + cap: - # slow lane (running + queued) is full; shed load with 503 - # instead of letting the slow queue grow unbounded - self.reject_overloaded(conn) - return - self.nr_slow += 1 fs = self.slow_pool.submit(self.handle, conn) else: fs = self.tpool.submit(self.handle, conn) @@ -262,22 +259,6 @@ class ThreadWorker(base.Worker): return None return method + " " + path - def reject_overloaded(self, conn): - """Reject a connection with 503 because the slow lane is saturated.""" - self.nr_conns -= 1 - try: - conn.sock.setblocking(True) - conn.sock.sendall( - b"HTTP/1.1 503 Service Unavailable\r\n" - b"Connection: close\r\n" - b"Content-Length: 0\r\n" - b"Retry-After: %d\r\n\r\n" - % int(self.cfg.slow_lane_retry_after)) - except OSError: - pass - finally: - conn.close() - def reuse_connection(self, conn, client): with self._lock: # unregister the client from the poller @@ -410,10 +391,6 @@ class ThreadWorker(base.Worker): futures.wait(self.futures, timeout=self.cfg.graceful_timeout) def finish_request(self, fs): - # the slow request is done (whatever the outcome): free its slow slot - if self.routing_enabled and fs.slow: - self.nr_slow -= 1 - if fs.cancelled(): self.nr_conns -= 1 fs.conn.close()