working concurrent version. But I think I will rewrite the worker loop

to react on events write/read for sockets so I could  have better
concurrency. Also i'm not sure that the sleep() is enough.

http://www.friendpaste.com/1KVIOzMJGxm0yjv1qprkbf
This commit is contained in:
Benoit Chesneau 2009-12-03 03:03:47 +01:00
parent 9b3704e395
commit 21d3f7ccd9
5 changed files with 57 additions and 97 deletions

View File

@ -129,6 +129,11 @@ class HTTPRequest(object):
return "chunked"
else:
return None
def should_close(self):
return self.version < 10 or self.headers.get('CONNECTION') == "close" \
or (self.version == 10 and self.headers.get('CONNECTION') != "Keep-Alive")
def decode_chunked(self):
"""Decode the 'chunked' transfer coding."""
@ -168,7 +173,8 @@ class HTTPRequest(object):
def close(self):
self.fp.close()
self.socket.close()
if self.should_close():
self.socket.close()
def first_line(self, line):
method, path, version = line.split(" ")

View File

@ -30,17 +30,15 @@ from gunicorn import socketserver
from gunicorn.util import NullHandler
class Worker(object):
def __init__(self, nr, tmp):
self.nr = nr
self.tmp = tmp
def __eq__(self, v):
return self.nr == v
class HTTPServer(object):
LISTENERS = {}
LISTENERS = []
PIPE = []
@ -64,27 +62,23 @@ class HTTPServer(object):
self.init_listeners = init_listeners
if not self.init_listeners:
self.init_listeners = [(('localhost', 8000), {})]
for address, opts in self.init_listeners:
self.listen(address, opts)
self.master_pid = os.getpid()
self.maintain_worker_count()
self.join()
#def start(self):
def listen(self, addr, opts):
"""start to listen"""
tries = self.opts.get('tries', 5)
delay = self.opts.get('delay', 0.5)
for i in range(tries):
try:
sock = socketserver.TCPServer(addr, **opts)
self.LISTENERS[sock.fileno()] = sock
self.LISTENERS.append(sock)
except socket.error, e:
if e[0] == errno.EADDRINUSE:
self.logger.error("adding listener failed address: %s" % str(addr))
@ -93,13 +87,18 @@ class HTTPServer(object):
time.sleep(delay)
break
def join(self):
def run(self):
# this pipe will be used to wake up the master when signal occurs
self.init_pipe()
respawn = True
while True:
try:
self.reap_workers()
self.master_sleep()
while True:
ready = select.select([self.PIPE[0]], [], [], 1.0)
if ready[0]: break
if respawn: self.maintain_worker_count()
except Exception, e:
self.logger.error("Unhandled exception [%s]" % str(e))
sys.exit()
@ -107,49 +106,19 @@ class HTTPServer(object):
self.kill_workers(signal.SIGQUIT)
sys.exit()
def master_sleep(self):
while True:
ready = select.select([self.PIPE[0]], [], [], 1.0)
if ready and ready[0]: break
try:
while True:
data = os.read(self.PIPE[0], 4096)
if len(data) < 4096: return
except errno.EAGAIN, errno.EINTR:
pass
def reap_workers(self):
try:
while True:
wpid, status = os.waitpid(-1, os.WNOHANG)
if not wpid:
break
try:
if not wpid: break
if wpid in self.WORKERS:
self.WORKERS[wpid].tmp.close()
except:
pass
del self.WORKERS[wpid]
del self.WORKERS[wpid]
except errno.ECHILD:
pass
def init_worker_process(self, worker):
self.init_pipe()
for w in self.WORKERS.values():
if w.nr != worker.nr:
try:
w.tmp.close()
except:
pass
self.WORKERS = {}
[fcntl.fcntl(fileno, fcntl.F_SETFD, fcntl.FD_CLOEXEC) for fileno in self.LISTENERS.keys()]
fcntl.fcntl(worker.tmp.fileno(), fcntl.F_SETFD, fcntl.FD_CLOEXEC)
def process_client(self, listener, conn, addr):
""" do nothing just echo message"""
fcntl.fcntl(conn.fileno(), fcntl.F_SETFD, fcntl.FD_CLOEXEC)
""" do nothing just echo message"""
req = HTTPRequest(conn, addr, listener.getsockname())
result = self.app(req.read(), req.start_response)
response = HTTPResponse(req, result)
@ -158,48 +127,40 @@ class HTTPServer(object):
def worker_loop(self, worker_pid, worker):
ppid = self.master_pid
self.init_worker_process(worker)
alive = worker.tmp.fileno()
m = 0
ready = self.LISTENERS.keys()
try:
while alive:
ready = self.LISTENERS
while alive:
try:
m = 0 if m == 1 else 1
os.fchmod(alive, m)
for fileno in ready:
sock = self.LISTENERS[fileno]
for sock in ready:
try:
self.process_client(sock, *sock.accept())
self.process_client(sock, *sock.accept_nonblock())
time.sleep(0.1)
m = 0 if m == 1 else 1
os.fchmod(alive, m)
except errno.EAGAIN, errno.ECONNABORTED:
pass
except Exception, e:
print >>sys.stderr, str(e)
if ppid != os.getppid(): return
m = 0 if m == 1 else 1
os.fchmod(alive, m)
while True:
try:
fd_sets = select.select(self.LISTENERS.keys(), [], self.PIPE, 0.2)
if fd_sets and fd_sets[0]:
ready = [fd_sets[0]]
break
except errno.EINTR:
print >>sys.stderr, "mmm"
ready = self.LISTENERS.keys()
except Exception, e:
self.logger.error("Unhandled exception in worker %s [%s]" % (worker_pid, str(e)))
pass
except KeyboardInterrupt:
sys.exit()
except Exception, e:
self.logger.error("Unhandled exception in worker %s [%s]" % (worker_pid, e))
ret = select.select(self.LISTENERS, [], [], 2.0)
if ret[0]:
ready = ret[0]
break
except KeyboardInterrupt:
sys.exit()
except Exception, e:
self.logger.error("Unhandled exception in worker %s [%s]" % (worker_pid, e))
def kill_workers(self, sig):
"""kill all workers with signal sig """
@ -228,11 +189,11 @@ class HTTPServer(object):
worker = Worker(i, os.tmpfile())
self.WORKERS[worker_pid] = worker
self.worker_loop(worker_pid, worker)
else:
continue
def maintain_worker_count(self):
if (len(self.WORKERS.keys()) - self.worker_processes) < 0:
if len(self.WORKERS.keys()) < self.worker_processes:
self.spawn_missing_workers()
for pid, w in self.WORKERS.items():

