refactor: Simpler implementation

Just do multiple queues, nothing else.
This commit is contained in:
Ankush Menat 2026-05-28 16:33:54 +05:30
parent ec6af68013
commit 7dc4d184be
3 changed files with 92 additions and 162 deletions

View File

@ -57,23 +57,23 @@ the actual failure mode — prediction is effective after the first sample.
│ fast │ slow │ fast │ slow
▼ ▼ ▼ ▼
┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐
│ fast_pool │ │ slow_pool │ (bounded, 503 on full) │ fast_pool │ │ slow_pool │
│ F threads │ │ S threads │ │ F threads │ │ S threads │
└─────┬─────┘ └─────┬─────┘ └─────┬─────┘ └─────┬─────┘
└───────┬───────┘ └───────┬───────┘
on completion: predictor.update(route, duration) on completion: predictor.update(route, duration)
``` ```
- **Fast lane**: a `ThreadPoolExecutor` of `F = cfg.threads` threads. Only ever - **Fast lane**: a `ThreadPoolExecutor` of `F = ceil(cfg.threads / 2)` threads.
runs fast-classified work. Only ever runs fast-classified work.
- **Slow lane**: a separate `ThreadPoolExecutor` of `S = cfg.slow_threads` - **Slow lane**: a separate `ThreadPoolExecutor` of `S = cfg.threads // 2`
threads (default 1). Only ever runs slow-classified work. threads. Only ever runs slow-classified work.
- Total OS threads per worker = `F + S`. - Total OS threads per worker stays at `cfg.threads` — adaptive-queueing mode splits
- The slow lane is bounded: a counter (`nr_slow`) tracks slow requests the existing budget, it does not expand it. `cfg.threads` must be at least 2
submitted-but-not-finished; once it reaches `S + cfg.slow_queue_maxsize` (i.e. for the split to be meaningful; otherwise the worker logs a warning and runs
running plus queued), further slow requests are rejected with `503` instead of with a single pool.
growing the executor's unbounded internal queue. The fast lane is governed by - Both lanes share the existing `worker_connections` admission like today; the
the existing `worker_connections` admission like today. slow lane is not separately bounded.
### Why two plain pools (and not a custom dual-queue scheduler) ### Why two plain pools (and not a custom dual-queue scheduler)
@ -157,13 +157,13 @@ the handshake completes. For SSL connections in this first cut:
New settings, mirroring `WorkerThreads` (`config.py:697`): New settings, mirroring `WorkerThreads` (`config.py:697`):
- `enable_adaptive_queueing` — boolean; when true, the `gthread` worker splits its
`cfg.threads` budget between a fast and a slow lane and routes by prediction.
Default `False` (single pool, today's behavior). Requires `cfg.threads >= 2`;
otherwise the worker logs a warning and falls back to the single pool.
- `slow_request_threshold` — float seconds; a route whose learned timing meets/ - `slow_request_threshold` — float seconds; a route whose learned timing meets/
exceeds this is "slow". Default e.g. `1.0`. **`0` disables the whole feature** exceeds this is "slow". Default `1.0`. Only consulted when `enable_adaptive_queueing` is
and restores today's single-pool behavior exactly. enabled.
- `slow_threads``S`, slow-lane worker count. Default `1`.
- `slow_queue_maxsize` — bound on `slow_q`; overflow ⇒ `503`. Default e.g. `100`
(`0` = unbounded).
- `slow_lane_retry_after` — seconds for the `Retry-After` header on 503.
A `slow_route_key` hook to customize the route key (e.g. collapse A `slow_route_key` hook to customize the route key (e.g. collapse
`/users/<id>`) is a possible future addition; the default key is method + path `/users/<id>`) is a possible future addition; the default key is method + path
@ -171,17 +171,17 @@ with the query string stripped.
### 5.2 Two thread pools ### 5.2 Two thread pools
`init_process` builds two plain `ThreadPoolExecutor`s when routing is enabled — `init_process` builds two plain `ThreadPoolExecutor`s when `enable_adaptive_queueing` is
`fast_pool` (`F = cfg.threads`) and `slow_pool` (`S = cfg.slow_threads`) — and enabled — `fast_pool` with `F = ceil(cfg.threads / 2)` workers and `slow_pool`
falls back to the single `get_thread_pool()` executor when it is disabled. with `S = cfg.threads // 2` workers — and falls back to the single
`enqueue_req(conn, slow)` submits to the matching pool; both produce ordinary `get_thread_pool()` executor when it is disabled. `enqueue_req(conn, slow)`
submits to the matching pool; both produce ordinary
`concurrent.futures.Future`s, so `_wrap_future`, `add_done_callback`, `concurrent.futures.Future`s, so `_wrap_future`, `add_done_callback`,
`self.futures` tracking, and `futures.wait` all keep working unchanged. `self.futures` tracking, and `futures.wait` all keep working unchanged.
- Bounding the slow lane: `nr_slow` counts slow requests submitted-but-not-yet- - The slow lane is not separately bounded; its executor's internal queue is
finished. `enqueue_req` rejects (503) when `nr_slow >= S + slow_queue_maxsize`; capped indirectly by the worker's existing `worker_connections` admission,
`finish_request` decrements it. This caps the slow executor's otherwise the same as the single-pool path today.
unbounded internal queue.
- Shutdown drains both pools via a `_shutdown_pools` helper, replacing the - Shutdown drains both pools via a `_shutdown_pools` helper, replacing the
single `tpool.shutdown` calls; the `graceful_timeout` `futures.wait` is single `tpool.shutdown` calls; the `graceful_timeout` `futures.wait` is
unchanged. unchanged.
@ -215,16 +215,16 @@ A small, self-contained, thread-safe object:
## 6. Behavior under load (the cases that matter) ## 6. Behavior under load (the cases that matter)
- **Flood of a previously-seen slow route**: every such request - **Flood of a previously-seen slow route**: every such request is routed to
is routed to the slow pool. The `F` fast threads are never given this work and the slow pool. The `F` fast threads are never given this work and keep
keep serving fast traffic at full capacity. When the slow lane reaches serving fast traffic at full capacity. Excess slow requests sit in the slow
`S + slow_queue_maxsize`, further slow requests get a fast `503` — backpressure pool's queue, gated overall by `worker_connections`.
is contained to the slow lane.
- **Flood of a never-seen slow route**: the first occurrence(s) run in the fast - **Flood of a never-seen slow route**: the first occurrence(s) run in the fast
lane; mid-flight learning (§5.4.2) flips the route to slow after one threshold lane; mid-flight learning (§5.4.2) flips the route to slow after one threshold
interval, so the flood is contained quickly. interval, so the flood is contained quickly.
- **Mixed fast traffic, idle slow lane**: the `S` slow threads stay parked (no - **Mixed fast traffic, idle slow lane**: the `S` slow threads stay parked (no
work stealing in this design — see §3), so fast throughput is `F`, not `F + S`. work stealing in this design — see §3), so fast throughput is `F`, not
`F + S`. This is the cost of splitting a fixed `cfg.threads` budget.
- **Misprediction (route marked slow but now fast)**: handled gracefully — it - **Misprediction (route marked slow but now fast)**: handled gracefully — it
runs in the slow lane, and EWMA decay restores it to the fast lane over time. runs in the slow lane, and EWMA decay restores it to the fast lane over time.
@ -232,23 +232,23 @@ A small, self-contained, thread-safe object:
Implemented: Implemented:
- `config.py``slow_request_threshold`, `slow_threads`, `slow_queue_maxsize`, - `config.py``enable_adaptive_queueing`, `slow_request_threshold`, plus
`slow_lane_retry_after`, plus `validate_pos_float`. `validate_pos_float`.
- `gthread.py` `init_process`/`get_thread_pool` — build `fast_pool` and - `gthread.py` `init_process`/`get_thread_pool` — build `fast_pool` and
`slow_pool` (or the single legacy pool when disabled); `_shutdown_pools`. `slow_pool` (split from `cfg.threads`) when `enable_adaptive_queueing` is on, or the
- `gthread.py` `enqueue_req` — route to the matching pool; `nr_slow` bound + single legacy pool when off; `_shutdown_pools`.
`reject_overloaded` (503). - `gthread.py` `enqueue_req` — route to the matching pool.
- `gthread.py` `accept`/`park_for_request`/`classify_and_dispatch`/ - `gthread.py` `accept`/`park_for_request`/`classify_and_dispatch`/
`_peek_request_line`/`_route_key` — poller-driven request-line peek + routing. `_peek_request_line`/`_route_key` — poller-driven request-line peek + routing.
- `gthread.py` `finish_request``predictor.update`, `nr_slow` decrement, - `gthread.py` `finish_request``predictor.update`, routing-aware keepalive
routing-aware keepalive re-park. re-park.
- `gthread.py` run-loop sweep — mid-flight learning. - `gthread.py` run-loop sweep — mid-flight learning.
- `gthread_routing.py``SlowRoutePredictor`. - `gthread_routing.py``SlowRoutePredictor`.
## 8. Backward compatibility ## 8. Backward compatibility
- `slow_request_threshold = 0` ⇒ feature off: single pool, no classification, no - `enable_adaptive_queueing = False` (the default) ⇒ feature off: single pool, no
rejection — byte-for-byte current behavior. classification — byte-for-byte current behavior.
- Hard per-request timeout (`gthread.py:243-250`) preserved unchanged; this adds - Hard per-request timeout (`gthread.py:243-250`) preserved unchanged; this adds
a softer, non-fatal classification on top. a softer, non-fatal classification on top.
- Worker `handle`/`handle_request`, keepalive semantics, and the - Worker `handle`/`handle_request`, keepalive semantics, and the
@ -265,8 +265,8 @@ Implemented:
dispatches to the expected lane. dispatches to the expected lane.
- **Integration — flood isolation**: app with a known-slow route flooded - **Integration — flood isolation**: app with a known-slow route flooded
concurrently; assert fast-route latency stays low and slow requests never concurrently; assert fast-route latency stays low and slow requests never
occupy fast workers; assert 503 once the slow lane is full. occupy fast workers.
- **Integration — cold start**: never-seen slow route burst ⇒ confirm the lane - **Integration — cold start**: never-seen slow route burst ⇒ confirm the lane
flips to slow within ~one threshold interval via mid-flight learning. flips to slow within ~one threshold interval via mid-flight learning.
- **Regression**: `slow_request_threshold = 0` ⇒ current behavior; keepalive, - **Regression**: `enable_adaptive_queueing = False` ⇒ current behavior; keepalive, SSL, and
SSL, and graceful shutdown paths still pass existing tests. graceful shutdown paths still pass existing tests.

View File

@ -744,32 +744,26 @@ class WorkerConnections(Setting):
""" """
class SlowRequestThreshold(Setting): class EnableAdaptiveQueueing(Setting):
name = "slow_request_threshold" name = "enable_adaptive_queueing"
section = "Worker Processes" section = "Worker Processes"
cli = ["--slow-request-threshold"] cli = ["--enable-adaptive-queueing"]
meta = "FLOAT" validator = validate_bool
validator = validate_pos_float action = "store_true"
type = float type = bool
default = 0.0 default = False
desc = """\ desc = """\
Processing time (in seconds) above which a request route is treated as Enable adaptive multi-queue routing in the ``gthread`` worker.
"slow" by the ``gthread`` worker.
When set to a positive value, the ``gthread`` worker runs two separate When enabled, the worker splits its :ref:`threads` roughly evenly into
lanes of threads: a *fast* lane (sized by :ref:`threads`) and a *slow* two lanes a *fast* lane and a *slow* lane and routes each request
lane (sized by :ref:`slow-threads`). Requests are routed by predicting, to one of them by predicting, from previously observed timings of the
from previously observed timings of the same route (method + path), same route (method + path), whether it will exceed
whether they will take longer than this threshold. Slow-predicted :ref:`slow-request-threshold`. Slow-predicted requests go to the slow
requests go to the slow lane so they can never starve the fast lane, lane so they can never starve the fast lane, even under a flood of
even under a flood. slow requests.
A route is learned as slow once it has been observed exceeding this Requires :ref:`threads` to be at least 2.
threshold (either on completion or while still running); its timing
decays back below the threshold if it becomes fast again.
The default of ``0`` disables the feature and restores the single
thread-pool behaviour.
This setting only affects the ``gthread`` worker type. This setting only affects the ``gthread`` worker type.
@ -777,66 +771,25 @@ class SlowRequestThreshold(Setting):
""" """
class SlowThreads(Setting): class SlowRequestThreshold(Setting):
name = "slow_threads" name = "slow_request_threshold"
section = "Worker Processes" section = "Worker Processes"
cli = ["--slow-threads"] cli = ["--slow-request-threshold"]
meta = "INT" meta = "FLOAT"
validator = validate_pos_int validator = validate_pos_float
type = int type = float
default = 1 default = 1.0
desc = """\ desc = """\
The number of worker threads dedicated to the slow lane. Processing time (in seconds) above which a request route is treated as
"slow" by the ``gthread`` worker when :ref:`enable-adaptive-queueing`
is enabled.
These threads only ever run requests predicted to be slow. When the slow A route is learned as slow once it has been observed exceeding this
lane has no pending work, they help drain the fast lane, so they are threshold (either on completion or while still running); its timing
never idle while fast work is waiting. Total threads per worker is decays back below the threshold if it becomes fast again.
``threads + slow_threads``.
Only used by the ``gthread`` worker when Only used by the ``gthread`` worker when :ref:`enable-adaptive-queueing`
:ref:`slow-request-threshold` is set. is enabled.
.. versionadded:: 23.1.0
"""
class SlowQueueMaxsize(Setting):
name = "slow_queue_maxsize"
section = "Worker Processes"
cli = ["--slow-queue-maxsize"]
meta = "INT"
validator = validate_pos_int
type = int
default = 100
desc = """\
Maximum number of requests allowed to wait in the slow lane queue.
When the slow lane queue is full, additional slow-predicted requests are
rejected immediately with a ``503 Service Unavailable`` response instead
of being queued, keeping the slow lane's backpressure contained. A value
of ``0`` means the slow queue is unbounded.
Only used by the ``gthread`` worker when
:ref:`slow-request-threshold` is set.
.. versionadded:: 23.1.0
"""
class SlowLaneRetryAfter(Setting):
name = "slow_lane_retry_after"
section = "Worker Processes"
cli = ["--slow-lane-retry-after"]
meta = "INT"
validator = validate_pos_int
type = int
default = 1
desc = """\
Value (in seconds) of the ``Retry-After`` header sent with the ``503``
response when the slow lane queue is full.
Only used by the ``gthread`` worker when
:ref:`slow-request-threshold` is set.
.. versionadded:: 23.1.0 .. versionadded:: 23.1.0
""" """

View File

@ -76,24 +76,22 @@ class ThreadWorker(base.Worker):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.worker_connections = self.cfg.worker_connections self.worker_connections = self.cfg.worker_connections
self.max_keepalived = self.cfg.worker_connections - self.cfg.threads self.max_keepalived = self.cfg.worker_connections - self.cfg.threads
# request routing: when adaptive queueing is enabled, the configured
# threads are split into a fast lane and a slow lane so slow requests
# cannot starve fast ones
self.routing_enabled = (
self.cfg.enable_adaptive_queueing and self.cfg.threads >= 2)
self.slow_threshold = self.cfg.slow_request_threshold
# initialise the pool(s): a single pool when routing is disabled, or a # initialise the pool(s): a single pool when routing is disabled, or a
# separate fast (``self.tpool``) and slow pool when it is enabled # separate fast (``self.tpool``) and slow pool when it is enabled
self.tpool = None self.tpool = None
self.slow_pool = None self.slow_pool = None
# number of slow requests submitted but not yet finished, used to bound
# the slow lane (running + queued) and shed load with 503
self.nr_slow = 0
self.poller = None self.poller = None
self.shutdown_event = os.eventfd(0) self.shutdown_event = os.eventfd(0)
self._lock = None self._lock = None
self.futures = deque() self.futures = deque()
self._keep = deque() self._keep = deque()
self.nr_conns = 0 self.nr_conns = 0
# request routing: when a slow-request threshold is configured, slow
# requests are routed to a separate lane so they cannot starve fast ones
self.slow_threshold = self.cfg.slow_request_threshold
self.routing_enabled = self.slow_threshold > 0
self.predictor = None self.predictor = None
@classmethod @classmethod
@ -104,14 +102,21 @@ class ThreadWorker(base.Worker):
log.warning("No keepalived connections can be handled. " + log.warning("No keepalived connections can be handled. " +
"Check the number of worker connections and threads.") "Check the number of worker connections and threads.")
if cfg.enable_adaptive_queueing and cfg.threads < 2:
log.warning("enable_adaptive_queueing requires at least 2 threads; "
"running with a single pool.")
def init_process(self): def init_process(self):
self.tpool = self.get_thread_pool()
if self.routing_enabled: if self.routing_enabled:
# split the configured threads roughly evenly between the two
# lanes; the fast lane gets the extra thread when threads is odd
slow = self.cfg.threads // 2
fast = self.cfg.threads - slow
self.tpool = futures.ThreadPoolExecutor(max_workers=fast)
self.slow_pool = futures.ThreadPoolExecutor(max_workers=slow)
self.predictor = SlowRoutePredictor(self.slow_threshold) self.predictor = SlowRoutePredictor(self.slow_threshold)
# a dedicated pool for the slow lane: slow requests can never else:
# occupy the fast pool's (``self.tpool``) threads self.tpool = self.get_thread_pool()
self.slow_pool = futures.ThreadPoolExecutor(
max_workers=self.cfg.slow_threads)
self.poller = selectors.DefaultSelector() self.poller = selectors.DefaultSelector()
self._lock = RLock() self._lock = RLock()
super().init_process() super().init_process()
@ -148,15 +153,7 @@ class ThreadWorker(base.Worker):
def enqueue_req(self, conn, slow=False): def enqueue_req(self, conn, slow=False):
conn.init() conn.init()
# submit the connection to the appropriate pool
if self.routing_enabled and slow: if self.routing_enabled and slow:
cap = self.cfg.slow_queue_maxsize
if cap and self.nr_slow >= self.cfg.slow_threads + cap:
# slow lane (running + queued) is full; shed load with 503
# instead of letting the slow queue grow unbounded
self.reject_overloaded(conn)
return
self.nr_slow += 1
fs = self.slow_pool.submit(self.handle, conn) fs = self.slow_pool.submit(self.handle, conn)
else: else:
fs = self.tpool.submit(self.handle, conn) fs = self.tpool.submit(self.handle, conn)
@ -262,22 +259,6 @@ class ThreadWorker(base.Worker):
return None return None
return method + " " + path return method + " " + path
def reject_overloaded(self, conn):
"""Reject a connection with 503 because the slow lane is saturated."""
self.nr_conns -= 1
try:
conn.sock.setblocking(True)
conn.sock.sendall(
b"HTTP/1.1 503 Service Unavailable\r\n"
b"Connection: close\r\n"
b"Content-Length: 0\r\n"
b"Retry-After: %d\r\n\r\n"
% int(self.cfg.slow_lane_retry_after))
except OSError:
pass
finally:
conn.close()
def reuse_connection(self, conn, client): def reuse_connection(self, conn, client):
with self._lock: with self._lock:
# unregister the client from the poller # unregister the client from the poller
@ -410,10 +391,6 @@ class ThreadWorker(base.Worker):
futures.wait(self.futures, timeout=self.cfg.graceful_timeout) futures.wait(self.futures, timeout=self.cfg.graceful_timeout)
def finish_request(self, fs): def finish_request(self, fs):
# the slow request is done (whatever the outcome): free its slow slot
if self.routing_enabled and fs.slow:
self.nr_slow -= 1
if fs.cancelled(): if fs.cancelled():
self.nr_conns -= 1 self.nr_conns -= 1
fs.conn.close() fs.conn.close()