Implementation of the max-requests feature.

Works on sync and eventlet works. Doesn't work on gevent_pywsig or
gevent_wsgi workers as we don't control their main loops. Tornado
workers appear to be broken.

Worst of all, this causes vanilla gevent workers to segfault. I'm
waiting to see if there's a known issue before considering what to
do next. Worst case we could refuse to run with the bad combination
of settings.
This commit is contained in:
Paul J. Davis 2010-08-27 21:24:59 -04:00 committed by benoitc
parent 8d089f95f9
commit 7e4ca4b809
5 changed files with 43 additions and 16 deletions

View File

@ -321,6 +321,25 @@ class WorkerConnections(Setting):
This setting only affects the Eventlet and Gevent worker types. This setting only affects the Eventlet and Gevent worker types.
""" """
class MaxRequests(Setting):
name = "max_requests"
section = "Worker Processes"
cli = ["--max-requests"]
meta = "INT"
validator = validate_pos_int
type = "int"
default = 0
desc = """\
The maximum number of requests a worker will process before restarting.
Any value greater than zero will limit the number of requests a work
will process before automatically restarting. This is a simple method
to help limit the damage of memory leaks.
If this is set to zero (the default) then the automatic worker
restarts are disabled.
"""
class Timeout(Setting): class Timeout(Setting):
name = "timeout" name = "timeout"
section = "Worker Processes" section = "Worker Processes"

View File

@ -115,9 +115,13 @@ class Response(object):
self.version = SERVER_VERSION self.version = SERVER_VERSION
self.status = None self.status = None
self.chunked = False self.chunked = False
self.should_close = req.should_close()
self.headers = [] self.headers = []
self.headers_sent = False self.headers_sent = False
def force_close(self):
self.should_close = True
def start_response(self, status, headers, exc_info=None): def start_response(self, status, headers, exc_info=None):
if exc_info: if exc_info:
try: try:
@ -151,7 +155,7 @@ class Response(object):
def default_headers(self): def default_headers(self):
connection = "keep-alive" connection = "keep-alive"
if self.req.should_close(): if self.should_close:
connection = "close" connection = "close"
return [ return [

View File

@ -29,15 +29,13 @@ class AsyncWorker(base.Worker):
try: try:
parser = http.RequestParser(client) parser = http.RequestParser(client)
try: try:
while True: while self.alive:
req = None req = None
with self.timeout_ctx(): with self.timeout_ctx():
req = parser.next() req = parser.next()
if not req: if not req:
break break
self.handle_request(req, client, addr) self.handle_request(req, client, addr)
except StopIteration: except StopIteration:
pass pass
except socket.error, e: except socket.error, e:
@ -64,8 +62,12 @@ class AsyncWorker(base.Worker):
try: try:
debug = self.cfg.debug or False debug = self.cfg.debug or False
self.cfg.pre_request(self, req) self.cfg.pre_request(self, req)
resp, environ = wsgi.create(req, sock, addr, self.address, resp, environ = wsgi.create(req, sock, addr, self.address, self.cfg)
self.cfg) self.nr += 1
if self.nr >= self.max_requests:
self.log.info("Autorestarting worker after current request.")
resp.force_close()
self.alive = False
respiter = self.wsgi(environ, resp.start_response) respiter = self.wsgi(environ, resp.start_response)
if respiter == ALREADY_HANDLED: if respiter == ALREADY_HANDLED:
return False return False

View File

@ -37,6 +37,7 @@ class Worker(object):
self.booted = False self.booted = False
self.nr = 0 self.nr = 0
self.max_requests = cfg.max_requests or sys.maxint
self.alive = True self.alive = True
self.spinner = 0 self.spinner = 0
self.log = logging.getLogger(__name__) self.log = logging.getLogger(__name__)

View File

@ -18,14 +18,11 @@ import gunicorn.workers.base as base
class SyncWorker(base.Worker): class SyncWorker(base.Worker):
def run(self): def run(self):
self.nr = 0
# self.socket appears to lose its blocking status after # self.socket appears to lose its blocking status after
# we fork in the arbiter. Reset it here. # we fork in the arbiter. Reset it here.
self.socket.setblocking(0) self.socket.setblocking(0)
while self.alive: while self.alive:
self.nr = 0
self.notify() self.notify()
# Accept a connection. If we get an error telling us # Accept a connection. If we get an error telling us
@ -37,17 +34,16 @@ class SyncWorker(base.Worker):
client.setblocking(1) client.setblocking(1)
util.close_on_exec(client) util.close_on_exec(client)
self.handle(client, addr) self.handle(client, addr)
self.nr += 1
# Keep processing clients until no one is waiting. This
# prevents the need to select() for every client that we
# process.
continue
except socket.error, e: except socket.error, e:
if e[0] not in (errno.EAGAIN, errno.ECONNABORTED): if e[0] not in (errno.EAGAIN, errno.ECONNABORTED):
raise raise
# Keep processing clients until no one is waiting. This
# prevents the need to select() for every client that we
# process.
if self.nr > 0:
continue
# If our parent changed then we shut down. # If our parent changed then we shut down.
if self.ppid != os.getppid(): if self.ppid != os.getppid():
self.log.info("Parent changed, shutting down: %s" % self) self.log.info("Parent changed, shutting down: %s" % self)
@ -97,6 +93,11 @@ class SyncWorker(base.Worker):
self.cfg.pre_request(self, req) self.cfg.pre_request(self, req)
resp, environ = wsgi.create(req, client, addr, resp, environ = wsgi.create(req, client, addr,
self.address, self.cfg) self.address, self.cfg)
self.nr += 1
if self.nr >= self.max_requests:
self.log.info("Autorestarting worker after current request.")
resp.force_close()
self.alive = False
respiter = self.wsgi(environ, resp.start_response) respiter = self.wsgi(environ, resp.start_response)
for item in respiter: for item in respiter:
resp.write(item) resp.write(item)