Refactoring the worker loop.

Accidentally got a bit carried away.
This commit is contained in:
Paul J. Davis 2010-01-31 17:45:26 -05:00
parent ae0416619f
commit 9e717b8f9c
8 changed files with 122 additions and 131 deletions

View File

@ -111,7 +111,6 @@ class Arbiter(object):
return return
raise raise
def init_signals(self): def init_signals(self):
if self.PIPE: if self.PIPE:
map(lambda p: p.close(), self.PIPE) map(lambda p: p.close(), self.PIPE)
@ -128,7 +127,6 @@ class Arbiter(object):
else: else:
self.log.warn("Ignoring rapid signaling: %s" % sig) self.log.warn("Ignoring rapid signaling: %s" % sig)
def listen(self, addr): def listen(self, addr):
if 'GUNICORN_FD' in os.environ: if 'GUNICORN_FD' in os.environ:
fd = int(os.environ['GUNICORN_FD']) fd = int(os.environ['GUNICORN_FD'])
@ -295,7 +293,6 @@ class Arbiter(object):
self.reap_workers() self.reap_workers()
self.kill_workers(signal.SIGKILL) self.kill_workers(signal.SIGKILL)
def reexec(self): def reexec(self):
self.reexec_pid = os.fork() self.reexec_pid = os.fork()
if self.reexec_pid == 0: if self.reexec_pid == 0:
@ -307,7 +304,7 @@ class Arbiter(object):
diff = time.time() - os.fstat(worker.tmp.fileno()).st_ctime diff = time.time() - os.fstat(worker.tmp.fileno()).st_ctime
if diff <= self.timeout: if diff <= self.timeout:
continue continue
self.log.error("worker %s PID %s timeout killing." % (str(worker.id), pid)) self.log.error("%s (pid:%s) timed out." % (worker, pid))
self.kill_worker(pid, signal.SIGKILL) self.kill_worker(pid, signal.SIGKILL)
def reap_workers(self): def reap_workers(self):
@ -341,7 +338,7 @@ class Arbiter(object):
continue continue
worker = Worker(i, self.pid, self.LISTENER, self.modname, worker = Worker(i, self.pid, self.LISTENER, self.modname,
self.timeout, self.PIPE, self.debug) self.timeout/2, self.PIPE, self.debug)
pid = os.fork() pid = os.fork()
if pid != 0: if pid != 0:
self.WORKERS[pid] = worker self.WORKERS[pid] = worker

View File

@ -3,8 +3,8 @@
# This file is part of gunicorn released under the MIT license. # This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information. # See the NOTICE for more information.
from gunicorn.http.parser import HttpParser from gunicorn.http.parser import Parser
from gunicorn.http.request import HttpRequest, RequestError from gunicorn.http.request import Request, RequestError
from gunicorn.http.response import HttpResponse from gunicorn.http.response import Response
__all__ = [HttpParser, HttpRequest, RequestError, HttpResponse] __all__ = [Parser, Request, RequestError, Response]

View File

@ -7,10 +7,10 @@ import urlparse
from gunicorn.util import normalize_name from gunicorn.util import normalize_name
class HttpParserError(Exception): class ParserError(Exception):
""" error raised when parsing fail""" pass
class HttpParser(object): class Parser(object):
def __init__(self): def __init__(self):
self.status = "" self.status = ""
@ -71,7 +71,8 @@ class HttpParser(object):
headers.extend(list(_headers.items())) headers.extend(list(_headers.items()))
self.headers = headers self.headers = headers
self._content_len = int(_headers.get('Content-Length',0)) self._content_len = int(_headers.get('Content-Length',0))
(_, _, self.path, self.query_string, self.fragment) = urlparse.urlsplit(self.raw_path) (_, _, self.path, self.query_string, self.fragment) = \
urlparse.urlsplit(self.raw_path)
return pos return pos
def _first_line(self, line): def _first_line(self, line):
@ -163,10 +164,9 @@ class HttpParser(object):
return (i != -1) return (i != -1)
def filter_body(self, data): def filter_body(self, data):
""" filter body and return a tuple: """\
body_chunk, new_buffer. They could be None. Filter body and return a tuple: (body_chunk, new_buffer)
new_fubber is always None if it's empty. Both can be None, and new_buffer is always None if its empty.
""" """
dlen = len(data) dlen = len(data)
chunk = '' chunk = ''

View File

