fix bug, now cores/cpus are used correctly

This commit is contained in:
Benoit Chesneau 2009-12-02 17:39:16 +01:00
parent b9468919e6
commit 90e87c6f0d
4 changed files with 99 additions and 79 deletions

View File

@ -153,11 +153,11 @@ class HTTPRequest(object):
return data, str(length) or "" return data, str(length) or ""
def write(self, data): def write(self, data):
self.f.write(data) self.fp.write(data)
def close(self, data): def close(self):
self.fp.close() self.fp.close()
self.conn.close() self.socket.close()
def first_line(self, line): def first_line(self, line):
method, path, version = line.split(" ") method, path, version = line.split(" ")
@ -173,7 +173,7 @@ class HTTPRequest(object):
class InputFile(object): class FileInput(object):
def __init__(self, req): def __init__(self, req):
self.length = req.body_length() self.length = req.body_length()
@ -244,9 +244,9 @@ class InputFile(object):
""" """
s = [] s = []
while amt > 0: while amt > 0:
chunk = self.fp.read(min(amt, MAXAMOUNT)) chunk = self.fp.read(amt)
if not chunk: if not chunk:
raise IncompleteRead(s) raise RequestError(500, "Incomplete read %s" % s)
s.append(chunk) s.append(chunk)
amt -= len(chunk) amt -= len(chunk)
return ''.join(s) return ''.join(s)

View File

