Merge pull request #12 from frappe/multi_queue

feat: Adaptive queueing of requests for gthread
This commit is contained in:
Ankush Menat 2026-05-28 17:07:53 +05:30 committed by GitHub
commit f61e5845f3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 717 additions and 21 deletions

View File

@ -0,0 +1,272 @@
# Design: Slow-request isolation for the gthread worker (predictive dual-queue)
Status: proposal / draft
Author: (ankush)
Scope: `gunicorn/workers/gthread.py`, `gunicorn/config.py`
## 1. Problem
The `gthread` worker runs synchronous WSGI applications on a single
`ThreadPoolExecutor` sized to `cfg.threads` (`gthread.py:95-97`). Every accepted
connection is submitted to that one pool (`enqueue_req`, `gthread.py:117-121`).
Because the pool has a fixed number of threads and an unbounded work queue, a
flood of slow requests occupies every thread and all fast requests starve behind
them in the queue — head-of-line blocking.
Goal: **route requests that are predicted to be slow into a separate, dedicated
lane so they can never occupy the threads reserved for fast requests, even under
a flood.** Fast requests go to a fast lane; slow requests go to a slow lane. The
slow lane may help drain fast work when its own queue is empty, but the fast
lane never touches slow work.
This supersedes the earlier "demotion-only" proposal, which could not stop slow
work from entering the fast pool and therefore could not survive a flood.
## 2. Why prediction is required (and its hard limit)
You cannot preempt a running Python thread executing WSGI code (`gthread.py:352`):
once a slow request is on a thread, that thread is committed until the app
returns. So isolation has to happen **before** a request is handed to a worker —
i.e. at routing time. That means we must decide "fast or slow" from the request
*before* running it.
The only information available pre-execution is the request itself (method,
path, headers) plus what we have learned from prior requests. So the design is:
1. A **predictor** that, given a request's route, answers "slow?" using learned
per-route timing statistics (plus optional operator-seeded patterns).
2. **Routing at accept time** based on that prediction, into one of two pools.
3. **Learning**: every completed request — and any request that crosses the
slow threshold mid-flight — updates the predictor, so a slow route is
recognized after its first occurrence(s) and all subsequent traffic to it is
routed to the slow lane.
Hard limit to state up front: a route that has **never been seen** cannot be
predicted slow on its very first request(s); those first occurrences run in the
fast lane until learning kicks in. We minimize this window (§5.4) and let
operators pre-seed known-slow routes (§5.1). For repeated/flooding slow routes —
the actual failure mode — prediction is effective after the first sample.
## 3. Architecture overview
```
┌─────────────────────────────────────────┐
listener ──accept──▶ │ main loop: poller-driven classification │
│ peek request line ▶ predictor.is_slow? │
└───────────────┬───────────────┬───────────┘
│ fast │ slow
▼ ▼
┌───────────┐ ┌───────────┐
│ fast_pool │ │ slow_pool │
│ F threads │ │ S threads │
└─────┬─────┘ └─────┬─────┘
└───────┬───────┘
on completion: predictor.update(route, duration)
```
- **Fast lane**: a `ThreadPoolExecutor` of `F = ceil(cfg.threads / 2)` threads.
Only ever runs fast-classified work.
- **Slow lane**: a separate `ThreadPoolExecutor` of `S = cfg.threads // 2`
threads. Only ever runs slow-classified work.
- Total OS threads per worker stays at `cfg.threads` — adaptive-queueing mode splits
the existing budget, it does not expand it. `cfg.threads` must be at least 2
for the split to be meaningful; otherwise the worker logs a warning and runs
with a single pool.
- Both lanes share the existing `worker_connections` admission like today; the
slow lane is not separately bounded.
### Why two plain pools (and not a custom dual-queue scheduler)
An earlier revision used a single custom scheduler with two queues and
one-directional **work stealing** (idle slow threads draining the fast queue).
Two independent `ThreadPoolExecutor`s are dramatically simpler and rely on
well-tested stdlib machinery. The one capability given up is work stealing: the
`S` slow threads sit idle when there is no slow work, even if fast work is
queued. For the common case (`S` small, e.g. 1) this is a negligible amount of
parked capacity, and the simplicity is worth it. If maximizing throughput under
pure-fast load ever matters more than simplicity, the custom scheduler can be
reintroduced behind the same `enqueue_req` interface without touching routing.
## 4. Routing point: classify before threading
Today, parsing happens inside the worker thread (`handle``next(conn.parser)`,
`gthread.py:295`), which is too late — the request is already on a thread. We
move *classification only* (not full parsing) into the main loop.
### 4.1 Restructured connection lifecycle
Both freshly accepted connections and keepalive connections flow through one
poller-driven classification step (this unifies `accept`/`reuse_connection` and
also moves slow-client header reads off the worker threads — a side benefit
against slowloris):
1. `accept` (`gthread.py:123`): accept socket, create `TConn`, set non-blocking,
register it in the poller for `EVENT_READ` with a `classify_and_dispatch`
callback. **Do not submit to any pool yet.** `nr_conns += 1`.
2. When the socket becomes readable, `classify_and_dispatch(conn)`:
- **Peek** the buffered bytes with `recv(n, socket.MSG_PEEK)` (plaintext) —
this reads without consuming, so the worker's parser still sees the full
byte stream unchanged. No parser changes required.
- Parse just the request line (`METHOD SP PATH SP VERSION CRLF`) from the
peeked buffer. If the line has not fully arrived yet, return and wait for
the next readable event (bounded by the existing keepalive/header timeout so
a stalled client is eventually closed, not left forever).
> **Why peek the request line, not fully read/parse the request here?**
> Classification only needs method + path. Doing a *full* read/parse of the
> request in the main loop is actively harmful: the main loop is a single
> thread serving every connection (accepts, keepalive, the poller). A blocking
> full read lets one slow client — slowloris, slow network, or a large/chunked
> body — stall the **entire worker**, which is strictly worse than the
> thread-pool starvation we are fixing (there is no pool to absorb it).
> Peeking only inspects already-buffered bytes and defers to the poller if the
> line is incomplete, so it never blocks. It also avoids having to read the
> body in the main loop (WSGI streams `wsgi.input` lazily) and keeps header
> parsing, parse-error responses (400/414), and `wsgi.input` wiring in the
> worker where they already live.
- Compute `route_key` (default: `METHOD + " " + path`, query string stripped;
overridable via hook, §5.1).
- `slow = predictor.is_slow(route_key)` (or matches a seeded slow pattern).
- Unregister the socket from the poller and submit the connection to the
**slow** pool if `slow` else the **fast** pool.
3. The worker's `handle`/`handle_request` run unchanged. On completion, the
measured `request_time` (already computed at `gthread.py:362`) is fed to
`predictor.update(route_key, duration)`.
4. Keepalive: after a kept-alive request, re-register the connection in the
poller with the same `classify_and_dispatch` callback (instead of the old
`reuse_connection`), so the *next* request on the connection is re-classified
independently (it may hit a different route).
### 4.2 SSL connections
Plaintext peek does not work through TLS — the request line is encrypted until
the handshake completes. For SSL connections in this first cut:
- They cannot be pre-classified at the socket level, so they default to the
**fast** lane and rely on mid-flight + completion learning (§5.4) — meaning an
SSL-only deployment does not get full flood protection.
- Note in docs that the common production layout terminates TLS upstream (e.g.
nginx) so gunicorn sees plaintext and gets full protection.
- **Phase 2** (deferred): drive a non-blocking TLS handshake from the poller and
buffer the decrypted request line (feeding it back via `Unreader.unread`,
`unreader.py:51`) to classify SSL the same way.
## 5. Components
### 5.1 Config (`gunicorn/config.py`)
New settings, mirroring `WorkerThreads` (`config.py:697`):
- `enable_adaptive_queueing` — boolean; when true, the `gthread` worker splits its
`cfg.threads` budget between a fast and a slow lane and routes by prediction.
Default `False` (single pool, today's behavior). Requires `cfg.threads >= 2`;
otherwise the worker logs a warning and falls back to the single pool.
- `slow_request_threshold` — float seconds; a route whose learned timing meets/
exceeds this is "slow". Default `1.0`. Only consulted when `enable_adaptive_queueing` is
enabled.
A `slow_route_key` hook to customize the route key (e.g. collapse
`/users/<id>`) is a possible future addition; the default key is method + path
with the query string stripped.
### 5.2 Two thread pools
`init_process` builds two plain `ThreadPoolExecutor`s when `enable_adaptive_queueing` is
enabled — `fast_pool` with `F = ceil(cfg.threads / 2)` workers and `slow_pool`
with `S = cfg.threads // 2` workers — and falls back to the single
`get_thread_pool()` executor when it is disabled. `enqueue_req(conn, slow)`
submits to the matching pool; both produce ordinary
`concurrent.futures.Future`s, so `_wrap_future`, `add_done_callback`,
`self.futures` tracking, and `futures.wait` all keep working unchanged.
- The slow lane is not separately bounded; its executor's internal queue is
capped indirectly by the worker's existing `worker_connections` admission,
the same as the single-pool path today.
- Shutdown drains both pools via a `_shutdown_pools` helper, replacing the
single `tpool.shutdown` calls; the `graceful_timeout` `futures.wait` is
unchanged.
### 5.3 Predictor
A small, self-contained, thread-safe object:
- State: bounded LRU map `route_key -> {ewma_seconds, samples, last_seen}`.
Bounding caps memory under high route cardinality.
- `update(route_key, duration)`: EWMA with decay so a route that becomes fast
again eventually returns to the fast lane (avoids permanent misclassification
after a one-off slow spike). Called on every completion.
- `is_slow(route_key)`: `True` if its `ewma_seconds >= slow_request_threshold`.
Unknown routes ⇒ `False` (fast) by default.
- Optional hysteresis (separate promote/demote thresholds) to avoid flapping
around the boundary.
### 5.4 Learning signals
1. **Completion (primary)**: feed `request_time` (`gthread.py:362`) into
`predictor.update`. After a slow route's first request completes, it is known.
2. **Mid-flight observation (catches simultaneous first-bursts)**: the main loop
already sweeps `self.futures` for the hard timeout (`gthread.py:245-250`). In
that sweep, for any in-flight request whose elapsed time exceeds
`slow_request_threshold`, call `predictor.update` with that elapsed time
*immediately* (do not wait for completion, and do not move the running
request — we can't). This shortens the learning window when many requests to a
brand-new slow route arrive at once: subsequent ones in the burst route to the
slow lane after one threshold interval instead of after a full slow request.
## 6. Behavior under load (the cases that matter)
- **Flood of a previously-seen slow route**: every such request is routed to
the slow pool. The `F` fast threads are never given this work and keep
serving fast traffic at full capacity. Excess slow requests sit in the slow
pool's queue, gated overall by `worker_connections`.
- **Flood of a never-seen slow route**: the first occurrence(s) run in the fast
lane; mid-flight learning (§5.4.2) flips the route to slow after one threshold
interval, so the flood is contained quickly.
- **Mixed fast traffic, idle slow lane**: the `S` slow threads stay parked (no
work stealing in this design — see §3), so fast throughput is `F`, not
`F + S`. This is the cost of splitting a fixed `cfg.threads` budget.
- **Misprediction (route marked slow but now fast)**: handled gracefully — it
runs in the slow lane, and EWMA decay restores it to the fast lane over time.
## 7. Implementation checklist (touch points)
Implemented:
- `config.py``enable_adaptive_queueing`, `slow_request_threshold`, plus
`validate_pos_float`.
- `gthread.py` `init_process`/`get_thread_pool` — build `fast_pool` and
`slow_pool` (split from `cfg.threads`) when `enable_adaptive_queueing` is on, or the
single legacy pool when off; `_shutdown_pools`.
- `gthread.py` `enqueue_req` — route to the matching pool.
- `gthread.py` `accept`/`park_for_request`/`classify_and_dispatch`/
`_peek_request_line`/`_route_key` — poller-driven request-line peek + routing.
- `gthread.py` `finish_request``predictor.update`, routing-aware keepalive
re-park.
- `gthread.py` run-loop sweep — mid-flight learning.
- `gthread_routing.py``SlowRoutePredictor`.
## 8. Backward compatibility
- `enable_adaptive_queueing = False` (the default) ⇒ feature off: single pool, no
classification — byte-for-byte current behavior.
- Hard per-request timeout (`gthread.py:243-250`) preserved unchanged; this adds
a softer, non-fatal classification on top.
- Worker `handle`/`handle_request`, keepalive semantics, and the
future/`finish_request` contract are preserved (MSG_PEEK leaves the byte
stream intact, so the parser is untouched).
## 9. Test plan
- **Predictor unit**: unknown ⇒ fast; after `update` with a slow duration ⇒
slow; EWMA decay restores fast; seeded patterns are slow from first call; LRU
bound holds under many keys.
- **Routing unit**: `classify_and_dispatch` extracts the right `route_key` from
partial vs complete peeked buffers; incomplete line defers; complete line
dispatches to the expected lane.
- **Integration — flood isolation**: app with a known-slow route flooded
concurrently; assert fast-route latency stays low and slow requests never
occupy fast workers.
- **Integration — cold start**: never-seen slow route burst ⇒ confirm the lane
flips to slow within ~one threshold interval via mid-flight learning.
- **Regression**: `enable_adaptive_queueing = False` ⇒ current behavior; keepalive, SSL, and
graceful shutdown paths still pass existing tests.

View File

@ -365,6 +365,13 @@ def validate_pos_int(val):
return val
def validate_pos_float(val):
val = float(val)
if val < 0:
raise ValueError("Value must be positive: %s" % val)
return val
def validate_ssl_version(val):
if val != SSLVersion.default:
sys.stderr.write("Warning: option `ssl_version` is deprecated and it is ignored. Use ssl_context instead.\n")
@ -737,6 +744,61 @@ class WorkerConnections(Setting):
"""
class EnableAdaptiveQueueing(Setting):
name = "enable_adaptive_queueing"
section = "Worker Processes"
cli = ["--enable-adaptive-queueing"]
validator = validate_bool
action = "store_true"
type = bool
default = validate_bool(
os.environ.get("GUNICORN_ENABLE_ADAPTIVE_QUEUEING", "false"))
desc = """\
Enable adaptive multi-queue routing in the ``gthread`` worker.
Can also be enabled by setting the ``GUNICORN_ENABLE_ADAPTIVE_QUEUEING``
environment variable to ``true``.
When enabled, the worker splits its :ref:`threads` roughly evenly into
two lanes a *fast* lane and a *slow* lane and routes each request
to one of them by predicting, from previously observed timings of the
same route (method + path), whether it will exceed
:ref:`slow-request-threshold`. Slow-predicted requests go to the slow
lane so they can never starve the fast lane, even under a flood of
slow requests.
Requires :ref:`threads` to be at least 2.
This setting only affects the ``gthread`` worker type.
.. versionadded:: 23.1.0
"""
class SlowRequestThreshold(Setting):
name = "slow_request_threshold"
section = "Worker Processes"
cli = ["--slow-request-threshold"]
meta = "FLOAT"
validator = validate_pos_float
type = float
default = 5.0
desc = """\
Processing time (in seconds) above which a request route is treated as
"slow" by the ``gthread`` worker when :ref:`enable-adaptive-queueing`
is enabled.
A route is learned as slow once it has been observed exceeding this
threshold (either on completion or while still running); its timing
decays back below the threshold if it becomes fast again.
Only used by the ``gthread`` worker when :ref:`enable-adaptive-queueing`
is enabled.
.. versionadded:: 23.1.0
"""
class MaxRequests(Setting):
name = "max_requests"
section = "Worker Processes"

View File

@ -25,11 +25,15 @@ from functools import partial
from threading import RLock
from . import base
from .gthread_routing import SlowRoutePredictor
from .. import http
from .. import util
from .. import sock
from ..http import wsgi
# how many bytes to peek when classifying a request by its request line
REQUEST_LINE_PEEK = 8192
class TConn:
@ -41,6 +45,9 @@ class TConn:
self.timeout = None
self.parser = None
# route key (method + path), set by the worker when request routing is
# enabled; used to predict and learn slow routes
self.route_key = None
# set the socket to non blocking
self.sock.setblocking(False)
@ -69,14 +76,23 @@ class ThreadWorker(base.Worker):
super().__init__(*args, **kwargs)
self.worker_connections = self.cfg.worker_connections
self.max_keepalived = self.cfg.worker_connections - self.cfg.threads
# initialise the pool
# request routing: when adaptive queueing is enabled, the configured
# threads are split into a fast lane and a slow lane so slow requests
# cannot starve fast ones
self.routing_enabled = (
self.cfg.enable_adaptive_queueing and self.cfg.threads >= 2)
self.slow_threshold = self.cfg.slow_request_threshold
# initialise the pool(s): a single pool when routing is disabled, or a
# separate fast (``self.tpool``) and slow pool when it is enabled
self.tpool = None
self.slow_pool = None
self.poller = None
self.shutdown_event = os.eventfd(0)
self._lock = None
self.futures = deque()
self._keep = deque()
self.nr_conns = 0
self.predictor = None
@classmethod
def check_config(cls, cfg, log):
@ -86,8 +102,21 @@ class ThreadWorker(base.Worker):
log.warning("No keepalived connections can be handled. " +
"Check the number of worker connections and threads.")
if cfg.enable_adaptive_queueing and cfg.threads < 2:
log.warning("enable_adaptive_queueing requires at least 2 threads; "
"running with a single pool.")
def init_process(self):
self.tpool = self.get_thread_pool()
if self.routing_enabled:
# split the configured threads roughly evenly between the two
# lanes; the fast lane gets the extra thread when threads is odd
slow = self.cfg.threads // 2
fast = self.cfg.threads - slow
self.tpool = futures.ThreadPoolExecutor(max_workers=fast)
self.slow_pool = futures.ThreadPoolExecutor(max_workers=slow)
self.predictor = SlowRoutePredictor(self.slow_threshold)
else:
self.tpool = self.get_thread_pool()
self.poller = selectors.DefaultSelector()
self._lock = RLock()
super().init_process()
@ -96,6 +125,11 @@ class ThreadWorker(base.Worker):
"""Override this method to customize how the thread pool is created"""
return futures.ThreadPoolExecutor(max_workers=self.cfg.threads)
def _shutdown_pools(self, wait):
for pool in (self.tpool, self.slow_pool):
if pool is not None:
pool.shutdown(wait)
def handle_exit(self, sig, frame):
self.alive = False
os.eventfd_write(self.shutdown_event, 1)
@ -104,21 +138,26 @@ class ThreadWorker(base.Worker):
self.alive = False
# worker_int callback
self.cfg.worker_int(self)
self.tpool.shutdown(False)
self._shutdown_pools(False)
time.sleep(0.1)
sys.exit(0)
def _wrap_future(self, fs, conn):
def _wrap_future(self, fs, conn, slow=False):
fs.conn = conn
fs._request_timeout = time.monotonic() + self.cfg.timeout
fs.slow = slow
fs._start_time = time.monotonic()
fs._request_timeout = fs._start_time + self.cfg.timeout
fs._observed_slow = False
self.futures.append(fs)
fs.add_done_callback(self.finish_request)
def enqueue_req(self, conn):
def enqueue_req(self, conn, slow=False):
conn.init()
# submit the connection to a worker
fs = self.tpool.submit(self.handle, conn)
self._wrap_future(fs, conn)
if self.routing_enabled and slow:
fs = self.slow_pool.submit(self.handle, conn)
else:
fs = self.tpool.submit(self.handle, conn)
self._wrap_future(fs, conn, slow=slow)
def accept(self, server, listener):
try:
@ -127,13 +166,99 @@ class ThreadWorker(base.Worker):
conn = TConn(self.cfg, sock, client, server)
self.nr_conns += 1
# enqueue the job
self.enqueue_req(conn)
if self.routing_enabled and not self.cfg.is_ssl:
# park the connection until its request line is readable, then
# classify and route it to the fast or slow lane
self.park_for_request(conn)
else:
# legacy single-lane path (also used for SSL, whose request
# line cannot be peeked before the TLS handshake)
self.enqueue_req(conn)
except OSError as e:
if e.errno not in (errno.EAGAIN, errno.ECONNABORTED,
errno.EWOULDBLOCK):
raise
def park_for_request(self, conn):
"""Register a connection in the poller until its request line arrives."""
conn.sock.setblocking(False)
conn.set_timeout()
with self._lock:
self._keep.append(conn)
self.poller.register(conn.sock, selectors.EVENT_READ,
partial(self.classify_and_dispatch, conn))
def classify_and_dispatch(self, conn, client=None):
"""Peek the request line, predict the lane, and enqueue the request."""
line, closed, complete = self._peek_request_line(conn)
if not closed and not complete:
# request line has not fully arrived yet; keep waiting. Stalled
# clients are reaped by murder_keepalived via the connection timeout.
return
with self._lock:
try:
# remove the connection from the parked set
self._keep.remove(conn)
except ValueError:
# already handled (e.g. by murder_keepalived); nothing to do
return
try:
self.poller.unregister(conn.sock)
except (KeyError, OSError, ValueError):
pass
if closed:
self.nr_conns -= 1
conn.close()
return
conn.route_key = self._route_key(line)
slow = self.predictor.is_slow(conn.route_key)
self.enqueue_req(conn, slow=slow)
def _peek_request_line(self, conn):
"""Return ``(line, closed, complete)`` for the connection's request line.
``line`` is the request line bytes (without CRLF) once available,
``closed`` is True if the peer closed the connection, and ``complete``
is True once we should stop waiting for more data.
"""
try:
data = conn.sock.recv(REQUEST_LINE_PEEK, socket.MSG_PEEK)
except (BlockingIOError, InterruptedError):
return None, False, False
except OSError:
return None, True, False
if data == b"":
# peer closed the connection before sending a request
return None, True, False
idx = data.find(b"\r\n")
if idx == -1:
if len(data) >= REQUEST_LINE_PEEK:
# request line longer than our peek window; stop classifying and
# let the worker's parser deal with (or reject) it
return None, False, True
return None, False, False
return data[:idx], False, True
@staticmethod
def _route_key(line):
"""Build a route key (``"METHOD /path"``) from a raw request line."""
if not line:
return None
parts = line.split(b" ")
if len(parts) < 2:
return None
try:
method = parts[0].decode("latin1")
path = parts[1].split(b"?", 1)[0].decode("latin1")
except UnicodeDecodeError:
return None
return method + " " + path
def reuse_connection(self, conn, client):
with self._lock:
# unregister the client from the poller
@ -248,8 +373,16 @@ class ThreadWorker(base.Worker):
self.alive = False
self.log.error("A request timed out. Exiting.")
faulthandler.dump_traceback()
elif (self.routing_enabled and not fut._observed_slow
and not fut.slow
and current_time - fut._start_time > self.slow_threshold):
# an in-flight fast-lane request crossed the threshold; learn
# the route as slow now so the rest of a burst is rerouted
# without waiting for this request to finish
self.predictor.observe_slow(fut.conn.route_key)
fut._observed_slow = True
self.tpool.shutdown(False)
self._shutdown_pools(False)
self.poller.close()
for s in self.sockets:
@ -263,22 +396,32 @@ class ThreadWorker(base.Worker):
fs.conn.close()
return
# feed the observed processing time back to the predictor so the route
# is learned (or unlearned) as slow
if self.routing_enabled and fs.conn.route_key:
self.predictor.update(fs.conn.route_key,
time.monotonic() - fs._start_time)
try:
(keepalive, conn) = fs.result()
# if the connection should be kept alived add it
# to the eventloop and record it
if keepalive and self.alive:
# flag the socket as non blocked
conn.sock.setblocking(False)
if self.routing_enabled and not self.cfg.is_ssl:
# re-classify the next request on this connection
self.park_for_request(conn)
else:
# flag the socket as non blocked
conn.sock.setblocking(False)
# register the connection
conn.set_timeout()
with self._lock:
self._keep.append(conn)
# register the connection
conn.set_timeout()
with self._lock:
self._keep.append(conn)
# add the socket to the event loop
self.poller.register(conn.sock, selectors.EVENT_READ,
partial(self.reuse_connection, conn))
# add the socket to the event loop
self.poller.register(conn.sock, selectors.EVENT_READ,
partial(self.reuse_connection, conn))
else:
self.nr_conns -= 1
conn.close()

View File

@ -0,0 +1,75 @@
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
"""Slow-route prediction for the gthread worker.
The :class:`SlowRoutePredictor` decides, before a request is handed to a
worker, whether its route is expected to be slow, based on previously observed
timings of the same route (method + path). The gthread worker uses this to
route slow requests to a dedicated thread pool so they cannot starve fast
requests.
"""
import threading
from collections import OrderedDict
class SlowRoutePredictor:
"""Predicts whether a route (method + path) is slow.
Timings are tracked per route as an exponentially weighted moving average
(EWMA) so that a route which becomes fast again decays back below the
threshold. The table is bounded (LRU) to cap memory under high route
cardinality.
"""
def __init__(self, threshold, max_entries=1024, alpha=0.3):
self.threshold = threshold
self.alpha = alpha
self.max_entries = max_entries
self._stats = OrderedDict()
self._lock = threading.Lock()
def is_slow(self, key):
if not key:
return False
with self._lock:
ewma = self._stats.get(key)
if ewma is None:
return False
self._stats.move_to_end(key)
return ewma >= self.threshold
def update(self, key, duration):
"""Record an observed processing ``duration`` (seconds) for ``key``."""
if not key:
return
with self._lock:
ewma = self._stats.get(key)
if ewma is None:
ewma = duration
else:
ewma = (1 - self.alpha) * ewma + self.alpha * duration
self._stats[key] = ewma
self._stats.move_to_end(key)
self._evict()
def observe_slow(self, key):
"""Mark ``key`` slow now, before its request has finished.
Used when an in-flight request crosses the threshold, so the rest of a
simultaneous burst to a never-seen slow route is routed to the slow lane
without waiting for the first request to complete.
"""
if not key:
return
with self._lock:
cur = self._stats.get(key, 0.0)
self._stats[key] = max(cur, self.threshold)
self._stats.move_to_end(key)
self._evict()
def _evict(self):
while len(self._stats) > self.max_entries:
self._stats.popitem(last=False)

View File

@ -0,0 +1,97 @@
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
import socket
import types
from gunicorn.workers.gthread import ThreadWorker, REQUEST_LINE_PEEK
# _route_key is a staticmethod, so it can be exercised directly.
def test_route_key_basic():
assert ThreadWorker._route_key(b"GET /index HTTP/1.1") == "GET /index"
def test_route_key_strips_query_string():
assert ThreadWorker._route_key(
b"GET /search?q=hello&p=2 HTTP/1.1") == "GET /search"
def test_route_key_post():
assert ThreadWorker._route_key(
b"POST /reports/generate HTTP/1.0") == "POST /reports/generate"
def test_route_key_malformed():
assert ThreadWorker._route_key(b"") is None
assert ThreadWorker._route_key(b"GARBAGE") is None
assert ThreadWorker._route_key(None) is None
def _peek(conn):
# _peek_request_line only touches conn.sock, so we can bind it to a stub
return ThreadWorker._peek_request_line(object(), conn)
def test_peek_complete_request_line():
a, b = socket.socketpair()
try:
a.setblocking(False)
b.sendall(b"GET /x HTTP/1.1\r\nHost: y\r\n\r\n")
conn = types.SimpleNamespace(sock=a)
line, closed, complete = _peek(conn)
assert line == b"GET /x HTTP/1.1"
assert closed is False
assert complete is True
# MSG_PEEK must leave the bytes in the buffer for the parser
assert a.recv(5) == b"GET /"
finally:
a.close()
b.close()
def test_peek_incomplete_request_line_waits():
a, b = socket.socketpair()
try:
a.setblocking(False)
b.sendall(b"GET /x HTT") # no CRLF yet
conn = types.SimpleNamespace(sock=a)
line, closed, complete = _peek(conn)
assert line is None
assert closed is False
assert complete is False
finally:
a.close()
b.close()
def test_peek_no_data_yet():
a, b = socket.socketpair()
try:
a.setblocking(False)
conn = types.SimpleNamespace(sock=a)
line, closed, complete = _peek(conn)
# nothing buffered: not closed, not complete -> keep waiting
assert (line, closed, complete) == (None, False, False)
finally:
a.close()
b.close()
def test_peek_peer_closed():
a, b = socket.socketpair()
a.setblocking(False)
b.close()
try:
conn = types.SimpleNamespace(sock=a)
line, closed, complete = _peek(conn)
assert closed is True
finally:
a.close()
def test_peek_window_constant_is_reasonable():
# a sanity bound so request lines fit comfortably in one peek
assert REQUEST_LINE_PEEK >= 8192

View File

@ -0,0 +1,47 @@
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
from gunicorn.workers.gthread_routing import SlowRoutePredictor
def test_predictor_unknown_route_is_fast():
p = SlowRoutePredictor(threshold=1.0)
assert p.is_slow("GET /") is False
def test_predictor_learns_slow_route_on_update():
p = SlowRoutePredictor(threshold=1.0, alpha=1.0)
p.update("GET /slow", 5.0)
assert p.is_slow("GET /slow") is True
assert p.is_slow("GET /fast") is False
def test_predictor_ewma_decays_back_to_fast():
p = SlowRoutePredictor(threshold=1.0, alpha=0.5)
p.update("GET /x", 5.0)
assert p.is_slow("GET /x") is True
# repeated fast samples should pull the EWMA back under the threshold
for _ in range(20):
p.update("GET /x", 0.01)
assert p.is_slow("GET /x") is False
def test_predictor_observe_slow_marks_immediately():
p = SlowRoutePredictor(threshold=2.0)
p.observe_slow("POST /report")
assert p.is_slow("POST /report") is True
def test_predictor_lru_bound():
p = SlowRoutePredictor(threshold=1.0, max_entries=10)
for i in range(50):
p.update("GET /%d" % i, 0.01)
assert len(p._stats) <= 10
def test_predictor_empty_key_is_fast():
p = SlowRoutePredictor(threshold=1.0)
assert p.is_slow(None) is False
p.update(None, 5.0) # must not raise
assert p.is_slow(None) is False