Merge pull request #3440 from benoitc/gthread-improvements

gthread: Lock-free refactoring with PollableMethodQueue
This commit is contained in:
Benoit Chesneau 2026-01-22 09:56:45 +01:00 committed by GitHub
commit 6df99ce99b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 1510 additions and 138 deletions

3
THANKS
View File

@ -58,6 +58,7 @@ Diego Oliveira <contact@diegoholiveira.com>
Dima Barsky <github@kappa.ac93.org>
Djoume Salvetti <djoume@freshbooks.com>
Dmitry Medvinsky <me@dmedvinsky.name>
Dominik Działak <ddzialak@users.noreply.github.com>
Dustin Ingram <di@users.noreply.github.com>
Ed Morley <edmorley@users.noreply.github.com>
Eric Florenzano <floguy@gmail.com>
@ -136,6 +137,7 @@ Neil Williams <neil@reddit.com>
Nick Pillitteri <nick@tshlabs.org>
Nik Nyby <nnyby@columbia.edu>
Nikolay Kim <fafhrd91@gmail.com>
Oliver Allen <oallenj@users.noreply.github.com>
Oliver Bristow <evilumbrella+github@gmail.com>
Oliver Tonnhofer <olt@bogosoft.com>
Omer Katz <omer.drow@gmail.com>
@ -168,6 +170,7 @@ Stephane Wirtel <stephane@wirtel.be>
Stephen DiCato <Locker537@gmail.com>
Stephen Holsapple <sholsapp@gmail.com>
Steven Cummings <estebistec@gmail.com>
sylt <sylt@users.noreply.github.com>
Sébastien Fievet <zyegfryed@gmail.com>
Tal Einat <532281+taleinat@users.noreply.github.com>
Talha Malik <talham7391@hotmail.com>

View File

@ -25,16 +25,25 @@ class Parser:
def __iter__(self):
return self
def finish_body(self):
"""Discard any unread body of the current message.
This should be called before returning a keepalive connection to
the poller to ensure the socket doesn't appear readable due to
leftover body bytes.
"""
if self.mesg:
data = self.mesg.body.read(8192)
while data:
data = self.mesg.body.read(8192)
def __next__(self):
# Stop if HTTP dictates a stop.
if self.mesg and self.mesg.should_close():
raise StopIteration()
# Discard any unread body of the previous message
if self.mesg:
data = self.mesg.body.read(8192)
while data:
data = self.mesg.body.read(8192)
self.finish_body()
# Parse the next request
self.req_count += 1

View File