@ -3,7 +3,6 @@
# This file is part of gunicorn released under the MIT license. # This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information. # See the NOTICE for more information.
import re import re
import StringIO import StringIO
import sys import sys
@ -11,22 +10,16 @@ from urllib import unquote
import logging import logging
from gunicorn import __version__ from gunicorn import __version__
from gunicorn.http.parser import HttpParser from gunicorn.http.parser import Parser
from gunicorn.http.tee import TeeInput from gunicorn.http.tee import TeeInput
from gunicorn.util import CHUNK_SIZE, read_partial, \ from gunicorn.util import CHUNK_SIZE, read_partial, normalize_name
normalize_name
NORMALIZE_SPACE = re.compile(r'(?:\r\n)?[ \t]+') NORMALIZE_SPACE = re.compile(r'(?:\r\n)?[ \t]+')
class RequestError(Exception): class RequestError(Exception):
""" raised when something wrong happend""" pass
class Request(object):
class HttpRequest(object):
SERVER_VERSION = "gunicorn/%s" % __version__ SERVER_VERSION = "gunicorn/%s" % __version__
@ -42,9 +35,7 @@ class HttpRequest(object):
"SERVER_SOFTWARE": "gunicorn/%s" % __version__ "SERVER_SOFTWARE": "gunicorn/%s" % __version__
} }
def __init__(self, socket, client_address, server_address, debug=False):
def __init__(self, socket, client_address, server_address,
debug=False):
self.debug = debug self.debug = debug
self.socket = socket self.socket = socket
self.client_address = client_address self.client_address = client_address
@ -52,11 +43,10 @@ class HttpRequest(object):
self.response_status = None self.response_status = None
self.response_headers = {} self.response_headers = {}
self._version = 11 self._version = 11
self.parser = HttpParser() self.parser = Parser()
self.start_response_called = False self.start_response_called = False
self.log = logging.getLogger(__name__) self.log = logging.getLogger(__name__)
def read(self): def read(self):
environ = {} environ = {}
headers = [] headers = []
@ -71,12 +61,10 @@ class HttpRequest(object):
i = self.parser.filter_headers(headers, buf) i = self.parser.filter_headers(headers, buf)
if i != -1: break if i != -1: break
self.log.debug("%s", self.parser.status) self.log.debug("%s", self.parser.status)
self.log.debug("Headers:\n%s" % headers)
self.log.debug("Got headers:\n%s" % headers) if self.parser.headers_dict.get('Expect', '').lower() == "100-continue":
if self.parser.headers_dict.get('Except', '').lower() == "100-continue":
self.socket.send("100 Continue\n") self.socket.send("100 Continue\n")
if not self.parser.content_len and not self.parser.is_chunked: if not self.parser.content_len and not self.parser.is_chunked:
@ -86,11 +74,10 @@ class HttpRequest(object):
if self.debug: if self.debug:
# according to the doc # This value should evaluate true if an equivalent application
# This value should evaluate true if an equivalent application object # object may be simultaneously invoked by another process, and
# may be simultaneously invoked by another process, and should evaluate # should evaluate false otherwise. In debug mode we fall to one
# false otherwise. In debug mode we fall to one worker # worker so we comply to pylons and other paster app.
# so we comply to pylons and other paster app.
wsgi_multiprocess = False wsgi_multiprocess = False
else: else:
wsgi_multiprocess = True wsgi_multiprocess = True
@ -135,7 +122,6 @@ class HttpRequest(object):
elif self.start_response_called: elif self.start_response_called:
raise AssertionError("Response headers already set!") raise AssertionError("Response headers already set!")
self.response_status = status self.response_status = status
for name, value in response_headers: for name, value in response_headers:
name = normalize_name(name) name = normalize_name(name)

View File

@ -5,7 +5,7 @@
from gunicorn.util import http_date, write, close from gunicorn.util import http_date, write, close
class HttpResponse(object): class Response(object):
def __init__(self, sock, response, req): def __init__(self, sock, response, req):
self.req = req self.req = req

View File

