From ee9bf1e950092d048058d0f1ffe60a7579ceee51 Mon Sep 17 00:00:00 2001 From: Ankush Menat Date: Wed, 27 May 2026 11:30:02 +0530 Subject: [PATCH 1/7] feat: Adaptive queueing of slow/fast requests --- docs/design/gthread-slow-request-isolation.md | 278 ++++++++++++++++++ gunicorn/config.py | 129 ++++++++ gunicorn/workers/gthread.py | 217 ++++++++++++-- gunicorn/workers/gthread_routing.py | 83 ++++++ tests/workers/test_gthread.py | 97 ++++++ tests/workers/test_gthread_routing.py | 53 ++++ 6 files changed, 836 insertions(+), 21 deletions(-) create mode 100644 docs/design/gthread-slow-request-isolation.md create mode 100644 gunicorn/workers/gthread_routing.py create mode 100644 tests/workers/test_gthread.py create mode 100644 tests/workers/test_gthread_routing.py diff --git a/docs/design/gthread-slow-request-isolation.md b/docs/design/gthread-slow-request-isolation.md new file mode 100644 index 00000000..c674a294 --- /dev/null +++ b/docs/design/gthread-slow-request-isolation.md @@ -0,0 +1,278 @@ +# Design: Slow-request isolation for the gthread worker (predictive dual-queue) + +Status: proposal / draft +Author: (ankush) +Scope: `gunicorn/workers/gthread.py`, `gunicorn/config.py` + +## 1. Problem + +The `gthread` worker runs synchronous WSGI applications on a single +`ThreadPoolExecutor` sized to `cfg.threads` (`gthread.py:95-97`). Every accepted +connection is submitted to that one pool (`enqueue_req`, `gthread.py:117-121`). +Because the pool has a fixed number of threads and an unbounded work queue, a +flood of slow requests occupies every thread and all fast requests starve behind +them in the queue — head-of-line blocking. + +Goal: **route requests that are predicted to be slow into a separate, dedicated +lane so they can never occupy the threads reserved for fast requests, even under +a flood.** Fast requests go to a fast lane; slow requests go to a slow lane. The +slow lane may help drain fast work when its own queue is empty, but the fast +lane never touches slow work. + +This supersedes the earlier "demotion-only" proposal, which could not stop slow +work from entering the fast pool and therefore could not survive a flood. + +## 2. Why prediction is required (and its hard limit) + +You cannot preempt a running Python thread executing WSGI code (`gthread.py:352`): +once a slow request is on a thread, that thread is committed until the app +returns. So isolation has to happen **before** a request is handed to a worker — +i.e. at routing time. That means we must decide "fast or slow" from the request +*before* running it. + +The only information available pre-execution is the request itself (method, +path, headers) plus what we have learned from prior requests. So the design is: + +1. A **predictor** that, given a request's route, answers "slow?" using learned + per-route timing statistics (plus optional operator-seeded patterns). +2. **Routing at accept time** based on that prediction, into one of two pools. +3. **Learning**: every completed request — and any request that crosses the + slow threshold mid-flight — updates the predictor, so a slow route is + recognized after its first occurrence(s) and all subsequent traffic to it is + routed to the slow lane. + +Hard limit to state up front: a route that has **never been seen** cannot be +predicted slow on its very first request(s); those first occurrences run in the +fast lane until learning kicks in. We minimize this window (§5.4) and let +operators pre-seed known-slow routes (§5.1). For repeated/flooding slow routes — +the actual failure mode — prediction is effective after the first sample. + +## 3. Architecture overview + +``` + ┌─────────────────────────────────────────┐ + listener ──accept──▶ │ main loop: poller-driven classification │ + │ peek request line ▶ predictor.is_slow? │ + └───────────────┬───────────────┬───────────┘ + │ fast │ slow + ▼ ▼ + ┌───────────┐ ┌───────────┐ + │ fast_pool │ │ slow_pool │ (bounded, 503 on full) + │ F threads │ │ S threads │ + └─────┬─────┘ └─────┬─────┘ + └───────┬───────┘ + on completion: predictor.update(route, duration) +``` + +- **Fast lane**: a `ThreadPoolExecutor` of `F = cfg.threads` threads. Only ever + runs fast-classified work. +- **Slow lane**: a separate `ThreadPoolExecutor` of `S = cfg.slow_threads` + threads (default 1). Only ever runs slow-classified work. +- Total OS threads per worker = `F + S`. +- The slow lane is bounded: a counter (`nr_slow`) tracks slow requests + submitted-but-not-finished; once it reaches `S + cfg.slow_queue_maxsize` (i.e. + running plus queued), further slow requests are rejected with `503` instead of + growing the executor's unbounded internal queue. The fast lane is governed by + the existing `worker_connections` admission like today. + +### Why two plain pools (and not a custom dual-queue scheduler) + +An earlier revision used a single custom scheduler with two queues and +one-directional **work stealing** (idle slow threads draining the fast queue). +Two independent `ThreadPoolExecutor`s are dramatically simpler and rely on +well-tested stdlib machinery. The one capability given up is work stealing: the +`S` slow threads sit idle when there is no slow work, even if fast work is +queued. For the common case (`S` small, e.g. 1) this is a negligible amount of +parked capacity, and the simplicity is worth it. If maximizing throughput under +pure-fast load ever matters more than simplicity, the custom scheduler can be +reintroduced behind the same `enqueue_req` interface without touching routing. + +## 4. Routing point: classify before threading + +Today, parsing happens inside the worker thread (`handle` → `next(conn.parser)`, +`gthread.py:295`), which is too late — the request is already on a thread. We +move *classification only* (not full parsing) into the main loop. + +### 4.1 Restructured connection lifecycle + +Both freshly accepted connections and keepalive connections flow through one +poller-driven classification step (this unifies `accept`/`reuse_connection` and +also moves slow-client header reads off the worker threads — a side benefit +against slowloris): + +1. `accept` (`gthread.py:123`): accept socket, create `TConn`, set non-blocking, + register it in the poller for `EVENT_READ` with a `classify_and_dispatch` + callback. **Do not submit to any pool yet.** `nr_conns += 1`. +2. When the socket becomes readable, `classify_and_dispatch(conn)`: + - **Peek** the buffered bytes with `recv(n, socket.MSG_PEEK)` (plaintext) — + this reads without consuming, so the worker's parser still sees the full + byte stream unchanged. No parser changes required. + - Parse just the request line (`METHOD SP PATH SP VERSION CRLF`) from the + peeked buffer. If the line has not fully arrived yet, return and wait for + the next readable event (bounded by the existing keepalive/header timeout so + a stalled client is eventually closed, not left forever). + + > **Why peek the request line, not fully read/parse the request here?** + > Classification only needs method + path. Doing a *full* read/parse of the + > request in the main loop is actively harmful: the main loop is a single + > thread serving every connection (accepts, keepalive, the poller). A blocking + > full read lets one slow client — slowloris, slow network, or a large/chunked + > body — stall the **entire worker**, which is strictly worse than the + > thread-pool starvation we are fixing (there is no pool to absorb it). + > Peeking only inspects already-buffered bytes and defers to the poller if the + > line is incomplete, so it never blocks. It also avoids having to read the + > body in the main loop (WSGI streams `wsgi.input` lazily) and keeps header + > parsing, parse-error responses (400/414), and `wsgi.input` wiring in the + > worker where they already live. + - Compute `route_key` (default: `METHOD + " " + path`, query string stripped; + overridable via hook, §5.1). + - `slow = predictor.is_slow(route_key)` (or matches a seeded slow pattern). + - Unregister the socket from the poller and submit the connection to the + **slow** pool if `slow` else the **fast** pool. +3. The worker's `handle`/`handle_request` run unchanged. On completion, the + measured `request_time` (already computed at `gthread.py:362`) is fed to + `predictor.update(route_key, duration)`. +4. Keepalive: after a kept-alive request, re-register the connection in the + poller with the same `classify_and_dispatch` callback (instead of the old + `reuse_connection`), so the *next* request on the connection is re-classified + independently (it may hit a different route). + +### 4.2 SSL connections + +Plaintext peek does not work through TLS — the request line is encrypted until +the handshake completes. For SSL connections in this first cut: + +- They cannot be pre-classified at the socket level, so they default to the + **fast** lane and rely on mid-flight + completion learning (§5.4) — meaning an + SSL-only deployment does not get full flood protection. +- Note in docs that the common production layout terminates TLS upstream (e.g. + nginx) so gunicorn sees plaintext and gets full protection. +- **Phase 2** (deferred): drive a non-blocking TLS handshake from the poller and + buffer the decrypted request line (feeding it back via `Unreader.unread`, + `unreader.py:51`) to classify SSL the same way. + +## 5. Components + +### 5.1 Config (`gunicorn/config.py`) + +New settings, mirroring `WorkerThreads` (`config.py:697`): + +- `slow_request_threshold` — float seconds; a route whose learned timing meets/ + exceeds this is "slow". Default e.g. `1.0`. **`0` disables the whole feature** + and restores today's single-pool behavior exactly. +- `slow_threads` — `S`, slow-lane worker count. Default `1`. +- `slow_queue_maxsize` — bound on `slow_q`; overflow ⇒ `503`. Default e.g. `100` + (`0` = unbounded). +- `slow_routes` — optional list of regex route patterns (matched with + `re.search` against the route key) operators know are slow, seeded into the + predictor so even the *first* request routes correctly. +- `slow_lane_retry_after` — seconds for the `Retry-After` header on 503. + +A `slow_route_key` hook to customize the route key (e.g. collapse +`/users/`) is a possible future addition; the default key is method + path +with the query string stripped. + +### 5.2 Two thread pools + +`init_process` builds two plain `ThreadPoolExecutor`s when routing is enabled — +`fast_pool` (`F = cfg.threads`) and `slow_pool` (`S = cfg.slow_threads`) — and +falls back to the single `get_thread_pool()` executor when it is disabled. +`enqueue_req(conn, slow)` submits to the matching pool; both produce ordinary +`concurrent.futures.Future`s, so `_wrap_future`, `add_done_callback`, +`self.futures` tracking, and `futures.wait` all keep working unchanged. + +- Bounding the slow lane: `nr_slow` counts slow requests submitted-but-not-yet- + finished. `enqueue_req` rejects (503) when `nr_slow >= S + slow_queue_maxsize`; + `finish_request` decrements it. This caps the slow executor's otherwise + unbounded internal queue. +- Shutdown drains both pools via a `_shutdown_pools` helper, replacing the + single `tpool.shutdown` calls; the `graceful_timeout` `futures.wait` is + unchanged. + +### 5.3 Predictor + +A small, self-contained, thread-safe object: + +- State: bounded LRU map `route_key -> {ewma_seconds, samples, last_seen}`. + Bounding caps memory under high route cardinality. +- `update(route_key, duration)`: EWMA with decay so a route that becomes fast + again eventually returns to the fast lane (avoids permanent misclassification + after a one-off slow spike). Called on every completion. +- `is_slow(route_key)`: `True` if the route matches a seeded `slow_routes` + pattern, or its `ewma_seconds >= slow_request_threshold`. Unknown routes ⇒ + `False` (fast) by default. +- Optional hysteresis (separate promote/demote thresholds) to avoid flapping + around the boundary. + +### 5.4 Learning signals + +1. **Completion (primary)**: feed `request_time` (`gthread.py:362`) into + `predictor.update`. After a slow route's first request completes, it is known. +2. **Mid-flight observation (catches simultaneous first-bursts)**: the main loop + already sweeps `self.futures` for the hard timeout (`gthread.py:245-250`). In + that sweep, for any in-flight request whose elapsed time exceeds + `slow_request_threshold`, call `predictor.update` with that elapsed time + *immediately* (do not wait for completion, and do not move the running + request — we can't). This shortens the learning window when many requests to a + brand-new slow route arrive at once: subsequent ones in the burst route to the + slow lane after one threshold interval instead of after a full slow request. +3. **Seeding (eliminates the first-occurrence window for known offenders)**: + `slow_routes` patterns mark routes slow from the very first request. + +## 6. Behavior under load (the cases that matter) + +- **Flood of a known/seeded or previously-seen slow route**: every such request + is routed to the slow pool. The `F` fast threads are never given this work and + keep serving fast traffic at full capacity. When the slow lane reaches + `S + slow_queue_maxsize`, further slow requests get a fast `503` — backpressure + is contained to the slow lane. +- **Flood of a never-seen slow route**: the first occurrence(s) run in the fast + lane; mid-flight learning (§5.4.2) flips the route to slow after one threshold + interval, so the flood is contained quickly. Seeding avoids even this window. +- **Mixed fast traffic, idle slow lane**: the `S` slow threads stay parked (no + work stealing in this design — see §3), so fast throughput is `F`, not `F + S`. +- **Misprediction (route marked slow but now fast)**: handled gracefully — it + runs in the slow lane, and EWMA decay restores it to the fast lane over time. + +## 7. Implementation checklist (touch points) + +Implemented: + +- `config.py` — `slow_request_threshold`, `slow_threads`, `slow_queue_maxsize`, + `slow_routes`, `slow_lane_retry_after`, plus `validate_pos_float`. +- `gthread.py` `init_process`/`get_thread_pool` — build `fast_pool` and + `slow_pool` (or the single legacy pool when disabled); `_shutdown_pools`. +- `gthread.py` `enqueue_req` — route to the matching pool; `nr_slow` bound + + `reject_overloaded` (503). +- `gthread.py` `accept`/`park_for_request`/`classify_and_dispatch`/ + `_peek_request_line`/`_route_key` — poller-driven request-line peek + routing. +- `gthread.py` `finish_request` — `predictor.update`, `nr_slow` decrement, + routing-aware keepalive re-park. +- `gthread.py` run-loop sweep — mid-flight learning. +- `gthread_routing.py` — `SlowRoutePredictor`. + +## 8. Backward compatibility + +- `slow_request_threshold = 0` ⇒ feature off: single pool, no classification, no + rejection — byte-for-byte current behavior. +- Hard per-request timeout (`gthread.py:243-250`) preserved unchanged; this adds + a softer, non-fatal classification on top. +- Worker `handle`/`handle_request`, keepalive semantics, and the + future/`finish_request` contract are preserved (MSG_PEEK leaves the byte + stream intact, so the parser is untouched). + +## 9. Test plan + +- **Predictor unit**: unknown ⇒ fast; after `update` with a slow duration ⇒ + slow; EWMA decay restores fast; seeded patterns are slow from first call; LRU + bound holds under many keys. +- **Routing unit**: `classify_and_dispatch` extracts the right `route_key` from + partial vs complete peeked buffers; incomplete line defers; complete line + dispatches to the expected lane. +- **Integration — flood isolation**: app with a known-slow route flooded + concurrently; assert fast-route latency stays low and slow requests never + occupy fast workers; assert 503 once the slow lane is full. +- **Integration — cold start**: never-seen slow route burst ⇒ confirm the lane + flips to slow within ~one threshold interval via mid-flight learning. +- **Regression**: `slow_request_threshold = 0` ⇒ current behavior; keepalive, + SSL, and graceful shutdown paths still pass existing tests. diff --git a/gunicorn/config.py b/gunicorn/config.py index 07c5aab3..a5cfa4b9 100644 --- a/gunicorn/config.py +++ b/gunicorn/config.py @@ -365,6 +365,13 @@ def validate_pos_int(val): return val +def validate_pos_float(val): + val = float(val) + if val < 0: + raise ValueError("Value must be positive: %s" % val) + return val + + def validate_ssl_version(val): if val != SSLVersion.default: sys.stderr.write("Warning: option `ssl_version` is deprecated and it is ignored. Use ssl_context instead.\n") @@ -737,6 +744,128 @@ class WorkerConnections(Setting): """ +class SlowRequestThreshold(Setting): + name = "slow_request_threshold" + section = "Worker Processes" + cli = ["--slow-request-threshold"] + meta = "FLOAT" + validator = validate_pos_float + type = float + default = 0.0 + desc = """\ + Processing time (in seconds) above which a request route is treated as + "slow" by the ``gthread`` worker. + + When set to a positive value, the ``gthread`` worker runs two separate + lanes of threads: a *fast* lane (sized by :ref:`threads`) and a *slow* + lane (sized by :ref:`slow-threads`). Requests are routed by predicting, + from previously observed timings of the same route (method + path), + whether they will take longer than this threshold. Slow-predicted + requests go to the slow lane so they can never starve the fast lane, + even under a flood. + + A route is learned as slow once it has been observed exceeding this + threshold (either on completion or while still running); its timing + decays back below the threshold if it becomes fast again. + + The default of ``0`` disables the feature and restores the single + thread-pool behaviour. + + This setting only affects the ``gthread`` worker type. + + .. versionadded:: 23.1.0 + """ + + +class SlowThreads(Setting): + name = "slow_threads" + section = "Worker Processes" + cli = ["--slow-threads"] + meta = "INT" + validator = validate_pos_int + type = int + default = 1 + desc = """\ + The number of worker threads dedicated to the slow lane. + + These threads only ever run requests predicted to be slow. When the slow + lane has no pending work, they help drain the fast lane, so they are + never idle while fast work is waiting. Total threads per worker is + ``threads + slow_threads``. + + Only used by the ``gthread`` worker when + :ref:`slow-request-threshold` is set. + + .. versionadded:: 23.1.0 + """ + + +class SlowQueueMaxsize(Setting): + name = "slow_queue_maxsize" + section = "Worker Processes" + cli = ["--slow-queue-maxsize"] + meta = "INT" + validator = validate_pos_int + type = int + default = 100 + desc = """\ + Maximum number of requests allowed to wait in the slow lane queue. + + When the slow lane queue is full, additional slow-predicted requests are + rejected immediately with a ``503 Service Unavailable`` response instead + of being queued, keeping the slow lane's backpressure contained. A value + of ``0`` means the slow queue is unbounded. + + Only used by the ``gthread`` worker when + :ref:`slow-request-threshold` is set. + + .. versionadded:: 23.1.0 + """ + + +class SlowRoutes(Setting): + name = "slow_routes" + section = "Worker Processes" + cli = ["--slow-route"] + action = "append" + meta = "PATTERN" + validator = validate_list_string + default = [] + desc = """\ + Regular expression(s) matching routes that should always be treated as + slow, regardless of observed timings. + + Each pattern is matched (using ``re.search``) against the route key, + which is the request method and path joined by a space, e.g. + ``"POST /reports/generate"``. Seeding known-slow routes avoids the brief + window where a never-before-seen slow route is learned. + + Only used by the ``gthread`` worker when + :ref:`slow-request-threshold` is set. + + .. versionadded:: 23.1.0 + """ + + +class SlowLaneRetryAfter(Setting): + name = "slow_lane_retry_after" + section = "Worker Processes" + cli = ["--slow-lane-retry-after"] + meta = "INT" + validator = validate_pos_int + type = int + default = 1 + desc = """\ + Value (in seconds) of the ``Retry-After`` header sent with the ``503`` + response when the slow lane queue is full. + + Only used by the ``gthread`` worker when + :ref:`slow-request-threshold` is set. + + .. versionadded:: 23.1.0 + """ + + class MaxRequests(Setting): name = "max_requests" section = "Worker Processes" diff --git a/gunicorn/workers/gthread.py b/gunicorn/workers/gthread.py index 903a581b..f666d4de 100644 --- a/gunicorn/workers/gthread.py +++ b/gunicorn/workers/gthread.py @@ -25,11 +25,15 @@ from functools import partial from threading import RLock from . import base +from .gthread_routing import SlowRoutePredictor from .. import http from .. import util from .. import sock from ..http import wsgi +# how many bytes to peek when classifying a request by its request line +REQUEST_LINE_PEEK = 65536 + class TConn: @@ -41,6 +45,9 @@ class TConn: self.timeout = None self.parser = None + # route key (method + path), set by the worker when request routing is + # enabled; used to predict and learn slow routes + self.route_key = None # set the socket to non blocking self.sock.setblocking(False) @@ -69,8 +76,14 @@ class ThreadWorker(base.Worker): super().__init__(*args, **kwargs) self.worker_connections = self.cfg.worker_connections self.max_keepalived = self.cfg.worker_connections - self.cfg.threads - # initialise the pool + # initialise the pool(s): a single pool when routing is disabled, or a + # separate fast and slow pool when it is enabled self.tpool = None + self.fast_pool = None + self.slow_pool = None + # number of slow requests submitted but not yet finished, used to bound + # the slow lane (running + queued) and shed load with 503 + self.nr_slow = 0 self.poller = None self.shutdown_event = os.eventfd(0) self._lock = None @@ -78,6 +91,12 @@ class ThreadWorker(base.Worker): self._keep = deque() self.nr_conns = 0 + # request routing: when a slow-request threshold is configured, slow + # requests are routed to a separate lane so they cannot starve fast ones + self.slow_threshold = self.cfg.slow_request_threshold + self.routing_enabled = self.slow_threshold > 0 + self.predictor = None + @classmethod def check_config(cls, cfg, log): max_keepalived = cfg.worker_connections - cfg.threads @@ -87,7 +106,19 @@ class ThreadWorker(base.Worker): "Check the number of worker connections and threads.") def init_process(self): - self.tpool = self.get_thread_pool() + if self.routing_enabled: + self.predictor = SlowRoutePredictor( + self.slow_threshold, + seed_patterns=self.cfg.slow_routes, + ) + # a dedicated pool per lane: slow requests can never occupy the + # fast pool's threads + self.fast_pool = futures.ThreadPoolExecutor( + max_workers=self.cfg.threads) + self.slow_pool = futures.ThreadPoolExecutor( + max_workers=self.cfg.slow_threads) + else: + self.tpool = self.get_thread_pool() self.poller = selectors.DefaultSelector() self._lock = RLock() super().init_process() @@ -96,6 +127,11 @@ class ThreadWorker(base.Worker): """Override this method to customize how the thread pool is created""" return futures.ThreadPoolExecutor(max_workers=self.cfg.threads) + def _shutdown_pools(self, wait): + for pool in (self.tpool, self.fast_pool, self.slow_pool): + if pool is not None: + pool.shutdown(wait) + def handle_exit(self, sig, frame): self.alive = False os.eventfd_write(self.shutdown_event, 1) @@ -104,21 +140,36 @@ class ThreadWorker(base.Worker): self.alive = False # worker_int callback self.cfg.worker_int(self) - self.tpool.shutdown(False) + self._shutdown_pools(False) time.sleep(0.1) sys.exit(0) - def _wrap_future(self, fs, conn): + def _wrap_future(self, fs, conn, slow=False): fs.conn = conn - fs._request_timeout = time.monotonic() + self.cfg.timeout + fs.slow = slow + fs._start_time = time.monotonic() + fs._request_timeout = fs._start_time + self.cfg.timeout + fs._observed_slow = False self.futures.append(fs) fs.add_done_callback(self.finish_request) - def enqueue_req(self, conn): + def enqueue_req(self, conn, slow=False): conn.init() - # submit the connection to a worker - fs = self.tpool.submit(self.handle, conn) - self._wrap_future(fs, conn) + # submit the connection to the appropriate pool + if not self.routing_enabled: + fs = self.tpool.submit(self.handle, conn) + elif slow: + cap = self.cfg.slow_queue_maxsize + if cap and self.nr_slow >= self.cfg.slow_threads + cap: + # slow lane (running + queued) is full; shed load with 503 + # instead of letting the slow queue grow unbounded + self.reject_overloaded(conn) + return + self.nr_slow += 1 + fs = self.slow_pool.submit(self.handle, conn) + else: + fs = self.fast_pool.submit(self.handle, conn) + self._wrap_future(fs, conn, slow=slow) def accept(self, server, listener): try: @@ -127,13 +178,115 @@ class ThreadWorker(base.Worker): conn = TConn(self.cfg, sock, client, server) self.nr_conns += 1 - # enqueue the job - self.enqueue_req(conn) + if self.routing_enabled and not self.cfg.is_ssl: + # park the connection until its request line is readable, then + # classify and route it to the fast or slow lane + self.park_for_request(conn) + else: + # legacy single-lane path (also used for SSL, whose request + # line cannot be peeked before the TLS handshake) + self.enqueue_req(conn) except OSError as e: if e.errno not in (errno.EAGAIN, errno.ECONNABORTED, errno.EWOULDBLOCK): raise + def park_for_request(self, conn): + """Register a connection in the poller until its request line arrives.""" + conn.sock.setblocking(False) + conn.set_timeout() + with self._lock: + self._keep.append(conn) + self.poller.register(conn.sock, selectors.EVENT_READ, + partial(self.classify_and_dispatch, conn)) + + def classify_and_dispatch(self, conn, client=None): + """Peek the request line, predict the lane, and enqueue the request.""" + line, closed, complete = self._peek_request_line(conn) + if not closed and not complete: + # request line has not fully arrived yet; keep waiting. Stalled + # clients are reaped by murder_keepalived via the connection timeout. + return + + with self._lock: + try: + # remove the connection from the parked set + self._keep.remove(conn) + except ValueError: + # already handled (e.g. by murder_keepalived); nothing to do + return + try: + self.poller.unregister(conn.sock) + except (KeyError, OSError, ValueError): + pass + + if closed: + self.nr_conns -= 1 + conn.close() + return + + conn.route_key = self._route_key(line) + slow = self.predictor.is_slow(conn.route_key) + self.enqueue_req(conn, slow=slow) + + def _peek_request_line(self, conn): + """Return ``(line, closed, complete)`` for the connection's request line. + + ``line`` is the request line bytes (without CRLF) once available, + ``closed`` is True if the peer closed the connection, and ``complete`` + is True once we should stop waiting for more data. + """ + try: + data = conn.sock.recv(REQUEST_LINE_PEEK, socket.MSG_PEEK) + except (BlockingIOError, InterruptedError): + return None, False, False + except OSError: + return None, True, False + + if data == b"": + # peer closed the connection before sending a request + return None, True, False + + idx = data.find(b"\r\n") + if idx == -1: + if len(data) >= REQUEST_LINE_PEEK: + # request line longer than our peek window; stop classifying and + # let the worker's parser deal with (or reject) it + return None, False, True + return None, False, False + return data[:idx], False, True + + @staticmethod + def _route_key(line): + """Build a route key (``"METHOD /path"``) from a raw request line.""" + if not line: + return None + parts = line.split(b" ") + if len(parts) < 2: + return None + try: + method = parts[0].decode("latin1") + path = parts[1].split(b"?", 1)[0].decode("latin1") + except UnicodeDecodeError: + return None + return method + " " + path + + def reject_overloaded(self, conn): + """Reject a connection with 503 because the slow lane is saturated.""" + self.nr_conns -= 1 + try: + conn.sock.setblocking(True) + conn.sock.sendall( + b"HTTP/1.1 503 Service Unavailable\r\n" + b"Connection: close\r\n" + b"Content-Length: 0\r\n" + b"Retry-After: %d\r\n\r\n" + % int(self.cfg.slow_lane_retry_after)) + except OSError: + pass + finally: + conn.close() + def reuse_connection(self, conn, client): with self._lock: # unregister the client from the poller @@ -248,8 +401,16 @@ class ThreadWorker(base.Worker): self.alive = False self.log.error("A request timed out. Exiting.") faulthandler.dump_traceback() + elif (self.routing_enabled and not fut._observed_slow + and not fut.slow + and current_time - fut._start_time > self.slow_threshold): + # an in-flight fast-lane request crossed the threshold; learn + # the route as slow now so the rest of a burst is rerouted + # without waiting for this request to finish + self.predictor.observe_slow(fut.conn.route_key) + fut._observed_slow = True - self.tpool.shutdown(False) + self._shutdown_pools(False) self.poller.close() for s in self.sockets: @@ -258,27 +419,41 @@ class ThreadWorker(base.Worker): futures.wait(self.futures, timeout=self.cfg.graceful_timeout) def finish_request(self, fs): + # the slow request is done (whatever the outcome): free its slow slot + if self.routing_enabled and fs.slow: + self.nr_slow -= 1 + if fs.cancelled(): self.nr_conns -= 1 fs.conn.close() return + # feed the observed processing time back to the predictor so the route + # is learned (or unlearned) as slow + if self.routing_enabled and fs.conn.route_key: + self.predictor.update(fs.conn.route_key, + time.monotonic() - fs._start_time) + try: (keepalive, conn) = fs.result() # if the connection should be kept alived add it # to the eventloop and record it if keepalive and self.alive: - # flag the socket as non blocked - conn.sock.setblocking(False) + if self.routing_enabled and not self.cfg.is_ssl: + # re-classify the next request on this connection + self.park_for_request(conn) + else: + # flag the socket as non blocked + conn.sock.setblocking(False) - # register the connection - conn.set_timeout() - with self._lock: - self._keep.append(conn) + # register the connection + conn.set_timeout() + with self._lock: + self._keep.append(conn) - # add the socket to the event loop - self.poller.register(conn.sock, selectors.EVENT_READ, - partial(self.reuse_connection, conn)) + # add the socket to the event loop + self.poller.register(conn.sock, selectors.EVENT_READ, + partial(self.reuse_connection, conn)) else: self.nr_conns -= 1 conn.close() diff --git a/gunicorn/workers/gthread_routing.py b/gunicorn/workers/gthread_routing.py new file mode 100644 index 00000000..885a29f7 --- /dev/null +++ b/gunicorn/workers/gthread_routing.py @@ -0,0 +1,83 @@ +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +"""Slow-route prediction for the gthread worker. + +The :class:`SlowRoutePredictor` decides, before a request is handed to a +worker, whether its route is expected to be slow, based on previously observed +timings of the same route (method + path) plus operator-seeded patterns. The +gthread worker uses this to route slow requests to a dedicated thread pool so +they cannot starve fast requests. +""" + +import re +import threading +from collections import OrderedDict + + +class SlowRoutePredictor: + """Predicts whether a route (method + path) is slow. + + Timings are tracked per route as an exponentially weighted moving average + (EWMA) so that a route which becomes fast again decays back below the + threshold. The table is bounded (LRU) to cap memory under high route + cardinality. Operator-seeded regex patterns always classify as slow. + """ + + def __init__(self, threshold, max_entries=1024, alpha=0.3, + seed_patterns=None): + self.threshold = threshold + self.alpha = alpha + self.max_entries = max_entries + self._stats = OrderedDict() + self._lock = threading.Lock() + self._seed = [re.compile(p) for p in (seed_patterns or [])] + + def _seeded(self, key): + return any(p.search(key) for p in self._seed) + + def is_slow(self, key): + if not key: + return False + if self._seeded(key): + return True + with self._lock: + ewma = self._stats.get(key) + if ewma is None: + return False + self._stats.move_to_end(key) + return ewma >= self.threshold + + def update(self, key, duration): + """Record an observed processing ``duration`` (seconds) for ``key``.""" + if not key: + return + with self._lock: + ewma = self._stats.get(key) + if ewma is None: + ewma = duration + else: + ewma = (1 - self.alpha) * ewma + self.alpha * duration + self._stats[key] = ewma + self._stats.move_to_end(key) + self._evict() + + def observe_slow(self, key): + """Mark ``key`` slow now, before its request has finished. + + Used when an in-flight request crosses the threshold, so the rest of a + simultaneous burst to a never-seen slow route is routed to the slow lane + without waiting for the first request to complete. + """ + if not key: + return + with self._lock: + cur = self._stats.get(key, 0.0) + self._stats[key] = max(cur, self.threshold) + self._stats.move_to_end(key) + self._evict() + + def _evict(self): + while len(self._stats) > self.max_entries: + self._stats.popitem(last=False) diff --git a/tests/workers/test_gthread.py b/tests/workers/test_gthread.py new file mode 100644 index 00000000..c5db96b2 --- /dev/null +++ b/tests/workers/test_gthread.py @@ -0,0 +1,97 @@ +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +import socket +import types + +from gunicorn.workers.gthread import ThreadWorker, REQUEST_LINE_PEEK + + +# _route_key is a staticmethod, so it can be exercised directly. + +def test_route_key_basic(): + assert ThreadWorker._route_key(b"GET /index HTTP/1.1") == "GET /index" + + +def test_route_key_strips_query_string(): + assert ThreadWorker._route_key( + b"GET /search?q=hello&p=2 HTTP/1.1") == "GET /search" + + +def test_route_key_post(): + assert ThreadWorker._route_key( + b"POST /reports/generate HTTP/1.0") == "POST /reports/generate" + + +def test_route_key_malformed(): + assert ThreadWorker._route_key(b"") is None + assert ThreadWorker._route_key(b"GARBAGE") is None + assert ThreadWorker._route_key(None) is None + + +def _peek(conn): + # _peek_request_line only touches conn.sock, so we can bind it to a stub + return ThreadWorker._peek_request_line(object(), conn) + + +def test_peek_complete_request_line(): + a, b = socket.socketpair() + try: + a.setblocking(False) + b.sendall(b"GET /x HTTP/1.1\r\nHost: y\r\n\r\n") + conn = types.SimpleNamespace(sock=a) + line, closed, complete = _peek(conn) + assert line == b"GET /x HTTP/1.1" + assert closed is False + assert complete is True + # MSG_PEEK must leave the bytes in the buffer for the parser + assert a.recv(5) == b"GET /" + finally: + a.close() + b.close() + + +def test_peek_incomplete_request_line_waits(): + a, b = socket.socketpair() + try: + a.setblocking(False) + b.sendall(b"GET /x HTT") # no CRLF yet + conn = types.SimpleNamespace(sock=a) + line, closed, complete = _peek(conn) + assert line is None + assert closed is False + assert complete is False + finally: + a.close() + b.close() + + +def test_peek_no_data_yet(): + a, b = socket.socketpair() + try: + a.setblocking(False) + conn = types.SimpleNamespace(sock=a) + line, closed, complete = _peek(conn) + # nothing buffered: not closed, not complete -> keep waiting + assert (line, closed, complete) == (None, False, False) + finally: + a.close() + b.close() + + +def test_peek_peer_closed(): + a, b = socket.socketpair() + a.setblocking(False) + b.close() + try: + conn = types.SimpleNamespace(sock=a) + line, closed, complete = _peek(conn) + assert closed is True + finally: + a.close() + + +def test_peek_window_constant_is_reasonable(): + # a sanity bound so request lines fit comfortably in one peek + assert REQUEST_LINE_PEEK >= 8192 diff --git a/tests/workers/test_gthread_routing.py b/tests/workers/test_gthread_routing.py new file mode 100644 index 00000000..d14ccfae --- /dev/null +++ b/tests/workers/test_gthread_routing.py @@ -0,0 +1,53 @@ +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +from gunicorn.workers.gthread_routing import SlowRoutePredictor + + +def test_predictor_unknown_route_is_fast(): + p = SlowRoutePredictor(threshold=1.0) + assert p.is_slow("GET /") is False + + +def test_predictor_learns_slow_route_on_update(): + p = SlowRoutePredictor(threshold=1.0, alpha=1.0) + p.update("GET /slow", 5.0) + assert p.is_slow("GET /slow") is True + assert p.is_slow("GET /fast") is False + + +def test_predictor_ewma_decays_back_to_fast(): + p = SlowRoutePredictor(threshold=1.0, alpha=0.5) + p.update("GET /x", 5.0) + assert p.is_slow("GET /x") is True + # repeated fast samples should pull the EWMA back under the threshold + for _ in range(20): + p.update("GET /x", 0.01) + assert p.is_slow("GET /x") is False + + +def test_predictor_observe_slow_marks_immediately(): + p = SlowRoutePredictor(threshold=2.0) + p.observe_slow("POST /report") + assert p.is_slow("POST /report") is True + + +def test_predictor_seed_patterns(): + p = SlowRoutePredictor(threshold=1.0, seed_patterns=[r"^POST /reports/"]) + assert p.is_slow("POST /reports/generate") is True + assert p.is_slow("GET /reports/generate") is False + + +def test_predictor_lru_bound(): + p = SlowRoutePredictor(threshold=1.0, max_entries=10) + for i in range(50): + p.update("GET /%d" % i, 0.01) + assert len(p._stats) <= 10 + + +def test_predictor_empty_key_is_fast(): + p = SlowRoutePredictor(threshold=1.0) + assert p.is_slow(None) is False + p.update(None, 5.0) # must not raise + assert p.is_slow(None) is False From 48260712ea2550792553d756e049b5888b03a185 Mon Sep 17 00:00:00 2001 From: Ankush Menat Date: Thu, 28 May 2026 15:53:39 +0530 Subject: [PATCH 2/7] fix: Limit peek to 8KB --- gunicorn/workers/gthread.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gunicorn/workers/gthread.py b/gunicorn/workers/gthread.py index f666d4de..ee9c0f0d 100644 --- a/gunicorn/workers/gthread.py +++ b/gunicorn/workers/gthread.py @@ -32,7 +32,7 @@ from .. import sock from ..http import wsgi # how many bytes to peek when classifying a request by its request line -REQUEST_LINE_PEEK = 65536 +REQUEST_LINE_PEEK = 8192 class TConn: From 2471050b3ac43fac72b99b1762ef4f4435914b37 Mon Sep 17 00:00:00 2001 From: Ankush Menat Date: Thu, 28 May 2026 15:54:33 +0530 Subject: [PATCH 3/7] refactor: Use only 2 pool variables. --- gunicorn/workers/gthread.py | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/gunicorn/workers/gthread.py b/gunicorn/workers/gthread.py index ee9c0f0d..a1ce27b6 100644 --- a/gunicorn/workers/gthread.py +++ b/gunicorn/workers/gthread.py @@ -77,9 +77,8 @@ class ThreadWorker(base.Worker): self.worker_connections = self.cfg.worker_connections self.max_keepalived = self.cfg.worker_connections - self.cfg.threads # initialise the pool(s): a single pool when routing is disabled, or a - # separate fast and slow pool when it is enabled + # separate fast (``self.tpool``) and slow pool when it is enabled self.tpool = None - self.fast_pool = None self.slow_pool = None # number of slow requests submitted but not yet finished, used to bound # the slow lane (running + queued) and shed load with 503 @@ -106,19 +105,16 @@ class ThreadWorker(base.Worker): "Check the number of worker connections and threads.") def init_process(self): + self.tpool = self.get_thread_pool() if self.routing_enabled: self.predictor = SlowRoutePredictor( self.slow_threshold, seed_patterns=self.cfg.slow_routes, ) - # a dedicated pool per lane: slow requests can never occupy the - # fast pool's threads - self.fast_pool = futures.ThreadPoolExecutor( - max_workers=self.cfg.threads) + # a dedicated pool for the slow lane: slow requests can never + # occupy the fast pool's (``self.tpool``) threads self.slow_pool = futures.ThreadPoolExecutor( max_workers=self.cfg.slow_threads) - else: - self.tpool = self.get_thread_pool() self.poller = selectors.DefaultSelector() self._lock = RLock() super().init_process() @@ -128,7 +124,7 @@ class ThreadWorker(base.Worker): return futures.ThreadPoolExecutor(max_workers=self.cfg.threads) def _shutdown_pools(self, wait): - for pool in (self.tpool, self.fast_pool, self.slow_pool): + for pool in (self.tpool, self.slow_pool): if pool is not None: pool.shutdown(wait) @@ -156,9 +152,7 @@ class ThreadWorker(base.Worker): def enqueue_req(self, conn, slow=False): conn.init() # submit the connection to the appropriate pool - if not self.routing_enabled: - fs = self.tpool.submit(self.handle, conn) - elif slow: + if self.routing_enabled and slow: cap = self.cfg.slow_queue_maxsize if cap and self.nr_slow >= self.cfg.slow_threads + cap: # slow lane (running + queued) is full; shed load with 503 @@ -168,7 +162,7 @@ class ThreadWorker(base.Worker): self.nr_slow += 1 fs = self.slow_pool.submit(self.handle, conn) else: - fs = self.fast_pool.submit(self.handle, conn) + fs = self.tpool.submit(self.handle, conn) self._wrap_future(fs, conn, slow=slow) def accept(self, server, listener): From ec6af68013d75c1ca2aa56df7b0719b236831045 Mon Sep 17 00:00:00 2001 From: Ankush Menat Date: Thu, 28 May 2026 16:01:02 +0530 Subject: [PATCH 4/7] fix: Remove hardcoded paths for slow prediction --- docs/design/gthread-slow-request-isolation.md | 16 ++++--------- gunicorn/config.py | 24 ------------------- gunicorn/workers/gthread.py | 5 +--- gunicorn/workers/gthread_routing.py | 18 ++++---------- tests/workers/test_gthread_routing.py | 6 ----- 5 files changed, 11 insertions(+), 58 deletions(-) diff --git a/docs/design/gthread-slow-request-isolation.md b/docs/design/gthread-slow-request-isolation.md index c674a294..cc62b8f7 100644 --- a/docs/design/gthread-slow-request-isolation.md +++ b/docs/design/gthread-slow-request-isolation.md @@ -163,9 +163,6 @@ New settings, mirroring `WorkerThreads` (`config.py:697`): - `slow_threads` — `S`, slow-lane worker count. Default `1`. - `slow_queue_maxsize` — bound on `slow_q`; overflow ⇒ `503`. Default e.g. `100` (`0` = unbounded). -- `slow_routes` — optional list of regex route patterns (matched with - `re.search` against the route key) operators know are slow, seeded into the - predictor so even the *first* request routes correctly. - `slow_lane_retry_after` — seconds for the `Retry-After` header on 503. A `slow_route_key` hook to customize the route key (e.g. collapse @@ -198,9 +195,8 @@ A small, self-contained, thread-safe object: - `update(route_key, duration)`: EWMA with decay so a route that becomes fast again eventually returns to the fast lane (avoids permanent misclassification after a one-off slow spike). Called on every completion. -- `is_slow(route_key)`: `True` if the route matches a seeded `slow_routes` - pattern, or its `ewma_seconds >= slow_request_threshold`. Unknown routes ⇒ - `False` (fast) by default. +- `is_slow(route_key)`: `True` if its `ewma_seconds >= slow_request_threshold`. + Unknown routes ⇒ `False` (fast) by default. - Optional hysteresis (separate promote/demote thresholds) to avoid flapping around the boundary. @@ -216,19 +212,17 @@ A small, self-contained, thread-safe object: request — we can't). This shortens the learning window when many requests to a brand-new slow route arrive at once: subsequent ones in the burst route to the slow lane after one threshold interval instead of after a full slow request. -3. **Seeding (eliminates the first-occurrence window for known offenders)**: - `slow_routes` patterns mark routes slow from the very first request. ## 6. Behavior under load (the cases that matter) -- **Flood of a known/seeded or previously-seen slow route**: every such request +- **Flood of a previously-seen slow route**: every such request is routed to the slow pool. The `F` fast threads are never given this work and keep serving fast traffic at full capacity. When the slow lane reaches `S + slow_queue_maxsize`, further slow requests get a fast `503` — backpressure is contained to the slow lane. - **Flood of a never-seen slow route**: the first occurrence(s) run in the fast lane; mid-flight learning (§5.4.2) flips the route to slow after one threshold - interval, so the flood is contained quickly. Seeding avoids even this window. + interval, so the flood is contained quickly. - **Mixed fast traffic, idle slow lane**: the `S` slow threads stay parked (no work stealing in this design — see §3), so fast throughput is `F`, not `F + S`. - **Misprediction (route marked slow but now fast)**: handled gracefully — it @@ -239,7 +233,7 @@ A small, self-contained, thread-safe object: Implemented: - `config.py` — `slow_request_threshold`, `slow_threads`, `slow_queue_maxsize`, - `slow_routes`, `slow_lane_retry_after`, plus `validate_pos_float`. + `slow_lane_retry_after`, plus `validate_pos_float`. - `gthread.py` `init_process`/`get_thread_pool` — build `fast_pool` and `slow_pool` (or the single legacy pool when disabled); `_shutdown_pools`. - `gthread.py` `enqueue_req` — route to the matching pool; `nr_slow` bound + diff --git a/gunicorn/config.py b/gunicorn/config.py index a5cfa4b9..0bbfd3fc 100644 --- a/gunicorn/config.py +++ b/gunicorn/config.py @@ -823,30 +823,6 @@ class SlowQueueMaxsize(Setting): """ -class SlowRoutes(Setting): - name = "slow_routes" - section = "Worker Processes" - cli = ["--slow-route"] - action = "append" - meta = "PATTERN" - validator = validate_list_string - default = [] - desc = """\ - Regular expression(s) matching routes that should always be treated as - slow, regardless of observed timings. - - Each pattern is matched (using ``re.search``) against the route key, - which is the request method and path joined by a space, e.g. - ``"POST /reports/generate"``. Seeding known-slow routes avoids the brief - window where a never-before-seen slow route is learned. - - Only used by the ``gthread`` worker when - :ref:`slow-request-threshold` is set. - - .. versionadded:: 23.1.0 - """ - - class SlowLaneRetryAfter(Setting): name = "slow_lane_retry_after" section = "Worker Processes" diff --git a/gunicorn/workers/gthread.py b/gunicorn/workers/gthread.py index a1ce27b6..c74e62f2 100644 --- a/gunicorn/workers/gthread.py +++ b/gunicorn/workers/gthread.py @@ -107,10 +107,7 @@ class ThreadWorker(base.Worker): def init_process(self): self.tpool = self.get_thread_pool() if self.routing_enabled: - self.predictor = SlowRoutePredictor( - self.slow_threshold, - seed_patterns=self.cfg.slow_routes, - ) + self.predictor = SlowRoutePredictor(self.slow_threshold) # a dedicated pool for the slow lane: slow requests can never # occupy the fast pool's (``self.tpool``) threads self.slow_pool = futures.ThreadPoolExecutor( diff --git a/gunicorn/workers/gthread_routing.py b/gunicorn/workers/gthread_routing.py index 885a29f7..b2ebf26a 100644 --- a/gunicorn/workers/gthread_routing.py +++ b/gunicorn/workers/gthread_routing.py @@ -6,12 +6,11 @@ The :class:`SlowRoutePredictor` decides, before a request is handed to a worker, whether its route is expected to be slow, based on previously observed -timings of the same route (method + path) plus operator-seeded patterns. The -gthread worker uses this to route slow requests to a dedicated thread pool so -they cannot starve fast requests. +timings of the same route (method + path). The gthread worker uses this to +route slow requests to a dedicated thread pool so they cannot starve fast +requests. """ -import re import threading from collections import OrderedDict @@ -22,26 +21,19 @@ class SlowRoutePredictor: Timings are tracked per route as an exponentially weighted moving average (EWMA) so that a route which becomes fast again decays back below the threshold. The table is bounded (LRU) to cap memory under high route - cardinality. Operator-seeded regex patterns always classify as slow. + cardinality. """ - def __init__(self, threshold, max_entries=1024, alpha=0.3, - seed_patterns=None): + def __init__(self, threshold, max_entries=1024, alpha=0.3): self.threshold = threshold self.alpha = alpha self.max_entries = max_entries self._stats = OrderedDict() self._lock = threading.Lock() - self._seed = [re.compile(p) for p in (seed_patterns or [])] - - def _seeded(self, key): - return any(p.search(key) for p in self._seed) def is_slow(self, key): if not key: return False - if self._seeded(key): - return True with self._lock: ewma = self._stats.get(key) if ewma is None: diff --git a/tests/workers/test_gthread_routing.py b/tests/workers/test_gthread_routing.py index d14ccfae..af933cc3 100644 --- a/tests/workers/test_gthread_routing.py +++ b/tests/workers/test_gthread_routing.py @@ -33,12 +33,6 @@ def test_predictor_observe_slow_marks_immediately(): assert p.is_slow("POST /report") is True -def test_predictor_seed_patterns(): - p = SlowRoutePredictor(threshold=1.0, seed_patterns=[r"^POST /reports/"]) - assert p.is_slow("POST /reports/generate") is True - assert p.is_slow("GET /reports/generate") is False - - def test_predictor_lru_bound(): p = SlowRoutePredictor(threshold=1.0, max_entries=10) for i in range(50): From 7dc4d184bee02b7a3233e4c964385e78b96720bd Mon Sep 17 00:00:00 2001 From: Ankush Menat Date: Thu, 28 May 2026 16:33:54 +0530 Subject: [PATCH 5/7] refactor: Simpler implementation Just do multiple queues, nothing else. --- docs/design/gthread-slow-request-isolation.md | 86 +++++++------- gunicorn/config.py | 109 +++++------------- gunicorn/workers/gthread.py | 59 +++------- 3 files changed, 92 insertions(+), 162 deletions(-) diff --git a/docs/design/gthread-slow-request-isolation.md b/docs/design/gthread-slow-request-isolation.md index cc62b8f7..18ed8797 100644 --- a/docs/design/gthread-slow-request-isolation.md +++ b/docs/design/gthread-slow-request-isolation.md @@ -57,23 +57,23 @@ the actual failure mode — prediction is effective after the first sample. │ fast │ slow ▼ ▼ ┌───────────┐ ┌───────────┐ - │ fast_pool │ │ slow_pool │ (bounded, 503 on full) + │ fast_pool │ │ slow_pool │ │ F threads │ │ S threads │ └─────┬─────┘ └─────┬─────┘ └───────┬───────┘ on completion: predictor.update(route, duration) ``` -- **Fast lane**: a `ThreadPoolExecutor` of `F = cfg.threads` threads. Only ever - runs fast-classified work. -- **Slow lane**: a separate `ThreadPoolExecutor` of `S = cfg.slow_threads` - threads (default 1). Only ever runs slow-classified work. -- Total OS threads per worker = `F + S`. -- The slow lane is bounded: a counter (`nr_slow`) tracks slow requests - submitted-but-not-finished; once it reaches `S + cfg.slow_queue_maxsize` (i.e. - running plus queued), further slow requests are rejected with `503` instead of - growing the executor's unbounded internal queue. The fast lane is governed by - the existing `worker_connections` admission like today. +- **Fast lane**: a `ThreadPoolExecutor` of `F = ceil(cfg.threads / 2)` threads. + Only ever runs fast-classified work. +- **Slow lane**: a separate `ThreadPoolExecutor` of `S = cfg.threads // 2` + threads. Only ever runs slow-classified work. +- Total OS threads per worker stays at `cfg.threads` — adaptive-queueing mode splits + the existing budget, it does not expand it. `cfg.threads` must be at least 2 + for the split to be meaningful; otherwise the worker logs a warning and runs + with a single pool. +- Both lanes share the existing `worker_connections` admission like today; the + slow lane is not separately bounded. ### Why two plain pools (and not a custom dual-queue scheduler) @@ -157,13 +157,13 @@ the handshake completes. For SSL connections in this first cut: New settings, mirroring `WorkerThreads` (`config.py:697`): +- `enable_adaptive_queueing` — boolean; when true, the `gthread` worker splits its + `cfg.threads` budget between a fast and a slow lane and routes by prediction. + Default `False` (single pool, today's behavior). Requires `cfg.threads >= 2`; + otherwise the worker logs a warning and falls back to the single pool. - `slow_request_threshold` — float seconds; a route whose learned timing meets/ - exceeds this is "slow". Default e.g. `1.0`. **`0` disables the whole feature** - and restores today's single-pool behavior exactly. -- `slow_threads` — `S`, slow-lane worker count. Default `1`. -- `slow_queue_maxsize` — bound on `slow_q`; overflow ⇒ `503`. Default e.g. `100` - (`0` = unbounded). -- `slow_lane_retry_after` — seconds for the `Retry-After` header on 503. + exceeds this is "slow". Default `1.0`. Only consulted when `enable_adaptive_queueing` is + enabled. A `slow_route_key` hook to customize the route key (e.g. collapse `/users/`) is a possible future addition; the default key is method + path @@ -171,17 +171,17 @@ with the query string stripped. ### 5.2 Two thread pools -`init_process` builds two plain `ThreadPoolExecutor`s when routing is enabled — -`fast_pool` (`F = cfg.threads`) and `slow_pool` (`S = cfg.slow_threads`) — and -falls back to the single `get_thread_pool()` executor when it is disabled. -`enqueue_req(conn, slow)` submits to the matching pool; both produce ordinary +`init_process` builds two plain `ThreadPoolExecutor`s when `enable_adaptive_queueing` is +enabled — `fast_pool` with `F = ceil(cfg.threads / 2)` workers and `slow_pool` +with `S = cfg.threads // 2` workers — and falls back to the single +`get_thread_pool()` executor when it is disabled. `enqueue_req(conn, slow)` +submits to the matching pool; both produce ordinary `concurrent.futures.Future`s, so `_wrap_future`, `add_done_callback`, `self.futures` tracking, and `futures.wait` all keep working unchanged. -- Bounding the slow lane: `nr_slow` counts slow requests submitted-but-not-yet- - finished. `enqueue_req` rejects (503) when `nr_slow >= S + slow_queue_maxsize`; - `finish_request` decrements it. This caps the slow executor's otherwise - unbounded internal queue. +- The slow lane is not separately bounded; its executor's internal queue is + capped indirectly by the worker's existing `worker_connections` admission, + the same as the single-pool path today. - Shutdown drains both pools via a `_shutdown_pools` helper, replacing the single `tpool.shutdown` calls; the `graceful_timeout` `futures.wait` is unchanged. @@ -215,16 +215,16 @@ A small, self-contained, thread-safe object: ## 6. Behavior under load (the cases that matter) -- **Flood of a previously-seen slow route**: every such request - is routed to the slow pool. The `F` fast threads are never given this work and - keep serving fast traffic at full capacity. When the slow lane reaches - `S + slow_queue_maxsize`, further slow requests get a fast `503` — backpressure - is contained to the slow lane. +- **Flood of a previously-seen slow route**: every such request is routed to + the slow pool. The `F` fast threads are never given this work and keep + serving fast traffic at full capacity. Excess slow requests sit in the slow + pool's queue, gated overall by `worker_connections`. - **Flood of a never-seen slow route**: the first occurrence(s) run in the fast lane; mid-flight learning (§5.4.2) flips the route to slow after one threshold interval, so the flood is contained quickly. - **Mixed fast traffic, idle slow lane**: the `S` slow threads stay parked (no - work stealing in this design — see §3), so fast throughput is `F`, not `F + S`. + work stealing in this design — see §3), so fast throughput is `F`, not + `F + S`. This is the cost of splitting a fixed `cfg.threads` budget. - **Misprediction (route marked slow but now fast)**: handled gracefully — it runs in the slow lane, and EWMA decay restores it to the fast lane over time. @@ -232,23 +232,23 @@ A small, self-contained, thread-safe object: Implemented: -- `config.py` — `slow_request_threshold`, `slow_threads`, `slow_queue_maxsize`, - `slow_lane_retry_after`, plus `validate_pos_float`. +- `config.py` — `enable_adaptive_queueing`, `slow_request_threshold`, plus + `validate_pos_float`. - `gthread.py` `init_process`/`get_thread_pool` — build `fast_pool` and - `slow_pool` (or the single legacy pool when disabled); `_shutdown_pools`. -- `gthread.py` `enqueue_req` — route to the matching pool; `nr_slow` bound + - `reject_overloaded` (503). + `slow_pool` (split from `cfg.threads`) when `enable_adaptive_queueing` is on, or the + single legacy pool when off; `_shutdown_pools`. +- `gthread.py` `enqueue_req` — route to the matching pool. - `gthread.py` `accept`/`park_for_request`/`classify_and_dispatch`/ `_peek_request_line`/`_route_key` — poller-driven request-line peek + routing. -- `gthread.py` `finish_request` — `predictor.update`, `nr_slow` decrement, - routing-aware keepalive re-park. +- `gthread.py` `finish_request` — `predictor.update`, routing-aware keepalive + re-park. - `gthread.py` run-loop sweep — mid-flight learning. - `gthread_routing.py` — `SlowRoutePredictor`. ## 8. Backward compatibility -- `slow_request_threshold = 0` ⇒ feature off: single pool, no classification, no - rejection — byte-for-byte current behavior. +- `enable_adaptive_queueing = False` (the default) ⇒ feature off: single pool, no + classification — byte-for-byte current behavior. - Hard per-request timeout (`gthread.py:243-250`) preserved unchanged; this adds a softer, non-fatal classification on top. - Worker `handle`/`handle_request`, keepalive semantics, and the @@ -265,8 +265,8 @@ Implemented: dispatches to the expected lane. - **Integration — flood isolation**: app with a known-slow route flooded concurrently; assert fast-route latency stays low and slow requests never - occupy fast workers; assert 503 once the slow lane is full. + occupy fast workers. - **Integration — cold start**: never-seen slow route burst ⇒ confirm the lane flips to slow within ~one threshold interval via mid-flight learning. -- **Regression**: `slow_request_threshold = 0` ⇒ current behavior; keepalive, - SSL, and graceful shutdown paths still pass existing tests. +- **Regression**: `enable_adaptive_queueing = False` ⇒ current behavior; keepalive, SSL, and + graceful shutdown paths still pass existing tests. diff --git a/gunicorn/config.py b/gunicorn/config.py index 0bbfd3fc..7681b043 100644 --- a/gunicorn/config.py +++ b/gunicorn/config.py @@ -744,32 +744,26 @@ class WorkerConnections(Setting): """ -class SlowRequestThreshold(Setting): - name = "slow_request_threshold" +class EnableAdaptiveQueueing(Setting): + name = "enable_adaptive_queueing" section = "Worker Processes" - cli = ["--slow-request-threshold"] - meta = "FLOAT" - validator = validate_pos_float - type = float - default = 0.0 + cli = ["--enable-adaptive-queueing"] + validator = validate_bool + action = "store_true" + type = bool + default = False desc = """\ - Processing time (in seconds) above which a request route is treated as - "slow" by the ``gthread`` worker. + Enable adaptive multi-queue routing in the ``gthread`` worker. - When set to a positive value, the ``gthread`` worker runs two separate - lanes of threads: a *fast* lane (sized by :ref:`threads`) and a *slow* - lane (sized by :ref:`slow-threads`). Requests are routed by predicting, - from previously observed timings of the same route (method + path), - whether they will take longer than this threshold. Slow-predicted - requests go to the slow lane so they can never starve the fast lane, - even under a flood. + When enabled, the worker splits its :ref:`threads` roughly evenly into + two lanes — a *fast* lane and a *slow* lane — and routes each request + to one of them by predicting, from previously observed timings of the + same route (method + path), whether it will exceed + :ref:`slow-request-threshold`. Slow-predicted requests go to the slow + lane so they can never starve the fast lane, even under a flood of + slow requests. - A route is learned as slow once it has been observed exceeding this - threshold (either on completion or while still running); its timing - decays back below the threshold if it becomes fast again. - - The default of ``0`` disables the feature and restores the single - thread-pool behaviour. + Requires :ref:`threads` to be at least 2. This setting only affects the ``gthread`` worker type. @@ -777,66 +771,25 @@ class SlowRequestThreshold(Setting): """ -class SlowThreads(Setting): - name = "slow_threads" +class SlowRequestThreshold(Setting): + name = "slow_request_threshold" section = "Worker Processes" - cli = ["--slow-threads"] - meta = "INT" - validator = validate_pos_int - type = int - default = 1 + cli = ["--slow-request-threshold"] + meta = "FLOAT" + validator = validate_pos_float + type = float + default = 1.0 desc = """\ - The number of worker threads dedicated to the slow lane. + Processing time (in seconds) above which a request route is treated as + "slow" by the ``gthread`` worker when :ref:`enable-adaptive-queueing` + is enabled. - These threads only ever run requests predicted to be slow. When the slow - lane has no pending work, they help drain the fast lane, so they are - never idle while fast work is waiting. Total threads per worker is - ``threads + slow_threads``. + A route is learned as slow once it has been observed exceeding this + threshold (either on completion or while still running); its timing + decays back below the threshold if it becomes fast again. - Only used by the ``gthread`` worker when - :ref:`slow-request-threshold` is set. - - .. versionadded:: 23.1.0 - """ - - -class SlowQueueMaxsize(Setting): - name = "slow_queue_maxsize" - section = "Worker Processes" - cli = ["--slow-queue-maxsize"] - meta = "INT" - validator = validate_pos_int - type = int - default = 100 - desc = """\ - Maximum number of requests allowed to wait in the slow lane queue. - - When the slow lane queue is full, additional slow-predicted requests are - rejected immediately with a ``503 Service Unavailable`` response instead - of being queued, keeping the slow lane's backpressure contained. A value - of ``0`` means the slow queue is unbounded. - - Only used by the ``gthread`` worker when - :ref:`slow-request-threshold` is set. - - .. versionadded:: 23.1.0 - """ - - -class SlowLaneRetryAfter(Setting): - name = "slow_lane_retry_after" - section = "Worker Processes" - cli = ["--slow-lane-retry-after"] - meta = "INT" - validator = validate_pos_int - type = int - default = 1 - desc = """\ - Value (in seconds) of the ``Retry-After`` header sent with the ``503`` - response when the slow lane queue is full. - - Only used by the ``gthread`` worker when - :ref:`slow-request-threshold` is set. + Only used by the ``gthread`` worker when :ref:`enable-adaptive-queueing` + is enabled. .. versionadded:: 23.1.0 """ diff --git a/gunicorn/workers/gthread.py b/gunicorn/workers/gthread.py index c74e62f2..9bb0e1f1 100644 --- a/gunicorn/workers/gthread.py +++ b/gunicorn/workers/gthread.py @@ -76,24 +76,22 @@ class ThreadWorker(base.Worker): super().__init__(*args, **kwargs) self.worker_connections = self.cfg.worker_connections self.max_keepalived = self.cfg.worker_connections - self.cfg.threads + # request routing: when adaptive queueing is enabled, the configured + # threads are split into a fast lane and a slow lane so slow requests + # cannot starve fast ones + self.routing_enabled = ( + self.cfg.enable_adaptive_queueing and self.cfg.threads >= 2) + self.slow_threshold = self.cfg.slow_request_threshold # initialise the pool(s): a single pool when routing is disabled, or a # separate fast (``self.tpool``) and slow pool when it is enabled self.tpool = None self.slow_pool = None - # number of slow requests submitted but not yet finished, used to bound - # the slow lane (running + queued) and shed load with 503 - self.nr_slow = 0 self.poller = None self.shutdown_event = os.eventfd(0) self._lock = None self.futures = deque() self._keep = deque() self.nr_conns = 0 - - # request routing: when a slow-request threshold is configured, slow - # requests are routed to a separate lane so they cannot starve fast ones - self.slow_threshold = self.cfg.slow_request_threshold - self.routing_enabled = self.slow_threshold > 0 self.predictor = None @classmethod @@ -104,14 +102,21 @@ class ThreadWorker(base.Worker): log.warning("No keepalived connections can be handled. " + "Check the number of worker connections and threads.") + if cfg.enable_adaptive_queueing and cfg.threads < 2: + log.warning("enable_adaptive_queueing requires at least 2 threads; " + "running with a single pool.") + def init_process(self): - self.tpool = self.get_thread_pool() if self.routing_enabled: + # split the configured threads roughly evenly between the two + # lanes; the fast lane gets the extra thread when threads is odd + slow = self.cfg.threads // 2 + fast = self.cfg.threads - slow + self.tpool = futures.ThreadPoolExecutor(max_workers=fast) + self.slow_pool = futures.ThreadPoolExecutor(max_workers=slow) self.predictor = SlowRoutePredictor(self.slow_threshold) - # a dedicated pool for the slow lane: slow requests can never - # occupy the fast pool's (``self.tpool``) threads - self.slow_pool = futures.ThreadPoolExecutor( - max_workers=self.cfg.slow_threads) + else: + self.tpool = self.get_thread_pool() self.poller = selectors.DefaultSelector() self._lock = RLock() super().init_process() @@ -148,15 +153,7 @@ class ThreadWorker(base.Worker): def enqueue_req(self, conn, slow=False): conn.init() - # submit the connection to the appropriate pool if self.routing_enabled and slow: - cap = self.cfg.slow_queue_maxsize - if cap and self.nr_slow >= self.cfg.slow_threads + cap: - # slow lane (running + queued) is full; shed load with 503 - # instead of letting the slow queue grow unbounded - self.reject_overloaded(conn) - return - self.nr_slow += 1 fs = self.slow_pool.submit(self.handle, conn) else: fs = self.tpool.submit(self.handle, conn) @@ -262,22 +259,6 @@ class ThreadWorker(base.Worker): return None return method + " " + path - def reject_overloaded(self, conn): - """Reject a connection with 503 because the slow lane is saturated.""" - self.nr_conns -= 1 - try: - conn.sock.setblocking(True) - conn.sock.sendall( - b"HTTP/1.1 503 Service Unavailable\r\n" - b"Connection: close\r\n" - b"Content-Length: 0\r\n" - b"Retry-After: %d\r\n\r\n" - % int(self.cfg.slow_lane_retry_after)) - except OSError: - pass - finally: - conn.close() - def reuse_connection(self, conn, client): with self._lock: # unregister the client from the poller @@ -410,10 +391,6 @@ class ThreadWorker(base.Worker): futures.wait(self.futures, timeout=self.cfg.graceful_timeout) def finish_request(self, fs): - # the slow request is done (whatever the outcome): free its slow slot - if self.routing_enabled and fs.slow: - self.nr_slow -= 1 - if fs.cancelled(): self.nr_conns -= 1 fs.conn.close() From c24711b795482e6b6b03853d90f35454816ff856 Mon Sep 17 00:00:00 2001 From: Ankush Menat Date: Thu, 28 May 2026 16:36:49 +0530 Subject: [PATCH 6/7] fix: Increase default slow request threshold --- gunicorn/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gunicorn/config.py b/gunicorn/config.py index 7681b043..6d8809e1 100644 --- a/gunicorn/config.py +++ b/gunicorn/config.py @@ -778,7 +778,7 @@ class SlowRequestThreshold(Setting): meta = "FLOAT" validator = validate_pos_float type = float - default = 1.0 + default = 5.0 desc = """\ Processing time (in seconds) above which a request route is treated as "slow" by the ``gthread`` worker when :ref:`enable-adaptive-queueing` From c20f3a378479a0acd4729f089ec517c873d21ac8 Mon Sep 17 00:00:00 2001 From: Ankush Menat Date: Thu, 28 May 2026 16:37:55 +0530 Subject: [PATCH 7/7] fix: Support configuring via envvar --- gunicorn/config.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/gunicorn/config.py b/gunicorn/config.py index 6d8809e1..f6c69a5f 100644 --- a/gunicorn/config.py +++ b/gunicorn/config.py @@ -751,10 +751,14 @@ class EnableAdaptiveQueueing(Setting): validator = validate_bool action = "store_true" type = bool - default = False + default = validate_bool( + os.environ.get("GUNICORN_ENABLE_ADAPTIVE_QUEUEING", "false")) desc = """\ Enable adaptive multi-queue routing in the ``gthread`` worker. + Can also be enabled by setting the ``GUNICORN_ENABLE_ADAPTIVE_QUEUEING`` + environment variable to ``true``. + When enabled, the worker splits its :ref:`threads` roughly evenly into two lanes — a *fast* lane and a *slow* lane — and routes each request to one of them by predicting, from previously observed timings of the