Some refactoring work.

Remoed alot of code before starting to add more. Trimmed as much as
possible to get to the point that I understood what was going on in each
place.

Split the server code into multiple objects to help my sanity. Arbiter
is now the main class in the master process that keeps track of child
processes and so on and such forth.

The Worker class is responsible for handling incoming requests from the
server socket passed to its constructor.
This commit is contained in:
Paul J. Davis 2009-12-03 01:13:38 -05:00
parent 701c88c019
commit 8be0226763
11 changed files with 305 additions and 619 deletions

34
bin/gunicorn Executable file
View File

@ -0,0 +1,34 @@
#! /usr/bin/env python
import optparse as op
import os
import sys
sys.path.insert(0, os.getcwd())
import gunicorn
__usage__ = "%prog [OPTIONS] APP_MODULE"
def options():
return [
op.make_option('--host', dest='host', default='127.0.0.1',
help='Host to listen on. [%default]'),
op.make_option('--port', dest='port', default=8000, type='int',
help='Port to listen on. [%default]'),
op.make_option('--workers', dest='workers', default=1, type='int',
help='Number of workers to spawn. [%default]')
]
def main():
parser = op.OptionParser(usage=__usage__, option_list=options())
opts, args = parser.parse_args()
if len(args) != 1:
parser.error("No application module specified.")
arbiter = gunicorn.Arbiter((opts.host, opts.port), opts.workers, args[0])
arbiter.run()
if __name__ == '__main__':
main()

View File

@ -15,4 +15,6 @@
# limitations under the License.
__version__ = "0.1"
__version__ = "0.1"
from arbiter import Arbiter

190
gunicorn/arbiter.py Normal file
View File