@ -22,7 +22,6 @@ import select
import signal import signal
import socket import socket
import sys import sys
import tempfile
import time import time
from gunicorn.httprequest import HTTPRequest from gunicorn.httprequest import HTTPRequest
@ -40,7 +39,7 @@ class Worker(object):
class HTTPServer(object): class HTTPServer(object):
LISTENERS = [] LISTENERS = {}
PIPE = [] PIPE = []
@ -63,13 +62,18 @@ class HTTPServer(object):
# start to listen # start to listen
self.init_listeners = init_listeners self.init_listeners = init_listeners
if not self.init_listeners: if not self.init_listeners:
self.init_listeners = [(('localhost', 8000), {})] self.init_listeners = [(('localhost', 8000), {})]
for address, opts in self.init_listeners: for address, opts in self.init_listeners:
self.listen(address, opts) self.listen(address, opts)
self.master_pid = os.getpid() self.master_pid = os.getpid()
self.maintain_worker_count() self.maintain_worker_count()
self.join()
#def start(self):
def listen(self, addr, opts): def listen(self, addr, opts):
@ -79,7 +83,7 @@ class HTTPServer(object):
for i in range(tries): for i in range(tries):
try: try:
sock = socketserver.TCPServer(addr, **opts) sock = socketserver.TCPServer(addr, **opts)
self.LISTENERS.append(sock) self.LISTENERS[sock.fileno()] = sock
except socket.error, e: except socket.error, e:
if e[0] == errno.EADDRINUSE: if e[0] == errno.EADDRINUSE:
self.logger.error("adding listener failed address: %s" % str(addr)) self.logger.error("adding listener failed address: %s" % str(addr))
@ -91,15 +95,13 @@ class HTTPServer(object):
def join(self): def join(self):
# this pipe will be used to wake up the master when signal occurs # this pipe will be used to wake up the master when signal occurs
self.init_pipe() self.init_pipe()
respawn = True
while True: while True:
try: try:
#if respawn: self.reap_workers()
#self.maintain_worker_count()
os.waitpid(-1, os.WNOHANG)
self.master_sleep() self.master_sleep()
except Exception, e: except Exception, e:
self.logger.error("Unhandled exception [%s]" % str(e)) self.logger.error("Unhandled exception [%s]" % str(e))
sys.exit()
except KeyboardInterrupt: except KeyboardInterrupt:
self.kill_workers(signal.SIGQUIT) self.kill_workers(signal.SIGQUIT)
sys.exit() sys.exit()
@ -108,7 +110,7 @@ class HTTPServer(object):
def master_sleep(self): def master_sleep(self):
while True: while True:
ready = select.select([self.PIPE[0]], [], [], 1) ready = select.select([self.PIPE[0]], [], [], 1.0)
if ready and ready[0]: break if ready and ready[0]: break
try: try:
while True: while True:
@ -117,16 +119,30 @@ class HTTPServer(object):
except errno.EAGAIN, errno.EINTR: except errno.EAGAIN, errno.EINTR:
pass pass
def reap_workers(self):
try:
while True:
wpid, status = os.waitpid(-1, os.WNOHANG)
if not wpid:
break
try:
self.WORKERS[wpid].tmp.close()
except:
pass
del self.WORKERS[wpid]
except errno.ECHILD:
pass
def init_worker_process(self, worker): def init_worker_process(self, worker):
for w in self.WORKERS: self.init_pipe()
if w != worker: for w in self.WORKERS.values():
if w.nr != worker.nr:
try: try:
w.tmp.close() w.tmp.close()
except: except:
continue pass
else: self.WORKERS = {}
continue [fcntl.fcntl(fileno, fcntl.F_SETFD, fcntl.FD_CLOEXEC) for fileno in self.LISTENERS.keys()]
[fcntl.fcntl(sock.fileno(), fcntl.F_SETFD, fcntl.FD_CLOEXEC) for sock in self.LISTENERS]
fcntl.fcntl(worker.tmp.fileno(), fcntl.F_SETFD, fcntl.FD_CLOEXEC) fcntl.fcntl(worker.tmp.fileno(), fcntl.F_SETFD, fcntl.FD_CLOEXEC)
@ -134,53 +150,51 @@ class HTTPServer(object):
""" do nothing just echo message""" """ do nothing just echo message"""
req = HTTPRequest(conn, addr, listener.getsockname()) req = HTTPRequest(conn, addr, listener.getsockname())
environ = req.read() environ = req.read()
req.write(str(environ)) req.write(str(environ))
req.close() req.close()
def worker_loop(self, worker): def worker_loop(self, worker_pid, worker):
pid = os.fork() ppid = self.master_pid
self.init_worker_process(worker)
alive = worker.tmp.fileno()
m = 0
ready = self.LISTENERS.keys()
try:
while alive:
m = 0 if m == 1 else 1
os.fchmod(alive, m)
if pid == 0: for fileno in ready:
worker_pid = os.getpid() sock = self.LISTENERS[fileno]
yield worker_pid try:
self.init_worker_process(worker) self.process_client(sock, *sock.accept())
alive = worker.tmp.fileno() except errno.EAGAIN, errno.ECONNABORTED:
m = 0 pass
ready = self.LISTENERS
try:
while alive:
m = 0 if m == 1 else 1
os.fchmod(alive, m)
for sock in ready:
try:
self.process_client(sock, *sock.accept_nonblock())
except errno.EAGAIN, errno.ECONNABORTED:
pass
m = 0 if m == 1 else 1
os.fchmod(alive, m)
m = 0 if m == 1 else 1 m = 0 if m == 1 else 1
os.fchmod(alive, m) os.fchmod(alive, m)
while True: if ppid != os.getppid(): return
try: m = 0 if m == 1 else 1
fd_sets = select.select(self.LISTENERS, [], self.PIPE, self.timeout) os.fchmod(alive, m)
if fd_sets and fd_sets[0]:
ready = [fd_sets[0]]
break
except errno.EINTR:
ready = self.LISTENERS
except Exception, e:
self.logger.error("Unhandled exception in worker %s [%s]" % (worker_pid, e))
pass
except KeyboardInterrupt: while True:
sys.exit() try:
except Exception, e: fd_sets = select.select(self.LISTENERS.keys(), [], self.PIPE, 0.2)
self.logger.error("Unhandled exception in worker %s [%s]" % (worker_pid, e)) if fd_sets and fd_sets[0]:
ready = [fd_sets[0]]
break
except errno.EINTR:
print >>sys.stderr, "mmm"
ready = self.LISTENERS.keys()
except Exception, e:
self.logger.error("Unhandled exception in worker %s [%s]" % (worker_pid, str(e)))
pass
except KeyboardInterrupt:
sys.exit()
except Exception, e:
self.logger.error("Unhandled exception in worker %s [%s]" % (worker_pid, e))
def kill_workers(self, sig): def kill_workers(self, sig):
"""kill all workers with signal sig """ """kill all workers with signal sig """
@ -198,13 +212,19 @@ class HTTPServer(object):
def spawn_missing_workers(self): def spawn_missing_workers(self):
workers_nr = [w.nr for w in self.WORKERS.values()]
for i in range(self.worker_processes): for i in range(self.worker_processes):
if i in self.WORKERS.values(): if i in workers_nr:
continue continue
else:
worker = Worker(i, os.tmpfile()) pid = os.fork()
for worker_pid in self.worker_loop(worker): if pid == 0:
self.WORKERS[worker_pid] = worker worker_pid = os.getpid()
worker = Worker(i, os.tmpfile())
self.WORKERS[worker_pid] = worker
self.worker_loop(worker_pid, worker)
else:
continue
def maintain_worker_count(self): def maintain_worker_count(self):
if (len(self.WORKERS.keys()) - self.worker_processes) < 0: if (len(self.WORKERS.keys()) - self.worker_processes) < 0:
@ -216,6 +236,6 @@ class HTTPServer(object):
def init_pipe(self): def init_pipe(self):
if self.PIPE: if self.PIPE:
[io.close() for io in self.PIPE] [os.close(fileno) for fileno in self.PIPE]
self.PIPE = os.pipe() self.PIPE = os.pipe()
[fcntl.fcntl(io, fcntl.F_SETFD, fcntl.FD_CLOEXEC) for io in self.PIPE] [fcntl.fcntl(io, fcntl.F_SETFD, fcntl.FD_CLOEXEC) for io in self.PIPE]

View File

@ -16,7 +16,13 @@
import socket import socket
class TCPServer(socket.socket): class Socket(socket.socket):
def accept_nonblock(self):
self.setblocking(0)
return self.accept()
class TCPServer(Socket):
"""class for server-side TCP sockets. """class for server-side TCP sockets.
This is wrapper around socket.socket class""" This is wrapper around socket.socket class"""
@ -46,10 +52,3 @@ class TCPServer(socket.socket):
def listen(self): def listen(self):
super(TCPServer, self).listen(self.backlog) super(TCPServer, self).listen(self.backlog)
def accept(self):
return super(TCPServer, self).accept()
def accept_nonblock(self):
self.setblocking(0)
return self.accept()

View File

@ -1,4 +1,5 @@
from gunicorn.httpserver import HTTPServer from gunicorn.httpserver import HTTPServer
if __name__ == '__main__': if __name__ == '__main__':
server = HTTPServer(None, 2).join() server = HTTPServer(None, 4)
server.join()