From 9e717b8f9c51c8d7a7a2c656a70512829c7b6b20 Mon Sep 17 00:00:00 2001 From: "Paul J. Davis" Date: Sun, 31 Jan 2010 17:45:26 -0500 Subject: [PATCH] Refactoring the worker loop. Accidentally got a bit carried away. --- gunicorn/arbiter.py | 11 ++-- gunicorn/http/__init__.py | 8 +-- gunicorn/http/parser.py | 16 +++--- gunicorn/http/request.py | 42 +++++--------- gunicorn/http/response.py | 2 +- gunicorn/util.py | 43 ++++++++++---- gunicorn/worker.py | 118 ++++++++++++++++++-------------------- tests/t.py | 13 ++--- 8 files changed, 122 insertions(+), 131 deletions(-) diff --git a/gunicorn/arbiter.py b/gunicorn/arbiter.py index 6075ec22..a2b0d029 100644 --- a/gunicorn/arbiter.py +++ b/gunicorn/arbiter.py @@ -110,8 +110,7 @@ class Arbiter(object): if e[0] == errno.ENOENT: return raise - - + def init_signals(self): if self.PIPE: map(lambda p: p.close(), self.PIPE) @@ -127,7 +126,6 @@ class Arbiter(object): self.wakeup() else: self.log.warn("Ignoring rapid signaling: %s" % sig) - def listen(self, addr): if 'GUNICORN_FD' in os.environ: @@ -294,8 +292,7 @@ class Arbiter(object): time.sleep(0.1) self.reap_workers() self.kill_workers(signal.SIGKILL) - - + def reexec(self): self.reexec_pid = os.fork() if self.reexec_pid == 0: @@ -307,7 +304,7 @@ class Arbiter(object): diff = time.time() - os.fstat(worker.tmp.fileno()).st_ctime if diff <= self.timeout: continue - self.log.error("worker %s PID %s timeout killing." % (str(worker.id), pid)) + self.log.error("%s (pid:%s) timed out." % (worker, pid)) self.kill_worker(pid, signal.SIGKILL) def reap_workers(self): @@ -341,7 +338,7 @@ class Arbiter(object): continue worker = Worker(i, self.pid, self.LISTENER, self.modname, - self.timeout, self.PIPE, self.debug) + self.timeout/2, self.PIPE, self.debug) pid = os.fork() if pid != 0: self.WORKERS[pid] = worker diff --git a/gunicorn/http/__init__.py b/gunicorn/http/__init__.py index 4a6585a0..eac09ac3 100644 --- a/gunicorn/http/__init__.py +++ b/gunicorn/http/__init__.py @@ -3,8 +3,8 @@ # This file is part of gunicorn released under the MIT license. # See the NOTICE for more information. -from gunicorn.http.parser import HttpParser -from gunicorn.http.request import HttpRequest, RequestError -from gunicorn.http.response import HttpResponse +from gunicorn.http.parser import Parser +from gunicorn.http.request import Request, RequestError +from gunicorn.http.response import Response -__all__ = [HttpParser, HttpRequest, RequestError, HttpResponse] \ No newline at end of file +__all__ = [Parser, Request, RequestError, Response] \ No newline at end of file diff --git a/gunicorn/http/parser.py b/gunicorn/http/parser.py index af237cd9..aa934d04 100644 --- a/gunicorn/http/parser.py +++ b/gunicorn/http/parser.py @@ -7,10 +7,10 @@ import urlparse from gunicorn.util import normalize_name -class HttpParserError(Exception): - """ error raised when parsing fail""" +class ParserError(Exception): + pass -class HttpParser(object): +class Parser(object): def __init__(self): self.status = "" @@ -71,7 +71,8 @@ class HttpParser(object): headers.extend(list(_headers.items())) self.headers = headers self._content_len = int(_headers.get('Content-Length',0)) - (_, _, self.path, self.query_string, self.fragment) = urlparse.urlsplit(self.raw_path) + (_, _, self.path, self.query_string, self.fragment) = \ + urlparse.urlsplit(self.raw_path) return pos def _first_line(self, line): @@ -163,10 +164,9 @@ class HttpParser(object): return (i != -1) def filter_body(self, data): - """ filter body and return a tuple: - body_chunk, new_buffer. They could be None. - new_fubber is always None if it's empty. - + """\ + Filter body and return a tuple: (body_chunk, new_buffer) + Both can be None, and new_buffer is always None if its empty. """ dlen = len(data) chunk = '' diff --git a/gunicorn/http/request.py b/gunicorn/http/request.py index 9721a5b1..f50cb9f4 100644 --- a/gunicorn/http/request.py +++ b/gunicorn/http/request.py @@ -3,7 +3,6 @@ # This file is part of gunicorn released under the MIT license. # See the NOTICE for more information. - import re import StringIO import sys @@ -11,22 +10,16 @@ from urllib import unquote import logging from gunicorn import __version__ -from gunicorn.http.parser import HttpParser +from gunicorn.http.parser import Parser from gunicorn.http.tee import TeeInput -from gunicorn.util import CHUNK_SIZE, read_partial, \ -normalize_name - +from gunicorn.util import CHUNK_SIZE, read_partial, normalize_name NORMALIZE_SPACE = re.compile(r'(?:\r\n)?[ \t]+') - - - class RequestError(Exception): - """ raised when something wrong happend""" + pass - -class HttpRequest(object): +class Request(object): SERVER_VERSION = "gunicorn/%s" % __version__ @@ -42,9 +35,7 @@ class HttpRequest(object): "SERVER_SOFTWARE": "gunicorn/%s" % __version__ } - - def __init__(self, socket, client_address, server_address, - debug=False): + def __init__(self, socket, client_address, server_address, debug=False): self.debug = debug self.socket = socket self.client_address = client_address @@ -52,11 +43,10 @@ class HttpRequest(object): self.response_status = None self.response_headers = {} self._version = 11 - self.parser = HttpParser() + self.parser = Parser() self.start_response_called = False self.log = logging.getLogger(__name__) - - + def read(self): environ = {} headers = [] @@ -71,12 +61,10 @@ class HttpRequest(object): i = self.parser.filter_headers(headers, buf) if i != -1: break - self.log.debug("%s", self.parser.status) - - self.log.debug("Got headers:\n%s" % headers) + self.log.debug("Headers:\n%s" % headers) - if self.parser.headers_dict.get('Except', '').lower() == "100-continue": + if self.parser.headers_dict.get('Expect', '').lower() == "100-continue": self.socket.send("100 Continue\n") if not self.parser.content_len and not self.parser.is_chunked: @@ -86,11 +74,10 @@ class HttpRequest(object): if self.debug: - # according to the doc - # This value should evaluate true if an equivalent application object - # may be simultaneously invoked by another process, and should evaluate - # false otherwise. In debug mode we fall to one worker - # so we comply to pylons and other paster app. + # This value should evaluate true if an equivalent application + # object may be simultaneously invoked by another process, and + # should evaluate false otherwise. In debug mode we fall to one + # worker so we comply to pylons and other paster app. wsgi_multiprocess = False else: wsgi_multiprocess = True @@ -134,8 +121,7 @@ class HttpRequest(object): exc_info = None elif self.start_response_called: raise AssertionError("Response headers already set!") - - + self.response_status = status for name, value in response_headers: name = normalize_name(name) diff --git a/gunicorn/http/response.py b/gunicorn/http/response.py index 001b05f8..b29adeb3 100644 --- a/gunicorn/http/response.py +++ b/gunicorn/http/response.py @@ -5,7 +5,7 @@ from gunicorn.util import http_date, write, close -class HttpResponse(object): +class Response(object): def __init__(self, sock, response, req): self.req = req diff --git a/gunicorn/util.py b/gunicorn/util.py index 87584fd7..fdf95cb8 100644 --- a/gunicorn/util.py +++ b/gunicorn/util.py @@ -9,6 +9,7 @@ import os import resource import select import socket +import textwrap import time MAXFD = 1024 @@ -17,8 +18,6 @@ if (hasattr(os, "devnull")): else: REDIRECT_TO = "/dev/null" - - timeout_default = object() CHUNK_SIZE = (16 * 1024) @@ -62,7 +61,6 @@ def read_partial(sock, length): data = sock.recv(length) return data - def write(sock, data): buf = "" buf += data @@ -82,18 +80,42 @@ def write(sock, data): def write_nonblock(sock, data): timeout = sock.gettimeout() - if timeout != "0.0": - sock.setblockin(0) - ret = write(sock, data) - sock.setblocking(1) - return ret + if sock.gettimeout() > 0.0: + try: + sock.setblocking(0) + return write(sock, data) + finally: + sock.setblocking(1) else: return write(sock, data) def writelines(sock, lines): for line in list(lines): write(sock, line) - + +def write_error(sock, mesg): + html = textwrap.dedent("""\ + + + Internal Server Error + + +