@ -9,6 +9,7 @@ import os
import resource import resource
import select import select
import socket import socket
import textwrap
import time import time
MAXFD = 1024 MAXFD = 1024
@ -17,8 +18,6 @@ if (hasattr(os, "devnull")):
else: else:
REDIRECT_TO = "/dev/null" REDIRECT_TO = "/dev/null"
timeout_default = object() timeout_default = object()
CHUNK_SIZE = (16 * 1024) CHUNK_SIZE = (16 * 1024)
@ -62,7 +61,6 @@ def read_partial(sock, length):
data = sock.recv(length) data = sock.recv(length)
return data return data
def write(sock, data): def write(sock, data):
buf = "" buf = ""
buf += data buf += data
@ -82,11 +80,12 @@ def write(sock, data):
def write_nonblock(sock, data): def write_nonblock(sock, data):
timeout = sock.gettimeout() timeout = sock.gettimeout()
if timeout != "0.0": if sock.gettimeout() > 0.0:
sock.setblockin(0) try:
ret = write(sock, data) sock.setblocking(0)
sock.setblocking(1) return write(sock, data)
return ret finally:
sock.setblocking(1)
else: else:
return write(sock, data) return write(sock, data)
@ -94,6 +93,29 @@ def writelines(sock, lines):
for line in list(lines): for line in list(lines):
write(sock, line) write(sock, line)
def write_error(sock, mesg):
html = textwrap.dedent("""\
<html>
<head>
<title>Internal Server Error</title>
</head>
<body>
<h1>Internal Server Error</h1>
<h2>WSGI Error Report:</h2>
<pre>%s</pre>
</body>
</html>
""") % mesg
http = textwrap.dedent("""\
HTTP/1.0 500 Internal Server Error\r
Connection: close\r
Content-Type: text/html\r
Content-Length: %d\r
\r
%s
""") % (len(http), http)
write_nonblock(sock, http)
def normalize_name(name): def normalize_name(name):
return "-".join([w.lower().capitalize() for w in name.split("-")]) return "-".join([w.lower().capitalize() for w in name.split("-")])
@ -116,7 +138,6 @@ def import_app(module):
raise TypeError("Application object must be callable.") raise TypeError("Application object must be callable.")
return app return app
def http_date(timestamp=None): def http_date(timestamp=None):
"""Return the current date and time formatted for a message header.""" """Return the current date and time formatted for a message header."""
if timestamp is None: if timestamp is None:

View File

