diff --git a/gunicorn/workers/sync.py b/gunicorn/workers/sync.py index 12a50f87..42f0c68e 100644 --- a/gunicorn/workers/sync.py +++ b/gunicorn/workers/sync.py @@ -21,13 +21,38 @@ from gunicorn import six class SyncWorker(base.Worker): - def run(self): - # self.socket appears to lose its blocking status after - # we fork in the arbiter. Reset it here. - for s in self.sockets: - s.setblocking(0) + def accept(self, listener): + client, addr = listener.accept() + client.setblocking(1) + util.close_on_exec(client) + self.handle(listener, client, addr) - ready = self.sockets + def wait(self, timeout): + try: + self.notify() + ret = select.select(self.sockets, [], self.PIPE, timeout) + if ret[0]: + return ret[0] + + except select.error as e: + if e.args[0] == errno.EINTR: + return self.sockets + if e.args[0] == errno.EBADF: + if self.nr < 0: + return self.sockets + else: + return False + raise + + def is_parent_alive(self): + # If our parent changed then we shut down. + if self.ppid != os.getppid(): + self.log.info("Parent changed, shutting down: %s", self) + return False + return True + + def run_for_one(self, timeout): + listener = self.sockets[0] while self.alive: self.notify() @@ -35,51 +60,57 @@ class SyncWorker(base.Worker): # that no connection is waiting we fall down to the # select which is where we'll wait for a bit for new # workers to come give us some love. + try: + self.accept(listener) + # Keep processing clients until no one is waiting. This + # prevents the need to select() for every client that we + # process. + continue - for sock in ready: + except socket.error as e: + if e.args[0] not in (errno.EAGAIN, errno.ECONNABORTED, + errno.EWOULDBLOCK): + raise + + if not self.is_parent_alive(): + return + + if not self.wait(timeout): + return + + def run_for_multiple(self, timeout): + while self.alive: + self.notify() + + ready = self.wait(timeout) + if not ready: + return + + for listener in ready: try: - client, addr = sock.accept() - client.setblocking(1) - util.close_on_exec(client) - self.handle(sock, client, addr) - - # Keep processing clients until no one is waiting. This - # prevents the need to select() for every client that we - # process. - continue - + self.accept(listener) except socket.error as e: if e.args[0] not in (errno.EAGAIN, errno.ECONNABORTED, errno.EWOULDBLOCK): raise - # If our parent changed then we shut down. - if self.ppid != os.getppid(): - self.log.info("Parent changed, shutting down: %s", self) + if not self.is_parent_alive(): return - try: - self.notify() + def run(self): + # if no timeout is given the worker will never wait and will + # use the CPU for nothing. This minimal timeout prevent it. + timeout = self.timeout or 0.5 - # if no timeout is given the worker will never wait and will - # use the CPU for nothing. This minimal timeout prevent it. - timeout = self.timeout or 0.5 + # self.socket appears to lose its blocking status after + # we fork in the arbiter. Reset it here. + for s in self.sockets: + s.setblocking(0) - ret = select.select(self.sockets, [], self.PIPE, timeout) - if ret[0]: - ready = ret[0] - continue - except select.error as e: - if e.args[0] == errno.EINTR: - ready = self.sockets - continue - if e.args[0] == errno.EBADF: - if self.nr < 0: - ready = self.sockets - continue - else: - return - raise + if len(self.sockets) > 1: + self.run_for_multiple(timeout) + else: + self.run_for_one(timeout) def handle(self, listener, client, addr): req = None