@ -0,0 +1,190 @@
import errno
import fcntl
import logging
import os
import select
import signal
import socket
import sys
import time
from worker import Worker
logging.basicConfig(level=logging.DEBUG, format="%(levelname)s: %(message)s")
log = logging.getLogger(__name__)
class Arbiter(object):
LISTENER = None
WORKERS = {}
PIPE = []
# I love dyanmic languages
SIG_QUEUE = []
SIGNALS = map(
lambda x: getattr(signal, "SIG%s" % x),
"WINCH CHLD QUIT INT TERM USR1 USR2 HUP TTIN TTOU".split()
)
SIG_NAMES = dict(
(getattr(signal, name), name) for name in dir(signal)
if name[:3] == "SIG"
)
def __init__(self, address, num_workers, modname):
log.info("Booting Arbiter.")
self.address = address
self.num_workers = num_workers
self.modname = modname
self.pid = os.getpid()
self.init_signals()
self.listen(self.address)
def init_signals(self):
if self.PIPE:
map(lambda p: p.close(), self.PIPE)
self.PIPE = pair = os.pipe()
map(self.set_non_blocking, pair)
map(lambda p: fcntl.fcntl(p, fcntl.F_SETFD, fcntl.FD_CLOEXEC), pair)
map(lambda s: signal.signal(s, self.signal), self.SIGNALS)
def set_non_blocking(self, fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK
fcntl.fcntl(fd, fcntl.F_SETFL, flags)
def signal(self, sig, frame):
if len(self.SIG_QUEUE) < 5:
self.SIG_QUEUE.append(sig)
else:
log.warn("Ignoring rapid signaling: %s" % sig)
# Wake up the arbiter
try:
os.write(self.PIPE[1], '.')
except IOError, e:
if e.errno not in [errno.EAGAIN, errno.EINTR]:
raise
def sleep(self):
try:
ready = select.select([self.PIPE[0]], [], [], 1)
if not ready[0]:
return
while os.read(self.PIPE[0], 1):
pass
except select.error, e:
if e[0] not in [errno.EAGAIN, errno.EINTR]:
raise
except OSError, e:
if e.errno not in [errno.EAGAIN, errno.EINTR]:
raise
def listen(self, addr):
for i in range(5):
try:
sock = self.init_socket(addr)
self.LISTENER = sock
break
except socket.error, e:
if e[0] == errno.EADDRINUSE:
log.error("Connection in use: %s" % str(addr))
if i < 5:
log.error("Retrying in 1 second.")
time.sleep(1)
def init_socket(self, address):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(0)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
sock.bind(address)
sock.listen(64)
return sock
def run(self):
self.manage_workers()
while True:
try:
sig = self.SIG_QUEUE.pop(0) if len(self.SIG_QUEUE) else None
if sig is not None:
log.info("SIGNAL: %s" % self.SIG_NAMES.get(sig, "Unknown"))
if sig is None:
self.sleep()
elif sig is signal.SIGINT:
self.kill_workers(signal.SIGINT)
sys.exit(1)
elif sig is signal.SIGQUIT:
self.kill_workers(signal.SIGTERM)
sys.exit(0)
else:
name = self.SIG_NAMES.get(sig, "UNKNOWN")
log.warn("IGNORED: %s" % name)
self.reap_workers()
self.manage_workers()
except KeyboardInterrupt:
self.kill_workers(signal.SIGTERM)
sys.exit()
except Exception, e:
self.kill_workers(signal.SIGTERM)
log.exception("Unhandled exception in main loop.")
sys.exit()
def manage_workers(self):
if len(self.WORKERS.keys()) < self.num_workers:
self.spawn_workers()
for pid, w in self.WORKERS.items():
if w.id >= self.num_workers:
self.kill_worker(pid, signal.SIGQUIT)
def spawn_workers(self):
workers = set(w.id for w in self.WORKERS.values())
for i in range(self.num_workers):
if i in workers:
continue
worker = Worker(i, self.LISTENER, self.modname)
pid = os.fork()
if pid != 0:
self.WORKERS[pid] = worker
continue
# Process Child
try:
log.info("Worker %s booting" % os.getpid())
worker.run()
log.info("Worker %s exiting" % os.getpid())
sys.exit(0)
except SystemExit:
pass
except:
log.exception("Exception in worker process.")
sys.exit(-1)
finally:
log.info("Done.")
def reap_workers(self):
try:
while True:
wpid, status = os.waitpid(-1, os.WNOHANG)
if not wpid:
break
worker = self.WORKERS.pop(wpid)
if not worker:
continue
worker.tmp.close()
except OSError, e:
if e.errno == errno.ECHILD:
pass
def kill_workers(self, sig):
for pid in self.WORKERS.keys():
self.kill_worker(pid, sig)
def kill_worker(self, pid, sig):
worker = self.WORKERS.pop(pid)
try:
os.kill(pid, sig)
finally:
worker.tmp.close()

View File

@ -1,24 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright 2008,2009 Benoit Chesneau <benoitc@e-engura.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
class RequestError(Exception):
def __init__(self, status_code, reason):
self.status_code = status_code
self.reason = reason
Exception.__init__(self, (status_code, reason))

View File

@ -1,280 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright 2008,2009 Benoit Chesneau <benoitc@e-engura.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
import StringIO
import sys
from urllib import unquote
from gunicorn import __version__
from gunicorn.errors import RequestError
NORMALIZE_SPACE = re.compile(r'(?:\r\n)?[ \t]+')
def _normalize_name(name):
return ["-".join([w.capitalize() for w in name.split("-")])]
class HTTPRequest(object):
SERVER_VERSION = "gunicorn/%s" % __version__
CHUNK_SIZE = 4096
def __init__(self, socket, client_address, server_address):
self.socket = socket
self.client_address = client_address
self.server_address = server_address
self.version = None
self.method = None
self.path = None
self.headers = {}
self.response_status = None
self.response_headers = {}
self._version = 11
self.fp = socket.makefile("rw", self.CHUNK_SIZE)
def read(self):
# get status line
self.first_line(self.fp.readline())
# read headers
self.read_headers()
if "?" in self.path:
path_info, query = self.path.split('?', 1)
else:
path_info = self.path
query = ""
length = self.body_length()
if not length:
wsgi_input = StringIO.StringIO()
elif length == "chunked":
length, wsgi_input = self.decode_chunked()
else:
wsgi_input = FileInput(self)
environ = {
"wsgi.url_scheme": 'http',
"wsgi.input": wsgi_input,
"wsgi.errors": sys.stderr,
"wsgi.version": (1, 0),
"wsgi.multithread": False,
"wsgi.multiprocess": True,
"wsgi.run_once": False,
"SCRIPT_NAME": "",
"SERVER_SOFTWARE": self.SERVER_VERSION,
"REQUEST_METHOD": self.method,
"PATH_INFO": unquote(path_info),
"QUERY_STRING": query,
"RAW_URI": self.path,
"CONTENT_TYPE": self.headers.get('content-type', ''),
"CONTENT_LENGTH": length,
"REMOTE_ADDR": self.client_address[0],
"REMOTE_PORT": self.client_address[1],
"SERVER_NAME": self.server_address[0],
"SERVER_PORT": self.server_address[1],
"SERVER_PROTOCOL": self.version
}
for key, value in self.headers.items():
key = 'HTTP_' + key.replace('-', '_')
if key not in ('HTTP_CONTENT_TYPE', 'HTTP_CONTENT_LENGTH'):
environ[key] = value
return environ
def read_headers(self):
hname = ""
while True:
line = self.fp.readline()
if line == "\r\n":
# end of headers
break
if line == "\t":
# It's a continuation line.
self.headers[hname] += line.strip()
else:
try:
hname =self.parse_header(line)
except ValueError:
# bad headers
pass
def body_length(self):
transfert_encoding = self.headers.get('TRANSFERT-ENCODING')
content_length = self.headers.get('CONTENT-LENGTH')
if transfert_encoding is None:
if content_length is None:
return None
return content_length
elif transfert_encoding == "chunked":
return "chunked"
else:
return None
def should_close(self):
return self.version < 10 or self.headers.get('CONNECTION') == "close" \
or (self.version == 10 and self.headers.get('CONNECTION') != "Keep-Alive")
def decode_chunked(self):
"""Decode the 'chunked' transfer coding."""
length = 0
data = StringIO.StringIO()
while True:
line = self.fp.readline().strip().split(";", 1)
chunk_size = int(line.pop(0), 16)
if chunk_size <= 0:
break
length += chunk_size
data.write(self.fp.read(chunk_size))
crlf = self.fp.read(2)
if crlf != "\r\n":
raise RequestError((400, "Bad chunked transfer coding "
"(expected '\\r\\n', got %r)" % crlf))
return
# Grab any trailer headers
self.read_headers()
data.seek(0)
return data, str(length) or ""
def start_response(self, status, response_headers):
resp_head = []
self.response_status = status
self.response_headers = {}
resp_head.append("%s %s" % (self.version, status))
for name, value in response_headers:
resp_head.append("%s: %s" % (name, value))
self.response_headers[name.lower()] = value
self.fp.write("%s\r\n\r\n" % "\r\n".join(resp_head))
def write(self, data):
self.fp.write(data)
def close(self):
self.fp.close()
if self.should_close():
self.socket.close()
def first_line(self, line):
method, path, version = line.split(" ")
self.version = version.strip()
self.method = method.upper()
self.path = path
def parse_header(self, line):
name, value = line.split(": ", 1)
name = name.upper()
self.headers[name] = value.strip()
return name
class FileInput(object):
def __init__(self, req):
self.length = req.body_length()
self.fp = req.fp
self.eof = False
def close(self):
self.eof = False
def read(self, amt=None):
if self.fp is None or self.eof:
return ''
if amt is None:
# unbounded read
s = self._safe_read(self.length)
self.close() # we read everything
return s
if amt > self.length:
amt = self.length
s = self.fp.read(amt)
self.length -= len(s)
if not self.length:
self.close()
return s
def readline(self, size=None):
if self.fp is None or self.eof:
return ''
if size is not None:
data = self.fp.readline(size)
else:
# User didn't specify a size ...
# We read the line in chunks to make sure it's not a 100MB line !
# cherrypy trick
res = []
while True:
data = self.fp.readline(256)
res.append(data)
if len(data) < 256 or data[-1:] == "\n":
data = ''.join(res)
break
self.length -= len(data)
if not self.length:
self.close()
return data
def readlines(self, sizehint=0):
# Shamelessly stolen from StringIO
total = 0
lines = []
line = self.readline()
while line:
lines.append(line)
total += len(line)
if 0 < sizehint <= total:
break
line = self.readline()
return lines
def _safe_read(self, amt):
"""Read the number of bytes requested, compensating for partial reads.
"""
s = []
while amt > 0:
chunk = self.fp.read(amt)
if not chunk:
raise RequestError(500, "Incomplete read %s" % s)
s.append(chunk)
amt -= len(chunk)
return ''.join(s)
def __iter__(self):
return self
def next(self):
if self.eof:
raise StopIteration()
return self.readline()

View File

@ -1,35 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright 2008,2009 Benoit Chesneau <benoitc@e-engura.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
class HTTPResponse(object):
def __init__(self, req, data):
self.req = req
self.data = data
self.headers = self.req.response_headers or {}
self.fp = req.fp
def write(self, data):
self.fp.write(data)
def send(self):
if not self.data: return
for chunk in self.data:
self.write(chunk)

View File

@ -1,208 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright 2008,2009 Benoit Chesneau <benoitc@e-engura.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import errno
import fcntl
import logging
import os
import select
import signal
import socket
import sys
import time
from gunicorn.httprequest import HTTPRequest
from gunicorn.httpresponse import HTTPResponse
from gunicorn import socketserver
from gunicorn.util import NullHandler
class Worker(object):
def __init__(self, nr, tmp):
self.nr = nr
self.tmp = tmp
class HTTPServer(object):
LISTENERS = []
PIPE = []
WORKERS = {}
def __init__(self, app, worker_processes, timeout=60, init_listeners=[],
pidfile=None, logging_handler=None, **opts):
self.opts = opts
self.app = app
self.timeout = timeout
self.pidfile = pidfile
self.worker_processes = worker_processes
if logging_handler is None:
logging_handler = NullHandler()
self.logger = logging.getLogger("gunicorn")
self.logger.setLevel(logging.INFO)
self.logger.addHandler(logging_handler)
# start to listen
self.init_listeners = init_listeners
if not self.init_listeners:
self.init_listeners = [(('localhost', 8000), {})]
for address, opts in self.init_listeners:
self.listen(address, opts)
self.master_pid = os.getpid()
self.maintain_worker_count()
def listen(self, addr, opts):
"""start to listen"""
tries = self.opts.get('tries', 5)
delay = self.opts.get('delay', 0.5)
for i in range(tries):
try:
sock = socketserver.TCPServer(addr, **opts)
self.LISTENERS.append(sock)
except socket.error, e:
if e[0] == errno.EADDRINUSE:
self.logger.error("adding listener failed address: %s" % str(addr))
if i < tries:
self.logger.error("retrying in %s seconds." % str(delay))
time.sleep(delay)
break
def run(self):
# this pipe will be used to wake up the master when signal occurs
self.init_pipe()
respawn = True
while True:
try:
self.reap_workers()
while True:
ready = select.select([self.PIPE[0]], [], [], 1.0)
if ready[0]: break
if respawn: self.maintain_worker_count()
except Exception, e:
self.logger.error("Unhandled exception [%s]" % str(e))
sys.exit()
except KeyboardInterrupt:
self.kill_workers(signal.SIGQUIT)
sys.exit()
def reap_workers(self):
try:
while True:
wpid, status = os.waitpid(-1, os.WNOHANG)
if not wpid: break
if wpid in self.WORKERS:
self.WORKERS[wpid].tmp.close()
del self.WORKERS[wpid]
except errno.ECHILD:
pass
def process_client(self, listener, conn, addr):
""" do nothing just echo message"""
req = HTTPRequest(conn, addr, listener.getsockname())
result = self.app(req.read(), req.start_response)
response = HTTPResponse(req, result)
response.send()
req.close()
def worker_loop(self, worker_pid, worker):
ppid = self.master_pid
alive = worker.tmp.fileno()
m = 0
ready = self.LISTENERS
while alive:
try:
m = 0 if m == 1 else 1
os.fchmod(alive, m)
for sock in ready:
try:
self.process_client(sock, *sock.accept_nonblock())
time.sleep(0.1)
m = 0 if m == 1 else 1
os.fchmod(alive, m)
except IOError, e:
if e.errno == errno.EAGAIN or e.errno == errno.ECONNABORTED:
pass
if ppid != os.getppid(): return
m = 0 if m == 1 else 1
os.fchmod(alive, m)
while True:
ret = select.select(self.LISTENERS, [], [], 2.0)
if ret[0]:
ready = ret[0]
break
except KeyboardInterrupt:
sys.exit()
except Exception, e:
self.logger.error("Unhandled exception in worker %s [%s]" % (worker_pid, e))
def kill_workers(self, sig):
"""kill all workers with signal sig """
for pid in self.WORKERS.keys():
self.kill_worker(pid, sig)
def kill_worker(self, pid, sig):
""" kill one worker with signal """
worker = self.WORKERS[pid]
try:
os.kill(pid, sig)
finally:
worker.tmp.close()
del self.WORKERS[pid]
def spawn_missing_workers(self):
workers_nr = [w.nr for w in self.WORKERS.values()]
for i in range(self.worker_processes):
if i in workers_nr:
continue
else:
pid = os.fork()
if pid == 0:
worker_pid = os.getpid()
worker = Worker(i, os.tmpfile())
self.WORKERS[worker_pid] = worker
self.worker_loop(worker_pid, worker)
def maintain_worker_count(self):
if len(self.WORKERS.keys()) < self.worker_processes:
self.spawn_missing_workers()
for pid, w in self.WORKERS.items():
if w.nr >= self.worker_processes:
self.kill_worker(pid, signal.SIGQUIT)
def init_pipe(self):
if self.PIPE:
[os.close(fileno) for fileno in self.PIPE]
self.PIPE = os.pipe()
[fcntl.fcntl(io, fcntl.F_SETFD, fcntl.FD_CLOEXEC) for io in self.PIPE]

View File

@ -1,48 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright 2008,2009 Benoit Chesneau <benoitc@e-engura.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import socket
class Socket(socket.socket):
def accept_nonblock(self):
conn, addr = self.accept()
conn.setblocking(0)
return (conn, addr)
class TCPServer(Socket):
"""class for server-side TCP sockets.
This is wrapper around socket.socket class"""
def __init__(self, address, **opts):
self.backlog = opts.get('backlog', 1024)
self.timeout = opts.get('timeout', 300)
self.reuseaddr = opts.get('reuseaddr', True)
self.nodelay = opts.get('nodelay', True)
socket.socket.__init__(self, socket.AF_INET, socket.SOCK_STREAM)
self.bind(address)
self.listen(self.backlog)
# set options
self.settimeout(self.timeout)
self.setblocking(0)
if self.reuseaddr:
self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if self.nodelay:
self.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)

