optimize the sync worker

This change optimize the sync worker when we only have to listen on one
interface. While I'm here, I fixed a long and unnoticed outstanding issue when
we were accepting on multiple interfaces (wonder if someone really use it), at
some point soe interfaces were skipped.
This commit is contained in:
benoitc 2014-10-25 11:58:28 +02:00
parent c152ce0dd0
commit 4c601ce447

View File

@ -21,13 +21,38 @@ from gunicorn import six
class SyncWorker(base.Worker): class SyncWorker(base.Worker):
def run(self): def accept(self, listener):
# self.socket appears to lose its blocking status after client, addr = listener.accept()
# we fork in the arbiter. Reset it here. client.setblocking(1)
for s in self.sockets: util.close_on_exec(client)
s.setblocking(0) self.handle(listener, client, addr)
ready = self.sockets def wait(self, timeout):
try:
self.notify()
ret = select.select(self.sockets, [], self.PIPE, timeout)
if ret[0]:
return ret[0]
except select.error as e:
if e.args[0] == errno.EINTR:
return self.sockets
if e.args[0] == errno.EBADF:
if self.nr < 0:
return self.sockets
else:
return False
raise
def is_parent_alive(self):
# If our parent changed then we shut down.
if self.ppid != os.getppid():
self.log.info("Parent changed, shutting down: %s", self)
return False
return True
def run_for_one(self, timeout):
listener = self.sockets[0]
while self.alive: while self.alive:
self.notify() self.notify()
@ -35,51 +60,57 @@ class SyncWorker(base.Worker):
# that no connection is waiting we fall down to the # that no connection is waiting we fall down to the
# select which is where we'll wait for a bit for new # select which is where we'll wait for a bit for new
# workers to come give us some love. # workers to come give us some love.
try:
self.accept(listener)
# Keep processing clients until no one is waiting. This
# prevents the need to select() for every client that we
# process.
continue
for sock in ready: except socket.error as e:
if e.args[0] not in (errno.EAGAIN, errno.ECONNABORTED,
errno.EWOULDBLOCK):
raise
if not self.is_parent_alive():
return
if not self.wait(timeout):
return
def run_for_multiple(self, timeout):
while self.alive:
self.notify()
ready = self.wait(timeout)
if not ready:
return
for listener in ready:
try: try:
client, addr = sock.accept() self.accept(listener)
client.setblocking(1)
util.close_on_exec(client)
self.handle(sock, client, addr)
# Keep processing clients until no one is waiting. This
# prevents the need to select() for every client that we
# process.
continue
except socket.error as e: except socket.error as e:
if e.args[0] not in (errno.EAGAIN, errno.ECONNABORTED, if e.args[0] not in (errno.EAGAIN, errno.ECONNABORTED,
errno.EWOULDBLOCK): errno.EWOULDBLOCK):
raise raise
# If our parent changed then we shut down. if not self.is_parent_alive():
if self.ppid != os.getppid():
self.log.info("Parent changed, shutting down: %s", self)
return return
try: def run(self):
self.notify() # if no timeout is given the worker will never wait and will
# use the CPU for nothing. This minimal timeout prevent it.
timeout = self.timeout or 0.5
# if no timeout is given the worker will never wait and will # self.socket appears to lose its blocking status after
# use the CPU for nothing. This minimal timeout prevent it. # we fork in the arbiter. Reset it here.
timeout = self.timeout or 0.5 for s in self.sockets:
s.setblocking(0)
ret = select.select(self.sockets, [], self.PIPE, timeout) if len(self.sockets) > 1:
if ret[0]: self.run_for_multiple(timeout)
ready = ret[0] else:
continue self.run_for_one(timeout)
except select.error as e:
if e.args[0] == errno.EINTR:
ready = self.sockets
continue
if e.args[0] == errno.EBADF:
if self.nr < 0:
ready = self.sockets
continue
else:
return
raise
def handle(self, listener, client, addr): def handle(self, listener, client, addr):
req = None req = None