Internal Server Error

+

WSGI Error Report:

+
%s
+ + + """) % mesg + http = textwrap.dedent("""\ + HTTP/1.0 500 Internal Server Error\r + Connection: close\r + Content-Type: text/html\r + Content-Length: %d\r + \r + %s + """) % (len(http), http) + write_nonblock(sock, http) + def normalize_name(name): return "-".join([w.lower().capitalize() for w in name.split("-")]) @@ -115,8 +137,7 @@ def import_app(module): if not callable(app): raise TypeError("Application object must be callable.") return app - - + def http_date(timestamp=None): """Return the current date and time formatted for a message header.""" if timestamp is None: diff --git a/gunicorn/worker.py b/gunicorn/worker.py index bd06da97..ea5d7924 100644 --- a/gunicorn/worker.py +++ b/gunicorn/worker.py @@ -27,22 +27,22 @@ class Worker(object): PIPE = [] - def __init__(self, workerid, ppid, socket, app, timeout, - pipe, debug=False): + def __init__(self, workerid, ppid, socket, app, + timeout, pipe, debug=False): self.nr = 0 self.id = workerid self.ppid = ppid self.debug = debug self.socket = socket - self.timeout = timeout / 2.0 + self.timeout = timeout fd, tmpname = tempfile.mkstemp() self.tmp = os.fdopen(fd, "r+b") self.tmpname = tmpname self.app = app self.alive = True self.log = logging.getLogger(__name__) - - + self.spinner = 0 + # init pipe self.PIPE = pipe map(util.set_non_blocking, pipe) @@ -54,6 +54,9 @@ class Worker(object): self.address = self.socket.getsockname() + def __str__(self): + return "" % self.id + def init_signals(self): map(lambda s: signal.signal(s, signal.SIG_DFL), self.SIGNALS) signal.signal(signal.SIGQUIT, self.handle_quit) @@ -74,88 +77,77 @@ class Worker(object): def handle_exit(self, sig, frame): sys.exit(0) - def _fchmod(self, mode): + def notify(self): + """\ + Notify our parent process that we're still alive. + """ + self.spinner = (self.spinner+1) % 2 if getattr(os, 'fchmod', None): - os.fchmod(self.tmp.fileno(), mode) + os.fchmod(self.tmp.fileno(), self.spinner) else: - os.chmod(self.tmpname, mode) + os.chmod(self.tmpname, self.spinner) def run(self): self.init_signals() - spinner = 0 self.nr = 0 + + # self.socket appears to lose its blocking status after + # we fork in the arbiter. Reset it here. + self.socket.setblocking(0) + while self.alive: - self.nr = 0 + self.notify() + try: + client, addr = self.socket.accept() + self.handle(client, addr) + self.nr += 1 + except socket.error, e: + if e[0] not in (errno.EAGAIN, errno.ECONNABORTED): + raise + # Accept until we hit EAGAIN. We're betting that when we're # processing clients that more clients are waiting. When # there's no more clients waiting we go back to the select() # loop and wait for some lovin. - while self.alive: - self.nr = 0 - try: - client, addr = self.socket.accept() - - # handle connection - self.handle(client, addr) - - # Update the fd mtime on each client completion - # to signal that this worker process is alive. - spinner = (spinner+1) % 2 - self._fchmod(spinner) - self.nr += 1 - except socket.error, e: - if e[0] in (errno.EAGAIN, errno.ECONNABORTED): - break # Uh oh! - raise - if self.nr == 0: break - + if self.nr > 0: + continue + + # If our parent changed then we shut down. if self.ppid != os.getppid(): + self.log.info("Parent process changed. Closing %s" % self) return - - while self.alive: - spinner = (spinner+1) % 2 - self._fchmod(spinner) - try: - ret = select.select([self.socket], [], self.PIPE, - self.timeout) - if ret[0]: break - except select.error, e: - if e[0] == errno.EINTR: - break - if e[0] == errno.EBADF: - if nr >= 0: - break - raise - - spinner = (spinner+1) % 2 - self._fchmod(spinner) + + try: + self.notify() + ret = select.select([self.socket], [], self.PIPE, self.timeout) + if ret[0]: + break + except select.error, e: + if e[0] == errno.EINTR: + break + if e[0] == errno.EBADF and self.nr < 0: + break + raise def handle(self, client, addr): util.close_on_exec(client) try: - req = http.HttpRequest(client, addr, self.address, self.debug) + req = http.Request(client, addr, self.address, self.debug) + try: response = self.app(req.read(), req.start_response) except Exception, e: - exc = ''.join(traceback.format_exception(*sys.exc_info())) - msg = "