View File

@ -18,9 +18,9 @@ import socket
class Socket(socket.socket):
def accept_nonblock(self):
sock, addr = self.accept()
sock.setblocking(0)
return (sock, addr)
conn, addr = self.accept()
conn.setblocking(0)
return (conn, addr)
class TCPServer(Socket):
@ -28,28 +28,21 @@ class TCPServer(Socket):
This is wrapper around socket.socket class"""
def __init__(self, address, **opts):
self.address = address
self.backlog = opts.get('backlog', 1024)
self.timeout = opts.get('timeout', 300)
self.reuseaddr = opts.get('reuseaddr', True)
self.nodelay = opts.get('nodelay', True)
self.recbuf = opts.get('recbuf', 8192)
socket.socket.__init__(self, socket.AF_INET, socket.SOCK_STREAM)
self.bind(address)
self.listen(self.backlog)
# set options
self.settimeout(self.timeout)
self.setblocking(0)
if self.reuseaddr:
self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if self.nodelay:
self.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
if self.recbuf:
self.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF,
self.recbuf)
self.settimeout(self.timeout)
self.bind(address)
self.listen()
def listen(self):
super(TCPServer, self).listen(self.backlog)

View File

@ -19,4 +19,4 @@ import logging
class NullHandler(logging.Handler):
""" null log handler """
def emit(self, record):
pass
pass

View File

@ -10,4 +10,4 @@ def simple_app(environ, start_response):
if __name__ == '__main__':
server = HTTPServer(simple_app, 4)
server.join()
server.run()