feat: Adaptive queueing of slow/fast requests

This commit is contained in:
Ankush Menat 2026-05-27 11:30:02 +05:30
parent bb554053bb
commit ee9bf1e950
6 changed files with 836 additions and 21 deletions

View File

@ -0,0 +1,278 @@
# Design: Slow-request isolation for the gthread worker (predictive dual-queue)
Status: proposal / draft
Author: (ankush)
Scope: `gunicorn/workers/gthread.py`, `gunicorn/config.py`
## 1. Problem
The `gthread` worker runs synchronous WSGI applications on a single
`ThreadPoolExecutor` sized to `cfg.threads` (`gthread.py:95-97`). Every accepted
connection is submitted to that one pool (`enqueue_req`, `gthread.py:117-121`).
Because the pool has a fixed number of threads and an unbounded work queue, a
flood of slow requests occupies every thread and all fast requests starve behind
them in the queue — head-of-line blocking.
Goal: **route requests that are predicted to be slow into a separate, dedicated
lane so they can never occupy the threads reserved for fast requests, even under
a flood.** Fast requests go to a fast lane; slow requests go to a slow lane. The
slow lane may help drain fast work when its own queue is empty, but the fast
lane never touches slow work.
This supersedes the earlier "demotion-only" proposal, which could not stop slow
work from entering the fast pool and therefore could not survive a flood.
## 2. Why prediction is required (and its hard limit)
You cannot preempt a running Python thread executing WSGI code (`gthread.py:352`):
once a slow request is on a thread, that thread is committed until the app
returns. So isolation has to happen **before** a request is handed to a worker —
i.e. at routing time. That means we must decide "fast or slow" from the request
*before* running it.
The only information available pre-execution is the request itself (method,
path, headers) plus what we have learned from prior requests. So the design is:
1. A **predictor** that, given a request's route, answers "slow?" using learned
per-route timing statistics (plus optional operator-seeded patterns).
2. **Routing at accept time** based on that prediction, into one of two pools.
3. **Learning**: every completed request — and any request that crosses the
slow threshold mid-flight — updates the predictor, so a slow route is
recognized after its first occurrence(s) and all subsequent traffic to it is
routed to the slow lane.
Hard limit to state up front: a route that has **never been seen** cannot be
predicted slow on its very first request(s); those first occurrences run in the
fast lane until learning kicks in. We minimize this window (§5.4) and let
operators pre-seed known-slow routes (§5.1). For repeated/flooding slow routes —
the actual failure mode — prediction is effective after the first sample.
## 3. Architecture overview
```
┌─────────────────────────────────────────┐
listener ──accept──▶ │ main loop: poller-driven classification │
│ peek request line ▶ predictor.is_slow? │
└───────────────┬───────────────┬───────────┘
│ fast │ slow
▼ ▼
┌───────────┐ ┌───────────┐
│ fast_pool │ │ slow_pool │ (bounded, 503 on full)
│ F threads │ │ S threads │
└─────┬─────┘ └─────┬─────┘
└───────┬───────┘
on completion: predictor.update(route, duration)
```
- **Fast lane**: a `ThreadPoolExecutor` of `F = cfg.threads` threads. Only ever
runs fast-classified work.
- **Slow lane**: a separate `ThreadPoolExecutor` of `S = cfg.slow_threads`
threads (default 1). Only ever runs slow-classified work.
- Total OS threads per worker = `F + S`.
- The slow lane is bounded: a counter (`nr_slow`) tracks slow requests
submitted-but-not-finished; once it reaches `S + cfg.slow_queue_maxsize` (i.e.
running plus queued), further slow requests are rejected with `503` instead of
growing the executor's unbounded internal queue. The fast lane is governed by
the existing `worker_connections` admission like today.
### Why two plain pools (and not a custom dual-queue scheduler)
An earlier revision used a single custom scheduler with two queues and
one-directional **work stealing** (idle slow threads draining the fast queue).
Two independent `ThreadPoolExecutor`s are dramatically simpler and rely on
well-tested stdlib machinery. The one capability given up is work stealing: the
`S` slow threads sit idle when there is no slow work, even if fast work is
queued. For the common case (`S` small, e.g. 1) this is a negligible amount of
parked capacity, and the simplicity is worth it. If maximizing throughput under
pure-fast load ever matters more than simplicity, the custom scheduler can be
reintroduced behind the same `enqueue_req` interface without touching routing.
## 4. Routing point: classify before threading
Today, parsing happens inside the worker thread (`handle``next(conn.parser)`,
`gthread.py:295`), which is too late — the request is already on a thread. We
move *classification only* (not full parsing) into the main loop.
### 4.1 Restructured connection lifecycle
Both freshly accepted connections and keepalive connections flow through one
poller-driven classification step (this unifies `accept`/`reuse_connection` and
also moves slow-client header reads off the worker threads — a side benefit
against slowloris):
1. `accept` (`gthread.py:123`): accept socket, create `TConn`, set non-blocking,
register it in the poller for `EVENT_READ` with a `classify_and_dispatch`
callback. **Do not submit to any pool yet.** `nr_conns += 1`.
2. When the socket becomes readable, `classify_and_dispatch(conn)`:
- **Peek** the buffered bytes with `recv(n, socket.MSG_PEEK)` (plaintext) —
this reads without consuming, so the worker's parser still sees the full
byte stream unchanged. No parser changes required.
- Parse just the request line (`METHOD SP PATH SP VERSION CRLF`) from the
peeked buffer. If the line has not fully arrived yet, return and wait for
the next readable event (bounded by the existing keepalive/header timeout so
a stalled client is eventually closed, not left forever).
> **Why peek the request line, not fully read/parse the request here?**
> Classification only needs method + path. Doing a *full* read/parse of the
> request in the main loop is actively harmful: the main loop is a single
> thread serving every connection (accepts, keepalive, the poller). A blocking
> full read lets one slow client — slowloris, slow network, or a large/chunked
> body — stall the **entire worker**, which is strictly worse than the
> thread-pool starvation we are fixing (there is no pool to absorb it).
> Peeking only inspects already-buffered bytes and defers to the poller if the
> line is incomplete, so it never blocks. It also avoids having to read the
> body in the main loop (WSGI streams `wsgi.input` lazily) and keeps header
> parsing, parse-error responses (400/414), and `wsgi.input` wiring in the
> worker where they already live.
- Compute `route_key` (default: `METHOD + " " + path`, query string stripped;
overridable via hook, §5.1).
- `slow = predictor.is_slow(route_key)` (or matches a seeded slow pattern).
- Unregister the socket from the poller and submit the connection to the
**slow** pool if `slow` else the **fast** pool.
3. The worker's `handle`/`handle_request` run unchanged. On completion, the
measured `request_time` (already computed at `gthread.py:362`) is fed to
`predictor.update(route_key, duration)`.
4. Keepalive: after a kept-alive request, re-register the connection in the
poller with the same `classify_and_dispatch` callback (instead of the old
`reuse_connection`), so the *next* request on the connection is re-classified
independently (it may hit a different route).
### 4.2 SSL connections
Plaintext peek does not work through TLS — the request line is encrypted until
the handshake completes. For SSL connections in this first cut:
- They cannot be pre-classified at the socket level, so they default to the
**fast** lane and rely on mid-flight + completion learning (§5.4) — meaning an
SSL-only deployment does not get full flood protection.
- Note in docs that the common production layout terminates TLS upstream (e.g.
nginx) so gunicorn sees plaintext and gets full protection.
- **Phase 2** (deferred): drive a non-blocking TLS handshake from the poller and
buffer the decrypted request line (feeding it back via `Unreader.unread`,
`unreader.py:51`) to classify SSL the same way.
## 5. Components
### 5.1 Config (`gunicorn/config.py`)
New settings, mirroring `WorkerThreads` (`config.py:697`):
- `slow_request_threshold` — float seconds; a route whose learned timing meets/
exceeds this is "slow". Default e.g. `1.0`. **`0` disables the whole feature**
and restores today's single-pool behavior exactly.
- `slow_threads``S`, slow-lane worker count. Default `1`.
- `slow_queue_maxsize` — bound on `slow_q`; overflow ⇒ `503`. Default e.g. `100`
(`0` = unbounded).
- `slow_routes` — optional list of regex route patterns (matched with
`re.search` against the route key) operators know are slow, seeded into the
predictor so even the *first* request routes correctly.
- `slow_lane_retry_after` — seconds for the `Retry-After` header on 503.
A `slow_route_key` hook to customize the route key (e.g. collapse
`/users/<id>`) is a possible future addition; the default key is method + path
with the query string stripped.
### 5.2 Two thread pools
`init_process` builds two plain `ThreadPoolExecutor`s when routing is enabled —
`fast_pool` (`F = cfg.threads`) and `slow_pool` (`S = cfg.slow_threads`) — and
falls back to the single `get_thread_pool()` executor when it is disabled.
`enqueue_req(conn, slow)` submits to the matching pool; both produce ordinary
`concurrent.futures.Future`s, so `_wrap_future`, `add_done_callback`,
`self.futures` tracking, and `futures.wait` all keep working unchanged.
- Bounding the slow lane: `nr_slow` counts slow requests submitted-but-not-yet-
finished. `enqueue_req` rejects (503) when `nr_slow >= S + slow_queue_maxsize`;
`finish_request` decrements it. This caps the slow executor's otherwise
unbounded internal queue.
- Shutdown drains both pools via a `_shutdown_pools` helper, replacing the
single `tpool.shutdown` calls; the `graceful_timeout` `futures.wait` is
unchanged.
### 5.3 Predictor
A small, self-contained, thread-safe object:
- State: bounded LRU map `route_key -> {ewma_seconds, samples, last_seen}`.
Bounding caps memory under high route cardinality.
- `update(route_key, duration)`: EWMA with decay so a route that becomes fast
again eventually returns to the fast lane (avoids permanent misclassification
after a one-off slow spike). Called on every completion.
- `is_slow(route_key)`: `True` if the route matches a seeded `slow_routes`
pattern, or its `ewma_seconds >= slow_request_threshold`. Unknown routes ⇒
`False` (fast) by default.
- Optional hysteresis (separate promote/demote thresholds) to avoid flapping
around the boundary.
### 5.4 Learning signals
1. **Completion (primary)**: feed `request_time` (`gthread.py:362`) into
`predictor.update`. After a slow route's first request completes, it is known.
2. **Mid-flight observation (catches simultaneous first-bursts)**: the main loop
already sweeps `self.futures` for the hard timeout (`gthread.py:245-250`). In
that sweep, for any in-flight request whose elapsed time exceeds
`slow_request_threshold`, call `predictor.update` with that elapsed time
*immediately* (do not wait for completion, and do not move the running
request — we can't). This shortens the learning window when many requests to a
brand-new slow route arrive at once: subsequent ones in the burst route to the
slow lane after one threshold interval instead of after a full slow request.
3. **Seeding (eliminates the first-occurrence window for known offenders)**:
`slow_routes` patterns mark routes slow from the very first request.
## 6. Behavior under load (the cases that matter)
- **Flood of a known/seeded or previously-seen slow route**: every such request
is routed to the slow pool. The `F` fast threads are never given this work and
keep serving fast traffic at full capacity. When the slow lane reaches
`S + slow_queue_maxsize`, further slow requests get a fast `503` — backpressure
is contained to the slow lane.
- **Flood of a never-seen slow route**: the first occurrence(s) run in the fast
lane; mid-flight learning (§5.4.2) flips the route to slow after one threshold
interval, so the flood is contained quickly. Seeding avoids even this window.
- **Mixed fast traffic, idle slow lane**: the `S` slow threads stay parked (no
work stealing in this design — see §3), so fast throughput is `F`, not `F + S`.
- **Misprediction (route marked slow but now fast)**: handled gracefully — it
runs in the slow lane, and EWMA decay restores it to the fast lane over time.
## 7. Implementation checklist (touch points)
Implemented:
- `config.py``slow_request_threshold`, `slow_threads`, `slow_queue_maxsize`,
`slow_routes`, `slow_lane_retry_after`, plus `validate_pos_float`.
- `gthread.py` `init_process`/`get_thread_pool` — build `fast_pool` and
`slow_pool` (or the single legacy pool when disabled); `_shutdown_pools`.
- `gthread.py` `enqueue_req` — route to the matching pool; `nr_slow` bound +
`reject_overloaded` (503).
- `gthread.py` `accept`/`park_for_request`/`classify_and_dispatch`/
`_peek_request_line`/`_route_key` — poller-driven request-line peek + routing.
- `gthread.py` `finish_request``predictor.update`, `nr_slow` decrement,
routing-aware keepalive re-park.
- `gthread.py` run-loop sweep — mid-flight learning.
- `gthread_routing.py``SlowRoutePredictor`.
## 8. Backward compatibility
- `slow_request_threshold = 0` ⇒ feature off: single pool, no classification, no
rejection — byte-for-byte current behavior.
- Hard per-request timeout (`gthread.py:243-250`) preserved unchanged; this adds
a softer, non-fatal classification on top.
- Worker `handle`/`handle_request`, keepalive semantics, and the
future/`finish_request` contract are preserved (MSG_PEEK leaves the byte
stream intact, so the parser is untouched).
## 9. Test plan
- **Predictor unit**: unknown ⇒ fast; after `update` with a slow duration ⇒
slow; EWMA decay restores fast; seeded patterns are slow from first call; LRU
bound holds under many keys.
- **Routing unit**: `classify_and_dispatch` extracts the right `route_key` from
partial vs complete peeked buffers; incomplete line defers; complete line
dispatches to the expected lane.
- **Integration — flood isolation**: app with a known-slow route flooded
concurrently; assert fast-route latency stays low and slow requests never
occupy fast workers; assert 503 once the slow lane is full.
- **Integration — cold start**: never-seen slow route burst ⇒ confirm the lane
flips to slow within ~one threshold interval via mid-flight learning.
- **Regression**: `slow_request_threshold = 0` ⇒ current behavior; keepalive,
SSL, and graceful shutdown paths still pass existing tests.

View File

@ -365,6 +365,13 @@ def validate_pos_int(val):
return val
def validate_pos_float(val):
val = float(val)
if val < 0:
raise ValueError("Value must be positive: %s" % val)
return val
def validate_ssl_version(val):
if val != SSLVersion.default:
sys.stderr.write("Warning: option `ssl_version` is deprecated and it is ignored. Use ssl_context instead.\n")
@ -737,6 +744,128 @@ class WorkerConnections(Setting):
"""
class SlowRequestThreshold(Setting):
name = "slow_request_threshold"
section = "Worker Processes"
cli = ["--slow-request-threshold"]
meta = "FLOAT"
validator = validate_pos_float
type = float
default = 0.0
desc = """\
Processing time (in seconds) above which a request route is treated as
"slow" by the ``gthread`` worker.
When set to a positive value, the ``gthread`` worker runs two separate
lanes of threads: a *fast* lane (sized by :ref:`threads`) and a *slow*
lane (sized by :ref:`slow-threads`). Requests are routed by predicting,
from previously observed timings of the same route (method + path),
whether they will take longer than this threshold. Slow-predicted
requests go to the slow lane so they can never starve the fast lane,
even under a flood.
A route is learned as slow once it has been observed exceeding this
threshold (either on completion or while still running); its timing
decays back below the threshold if it becomes fast again.
The default of ``0`` disables the feature and restores the single
thread-pool behaviour.
This setting only affects the ``gthread`` worker type.
.. versionadded:: 23.1.0
"""
class SlowThreads(Setting):
name = "slow_threads"
section = "Worker Processes"
cli = ["--slow-threads"]
meta = "INT"
validator = validate_pos_int
type = int
default = 1
desc = """\
The number of worker threads dedicated to the slow lane.
These threads only ever run requests predicted to be slow. When the slow
lane has no pending work, they help drain the fast lane, so they are
never idle while fast work is waiting. Total threads per worker is
``threads + slow_threads``.
Only used by the ``gthread`` worker when
:ref:`slow-request-threshold` is set.
.. versionadded:: 23.1.0
"""
class SlowQueueMaxsize(Setting):
name = "slow_queue_maxsize"
section = "Worker Processes"
cli = ["--slow-queue-maxsize"]
meta = "INT"
validator = validate_pos_int
type = int
default = 100
desc = """\
Maximum number of requests allowed to wait in the slow lane queue.
When the slow lane queue is full, additional slow-predicted requests are
rejected immediately with a ``503 Service Unavailable`` response instead
of being queued, keeping the slow lane's backpressure contained. A value
of ``0`` means the slow queue is unbounded.
Only used by the ``gthread`` worker when
:ref:`slow-request-threshold` is set.
.. versionadded:: 23.1.0
"""
class SlowRoutes(Setting):
name = "slow_routes"
section = "Worker Processes"
cli = ["--slow-route"]
action = "append"
meta = "PATTERN"
validator = validate_list_string
default = []
desc = """\
Regular expression(s) matching routes that should always be treated as
slow, regardless of observed timings.
Each pattern is matched (using ``re.search``) against the route key,
which is the request method and path joined by a space, e.g.
``"POST /reports/generate"``. Seeding known-slow routes avoids the brief
window where a never-before-seen slow route is learned.
Only used by the ``gthread`` worker when
:ref:`slow-request-threshold` is set.
.. versionadded:: 23.1.0
"""
class SlowLaneRetryAfter(Setting):
name = "slow_lane_retry_after"
section = "Worker Processes"
cli = ["--slow-lane-retry-after"]
meta = "INT"
validator = validate_pos_int
type = int
default = 1
desc = """\
Value (in seconds) of the ``Retry-After`` header sent with the ``503``
response when the slow lane queue is full.
Only used by the ``gthread`` worker when
:ref:`slow-request-threshold` is set.
.. versionadded:: 23.1.0
"""
class MaxRequests(Setting):
name = "max_requests"
section = "Worker Processes"

View File

@ -25,11 +25,15 @@ from functools import partial
from threading import RLock
from . import base
from .gthread_routing import SlowRoutePredictor
from .. import http
from .. import util
from .. import sock
from ..http import wsgi
# how many bytes to peek when classifying a request by its request line
REQUEST_LINE_PEEK = 65536
class TConn:
@ -41,6 +45,9 @@ class TConn:
self.timeout = None
self.parser = None
# route key (method + path), set by the worker when request routing is
# enabled; used to predict and learn slow routes
self.route_key = None
# set the socket to non blocking
self.sock.setblocking(False)
@ -69,8 +76,14 @@ class ThreadWorker(base.Worker):
super().__init__(*args, **kwargs)
self.worker_connections = self.cfg.worker_connections
self.max_keepalived = self.cfg.worker_connections - self.cfg.threads
# initialise the pool
# initialise the pool(s): a single pool when routing is disabled, or a
# separate fast and slow pool when it is enabled
self.tpool = None
self.fast_pool = None
self.slow_pool = None
# number of slow requests submitted but not yet finished, used to bound
# the slow lane (running + queued) and shed load with 503
self.nr_slow = 0
self.poller = None
self.shutdown_event = os.eventfd(0)
self._lock = None
@ -78,6 +91,12 @@ class ThreadWorker(base.Worker):
self._keep = deque()
self.nr_conns = 0
# request routing: when a slow-request threshold is configured, slow
# requests are routed to a separate lane so they cannot starve fast ones
self.slow_threshold = self.cfg.slow_request_threshold
self.routing_enabled = self.slow_threshold > 0
self.predictor = None
@classmethod
def check_config(cls, cfg, log):
max_keepalived = cfg.worker_connections - cfg.threads
@ -87,7 +106,19 @@ class ThreadWorker(base.Worker):
"Check the number of worker connections and threads.")
def init_process(self):
self.tpool = self.get_thread_pool()
if self.routing_enabled:
self.predictor = SlowRoutePredictor(
self.slow_threshold,
seed_patterns=self.cfg.slow_routes,
)
# a dedicated pool per lane: slow requests can never occupy the
# fast pool's threads
self.fast_pool = futures.ThreadPoolExecutor(
max_workers=self.cfg.threads)
self.slow_pool = futures.ThreadPoolExecutor(
max_workers=self.cfg.slow_threads)
else:
self.tpool = self.get_thread_pool()
self.poller = selectors.DefaultSelector()
self._lock = RLock()
super().init_process()
@ -96,6 +127,11 @@ class ThreadWorker(base.Worker):
"""Override this method to customize how the thread pool is created"""
return futures.ThreadPoolExecutor(max_workers=self.cfg.threads)
def _shutdown_pools(self, wait):
for pool in (self.tpool, self.fast_pool, self.slow_pool):
if pool is not None:
pool.shutdown(wait)
def handle_exit(self, sig, frame):
self.alive = False
os.eventfd_write(self.shutdown_event, 1)
@ -104,21 +140,36 @@ class ThreadWorker(base.Worker):
self.alive = False
# worker_int callback
self.cfg.worker_int(self)
self.tpool.shutdown(False)
self._shutdown_pools(False)
time.sleep(0.1)
sys.exit(0)
def _wrap_future(self, fs, conn):
def _wrap_future(self, fs, conn, slow=False):
fs.conn = conn
fs._request_timeout = time.monotonic() + self.cfg.timeout
fs.slow = slow
fs._start_time = time.monotonic()
fs._request_timeout = fs._start_time + self.cfg.timeout
fs._observed_slow = False
self.futures.append(fs)
fs.add_done_callback(self.finish_request)
def enqueue_req(self, conn):
def enqueue_req(self, conn, slow=False):
conn.init()
# submit the connection to a worker
fs = self.tpool.submit(self.handle, conn)
self._wrap_future(fs, conn)
# submit the connection to the appropriate pool
if not self.routing_enabled:
fs = self.tpool.submit(self.handle, conn)
elif slow:
cap = self.cfg.slow_queue_maxsize
if cap and self.nr_slow >= self.cfg.slow_threads + cap:
# slow lane (running + queued) is full; shed load with 503
# instead of letting the slow queue grow unbounded
self.reject_overloaded(conn)
return
self.nr_slow += 1
fs = self.slow_pool.submit(self.handle, conn)
else:
fs = self.fast_pool.submit(self.handle, conn)
self._wrap_future(fs, conn, slow=slow)
def accept(self, server, listener):
try:
@ -127,13 +178,115 @@ class ThreadWorker(base.Worker):
conn = TConn(self.cfg, sock, client, server)
self.nr_conns += 1
# enqueue the job
self.enqueue_req(conn)
if self.routing_enabled and not self.cfg.is_ssl:
# park the connection until its request line is readable, then
# classify and route it to the fast or slow lane
self.park_for_request(conn)
else:
# legacy single-lane path (also used for SSL, whose request
# line cannot be peeked before the TLS handshake)
self.enqueue_req(conn)
except OSError as e:
if e.errno not in (errno.EAGAIN, errno.ECONNABORTED,
errno.EWOULDBLOCK):
raise
def park_for_request(self, conn):
"""Register a connection in the poller until its request line arrives."""
conn.sock.setblocking(False)
conn.set_timeout()
with self._lock:
self._keep.append(conn)
self.poller.register(conn.sock, selectors.EVENT_READ,
partial(self.classify_and_dispatch, conn))
def classify_and_dispatch(self, conn, client=None):
"""Peek the request line, predict the lane, and enqueue the request."""
line, closed, complete = self._peek_request_line(conn)
if not closed and not complete:
# request line has not fully arrived yet; keep waiting. Stalled
# clients are reaped by murder_keepalived via the connection timeout.
return
with self._lock:
try:
# remove the connection from the parked set
self._keep.remove(conn)
except ValueError:
# already handled (e.g. by murder_keepalived); nothing to do
return
try:
self.poller.unregister(conn.sock)
except (KeyError, OSError, ValueError):
pass
if closed:
self.nr_conns -= 1
conn.close()
return
conn.route_key = self._route_key(line)
slow = self.predictor.is_slow(conn.route_key)
self.enqueue_req(conn, slow=slow)
def _peek_request_line(self, conn):
"""Return ``(line, closed, complete)`` for the connection's request line.
``line`` is the request line bytes (without CRLF) once available,
``closed`` is True if the peer closed the connection, and ``complete``
is True once we should stop waiting for more data.
"""
try:
data = conn.sock.recv(REQUEST_LINE_PEEK, socket.MSG_PEEK)
except (BlockingIOError, InterruptedError):
return None, False, False
except OSError:
return None, True, False
if data == b"":
# peer closed the connection before sending a request
return None, True, False
idx = data.find(b"\r\n")
if idx == -1:
if len(data) >= REQUEST_LINE_PEEK:
# request line longer than our peek window; stop classifying and
# let the worker's parser deal with (or reject) it
return None, False, True
return None, False, False
return data[:idx], False, True
@staticmethod
def _route_key(line):
"""Build a route key (``"METHOD /path"``) from a raw request line."""
if not line:
return None
parts = line.split(b" ")
if len(parts) < 2:
return None
try:
method = parts[0].decode("latin1")
path = parts[1].split(b"?", 1)[0].decode("latin1")
except UnicodeDecodeError:
return None
return method + " " + path
def reject_overloaded(self, conn):
"""Reject a connection with 503 because the slow lane is saturated."""
self.nr_conns -= 1
try:
conn.sock.setblocking(True)
conn.sock.sendall(
b"HTTP/1.1 503 Service Unavailable\r\n"
b"Connection: close\r\n"
b"Content-Length: 0\r\n"
b"Retry-After: %d\r\n\r\n"
% int(self.cfg.slow_lane_retry_after))
except OSError:
pass
finally:
conn.close()
def reuse_connection(self, conn, client):
with self._lock:
# unregister the client from the poller
@ -248,8 +401,16 @@ class ThreadWorker(base.Worker):
self.alive = False
self.log.error("A request timed out. Exiting.")
faulthandler.dump_traceback()
elif (self.routing_enabled and not fut._observed_slow
and not fut.slow
and current_time - fut._start_time > self.slow_threshold):
# an in-flight fast-lane request crossed the threshold; learn
# the route as slow now so the rest of a burst is rerouted
# without waiting for this request to finish
self.predictor.observe_slow(fut.conn.route_key)
fut._observed_slow = True
self.tpool.shutdown(False)
self._shutdown_pools(False)
self.poller.close()
for s in self.sockets:
@ -258,27 +419,41 @@ class ThreadWorker(base.Worker):
futures.wait(self.futures, timeout=self.cfg.graceful_timeout)
def finish_request(self, fs):
# the slow request is done (whatever the outcome): free its slow slot
if self.routing_enabled and fs.slow:
self.nr_slow -= 1
if fs.cancelled():
self.nr_conns -= 1
fs.conn.close()
return
# feed the observed processing time back to the predictor so the route
# is learned (or unlearned) as slow
if self.routing_enabled and fs.conn.route_key:
self.predictor.update(fs.conn.route_key,
time.monotonic() - fs._start_time)
try:
(keepalive, conn) = fs.result()
# if the connection should be kept alived add it
# to the eventloop and record it
if keepalive and self.alive:
# flag the socket as non blocked
conn.sock.setblocking(False)
if self.routing_enabled and not self.cfg.is_ssl:
# re-classify the next request on this connection
self.park_for_request(conn)
else:
# flag the socket as non blocked
conn.sock.setblocking(False)
# register the connection
conn.set_timeout()
with self._lock:
self._keep.append(conn)
# register the connection
conn.set_timeout()
with self._lock:
self._keep.append(conn)
# add the socket to the event loop
self.poller.register(conn.sock, selectors.EVENT_READ,
partial(self.reuse_connection, conn))
# add the socket to the event loop
self.poller.register(conn.sock, selectors.EVENT_READ,
partial(self.reuse_connection, conn))
else:
self.nr_conns -= 1
conn.close()

View File

@ -0,0 +1,83 @@
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
"""Slow-route prediction for the gthread worker.
The :class:`SlowRoutePredictor` decides, before a request is handed to a
worker, whether its route is expected to be slow, based on previously observed
timings of the same route (method + path) plus operator-seeded patterns. The
gthread worker uses this to route slow requests to a dedicated thread pool so
they cannot starve fast requests.
"""
import re
import threading
from collections import OrderedDict
class SlowRoutePredictor:
"""Predicts whether a route (method + path) is slow.
Timings are tracked per route as an exponentially weighted moving average
(EWMA) so that a route which becomes fast again decays back below the
threshold. The table is bounded (LRU) to cap memory under high route
cardinality. Operator-seeded regex patterns always classify as slow.
"""
def __init__(self, threshold, max_entries=1024, alpha=0.3,
seed_patterns=None):
self.threshold = threshold
self.alpha = alpha
self.max_entries = max_entries
self._stats = OrderedDict()
self._lock = threading.Lock()
self._seed = [re.compile(p) for p in (seed_patterns or [])]
def _seeded(self, key):
return any(p.search(key) for p in self._seed)
def is_slow(self, key):
if not key:
return False
if self._seeded(key):
return True
with self._lock:
ewma = self._stats.get(key)
if ewma is None:
return False
self._stats.move_to_end(key)
return ewma >= self.threshold
def update(self, key, duration):
"""Record an observed processing ``duration`` (seconds) for ``key``."""
if not key:
return
with self._lock:
ewma = self._stats.get(key)
if ewma is None:
ewma = duration
else:
ewma = (1 - self.alpha) * ewma + self.alpha * duration
self._stats[key] = ewma
self._stats.move_to_end(key)
self._evict()
def observe_slow(self, key):
"""Mark ``key`` slow now, before its request has finished.
Used when an in-flight request crosses the threshold, so the rest of a
simultaneous burst to a never-seen slow route is routed to the slow lane
without waiting for the first request to complete.
"""
if not key:
return
with self._lock:
cur = self._stats.get(key, 0.0)
self._stats[key] = max(cur, self.threshold)
self._stats.move_to_end(key)
self._evict()
def _evict(self):
while len(self._stats) > self.max_entries:
self._stats.popitem(last=False)

View File

@ -0,0 +1,97 @@
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
import socket
import types
from gunicorn.workers.gthread import ThreadWorker, REQUEST_LINE_PEEK
# _route_key is a staticmethod, so it can be exercised directly.
def test_route_key_basic():
assert ThreadWorker._route_key(b"GET /index HTTP/1.1") == "GET /index"
def test_route_key_strips_query_string():
assert ThreadWorker._route_key(
b"GET /search?q=hello&p=2 HTTP/1.1") == "GET /search"
def test_route_key_post():
assert ThreadWorker._route_key(
b"POST /reports/generate HTTP/1.0") == "POST /reports/generate"
def test_route_key_malformed():
assert ThreadWorker._route_key(b"") is None
assert ThreadWorker._route_key(b"GARBAGE") is None
assert ThreadWorker._route_key(None) is None
def _peek(conn):
# _peek_request_line only touches conn.sock, so we can bind it to a stub
return ThreadWorker._peek_request_line(object(), conn)
def test_peek_complete_request_line():
a, b = socket.socketpair()
try:
a.setblocking(False)
b.sendall(b"GET /x HTTP/1.1\r\nHost: y\r\n\r\n")
conn = types.SimpleNamespace(sock=a)
line, closed, complete = _peek(conn)
assert line == b"GET /x HTTP/1.1"
assert closed is False
assert complete is True
# MSG_PEEK must leave the bytes in the buffer for the parser
assert a.recv(5) == b"GET /"
finally:
a.close()
b.close()
def test_peek_incomplete_request_line_waits():
a, b = socket.socketpair()
try:
a.setblocking(False)
b.sendall(b"GET /x HTT") # no CRLF yet
conn = types.SimpleNamespace(sock=a)
line, closed, complete = _peek(conn)
assert line is None
assert closed is False
assert complete is False
finally:
a.close()
b.close()
def test_peek_no_data_yet():
a, b = socket.socketpair()
try:
a.setblocking(False)
conn = types.SimpleNamespace(sock=a)
line, closed, complete = _peek(conn)
# nothing buffered: not closed, not complete -> keep waiting
assert (line, closed, complete) == (None, False, False)
finally:
a.close()
b.close()
def test_peek_peer_closed():
a, b = socket.socketpair()
a.setblocking(False)
b.close()
try:
conn = types.SimpleNamespace(sock=a)
line, closed, complete = _peek(conn)
assert closed is True
finally:
a.close()
def test_peek_window_constant_is_reasonable():
# a sanity bound so request lines fit comfortably in one peek
assert REQUEST_LINE_PEEK >= 8192

View File

@ -0,0 +1,53 @@
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
from gunicorn.workers.gthread_routing import SlowRoutePredictor
def test_predictor_unknown_route_is_fast():
p = SlowRoutePredictor(threshold=1.0)
assert p.is_slow("GET /") is False
def test_predictor_learns_slow_route_on_update():
p = SlowRoutePredictor(threshold=1.0, alpha=1.0)
p.update("GET /slow", 5.0)
assert p.is_slow("GET /slow") is True
assert p.is_slow("GET /fast") is False
def test_predictor_ewma_decays_back_to_fast():
p = SlowRoutePredictor(threshold=1.0, alpha=0.5)
p.update("GET /x", 5.0)
assert p.is_slow("GET /x") is True
# repeated fast samples should pull the EWMA back under the threshold
for _ in range(20):
p.update("GET /x", 0.01)
assert p.is_slow("GET /x") is False
def test_predictor_observe_slow_marks_immediately():
p = SlowRoutePredictor(threshold=2.0)
p.observe_slow("POST /report")
assert p.is_slow("POST /report") is True
def test_predictor_seed_patterns():
p = SlowRoutePredictor(threshold=1.0, seed_patterns=[r"^POST /reports/"])
assert p.is_slow("POST /reports/generate") is True
assert p.is_slow("GET /reports/generate") is False
def test_predictor_lru_bound():
p = SlowRoutePredictor(threshold=1.0, max_entries=10)
for i in range(50):
p.update("GET /%d" % i, 0.01)
assert len(p._stats) <= 10
def test_predictor_empty_key_is_fast():
p = SlowRoutePredictor(threshold=1.0)
assert p.is_slow(None) is False
p.update(None, 5.0) # must not raise
assert p.is_slow(None) is False