@ -27,21 +27,21 @@ class Worker(object):
PIPE = [] PIPE = []
def __init__(self, workerid, ppid, socket, app, timeout, def __init__(self, workerid, ppid, socket, app,
pipe, debug=False): timeout, pipe, debug=False):
self.nr = 0 self.nr = 0
self.id = workerid self.id = workerid
self.ppid = ppid self.ppid = ppid
self.debug = debug self.debug = debug
self.socket = socket self.socket = socket
self.timeout = timeout / 2.0 self.timeout = timeout
fd, tmpname = tempfile.mkstemp() fd, tmpname = tempfile.mkstemp()
self.tmp = os.fdopen(fd, "r+b") self.tmp = os.fdopen(fd, "r+b")
self.tmpname = tmpname self.tmpname = tmpname
self.app = app self.app = app
self.alive = True self.alive = True
self.log = logging.getLogger(__name__) self.log = logging.getLogger(__name__)
self.spinner = 0
# init pipe # init pipe
self.PIPE = pipe self.PIPE = pipe
@ -54,6 +54,9 @@ class Worker(object):
self.address = self.socket.getsockname() self.address = self.socket.getsockname()
def __str__(self):
return "<Worker %s>" % self.id
def init_signals(self): def init_signals(self):
map(lambda s: signal.signal(s, signal.SIG_DFL), self.SIGNALS) map(lambda s: signal.signal(s, signal.SIG_DFL), self.SIGNALS)
signal.signal(signal.SIGQUIT, self.handle_quit) signal.signal(signal.SIGQUIT, self.handle_quit)
@ -74,88 +77,77 @@ class Worker(object):
def handle_exit(self, sig, frame): def handle_exit(self, sig, frame):
sys.exit(0) sys.exit(0)
def _fchmod(self, mode): def notify(self):
"""\
Notify our parent process that we're still alive.
"""
self.spinner = (self.spinner+1) % 2
if getattr(os, 'fchmod', None): if getattr(os, 'fchmod', None):
os.fchmod(self.tmp.fileno(), mode) os.fchmod(self.tmp.fileno(), self.spinner)
else: else:
os.chmod(self.tmpname, mode) os.chmod(self.tmpname, self.spinner)
def run(self): def run(self):
self.init_signals() self.init_signals()
spinner = 0
self.nr = 0 self.nr = 0
while self.alive:
# self.socket appears to lose its blocking status after
# we fork in the arbiter. Reset it here.
self.socket.setblocking(0)
while self.alive:
self.nr = 0 self.nr = 0
self.notify()
try:
client, addr = self.socket.accept()
self.handle(client, addr)
self.nr += 1
except socket.error, e:
if e[0] not in (errno.EAGAIN, errno.ECONNABORTED):
raise
# Accept until we hit EAGAIN. We're betting that when we're # Accept until we hit EAGAIN. We're betting that when we're
# processing clients that more clients are waiting. When # processing clients that more clients are waiting. When
# there's no more clients waiting we go back to the select() # there's no more clients waiting we go back to the select()
# loop and wait for some lovin. # loop and wait for some lovin.
while self.alive: if self.nr > 0:
self.nr = 0 continue
try:
client, addr = self.socket.accept()
# handle connection
self.handle(client, addr)
# Update the fd mtime on each client completion
# to signal that this worker process is alive.
spinner = (spinner+1) % 2
self._fchmod(spinner)
self.nr += 1
except socket.error, e:
if e[0] in (errno.EAGAIN, errno.ECONNABORTED):
break # Uh oh!
raise
if self.nr == 0: break
# If our parent changed then we shut down.
if self.ppid != os.getppid(): if self.ppid != os.getppid():
self.log.info("Parent process changed. Closing %s" % self)
return return
while self.alive: try:
spinner = (spinner+1) % 2 self.notify()
self._fchmod(spinner) ret = select.select([self.socket], [], self.PIPE, self.timeout)
try: if ret[0]:
ret = select.select([self.socket], [], self.PIPE, break
self.timeout) except select.error, e:
if ret[0]: break if e[0] == errno.EINTR:
except select.error, e: break
if e[0] == errno.EINTR: if e[0] == errno.EBADF and self.nr < 0:
break break
if e[0] == errno.EBADF: raise
if nr >= 0:
break
raise
spinner = (spinner+1) % 2
self._fchmod(spinner)
def handle(self, client, addr): def handle(self, client, addr):
util.close_on_exec(client) util.close_on_exec(client)
try: try:
req = http.HttpRequest(client, addr, self.address, self.debug) req = http.Request(client, addr, self.address, self.debug)
try: try:
response = self.app(req.read(), req.start_response) response = self.app(req.read(), req.start_response)
except Exception, e: except Exception, e:
exc = ''.join(traceback.format_exception(*sys.exc_info())) util.write_error(client, traceback.format_exc())
msg = "<h1>Internal Server Error</h1><h2>wsgi error:</h2><pre>%s</pre>" % exc
util.writelines(client,
["HTTP/1.0 500 Internal Server Error\r\n",
"Connection: close\r\n",
"Content-type: text/html\r\n",
"Content-length: %s\r\n" % str(len(msg)),
"\r\n",
msg])
return return
http.HttpResponse(client, response, req).send()
http.Response(client, response, req).send()
except Exception, e: except Exception, e:
self.log.exception("Error processing request. [%s]" % str(e)) self.log.exception("Error processing request. [%s]" % str(e))
# try to send a response even if something happend
try: try:
write_nonblock(sock, # Last ditch attempt to notify the client of an error.
"HTTP/1.0 500 Internal Server Error\r\n\r\n") mesg = "HTTP/1.0 500 Internal Server Error\r\n\r\n"
write_nonblock(sock, mesg)
except: except:
pass pass
finally: finally:

View File

@ -9,8 +9,8 @@ import tempfile
dirname = os.path.dirname(__file__) dirname = os.path.dirname(__file__)
from gunicorn.http.parser import HttpParser from gunicorn.http.parser import Parser
from gunicorn.http.request import HttpRequest from gunicorn.http.request import Request
def data_source(fname): def data_source(fname):
with open(fname) as handle: with open(fname) as handle:
@ -27,7 +27,7 @@ class request(object):
def __call__(self, func): def __call__(self, func):
def run(): def run():
src = data_source(self.fname) src = data_source(self.fname)
func(src, HttpParser()) func(src, Parser())
run.func_name = func.func_name run.func_name = func.func_name
return run return run
@ -65,16 +65,11 @@ class http_request(object):
def __call__(self, func): def __call__(self, func):
def run(): def run():
fsock = FakeSocket(data_source(self.fname)) fsock = FakeSocket(data_source(self.fname))
req = HttpRequest(fsock, ('127.0.0.1', 6000), req = Request(fsock, ('127.0.0.1', 6000), ('127.0.0.1', 8000))
('127.0.0.1', 8000))
func(req) func(req)
run.func_name = func.func_name run.func_name = func.func_name
return run return run
def eq(a, b): def eq(a, b):
assert a == b, "%r != %r" % (a, b) assert a == b, "%r != %r" % (a, b)