From 54b59ca884c79dcf04d4882658a32bbaf17e999f Mon Sep 17 00:00:00 2001 From: Ankush Menat Date: Fri, 29 May 2026 10:47:16 +0530 Subject: [PATCH] fix: Avoid queue time in slow req prediction (#13) * fix: Avoid queue time in slow req prediction Else we punish requests that were blocked by slow requests in queue. * fix: add basic logging for adaptive queueing --- gunicorn/workers/gthread.py | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/gunicorn/workers/gthread.py b/gunicorn/workers/gthread.py index 9bb0e1f1..23fa8272 100644 --- a/gunicorn/workers/gthread.py +++ b/gunicorn/workers/gthread.py @@ -48,6 +48,8 @@ class TConn: # route key (method + path), set by the worker when request routing is # enabled; used to predict and learn slow routes self.route_key = None + # set by ``handle`` when a pool thread picks this request up + self.exec_start_time = None # set the socket to non blocking self.sock.setblocking(False) @@ -115,6 +117,8 @@ class ThreadWorker(base.Worker): self.tpool = futures.ThreadPoolExecutor(max_workers=fast) self.slow_pool = futures.ThreadPoolExecutor(max_workers=slow) self.predictor = SlowRoutePredictor(self.slow_threshold) + self.log.debug("adaptive queueing enabled: fast=%d slow=%d " + "threshold=%.1fs", fast, slow, self.slow_threshold) else: self.tpool = self.get_thread_pool() self.poller = selectors.DefaultSelector() @@ -145,8 +149,11 @@ class ThreadWorker(base.Worker): def _wrap_future(self, fs, conn, slow=False): fs.conn = conn fs.slow = slow - fs._start_time = time.monotonic() - fs._request_timeout = fs._start_time + self.cfg.timeout + # ``handle`` records the actual execution start time on ``conn`` when + # the pool thread picks the request up; until then it stays None so + # the slow-route checks below skip still-queued requests. + conn.exec_start_time = None + fs._request_timeout = time.monotonic() + self.cfg.timeout fs._observed_slow = False self.futures.append(fs) fs.add_done_callback(self.finish_request) @@ -215,6 +222,8 @@ class ThreadWorker(base.Worker): conn.route_key = self._route_key(line) slow = self.predictor.is_slow(conn.route_key) + self.log.debug("routing %r to %s lane", conn.route_key, + "slow" if slow else "fast") self.enqueue_req(conn, slow=slow) def _peek_request_line(self, conn): @@ -375,12 +384,15 @@ class ThreadWorker(base.Worker): faulthandler.dump_traceback() elif (self.routing_enabled and not fut._observed_slow and not fut.slow - and current_time - fut._start_time > self.slow_threshold): + and fut.conn.exec_start_time is not None + and current_time - fut.conn.exec_start_time > self.slow_threshold): # an in-flight fast-lane request crossed the threshold; learn # the route as slow now so the rest of a burst is rerouted # without waiting for this request to finish self.predictor.observe_slow(fut.conn.route_key) fut._observed_slow = True + self.log.debug("in-flight request %r crossed threshold; " + "marking route slow", fut.conn.route_key) self._shutdown_pools(False) self.poller.close() @@ -398,9 +410,12 @@ class ThreadWorker(base.Worker): # feed the observed processing time back to the predictor so the route # is learned (or unlearned) as slow - if self.routing_enabled and fs.conn.route_key: - self.predictor.update(fs.conn.route_key, - time.monotonic() - fs._start_time) + if (self.routing_enabled and fs.conn.route_key + and fs.conn.exec_start_time is not None): + duration = time.monotonic() - fs.conn.exec_start_time + self.predictor.update(fs.conn.route_key, duration) + self.log.debug("observed %r took %.3fs", fs.conn.route_key, + duration) try: (keepalive, conn) = fs.result() @@ -432,6 +447,7 @@ class ThreadWorker(base.Worker): fs.conn.close() def handle(self, conn): + conn.exec_start_time = time.monotonic() keepalive = False req = None try: