diff --git a/gunicorn/arbiter.py b/gunicorn/arbiter.py index b5210fc4..11546619 100644 --- a/gunicorn/arbiter.py +++ b/gunicorn/arbiter.py @@ -78,13 +78,14 @@ class Arbiter(object): def set_non_blocking(self, fd): flags = fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK fcntl.fcntl(fd, fcntl.F_SETFL, flags) - + def signal(self, sig, frame): if len(self.SIG_QUEUE) < 5: self.SIG_QUEUE.append(sig) + self.wakeup() else: log.warn("Ignoring rapid signaling: %s" % sig) - self.wakeup() + def listen(self, addr): if 'GUNICORN_FD' in os.environ: diff --git a/gunicorn/http/http_parser.py b/gunicorn/http/http_parser.py new file mode 100644 index 00000000..6424dddc --- /dev/null +++ b/gunicorn/http/http_parser.py @@ -0,0 +1,135 @@ +# -*- coding: utf-8 - +# +# 2010 (c) Benoit Chesneau +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation +# files (the "Software"), to deal in the Software without +# restriction, including without limitation the rights to use, +# copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following +# conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +from ctypes import * + +class HttpParser(object): + + def __init__(self): + self.headers = {} + self.version = None + self.method None + self.path = None + self._content_len = None + + def header(self, headers, buf): + """ take a string buff. It return + environ or None if parsing isn't done. + """ + + # wee could be smarter here + # by just reading the array, but converting + # is enough for now + cs = "".join(buf) + ld = len("\r\n\r\n") + i = cs.find("\r\n\r\n") + if i != -1: + if i > 0: + r = cs[:i] + buf = create_string_buffer(cs[i+ ld:]) + return self.finalize_headers(headers, r) + return None + + def finalize_headers(self, headers, headers_str): + lines = headers_str.split("\r\n") + + # parse first line of headers + self._first_line(lines.pop(0)) + + # parse headers. We silently ignore + # bad headers' lines + hname = "" + for line in lines: + if line == "\t": + self.headers[hname] += line.strip() + else: + try: + hname =self._parse_headerl(line) + except ValueError: + # bad headers + pass + headers = self.headers + self._content_len = int(self._headers.get('Content-Length')) + return headers + + def _first_line(self, line): + method, path, version = line.strip().split(" ") + self.version = version.strip() + self.method = method.upper() + self.path = path + + def _parse_headerl(self, line): + name, value = line.split(": ", 1) + name = name.strip() + self.headers[name] = value.strip() + return name + + @property + def should_close(self): + if self._should_close: + return True + if self.headers.get("Connection") == "close": + return True + if self.headers.get("Connection") == "Keep-Alive": + return False + if self.version < "HTTP/1.1": + return True + + @property + def is_chunked(self): + transfert_encoding = self._headers.get('Transfer-Encoding', False) + return (transfert_encoding == "chunked") + + @property + def content_length(self): + transfert_encoding = self._headers.get('Transfer-Encoding') + content_length = self._headers.get('Content-Length') + if transfert_encoding is None: + if content_length is None: + return 0 + return int(content_length) + else: + return None + + def body_eof(self): + #TODO : add chunk + if self._len_content == 0: + return True + return False + + def fetch_body(self, buf, data): + dlen = len(data) + resize(buf, sizeof(data)) + if self.is_chunked: + # do chunk + else: + if self.content_len > 0: + nr = min(len(data), self._content_len) + # addessof may be not needed here + memmove(addressof(buf), addressof(data), nr) + self._content_len -= nr + data.value = None + resize(buf, nr) + self.start_offset = 0 + return data \ No newline at end of file diff --git a/gunicorn/http/iostream.py b/gunicorn/http/iostream.py deleted file mode 100644 index 87c6058c..00000000 --- a/gunicorn/http/iostream.py +++ /dev/null @@ -1,85 +0,0 @@ -# -*- coding: utf-8 - -# -# 2009 (c) Benoit Chesneau -# 2009 (c) Paul J. Davis -# -# Permission is hereby granted, free of charge, to any person -# obtaining a copy of this software and associated documentation -# files (the "Software"), to deal in the Software without -# restriction, including without limitation the rights to use, -# copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the -# Software is furnished to do so, subject to the following -# conditions: -# -# The above copyright notice and this permission notice shall be -# included in all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, -# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES -# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT -# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, -# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR -# OTHER DEALINGS IN THE SOFTWARE. - - - -from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, \ - ENOTCONN, ESHUTDOWN, EINTR, EISCONN, EBADF, ECONNABORTED, errorcode - -import socket - -SOCKET_CLOSED = (ECONNRESET, ENOTCONN, ESHUTDOWN) - -class IOStream(object): - - chunk_size = 4096 - - def __init__(self, sock): - self.sock = sock - self.buf = "" - - def recv(self, buffer_size): - - buffer_size = buffer_size or 0 - if self.buf: - l = len(self.buf) - if buffer_size > l: - buffer_size -= l - else: - s = self.buf[:buffer_size] - self.buf = self.buf[buffer_size:] - return s - try: - data = self.sock.recv(buffer_size) - s = self.buf + data - self.buf = '' - return s - except socket.error, e: - if e[0] == EWOULDBLOCK: - return None - if e[0] in SOCKET_CLOSED: - return '' - raise - - def send(self, data): - return self.sock.send(data) - - def read_until(self, delimiter): - while True: - try: - data = self.recv(self.chunk_size) - except socket.error, e: - return - self.buf = self.buf + data - - lb = len(self.buf) - ld = len(delimiter) - i = self.buf.find(delimiter) - if i != -1: - if i > 0: - r = self.buf[:i] - self.buf = self.buf[i+ ld:] - return r diff --git a/gunicorn/http/request.py b/gunicorn/http/request.py index a5d82d15..33883665 100644 --- a/gunicorn/http/request.py +++ b/gunicorn/http/request.py @@ -24,6 +24,7 @@ # FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR # OTHER DEALINGS IN THE SOFTWARE. +from ctypes import create_string_buffer import re import StringIO import sys @@ -31,8 +32,9 @@ from urllib import unquote from gunicorn import __version__ -from gunicorn.http.iostream import IOStream - +from gunicorn.http.http_parser import HttpParser +from gunicorn.http.tee import TeeInput +from gunicorn.util import CHUNK_SIZE NORMALIZE_SPACE = re.compile(r'(?:\r\n)?[ \t]+') @@ -46,11 +48,11 @@ class RequestError(Exception): self.status_code = status_code self.reason = reason Exception.__init__(self, (status_code, reason)) + class HTTPRequest(object): SERVER_VERSION = "gunicorn/%s" % __version__ - CHUNK_SIZE = 4096 def __init__(self, socket, client_address, server_address): self.socket = socket @@ -63,31 +65,33 @@ class HTTPRequest(object): self.response_status = None self.response_headers = {} self._version = 11 - self.io = IOStream(socket) + self.parser = HttpParser() self.start_response_called = False - self._should_close = False def read(self): - # read headers - self.read_headers(first_line=True) - - if self.headers.get('ACCEPT', '').lower() == "100-continue": - self.io.send("100 Continue\n") + headers = {} + remain = CHUNK_SIZE + buf = create_string_buffer(remain) + remain -= self.socket.recv_into(buf, remain) + while not self.parser.headers(headers, buf): + data = create_string_buffer(remain) + remain -= self.socket.recv_into(data, remain) + buf = create_string_buffer(data.value + buf.value) + + if headers.get('Accept', '').lower() == "100-continue": + self.socket.send("100 Continue\n") - if "?" in self.path: - path_info, query = self.path.split('?', 1) + if "?" in parser.path: + path_info, query = parser.path.split('?', 1) else: - path_info = self.path + path_info = self.parser.path query = "" - length = self.body_length() if not length: - wsgi_input = StringIO.StringIO() - elif length == "chunked": - length, wsgi_input = self.decode_chunked() - else: - wsgi_input = FileInput(self) - + if not self.parser.content_length and not self.parser.is_chunked: + wsgi_input = StringIO.StringIO() + else: + wsgi_input = TeeInput(self.socket, parser, buf, remain) environ = { "wsgi.url_scheme": 'http', @@ -104,7 +108,7 @@ class HTTPRequest(object): "QUERY_STRING": query, "RAW_URI": self.path, "CONTENT_TYPE": self.headers.get('CONTENT-TYPE', ''), - "CONTENT_LENGTH": length, + "CONTENT_LENGTH": wsgi_input.len, "REMOTE_ADDR": self.client_address[0], "REMOTE_PORT": self.client_address[1], "SERVER_NAME": self.server_address[0], @@ -113,48 +117,12 @@ class HTTPRequest(object): } for key, value in self.headers.items(): - key = 'HTTP_' + key.replace('-', '_') + key = 'HTTP_' + key.upper().replace('-', '_') if key not in ('HTTP_CONTENT_TYPE', 'HTTP_CONTENT_LENGTH'): environ[key] = value return environ - def read_headers(self, first_line=False): - headers_str = self.io.read_until("\r\n\r\n") - lines = headers_str.split("\r\n") - self.first_line(lines.pop(0)) - hname = "" - for line in lines: - if line == "\t": - self.headers[hname] += line.strip() - else: - try: - hname =self.parse_header(line) - except ValueError: - # bad headers - pass - - def body_length(self): - transfert_encoding = self.headers.get('TRANSFERT-ENCODING') - content_length = self.headers.get('CONTENT-LENGTH') - if transfert_encoding is None: - if content_length is None: - return None - return content_length - elif transfert_encoding == "chunked": - return "chunked" - else: - return None - - def should_close(self): - if self._should_close: - return True - if self.headers.get("CONNECTION") == "close": - return True - if self.headers.get("CONNECTION") == "Keep-Alive": - return False - if self.version < "HTTP/1.1": - return True - + def decode_chunked(self): """Decode the 'chunked' transfer coding.""" length = 0 @@ -190,96 +158,4 @@ class HTTPRequest(object): self.io.write(send) def close(self): - self.socket.close() - - def first_line(self, line): - method, path, version = line.strip().split(" ") - self.version = version.strip() - self.method = method.upper() - self.path = path - - def parse_header(self, line): - name, value = line.split(": ", 1) - name = name.strip().upper() - self.headers[name] = value.strip() - return name - -class FileInput(object): - - stream_size = 4096 - - def __init__(self, req): - self.req = req - self.length = int(req.body_length() or 0) - self.io = req.io - self._rbuf = "" - self.size = 0 - - def close(self): - self.eof = False - - def read(self, amt=None): - if self.length and self.size >= self.length: - return '' - - if self._rbuf and amt is not None: - L = len(self._rbuf) - print L - if amt > L: - amt -= L - else: - s = self._rbuf[:amt] - self._rbuf = self._rbuf[amt:] - self.size += len(s) - return s - - if amt is None: - amt = min(self. stream_size, self.length or 0) - - data = self.req.io.recv(amt) - s = self._rbuf + data - self._rbuf = '' - self.size += len(s) - return s - - def readline(self, amt=-1): - i = self._rbuf.find('\n') - while i < 0 and not (0 < amt <= len(self._rbuf)): - new = self.io.recv(self.stream_size) - if not new: break - i = new.find('\n') - if i >= 0: - i = i + len(self._rbuf) - self._rbuf = self._rbuf + new - if i < 0: - i = len(self._rbuf) - else: - i = i+1 - if 0 <= amt < len(self._rbuf): - i = amt - data, self._rbuf = self._rbuf[:i], self._rbuf[i:] - return data - - def readlines(self, sizehint=0): - total = 0 - lines = [] - line = self.readline() - while line: - lines.append(line) - total += len(line) - if 0 < sizehint <= total: - break - line = self.readline() - return lines - - def next(self): - r = self.readline() - if not r: - raise StopIteration - return r - - def __iter__(self): - return self - - - \ No newline at end of file + self.socket.close() \ No newline at end of file diff --git a/gunicorn/http/response.py b/gunicorn/http/response.py index 039ee5a3..81255308 100644 --- a/gunicorn/http/response.py +++ b/gunicorn/http/response.py @@ -32,10 +32,9 @@ class HTTPResponse(object): self.req = req self.data = data self.headers = self.req.response_headers or {} - self.io = req.io def write(self, data): - self.io.send(data) + self.req.socket.send(data) def send(self): # send headers @@ -45,13 +44,16 @@ class HTTPResponse(object): resp_head.append("Server: %s\r\n" % self.req.SERVER_VERSION) resp_head.append("Date: %s\r\n" % http_date()) # broken clients - resp_head.append("Status: %s\r\n" % str(self.req.response_status)) + resp_head.append("Status: %s\r\n" % str(self.req.response_status)) + # always close the conenction + resp_head.append("Connection: close\r\n") for name, value in self.req.response_headers.items(): resp_head.append("%s: %s\r\n" % (name, value)) - self.io.send("%s\r\n" % "".join(resp_head)) + self.write("%s\r\n" % "".join(resp_head)) for chunk in self.data: self.write(chunk) + self.req.close() if hasattr(self.data, "close"): diff --git a/gunicorn/http/tee.py b/gunicorn/http/tee.py new file mode 100644 index 00000000..1c853598 --- /dev/null +++ b/gunicorn/http/tee.py @@ -0,0 +1,138 @@ +# -*- coding: utf-8 - +# +# 2010 (c) Benoit Chesneau +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation +# files (the "Software"), to deal in the Software without +# restriction, including without limitation the rights to use, +# copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following +# conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. + +""" +TeeInput replace old FileInput. It use a file +if size > MAX_BODY or memory. It's now possible to rewind +read or restart etc ... It's based on TeeInput from unicorn. + +""" + + +import os +import StringIO +import tempfile +from ctypes import create_string_buffer + +from gunicorn.util import MAX_BODY, CHUNK_SIZE + +class TeeInput(object): + + def __init__(self, socket, parser, buf, remain): + self.buf = buf + self.remain = remain + self.parser = parser + self.socket = socket + self._len = parser.content_length + if self._len and self._len < MAX_BODY: + self.tmp = StringIO.StringIO() + else: + self.tmp = new tempfile.TemporaryFile() + self.buf2 = create_string_buffer(tmp) + if len(buf) > 0: + parser.filter_body(self.buf2, buf) + self._finalize() + self.tmp.write(self.buf2) + self.tmp.seek(0) + + @property + def len(self): + if self._len return self._len + if self.remain: + pos = self.tmp.tell() + while True: + if not self._tee(self.remain, self.buf2): + break + self.tmp.seek(pos) + self._len = self._tmp_size() + return self._len + + + def read(self, length=None): + """ read """ + if not self.remain: + return self.tmp.read(length) + + if not length: + r = self.tmp.read() or || + while self._tee(self.remain, self.buf2): + r += self.buf2.value + return r + else: + r = self.buf2 + diff = self._tmp_size() - self.tmp.tell() + if not diff: + return self._ensure_length((self._tee(self.remain, r), self.remain) + else: + length = min(diff, self.remain) + return self._ensure_length(self._tee(length, r), length) + + def readline(self, amt=-1): + pass + + def readlines(self, sizehints=0): + pass + + def __next__(self): + r = self.readline() + if not r: + raise StopIteration + return r + next = __next__ + + def __iter__(self): + return self + + def _tee(self, length, dst): + """ fetch partial body""" + while not self.parser.body_eof() and self.remain: + data = create_string_buffer(length) + length -= self.socket.recv_into(data, length) + self.remain = length + if self.parser.filter_body(dst, data): + self.tmp.write(dst) + self.tmp.seek(0, os.SEEK_END) + return dst + self._finalize() + return "" + + def _finalize(self): + """ here we wil fetch final trailers + if any.""" + + + def _tmp_size(self): + if isinstance(self.tmp, StringIO.StringIO): + return self.tmp.len + else: + return int(os.fstat(self.tmp.fileno())[6]) + + def _ensure_length(buf, length): + if not buf or not self._len: + return buf + while len(buf) < length && self.len != self.tmp.pos(): + buf += self._tee(length - len(buf), self.buf2) + + return buf \ No newline at end of file diff --git a/gunicorn/util.py b/gunicorn/util.py index 8b136853..dffe850b 100644 --- a/gunicorn/util.py +++ b/gunicorn/util.py @@ -26,6 +26,10 @@ import time +CHUNK_SIZE = 16 * 1024 + +MAX_BODY = 1024 * (80 + 32) + weekdayname = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'] monthname = [None, diff --git a/gunicorn/worker.py b/gunicorn/worker.py index 0b481e35..81fac28e 100644 --- a/gunicorn/worker.py +++ b/gunicorn/worker.py @@ -33,7 +33,7 @@ import signal import socket import sys import tempfile - +import time from gunicorn import http from gunicorn import util @@ -44,27 +44,36 @@ class Worker(object): SIGNALS = map( lambda x: getattr(signal, "SIG%s" % x), - "HUP QUIT INT TERM TTIN TTOU USR1 USR2".split() + "HUP QUIT INT TERM TTIN TTOU USR1".split() ) def __init__(self, workerid, ppid, socket, app): self.id = workerid self.ppid = ppid - self.socket = socket - self.address = socket.getsockname() fd, tmpname = tempfile.mkstemp() self.tmp = os.fdopen(fd, "r+b") self.tmpname = tmpname + + # prevent inherientence + self.close_on_exec(socket) + self.close_on_exec(fd) + + self.socket = socket + self.address = socket.getsockname() + self.app = app self.alive = True + def close_on_exec(self, fd): + flags = fcntl.fcntl(fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC + fcntl.fcntl(fd, fcntl.F_SETFL, flags) + def init_signals(self): map(lambda s: signal.signal(s, signal.SIG_DFL), self.SIGNALS) signal.signal(signal.SIGQUIT, self.handle_quit) signal.signal(signal.SIGTERM, self.handle_exit) signal.signal(signal.SIGINT, self.handle_exit) signal.signal(signal.SIGUSR1, self.handle_quit) - signal.signal(signal.SIGUSR2, self.handle_quit) def handle_quit(self, sig, frame): self.alive = False @@ -88,38 +97,48 @@ class Worker(object): while self.alive: try: ret = select.select([self.socket], [], [], 2.0) + if ret[0]: + break except select.error, e: - if e[0] != errno.EINTR: - raise - if ret[0]: - break + if e[0] == errno.EINTR: + break + elif e[0] == errno.EBADF: + return + raise # Accept until we hit EAGAIN. We're betting that when we're # processing clients that more clients are waiting. When # there's no more clients waiting we go back to the select() # loop and wait for some lovin. while self.alive: + #time.sleep(0.01) try: - (conn, addr) = self.socket.accept() - except socket.error, e: - if e[0] in (errno.EAGAIN, errno.EINTR, - errno.ECONNABORTED): - break # Jump back to select - raise # Uh oh! - - conn.setblocking(1) - try: + conn, addr = self.socket.accept() + conn.setblocking(1) + + # Update the fd mtime on each client completion + # to signal that this worker process is alive. + spinner = (spinner+1) % 2 + self._fchmod(spinner) + + # handle connection self.handle(conn, addr) - except Exception, e: - log.exception("Error processing request. [%s]" % str(e)) - - # Update the fd mtime on each client completion - # to signal that this worker process is alive. - spinner = (spinner+1) % 2 - self._fchmod(spinner) + except socket.error, e: + if e[0] in [errno.EAGAIN, errno.ECONNABORTED]: + break # Uh oh! + raise + def handle(self, conn, client): - req = http.HTTPRequest(conn, client, self.address) - result = self.app(req.read(), req.start_response) - response = http.HTTPResponse(req, result) - response.send() + self.close_on_exec(conn) + try: + req = http.HTTPRequest(conn, client, self.address) + result = self.app(req.read(), req.start_response) + response = http.HTTPResponse(req, result) + response.send() + except Exception, e: + log.exception("Error processing request. [%s]" % str(e)) + if e[0] == 32: + raise + conn.send("HTTP/1.1 500 Internal Server Error\r\n\r\n") + conn.close()