From 21d3f7ccd9cddb3ae5d95501779ad18db2a4d523 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Thu, 3 Dec 2009 03:03:47 +0100 Subject: [PATCH] working concurrent version. But I think I will rewrite the worker loop to react on events write/read for sockets so I could have better concurrency. Also i'm not sure that the sleep() is enough. http://www.friendpaste.com/1KVIOzMJGxm0yjv1qprkbf --- gunicorn/httprequest.py | 8 ++- gunicorn/httpserver.py | 117 +++++++++++++-------------------------- gunicorn/socketserver.py | 25 +++------ gunicorn/util.py | 2 +- test.py | 2 +- 5 files changed, 57 insertions(+), 97 deletions(-) diff --git a/gunicorn/httprequest.py b/gunicorn/httprequest.py index 166d65be..e1d34913 100644 --- a/gunicorn/httprequest.py +++ b/gunicorn/httprequest.py @@ -129,6 +129,11 @@ class HTTPRequest(object): return "chunked" else: return None + + def should_close(self): + return self.version < 10 or self.headers.get('CONNECTION') == "close" \ + or (self.version == 10 and self.headers.get('CONNECTION') != "Keep-Alive") + def decode_chunked(self): """Decode the 'chunked' transfer coding.""" @@ -168,7 +173,8 @@ class HTTPRequest(object): def close(self): self.fp.close() - self.socket.close() + if self.should_close(): + self.socket.close() def first_line(self, line): method, path, version = line.split(" ") diff --git a/gunicorn/httpserver.py b/gunicorn/httpserver.py index df4597de..37f80cd4 100644 --- a/gunicorn/httpserver.py +++ b/gunicorn/httpserver.py @@ -30,17 +30,15 @@ from gunicorn import socketserver from gunicorn.util import NullHandler class Worker(object): - + def __init__(self, nr, tmp): self.nr = nr self.tmp = tmp - def __eq__(self, v): - return self.nr == v class HTTPServer(object): - LISTENERS = {} + LISTENERS = [] PIPE = [] @@ -64,27 +62,23 @@ class HTTPServer(object): self.init_listeners = init_listeners if not self.init_listeners: self.init_listeners = [(('localhost', 8000), {})] - + for address, opts in self.init_listeners: self.listen(address, opts) self.master_pid = os.getpid() self.maintain_worker_count() - self.join() - - #def start(self): - - def listen(self, addr, opts): + """start to listen""" tries = self.opts.get('tries', 5) delay = self.opts.get('delay', 0.5) for i in range(tries): try: sock = socketserver.TCPServer(addr, **opts) - self.LISTENERS[sock.fileno()] = sock + self.LISTENERS.append(sock) except socket.error, e: if e[0] == errno.EADDRINUSE: self.logger.error("adding listener failed address: %s" % str(addr)) @@ -93,13 +87,18 @@ class HTTPServer(object): time.sleep(delay) break - def join(self): + def run(self): # this pipe will be used to wake up the master when signal occurs self.init_pipe() + respawn = True while True: try: self.reap_workers() - self.master_sleep() + while True: + ready = select.select([self.PIPE[0]], [], [], 1.0) + if ready[0]: break + + if respawn: self.maintain_worker_count() except Exception, e: self.logger.error("Unhandled exception [%s]" % str(e)) sys.exit() @@ -107,49 +106,19 @@ class HTTPServer(object): self.kill_workers(signal.SIGQUIT) sys.exit() - - - def master_sleep(self): - while True: - ready = select.select([self.PIPE[0]], [], [], 1.0) - if ready and ready[0]: break - try: - while True: - data = os.read(self.PIPE[0], 4096) - if len(data) < 4096: return - except errno.EAGAIN, errno.EINTR: - pass - def reap_workers(self): try: while True: wpid, status = os.waitpid(-1, os.WNOHANG) - if not wpid: - break - try: + if not wpid: break + if wpid in self.WORKERS: self.WORKERS[wpid].tmp.close() - except: - pass - del self.WORKERS[wpid] + del self.WORKERS[wpid] except errno.ECHILD: pass - - def init_worker_process(self, worker): - self.init_pipe() - for w in self.WORKERS.values(): - if w.nr != worker.nr: - try: - w.tmp.close() - except: - pass - self.WORKERS = {} - [fcntl.fcntl(fileno, fcntl.F_SETFD, fcntl.FD_CLOEXEC) for fileno in self.LISTENERS.keys()] - fcntl.fcntl(worker.tmp.fileno(), fcntl.F_SETFD, fcntl.FD_CLOEXEC) - - + def process_client(self, listener, conn, addr): - """ do nothing just echo message""" - fcntl.fcntl(conn.fileno(), fcntl.F_SETFD, fcntl.FD_CLOEXEC) + """ do nothing just echo message""" req = HTTPRequest(conn, addr, listener.getsockname()) result = self.app(req.read(), req.start_response) response = HTTPResponse(req, result) @@ -158,48 +127,40 @@ class HTTPServer(object): def worker_loop(self, worker_pid, worker): ppid = self.master_pid - self.init_worker_process(worker) alive = worker.tmp.fileno() + m = 0 - ready = self.LISTENERS.keys() - try: - while alive: + ready = self.LISTENERS + + while alive: + try: m = 0 if m == 1 else 1 os.fchmod(alive, m) - - for fileno in ready: - sock = self.LISTENERS[fileno] + + for sock in ready: try: - self.process_client(sock, *sock.accept()) + self.process_client(sock, *sock.accept_nonblock()) + time.sleep(0.1) m = 0 if m == 1 else 1 os.fchmod(alive, m) except errno.EAGAIN, errno.ECONNABORTED: pass - except Exception, e: - print >>sys.stderr, str(e) - if ppid != os.getppid(): return m = 0 if m == 1 else 1 os.fchmod(alive, m) while True: - try: - fd_sets = select.select(self.LISTENERS.keys(), [], self.PIPE, 0.2) - if fd_sets and fd_sets[0]: - ready = [fd_sets[0]] - break - except errno.EINTR: - print >>sys.stderr, "mmm" - ready = self.LISTENERS.keys() - except Exception, e: - self.logger.error("Unhandled exception in worker %s [%s]" % (worker_pid, str(e))) - pass - - except KeyboardInterrupt: - sys.exit() - except Exception, e: - self.logger.error("Unhandled exception in worker %s [%s]" % (worker_pid, e)) + ret = select.select(self.LISTENERS, [], [], 2.0) + if ret[0]: + ready = ret[0] + break + + + except KeyboardInterrupt: + sys.exit() + except Exception, e: + self.logger.error("Unhandled exception in worker %s [%s]" % (worker_pid, e)) def kill_workers(self, sig): """kill all workers with signal sig """ @@ -228,11 +189,11 @@ class HTTPServer(object): worker = Worker(i, os.tmpfile()) self.WORKERS[worker_pid] = worker self.worker_loop(worker_pid, worker) - else: - continue + + def maintain_worker_count(self): - if (len(self.WORKERS.keys()) - self.worker_processes) < 0: + if len(self.WORKERS.keys()) < self.worker_processes: self.spawn_missing_workers() for pid, w in self.WORKERS.items(): diff --git a/gunicorn/socketserver.py b/gunicorn/socketserver.py index cea4963b..9700848e 100644 --- a/gunicorn/socketserver.py +++ b/gunicorn/socketserver.py @@ -18,9 +18,9 @@ import socket class Socket(socket.socket): def accept_nonblock(self): - sock, addr = self.accept() - sock.setblocking(0) - return (sock, addr) + conn, addr = self.accept() + conn.setblocking(0) + return (conn, addr) class TCPServer(Socket): @@ -28,28 +28,21 @@ class TCPServer(Socket): This is wrapper around socket.socket class""" def __init__(self, address, **opts): - self.address = address self.backlog = opts.get('backlog', 1024) self.timeout = opts.get('timeout', 300) self.reuseaddr = opts.get('reuseaddr', True) self.nodelay = opts.get('nodelay', True) - self.recbuf = opts.get('recbuf', 8192) socket.socket.__init__(self, socket.AF_INET, socket.SOCK_STREAM) + + self.bind(address) + self.listen(self.backlog) + # set options + self.settimeout(self.timeout) + self.setblocking(0) if self.reuseaddr: self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) if self.nodelay: self.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - - if self.recbuf: - self.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, - self.recbuf) - - self.settimeout(self.timeout) - self.bind(address) - self.listen() - - def listen(self): - super(TCPServer, self).listen(self.backlog) \ No newline at end of file diff --git a/gunicorn/util.py b/gunicorn/util.py index 08330017..f2b94dfa 100644 --- a/gunicorn/util.py +++ b/gunicorn/util.py @@ -19,4 +19,4 @@ import logging class NullHandler(logging.Handler): """ null log handler """ def emit(self, record): - pass + pass \ No newline at end of file diff --git a/test.py b/test.py index 492b1367..bd681cb1 100644 --- a/test.py +++ b/test.py @@ -10,4 +10,4 @@ def simple_app(environ, start_response): if __name__ == '__main__': server = HTTPServer(simple_app, 4) - server.join() \ No newline at end of file + server.run() \ No newline at end of file