Internal Server Error

wsgi error:

%s
" % exc - util.writelines(client, - ["HTTP/1.0 500 Internal Server Error\r\n", - "Connection: close\r\n", - "Content-type: text/html\r\n", - "Content-length: %s\r\n" % str(len(msg)), - "\r\n", - msg]) + util.write_error(client, traceback.format_exc()) return - http.HttpResponse(client, response, req).send() + + http.Response(client, response, req).send() except Exception, e: self.log.exception("Error processing request. [%s]" % str(e)) - - # try to send a response even if something happend - try: - write_nonblock(sock, - "HTTP/1.0 500 Internal Server Error\r\n\r\n") + try: + # Last ditch attempt to notify the client of an error. + mesg = "HTTP/1.0 500 Internal Server Error\r\n\r\n" + write_nonblock(sock, mesg) except: pass finally: diff --git a/tests/t.py b/tests/t.py index 4529da44..d2269c70 100644 --- a/tests/t.py +++ b/tests/t.py @@ -9,8 +9,8 @@ import tempfile dirname = os.path.dirname(__file__) -from gunicorn.http.parser import HttpParser -from gunicorn.http.request import HttpRequest +from gunicorn.http.parser import Parser +from gunicorn.http.request import Request def data_source(fname): with open(fname) as handle: @@ -27,7 +27,7 @@ class request(object): def __call__(self, func): def run(): src = data_source(self.fname) - func(src, HttpParser()) + func(src, Parser()) run.func_name = func.func_name return run @@ -65,16 +65,11 @@ class http_request(object): def __call__(self, func): def run(): fsock = FakeSocket(data_source(self.fname)) - req = HttpRequest(fsock, ('127.0.0.1', 6000), - ('127.0.0.1', 8000)) + req = Request(fsock, ('127.0.0.1', 6000), ('127.0.0.1', 8000)) func(req) run.func_name = func.func_name return run - - - - def eq(a, b): assert a == b, "%r != %r" % (a, b)