mirror of
https://github.com/frappe/gunicorn.git
synced 2026-07-01 10:11:30 +08:00
fix: Avoid queue time in slow req prediction (#13)
* fix: Avoid queue time in slow req prediction Else we punish requests that were blocked by slow requests in queue. * fix: add basic logging for adaptive queueing
This commit is contained in:
parent
f61e5845f3
commit
54b59ca884
@ -48,6 +48,8 @@ class TConn:
|
||||
# route key (method + path), set by the worker when request routing is
|
||||
# enabled; used to predict and learn slow routes
|
||||
self.route_key = None
|
||||
# set by ``handle`` when a pool thread picks this request up
|
||||
self.exec_start_time = None
|
||||
|
||||
# set the socket to non blocking
|
||||
self.sock.setblocking(False)
|
||||
@ -115,6 +117,8 @@ class ThreadWorker(base.Worker):
|
||||
self.tpool = futures.ThreadPoolExecutor(max_workers=fast)
|
||||
self.slow_pool = futures.ThreadPoolExecutor(max_workers=slow)
|
||||
self.predictor = SlowRoutePredictor(self.slow_threshold)
|
||||
self.log.debug("adaptive queueing enabled: fast=%d slow=%d "
|
||||
"threshold=%.1fs", fast, slow, self.slow_threshold)
|
||||
else:
|
||||
self.tpool = self.get_thread_pool()
|
||||
self.poller = selectors.DefaultSelector()
|
||||
@ -145,8 +149,11 @@ class ThreadWorker(base.Worker):
|
||||
def _wrap_future(self, fs, conn, slow=False):
|
||||
fs.conn = conn
|
||||
fs.slow = slow
|
||||
fs._start_time = time.monotonic()
|
||||
fs._request_timeout = fs._start_time + self.cfg.timeout
|
||||
# ``handle`` records the actual execution start time on ``conn`` when
|
||||
# the pool thread picks the request up; until then it stays None so
|
||||
# the slow-route checks below skip still-queued requests.
|
||||
conn.exec_start_time = None
|
||||
fs._request_timeout = time.monotonic() + self.cfg.timeout
|
||||
fs._observed_slow = False
|
||||
self.futures.append(fs)
|
||||
fs.add_done_callback(self.finish_request)
|
||||
@ -215,6 +222,8 @@ class ThreadWorker(base.Worker):
|
||||
|
||||
conn.route_key = self._route_key(line)
|
||||
slow = self.predictor.is_slow(conn.route_key)
|
||||
self.log.debug("routing %r to %s lane", conn.route_key,
|
||||
"slow" if slow else "fast")
|
||||
self.enqueue_req(conn, slow=slow)
|
||||
|
||||
def _peek_request_line(self, conn):
|
||||
@ -375,12 +384,15 @@ class ThreadWorker(base.Worker):
|
||||
faulthandler.dump_traceback()
|
||||
elif (self.routing_enabled and not fut._observed_slow
|
||||
and not fut.slow
|
||||
and current_time - fut._start_time > self.slow_threshold):
|
||||
and fut.conn.exec_start_time is not None
|
||||
and current_time - fut.conn.exec_start_time > self.slow_threshold):
|
||||
# an in-flight fast-lane request crossed the threshold; learn
|
||||
# the route as slow now so the rest of a burst is rerouted
|
||||
# without waiting for this request to finish
|
||||
self.predictor.observe_slow(fut.conn.route_key)
|
||||
fut._observed_slow = True
|
||||
self.log.debug("in-flight request %r crossed threshold; "
|
||||
"marking route slow", fut.conn.route_key)
|
||||
|
||||
self._shutdown_pools(False)
|
||||
self.poller.close()
|
||||
@ -398,9 +410,12 @@ class ThreadWorker(base.Worker):
|
||||
|
||||
# feed the observed processing time back to the predictor so the route
|
||||
# is learned (or unlearned) as slow
|
||||
if self.routing_enabled and fs.conn.route_key:
|
||||
self.predictor.update(fs.conn.route_key,
|
||||
time.monotonic() - fs._start_time)
|
||||
if (self.routing_enabled and fs.conn.route_key
|
||||
and fs.conn.exec_start_time is not None):
|
||||
duration = time.monotonic() - fs.conn.exec_start_time
|
||||
self.predictor.update(fs.conn.route_key, duration)
|
||||
self.log.debug("observed %r took %.3fs", fs.conn.route_key,
|
||||
duration)
|
||||
|
||||
try:
|
||||
(keepalive, conn) = fs.result()
|
||||
@ -432,6 +447,7 @@ class ThreadWorker(base.Worker):
|
||||
fs.conn.close()
|
||||
|
||||
def handle(self, conn):
|
||||
conn.exec_start_time = time.monotonic()
|
||||
keepalive = False
|
||||
req = None
|
||||
try:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user