View File

@ -1,22 +1,15 @@
# -*- coding: utf-8 -*-
#
# Copyright 2008,2009 Benoit Chesneau <benoitc@e-engura.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
class NullHandler(logging.Handler):
""" null log handler """
def emit(self, record):
pass
def import_mod(module):
parts = module.rsplit(":", 1)
if len(parts) == 1:
module, obj = module, "application"
else:
module, obj = parts[0], parts[1]
mod = __import__(module)
parts = module.split(".")
for p in parts[1:]:
mod = getattr(mod, p, None)
if mod is None:
raise ImportError("Failed to import: %s" % module)
return mod

59
gunicorn/worker.py Normal file
View File

@ -0,0 +1,59 @@
import logging
import os
import select
import signal
import util
log = logging.getLogger(__name__)
class Worker(object):
SIGNALS = map(
lambda x: getattr(signal, "SIG%s" % x),
"WINCH QUIT INT TERM USR1 USR2 HUP TTIN TTOU".split()
)
def __init__(self, workerid, socket, module):
self.id = workerid
self.socket = socket
self.tmp = os.tmpfile()
self.module = util.import_mod(module)
def init_signals(self):
map(lambda s: signal.signal(s, signal.SIG_DFL), self.SIGNALS)
def run(self):
self.init_signals()
while True:
# Wait for a request to handle.
while True:
ret = select.select([self.socket], [], [], 2.0)
if ret[0]:
break
(conn, addr) = self.socket.accept()
log.info("Client connected: %s:%s" % addr)
conn.setblocking(1)
if not self.handle(conn, addr):
log.info("Client requested process recycle.")
return
def handle(self, conn, client):
fp = conn.makefile()
line = fp.readline()
while line:
log.info("Received: %s" % line.strip())
if line.strip().startswith("q"):
log.info("Client disconnected.")
conn.close()
return True
elif line.strip().startswith("k"):
log.info("Client disconnected.")
conn.close()
return False
else:
fp.write(line)
fp.flush()
line = fp.readline()

View File

@ -1,4 +1,7 @@
from gunicorn.httpserver import HTTPServer
from gunicorn.httpserver import WSGIServer
def simple_app(environ, start_response):
@ -9,5 +12,5 @@ def simple_app(environ, start_response):
return ['Hello world!\n']
if __name__ == '__main__':
server = HTTPServer(simple_app, 4)
server = WSGIServer(("127.0.0.1", 8000), 1, simple_app)
server.run()