From 90e87c6f0d1bee5173ab998491b5314c394028e9 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Wed, 2 Dec 2009 17:39:16 +0100 Subject: [PATCH] fix bug, now cores/cpus are used correctly --- gunicorn/httprequest.py | 12 ++-- gunicorn/httpserver.py | 146 ++++++++++++++++++++++----------------- gunicorn/socketserver.py | 17 +++-- test.py | 3 +- 4 files changed, 99 insertions(+), 79 deletions(-) diff --git a/gunicorn/httprequest.py b/gunicorn/httprequest.py index 8ace551d..c6c8cca5 100644 --- a/gunicorn/httprequest.py +++ b/gunicorn/httprequest.py @@ -153,11 +153,11 @@ class HTTPRequest(object): return data, str(length) or "" def write(self, data): - self.f.write(data) + self.fp.write(data) - def close(self, data): + def close(self): self.fp.close() - self.conn.close() + self.socket.close() def first_line(self, line): method, path, version = line.split(" ") @@ -173,7 +173,7 @@ class HTTPRequest(object): -class InputFile(object): +class FileInput(object): def __init__(self, req): self.length = req.body_length() @@ -244,9 +244,9 @@ class InputFile(object): """ s = [] while amt > 0: - chunk = self.fp.read(min(amt, MAXAMOUNT)) + chunk = self.fp.read(amt) if not chunk: - raise IncompleteRead(s) + raise RequestError(500, "Incomplete read %s" % s) s.append(chunk) amt -= len(chunk) return ''.join(s) diff --git a/gunicorn/httpserver.py b/gunicorn/httpserver.py index 05d546bc..b4a5b2f0 100644 --- a/gunicorn/httpserver.py +++ b/gunicorn/httpserver.py @@ -22,7 +22,6 @@ import select import signal import socket import sys -import tempfile import time from gunicorn.httprequest import HTTPRequest @@ -40,7 +39,7 @@ class Worker(object): class HTTPServer(object): - LISTENERS = [] + LISTENERS = {} PIPE = [] @@ -63,13 +62,18 @@ class HTTPServer(object): # start to listen self.init_listeners = init_listeners if not self.init_listeners: - self.init_listeners = [(('localhost', 8000), {})] + self.init_listeners = [(('localhost', 8000), {})] + for address, opts in self.init_listeners: self.listen(address, opts) self.master_pid = os.getpid() self.maintain_worker_count() + self.join() + + #def start(self): + def listen(self, addr, opts): @@ -79,7 +83,7 @@ class HTTPServer(object): for i in range(tries): try: sock = socketserver.TCPServer(addr, **opts) - self.LISTENERS.append(sock) + self.LISTENERS[sock.fileno()] = sock except socket.error, e: if e[0] == errno.EADDRINUSE: self.logger.error("adding listener failed address: %s" % str(addr)) @@ -91,15 +95,13 @@ class HTTPServer(object): def join(self): # this pipe will be used to wake up the master when signal occurs self.init_pipe() - respawn = True while True: try: - #if respawn: - #self.maintain_worker_count() - os.waitpid(-1, os.WNOHANG) + self.reap_workers() self.master_sleep() except Exception, e: self.logger.error("Unhandled exception [%s]" % str(e)) + sys.exit() except KeyboardInterrupt: self.kill_workers(signal.SIGQUIT) sys.exit() @@ -108,7 +110,7 @@ class HTTPServer(object): def master_sleep(self): while True: - ready = select.select([self.PIPE[0]], [], [], 1) + ready = select.select([self.PIPE[0]], [], [], 1.0) if ready and ready[0]: break try: while True: @@ -116,17 +118,31 @@ class HTTPServer(object): if len(data) < 4096: return except errno.EAGAIN, errno.EINTR: pass + + def reap_workers(self): + try: + while True: + wpid, status = os.waitpid(-1, os.WNOHANG) + if not wpid: + break + try: + self.WORKERS[wpid].tmp.close() + except: + pass + del self.WORKERS[wpid] + except errno.ECHILD: + pass def init_worker_process(self, worker): - for w in self.WORKERS: - if w != worker: + self.init_pipe() + for w in self.WORKERS.values(): + if w.nr != worker.nr: try: w.tmp.close() except: - continue - else: - continue - [fcntl.fcntl(sock.fileno(), fcntl.F_SETFD, fcntl.FD_CLOEXEC) for sock in self.LISTENERS] + pass + self.WORKERS = {} + [fcntl.fcntl(fileno, fcntl.F_SETFD, fcntl.FD_CLOEXEC) for fileno in self.LISTENERS.keys()] fcntl.fcntl(worker.tmp.fileno(), fcntl.F_SETFD, fcntl.FD_CLOEXEC) @@ -134,53 +150,51 @@ class HTTPServer(object): """ do nothing just echo message""" req = HTTPRequest(conn, addr, listener.getsockname()) environ = req.read() - req.write(str(environ)) req.close() - def worker_loop(self, worker): - pid = os.fork() - - if pid == 0: - worker_pid = os.getpid() - yield worker_pid - self.init_worker_process(worker) - alive = worker.tmp.fileno() - m = 0 - ready = self.LISTENERS - try: - while alive: + def worker_loop(self, worker_pid, worker): + ppid = self.master_pid + self.init_worker_process(worker) + alive = worker.tmp.fileno() + m = 0 + ready = self.LISTENERS.keys() + try: + while alive: + m = 0 if m == 1 else 1 + os.fchmod(alive, m) + + for fileno in ready: + sock = self.LISTENERS[fileno] + try: + self.process_client(sock, *sock.accept()) + except errno.EAGAIN, errno.ECONNABORTED: + pass + m = 0 if m == 1 else 1 os.fchmod(alive, m) - - for sock in ready: - try: - self.process_client(sock, *sock.accept_nonblock()) - except errno.EAGAIN, errno.ECONNABORTED: - pass - - m = 0 if m == 1 else 1 - os.fchmod(alive, m) - - m = 0 if m == 1 else 1 - os.fchmod(alive, m) - while True: - try: - fd_sets = select.select(self.LISTENERS, [], self.PIPE, self.timeout) - if fd_sets and fd_sets[0]: - ready = [fd_sets[0]] - break - except errno.EINTR: - ready = self.LISTENERS - except Exception, e: - self.logger.error("Unhandled exception in worker %s [%s]" % (worker_pid, e)) - pass - - except KeyboardInterrupt: - sys.exit() - except Exception, e: - self.logger.error("Unhandled exception in worker %s [%s]" % (worker_pid, e)) + if ppid != os.getppid(): return + m = 0 if m == 1 else 1 + os.fchmod(alive, m) + + while True: + try: + fd_sets = select.select(self.LISTENERS.keys(), [], self.PIPE, 0.2) + if fd_sets and fd_sets[0]: + ready = [fd_sets[0]] + break + except errno.EINTR: + print >>sys.stderr, "mmm" + ready = self.LISTENERS.keys() + except Exception, e: + self.logger.error("Unhandled exception in worker %s [%s]" % (worker_pid, str(e))) + pass + + except KeyboardInterrupt: + sys.exit() + except Exception, e: + self.logger.error("Unhandled exception in worker %s [%s]" % (worker_pid, e)) def kill_workers(self, sig): """kill all workers with signal sig """ @@ -198,24 +212,30 @@ class HTTPServer(object): def spawn_missing_workers(self): + workers_nr = [w.nr for w in self.WORKERS.values()] for i in range(self.worker_processes): - if i in self.WORKERS.values(): + if i in workers_nr: continue - - worker = Worker(i, os.tmpfile()) - for worker_pid in self.worker_loop(worker): - self.WORKERS[worker_pid] = worker + else: + pid = os.fork() + if pid == 0: + worker_pid = os.getpid() + worker = Worker(i, os.tmpfile()) + self.WORKERS[worker_pid] = worker + self.worker_loop(worker_pid, worker) + else: + continue def maintain_worker_count(self): if (len(self.WORKERS.keys()) - self.worker_processes) < 0: self.spawn_missing_workers() - + for pid, w in self.WORKERS.items(): if w.nr >= self.worker_processes: self.kill_worker(pid, signal.SIGQUIT) def init_pipe(self): if self.PIPE: - [io.close() for io in self.PIPE] + [os.close(fileno) for fileno in self.PIPE] self.PIPE = os.pipe() [fcntl.fcntl(io, fcntl.F_SETFD, fcntl.FD_CLOEXEC) for io in self.PIPE] \ No newline at end of file diff --git a/gunicorn/socketserver.py b/gunicorn/socketserver.py index a6838a88..7c0f955c 100644 --- a/gunicorn/socketserver.py +++ b/gunicorn/socketserver.py @@ -16,7 +16,13 @@ import socket -class TCPServer(socket.socket): +class Socket(socket.socket): + def accept_nonblock(self): + self.setblocking(0) + return self.accept() + + +class TCPServer(Socket): """class for server-side TCP sockets. This is wrapper around socket.socket class""" @@ -45,11 +51,4 @@ class TCPServer(socket.socket): self.listen() def listen(self): - super(TCPServer, self).listen(self.backlog) - - def accept(self): - return super(TCPServer, self).accept() - - def accept_nonblock(self): - self.setblocking(0) - return self.accept() \ No newline at end of file + super(TCPServer, self).listen(self.backlog) \ No newline at end of file diff --git a/test.py b/test.py index 40e9433d..7ab67bcb 100644 --- a/test.py +++ b/test.py @@ -1,4 +1,5 @@ from gunicorn.httpserver import HTTPServer if __name__ == '__main__': - server = HTTPServer(None, 2).join() \ No newline at end of file + server = HTTPServer(None, 4) + server.join() \ No newline at end of file