@ -13,6 +13,7 @@
from concurrent import futures
import errno
import os
import queue
import selectors
import socket
import ssl
@ -21,7 +22,6 @@ import time
from collections import deque
from datetime import datetime
from functools import partial
from threading import RLock
from . import base
from .. import http
@ -46,6 +46,9 @@ class TConn:
self.sock.setblocking(False)
def init(self):
# Guard against double initialization
if self.initialized:
return
self.initialized = True
self.sock.setblocking(True)
@ -58,26 +61,110 @@ class TConn:
self.parser = http.RequestParser(self.cfg, self.sock, self.client)
def set_timeout(self):
# set the timeout
self.timeout = time.time() + self.cfg.keepalive
# Use monotonic clock for reliability (time.time() can jump due to NTP)
self.timeout = time.monotonic() + self.cfg.keepalive
def close(self):
util.close(self.sock)
class PollableMethodQueue:
"""Thread-safe queue that can wake up a selector.
Uses a pipe to allow worker threads to signal the main thread
when work is ready, enabling lock-free coordination.
This approach is compatible with all POSIX systems including
Linux, macOS, FreeBSD, OpenBSD, and NetBSD. The pipe is set to
non-blocking mode to prevent worker threads from blocking if
the pipe buffer fills up under extreme load.
"""
def __init__(self):
self._read_fd = None
self._write_fd = None
self._queue = None
def init(self):
"""Initialize the pipe and queue."""
self._read_fd, self._write_fd = os.pipe()
# Set both ends to non-blocking:
# - Write: prevents worker threads from blocking if buffer is full
# - Read: allows run_callbacks to drain without blocking
os.set_blocking(self._read_fd, False)
os.set_blocking(self._write_fd, False)
self._queue = queue.SimpleQueue()
def close(self):
"""Close the pipe file descriptors."""
if self._read_fd is not None:
try:
os.close(self._read_fd)
except OSError:
pass
if self._write_fd is not None:
try:
os.close(self._write_fd)
except OSError:
pass
def fileno(self):
"""Return the readable file descriptor for selector registration."""
return self._read_fd
def defer(self, callback, *args):
"""Queue a callback to be run on the main thread.
The callback is added to the queue first, then a wake-up byte
is written to the pipe. If the pipe write fails (buffer full),
it's safe to ignore because the main thread will eventually
drain the queue when it reads other wake-up bytes.
"""
self._queue.put(partial(callback, *args))
try:
os.write(self._write_fd, b'\x00')
except OSError:
# Pipe buffer full (EAGAIN/EWOULDBLOCK) - safe to ignore
# The main thread will still process the queue
pass
def run_callbacks(self, _fileobj, max_callbacks=50):
"""Run queued callbacks. Called when the pipe is readable.
Drains all available wake-up bytes and runs corresponding callbacks.
The max_callbacks limit prevents starvation of other event sources.
"""
# Read all available wake-up bytes (up to limit)
try:
data = os.read(self._read_fd, max_callbacks)
except OSError:
return
# Run callbacks for each byte read, plus any extras in queue
# (extras can accumulate if pipe writes were dropped)
callbacks_run = 0
while callbacks_run < len(data) + 10: # +10 to drain dropped writes
try:
callback = self._queue.get_nowait()
callback()
callbacks_run += 1
except queue.Empty:
break
class ThreadWorker(base.Worker):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.worker_connections = self.cfg.worker_connections
self.max_keepalived = self.cfg.worker_connections - self.cfg.threads
# initialise the pool
self.tpool = None
self.poller = None
self._lock = None
self.futures = deque()
self._keep = deque()
self.method_queue = PollableMethodQueue()
self.keepalived_conns = deque()
self.nr_conns = 0
self._accepting = False
@classmethod
def check_config(cls, cfg, log):
@ -90,98 +177,85 @@ class ThreadWorker(base.Worker):
def init_process(self):
self.tpool = self.get_thread_pool()
self.poller = selectors.DefaultSelector()
self._lock = RLock()
self.method_queue.init()
super().init_process()
def get_thread_pool(self):
"""Override this method to customize how the thread pool is created"""
return futures.ThreadPoolExecutor(max_workers=self.cfg.threads)
def handle_quit(self, sig, frame):
self.alive = False
# worker_int callback
self.cfg.worker_int(self)
self.tpool.shutdown(False)
time.sleep(0.1)
sys.exit(0)
def handle_exit(self, sig, frame):
"""Handle SIGTERM - begin graceful shutdown."""
if self.alive:
self.alive = False
# Wake up the poller so it can start shutdown
self.method_queue.defer(lambda: None)
def _wrap_future(self, fs, conn):
fs.conn = conn
self.futures.append(fs)
fs.add_done_callback(self.finish_request)
def handle_quit(self, sig, frame):
"""Handle SIGQUIT - immediate shutdown."""
self.tpool.shutdown(wait=False)
super().handle_quit(sig, frame)
def set_accept_enabled(self, enabled):
"""Enable or disable accepting new connections."""
if enabled == self._accepting:
return
for sock in self.sockets:
if enabled:
sock.setblocking(False)
self.poller.register(sock, selectors.EVENT_READ, self.accept)
else:
self.poller.unregister(sock)
self._accepting = enabled
def enqueue_req(self, conn):
conn.init()
# submit the connection to a worker
"""Submit connection to thread pool for processing."""
fs = self.tpool.submit(self.handle, conn)
self._wrap_future(fs, conn)
fs.add_done_callback(
lambda fut: self.method_queue.defer(self.finish_request, conn, fut))
def accept(self, server, listener):
def accept(self, listener):
"""Accept a new connection from a listener socket."""
try:
sock, client = listener.accept()
# initialize the connection object
conn = TConn(self.cfg, sock, client, server)
client_sock, client_addr = listener.accept()
self.nr_conns += 1
# wait until socket is readable
with self._lock:
self.poller.register(conn.sock, selectors.EVENT_READ,
partial(self.on_client_socket_readable, conn))
client_sock.setblocking(True)
conn = TConn(self.cfg, client_sock, client_addr, listener.getsockname())
# Submit directly to thread pool for processing
self.enqueue_req(conn)
except OSError as e:
if e.errno not in (errno.EAGAIN, errno.ECONNABORTED,
errno.EWOULDBLOCK):
if e.errno not in (errno.EAGAIN, errno.ECONNABORTED, errno.EWOULDBLOCK):
raise
def on_client_socket_readable(self, conn, client):
with self._lock:
# unregister the client from the poller
self.poller.unregister(client)
"""Handle a keepalive connection becoming readable."""
self.poller.unregister(client)
self.keepalived_conns.remove(conn)
if conn.initialized:
# remove the connection from keepalive
try:
self._keep.remove(conn)
except ValueError:
# race condition
return
# submit the connection to a worker
# Submit to thread pool for processing
self.enqueue_req(conn)
def murder_keepalived(self):
now = time.time()
while True:
with self._lock:
try:
# remove the connection from the queue
conn = self._keep.popleft()
except IndexError:
break
"""Close expired keepalive connections."""
now = time.monotonic()
while self.keepalived_conns:
conn = self.keepalived_conns[0]
delta = conn.timeout - now
if delta > 0:
# add the connection back to the queue
with self._lock:
self._keep.appendleft(conn)
break
else:
self.nr_conns -= 1
# remove the socket from the poller
with self._lock:
try:
self.poller.unregister(conn.sock)
except OSError as e:
if e.errno != errno.EBADF:
raise
except KeyError:
# already removed by the system, continue
pass
except ValueError:
# already removed by the system continue
pass
# close the socket
conn.close()
# Connection has timed out
self.keepalived_conns.popleft()
try:
self.poller.unregister(conn.sock)
except (OSError, KeyError, ValueError):
pass # Already unregistered
self.nr_conns -= 1
conn.close()
def is_parent_alive(self):
# If our parent changed then we shut down.
@ -190,100 +264,103 @@ class ThreadWorker(base.Worker):
return False
return True
def wait_for_and_dispatch_events(self, timeout):
"""Wait for events and dispatch callbacks."""
try:
events = self.poller.select(timeout)
for key, _ in events:
callback = key.data
callback(key.fileobj)
except OSError as e:
if e.errno != errno.EINTR:
raise
def run(self):
# init listeners, add them to the event loop
for sock in self.sockets:
sock.setblocking(False)
# a race condition during graceful shutdown may make the listener
# name unavailable in the request handler so capture it once here
server = sock.getsockname()
acceptor = partial(self.accept, server)
self.poller.register(sock, selectors.EVENT_READ, acceptor)
# Register the method queue with the poller
self.poller.register(self.method_queue.fileno(),
selectors.EVENT_READ,
self.method_queue.run_callbacks)
# Start accepting connections
self.set_accept_enabled(True)
while self.alive:
# notify the arbiter we are alive
# Notify the arbiter we are alive
self.notify()
# can we accept more connections?
if self.nr_conns < self.worker_connections:
# wait for an event
events = self.poller.select(1.0)
for key, _ in events:
callback = key.data
callback(key.fileobj)
# Check if we can accept more connections
can_accept = self.nr_conns < self.worker_connections
if can_accept != self._accepting:
self.set_accept_enabled(can_accept)
# check (but do not wait) for finished requests
result = futures.wait(self.futures, timeout=0,
return_when=futures.FIRST_COMPLETED)
else:
# wait for a request to finish
result = futures.wait(self.futures, timeout=1.0,
return_when=futures.FIRST_COMPLETED)
# clean up finished requests
for fut in result.done:
self.futures.remove(fut)
# Wait for events (unified event loop - no futures.wait())
self.wait_for_and_dispatch_events(timeout=1.0)
if not self.is_parent_alive():
break
# handle keepalive timeouts
# Handle keepalive timeouts
self.murder_keepalived()
self.tpool.shutdown(False)
# Graceful shutdown: stop accepting but handle existing connections
self.set_accept_enabled(False)
# Wait for in-flight connections within grace period
graceful_timeout = time.monotonic() + self.cfg.graceful_timeout
while self.nr_conns > 0:
time_remaining = max(graceful_timeout - time.monotonic(), 0)
if time_remaining == 0:
break
self.wait_for_and_dispatch_events(timeout=time_remaining)
self.murder_keepalived()
# Cleanup
self.tpool.shutdown(wait=False)
self.poller.close()
self.method_queue.close()
for s in self.sockets:
s.close()
futures.wait(self.futures, timeout=self.cfg.graceful_timeout)
def finish_request(self, fs):
if fs.cancelled():
self.nr_conns -= 1
fs.conn.close()
return
def finish_request(self, conn, fs):
"""Handle completion of a request (called via method_queue on main thread)."""
try:
(keepalive, conn) = fs.result()
# if the connection should be kept alived add it
# to the eventloop and record it
keepalive = not fs.cancelled() and fs.result()
if keepalive and self.alive:
# flag the socket as non blocked
# Put connection back in the poller for keepalive
conn.sock.setblocking(False)
# register the connection
conn.set_timeout()
with self._lock:
self._keep.append(conn)
# add the socket to the event loop
self.poller.register(conn.sock, selectors.EVENT_READ,
partial(self.on_client_socket_readable, conn))
self.keepalived_conns.append(conn)
self.poller.register(conn.sock, selectors.EVENT_READ,
partial(self.on_client_socket_readable, conn))
else:
self.nr_conns -= 1
conn.close()
except Exception:
# an exception happened, make sure to close the
# socket.
self.nr_conns -= 1
fs.conn.close()
conn.close()
def handle(self, conn):
keepalive = False
"""Handle a request on a connection. Runs in a worker thread."""
req = None
try:
# Initialize connection in worker thread to handle SSL errors gracefully
# (ENOTCONN from ssl_wrap_socket would crash main thread otherwise)
conn.init()
req = next(conn.parser)
if not req:
return (False, conn)
return False
# handle the request
# Handle the request
keepalive = self.handle_request(req, conn)
if keepalive:
return (keepalive, conn)
# Discard any unread request body before keepalive
# to prevent socket appearing readable due to leftover bytes
conn.parser.finish_body()
return True
except http.errors.NoMoreData as e:
self.log.debug("Ignored premature client disconnection. %s", e)
except StopIteration as e:
self.log.debug("Closing connection. %s", e)
except ssl.SSLError as e:
@ -293,7 +370,6 @@ class ThreadWorker(base.Worker):
else:
self.log.debug("Error processing SSL request.")
self.handle_error(req, conn.sock, conn.client, e)
except OSError as e:
if e.errno not in (errno.EPIPE, errno.ECONNRESET, errno.ENOTCONN):
self.log.exception("Socket error processing request.")
@ -307,7 +383,7 @@ class ThreadWorker(base.Worker):
except Exception as e:
self.handle_error(req, conn.sock, conn.client, e)
return (False, conn)
return False
def handle_request(self, req, conn):
environ = {}
@ -327,7 +403,7 @@ class ThreadWorker(base.Worker):
if not self.alive or not self.cfg.keepalive:
resp.force_close()
elif len(self._keep) >= self.max_keepalived:
elif len(self.keepalived_conns) >= self.max_keepalived:
resp.force_close()
respiter = self.wsgi(environ, resp.start_response)

1284
tests/test_gthread.py Normal file

File diff suppressed because it is too large Load Diff