new parser using StringIO, faster than concatenate strings. Lot of fixes

in TeeInput.
This commit is contained in:
benoitc 2010-03-06 20:47:39 +01:00
parent 54d1a8a5dc
commit c785be0780
7 changed files with 245 additions and 187 deletions

View File

@ -3,18 +3,24 @@
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
from StringIO import StringIO
import urlparse
from gunicorn.util import normalize_name
class BadStatusLine(Exception):
pass
class ParserError(Exception):
pass
class Parser(object):
_should_close = False
def __init__(self):
""" HTTP Parser compatible 1.0 & 1.1
This parser can parse HTTP requests and response.
"""
def __init__(self, ptype='request', should_close=False):
self.status_line = ""
self.status_int = None
self.reason = ""
self.status = ""
self.headers = []
self.headers_dict = {}
@ -28,7 +34,19 @@ class Parser(object):
self._content_len = None
self.start_offset = 0
self.chunk_size = 0
self._chunk_eof = False
self._chunk_eof = False
self.type = ptype
self._should_close = should_close
@classmethod
def parse_response(cls, should_close=False):
""" Return parser object for response"""
return cls(ptype='response', should_close=should_close)
@classmethod
def parse_request(cls):
""" return parser object for requests """
return cls(ptype='request')
def filter_headers(self, headers, buf):
""" take a string as buffer and an header dict
@ -36,14 +54,17 @@ class Parser(object):
if parsing isn't done. headers dict is updated
with new headers.
"""
i = buf.find("\r\n\r\n")
line = buf.getvalue()
i = line.find("\r\n\r\n")
if i != -1:
r = buf[:i]
r = line[:i]
pos = i+4
return self.finalize_headers(headers, r, pos)
return -1
buf2 = StringIO()
buf2.write(line[pos:])
return self.finalize_headers(headers, r, buf2)
return False
def finalize_headers(self, headers, headers_str, pos):
def finalize_headers(self, headers, headers_str, buf2):
""" parse the headers """
lines = headers_str.split("\r\n")
@ -56,7 +77,7 @@ class Parser(object):
_headers = {}
hname = ""
for line in lines:
if line.startswith("\t") or line.startswith(" "):
if line.startswith('\t') or line.startswith(' '):
headers[hname] += line.strip()
else:
try:
@ -68,31 +89,45 @@ class Parser(object):
headers.extend(list(_headers.items()))
self.headers = headers
self._content_len = int(_headers.get('Content-Length',0))
(_, _, self.path, self.query_string, self.fragment) = \
if self.type == 'request':
(_, _, self.path, self.query_string, self.fragment) = \
urlparse.urlsplit(self.raw_path)
return pos
return buf2
def _parse_version(self, version):
self.raw_version = version.strip()
try:
major, minor = self.raw_version.split("HTTP/")[1].split(".")
self.version = (int(major), int(minor))
except IndexError:
self.version = (1, 0)
def _first_line(self, line):
""" parse first line """
self.status = status = line.strip()
method, path, version = status.split(" ")
version = version.strip()
self.raw_version = version
self.status_line = status_line = line.strip()
try:
major, minor = version.split("HTTP/")[1].split(".")
version = (int(major), int(minor))
except IndexError:
version = (1, 0)
self.version = version
self.method = method.upper()
self.raw_path = path
if self.type == 'response':
version, self.status = status_line.split(None, 1)
self._parse_version(version)
try:
self.status_int, self.reason = self.status.split(None, 1)
except ValueError:
self.status_int = self.status
self.status_int = int(self.status_int)
else:
method, path, version = status_line.split(None, 2)
self._parse_version(version)
self.method = method.upper()
self.raw_path = path
except ValueError:
raise BadStatusLine(line)
def _parse_headerl(self, hdrs, line):
""" parse header line"""
name, value = line.split(":", 1)
name = normalize_name(name.strip())
name = name.strip().title()
value = value.rsplit("\r\n",1)[0].strip()
if name in hdrs:
hdrs[name] = "%s, %s" % (hdrs[name], value)
@ -108,7 +143,7 @@ class Parser(object):
return True
elif self.headers_dict.get("Connection") == "Keep-Alive":
return False
elif self.version < (1,0):
elif self.version <= (1, 0):
return True
return False
@ -139,11 +174,14 @@ class Parser(object):
return True
return False
def read_chunk(self, data):
def read_chunk(self, buf):
line = buf.getvalue()
buf2 = StringIO()
if not self.start_offset:
i = data.find("\r\n")
i = line.find("\r\n")
if i != -1:
chunk = data[:i].strip().split(";", 1)
chunk = line[:i].strip().split(";", 1)
chunk_size = int(chunk.pop(0), 16)
self.start_offset = i+2
self.chunk_size = chunk_size
@ -151,39 +189,47 @@ class Parser(object):
if self.start_offset:
if self.chunk_size == 0:
self._chunk_eof = True
ret = '', data[:self.start_offset]
return ret
buf2.write(line[:self.start_offset])
return '', buf2
else:
chunk = data[self.start_offset:self.start_offset+self.chunk_size]
chunk = line[self.start_offset:self.start_offset+self.chunk_size]
end_offset = self.start_offset + self.chunk_size + 2
# we wait CRLF else return None
if len(data) >= end_offset:
ret = chunk, data[end_offset:]
if buf.len >= end_offset:
buf2.write(line[end_offset:])
self.chunk_size = 0
return ret
return '', data
return chunk, buf2
return '', buf
def trailing_header(self, data):
i = data.find("\r\n\r\n")
def trailing_header(self, buf):
line = buf.getvalue()
i = line.find("\r\n\r\n")
return (i != -1)
def filter_body(self, data):
def filter_body(self, buf):
"""\
Filter body and return a tuple: (body_chunk, new_buffer)
Both can be None, and new_buffer is always None if its empty.
"""
dlen = len(data)
dlen = buf.len
chunk = ''
if self.is_chunked:
chunk, data = self.read_chunk(data)
try:
chunk, buf2 = self.read_chunk(buf)
except Exception, e:
raise ParserError("chunked decoding error [%s]" % str(e))
if not chunk:
return '', data
return '', buf
else:
buf2 = StringIO()
if self._content_len > 0:
nr = min(dlen, self._content_len)
chunk = data[:nr]
chunk = buf.getvalue()[:nr]
self._content_len -= nr
data = []
self.start_offset = 0
return (chunk, data)
buf2.seek(0, 2)
return (chunk, buf2)

View File

@ -6,14 +6,14 @@
import logging
import os
import re
import StringIO
from StringIO import StringIO
import sys
from urllib import unquote
from gunicorn import __version__
from gunicorn.http.parser import Parser
from gunicorn.http.tee import TeeInput
from gunicorn.util import CHUNK_SIZE, read_partial
from gunicorn.util import CHUNK_SIZE
NORMALIZE_SPACE = re.compile(r'(?:\r\n)?[ \t]+')
@ -26,7 +26,7 @@ class Request(object):
DEFAULTS = {
"wsgi.url_scheme": 'http',
"wsgi.input": StringIO.StringIO(),
"wsgi.input": StringIO(),
"wsgi.errors": sys.stderr,
"wsgi.version": (1, 0),
"wsgi.multithread": False,
@ -39,14 +39,14 @@ class Request(object):
def __init__(self, socket, client_address, server_address, conf):
self.debug = conf['debug']
self.conf = conf
self.socket = socket
self._sock = socket
self.client_address = client_address
self.server_address = server_address
self.response_status = None
self.response_headers = []
self._version = 11
self.parser = Parser()
self.parser = Parser.parse_request()
self.start_response_called = False
self.log = logging.getLogger(__name__)
self.response_chunked = False
@ -54,27 +54,30 @@ class Request(object):
def read(self):
environ = {}
headers = []
buf = read_partial(self.socket, CHUNK_SIZE)
i = self.parser.filter_headers(headers, buf)
if i == -1 and buf:
buf = StringIO()
data = self._sock.recv(CHUNK_SIZE)
buf.write(data)
buf2 = self.parser.filter_headers(headers, buf)
if not buf2:
while True:
data = read_partial(self.socket, CHUNK_SIZE)
if not data: break
buf += data
i = self.parser.filter_headers(headers, buf)
if i != -1:
break
data = self._sock.recv(CHUNK_SIZE)
if not data:
break
buf.write(data)
buf2 = self.parser.filter_headers(headers, buf)
if buf2:
break
self.log.debug("%s", self.parser.status)
self.log.debug("Headers:\n%s" % headers)
if self.parser.headers_dict.get('Expect','').lower() == "100-continue":
self.socket.send("HTTP/1.1 100 Continue\r\n\r\n")
self._sock.send("HTTP/1.1 100 Continue\r\n\r\n")
if not self.parser.content_len and not self.parser.is_chunked:
wsgi_input = StringIO.StringIO()
wsgi_input = StringIO()
else:
wsgi_input = TeeInput(self.socket, self.parser, buf[i:], self.conf)
wsgi_input = TeeInput(self._sock, self.parser, buf2, self.conf)
# This value should evaluate true if an equivalent application
# object may be simultaneously invoked by another process, and

View File

@ -9,7 +9,7 @@ class Response(object):
def __init__(self, sock, response, req):
self.req = req
self.sock = sock
self._sock = sock
self.data = response
self.headers = req.response_headers or []
self.status = req.response_status
@ -18,21 +18,18 @@ class Response(object):
def send(self):
# send headers
resp_head = []
resp_head.append("HTTP/1.1 %s\r\n" % (self.status))
resp_head.append("Server: %s\r\n" % self.SERVER_VERSION)
resp_head.append("Date: %s\r\n" % http_date())
# always close the connection
resp_head.append("Connection: close\r\n")
for name, value in self.headers:
resp_head.append("%s: %s\r\n" % (name, value))
write(self.sock, "%s\r\n" % "".join(resp_head))
resp_head = [
"HTTP/1.1 %s\r\n" % self.status,
"Server: %s\r\n" % self.SERVER_VERSION,
"Date: %s\r\n" % http_date(),
"Connection: close\r\n"
]
resp_head.extend(["%s: %s\r\n" % (n, v) for n, v in self.headers])
write(self._sock, "%s\r\n" % "".join(resp_head))
last_chunk = None
for chunk in list(self.data):
write(self.sock, chunk, self.chunked)
write(self._sock, chunk, self.chunked)
last_chunk = chunk
if self.chunked:
@ -40,7 +37,7 @@ class Response(object):
# send last chunk
write_chunk("")
close(self.sock)
close(self._sock)
if hasattr(self.data, "close"):
self.data.close()

View File

@ -1,63 +1,68 @@
# -*- coding: utf-8 -
#
# This file is part of gunicorn released under the MIT license.
# This file is part of restkit released under the MIT license.
# See the NOTICE for more information.
"""
TeeInput replace old FileInput. It use a file
if size > MAX_BODY or memory. It's now possible to rewind
read or restart etc ... It's based on TeeInput from unicorn.
read or restart etc ... It's based on TeeInput from Gunicorn.
"""
import os
import StringIO
from StringIO import StringIO
import tempfile
from gunicorn.util import MAX_BODY, CHUNK_SIZE, read_partial
from gunicorn import util
class TeeInput(object):
CHUNK_SIZE = util.CHUNK_SIZE
def __init__(self, socket, parser, buf, conf):
self.conf = conf
self.buf = buf
self.buf = StringIO()
self.parser = parser
self.socket = socket
self._sock = socket
self._is_socket = True
self._len = parser.content_len
if self._len and self._len < MAX_BODY:
self.tmp = StringIO.StringIO()
if self._len and self._len < util.MAX_BODY:
self.tmp = StringIO()
else:
self.tmp = tempfile.TemporaryFile(
dir=self.conf['tmp_upload_dir'])
if len(buf) > 0:
self.tmp = tempfile.TemporaryFile(dir=self.conf['tmp_upload_dir'])
if buf.len > 0:
chunk, self.buf = parser.filter_body(buf)
if chunk:
self.tmp.write(chunk)
self.tmp.flush()
self._finalize()
self.tmp.seek(0)
del buf
@property
def len(self):
if self._len: return self._len
if self._is_socket:
pos = self.tmp.tell()
self.tmp.seek(0, 2)
while True:
self.tmp.seek(self._tmp_size())
if not self._tee(CHUNK_SIZE):
break
if not self._tee(self.CHUNK_SIZE):
break
self.tmp.seek(pos)
self._len = self._tmp_size()
return self._len
def seek(self, offset, whence=0):
""" naive implementation of seek """
if self._is_socket:
self.tmp.seek(0, 2)
while True:
if not self._tee(CHUNK_SIZE):
if not self._tee(self.CHUNK_SIZE):
break
self.tmp.seek(offset, whence)
@ -68,47 +73,54 @@ class TeeInput(object):
""" read """
if not self._is_socket:
return self.tmp.read(length)
if length < 0:
r = self.tmp.read() or ""
buf = StringIO()
buf.write(self.tmp.read())
while True:
chunk = self._tee(CHUNK_SIZE)
if not chunk: break
r += chunk
return r
chunk = self._tee(self.CHUNK_SIZE)
if not chunk:
break
buf.write(chunk)
return buf.getvalue()
else:
dest = StringIO()
diff = self._tmp_size() - self.tmp.tell()
if not diff:
return self._ensure_length(self._tee(length), length)
dest.write(self._tee(length))
return self._ensure_length(dest, length)
else:
l = min(diff, length)
return self._ensure_length(self.tmp.read(l), length)
dest.write(self.tmp.read(l))
return self._ensure_length(dest, length)
def readline(self, size=-1):
if not self._is_socket:
return self.tmp.readline(size)
return self.tmp.readline()
orig_size = self._tmp_size()
if self.tmp.tell() == orig_size:
if not self._tee(CHUNK_SIZE):
if not self._tee(self.CHUNK_SIZE):
return ''
self.tmp.seek(orig_size)
# now we can get line
line = self.tmp.readline()
i = line.find("\n")
if i == -1:
while True:
orig_size = self.tmp.tell()
if not self._tee(CHUNK_SIZE):
break
self.tmp.seek(orig_size)
line += self.tmp.readline()
i = line.find("\n")
if i != -1:
break
return line
if line.find("\n") >=0:
return line
buf = StringIO()
buf.write(line)
while True:
orig_size = self.tmp.tell()
data = self._tee(self.CHUNK_SIZE)
if not data:
break
self.tmp.seek(orig_size)
buf.write(self.tmp.readline())
if data.find("\n") >= 0:
break
return buf.getvalue()
def readlines(self, sizehint=0):
total = 0
@ -134,41 +146,49 @@ class TeeInput(object):
def _tee(self, length):
""" fetch partial body"""
buf2 = self.buf
buf2.seek(0, 2)
while True:
chunk, self.buf = self.parser.filter_body(self.buf)
chunk, buf2 = self.parser.filter_body(buf2)
if chunk:
self.tmp.write(chunk)
self.tmp.flush()
self.tmp.seek(0, os.SEEK_END)
self.tmp.seek(0, 2)
self.buf = StringIO()
self.buf.write(buf2.getvalue())
return chunk
if self.parser.body_eof():
break
self.buf = read_partial(self.socket, length, self.buf)
data = self._sock.recv(length)
buf2.write(data)
self._finalize()
return ""
def _finalize(self):
""" here we wil fetch final trailers
if any."""
if self.parser.body_eof():
del self.buf
self._is_socket = False
def _tmp_size(self):
if isinstance(self.tmp, StringIO.StringIO):
if isinstance(self.tmp, StringIO):
return self.tmp.len
else:
return int(os.fstat(self.tmp.fileno())[6])
def _ensure_length(self, buf, length):
if not buf or not self._len:
return buf
def _ensure_length(self, dest, length):
if not dest.len or not self._len:
return dest.getvalue()
while True:
if len(buf) >= length:
if dest.len >= length:
break
data = self._tee(length - len(buf))
if not data: break
buf += data
return buf
data = self._tee(length - dest.len)
if not data:
break
dest.write(data)
return dest.getvalue()

View File

@ -88,13 +88,6 @@ def close(sock):
sock.close()
except socket.error:
pass
def read_partial(sock, length, buf=None):
tmp_buf = sock.recv(length)
if not buf:
return tmp_buf
return buf + tmp_buf
def write_chunk(sock, data):
chunk = "".join(("%X\r\n" % len(data), data, "\r\n"))

View File

@ -8,8 +8,8 @@ import t
@t.request("001.http")
def test_001(buf, p):
headers = []
i = p.filter_headers(headers, buf)
t.ne(i, -1)
buf2 = p.filter_headers(headers, buf)
t.ne(buf2, False)
t.eq(p.method, "PUT")
t.eq(p.version, (1,0))
@ -20,15 +20,15 @@ def test_001(buf, p):
('Content-Type', 'application/json'),
('Server', 'http://127.0.0.1:5984')
])
body, tr = p.filter_body(buf[i:])
body, tr = p.filter_body(buf2)
t.eq(body, '{"nom": "nom"}')
t.eq(p.body_eof(), True)
@t.request("002.http")
def test_002(buf, p):
headers = []
i = p.filter_headers(headers, buf)
t.ne(i, -1)
buf2 = p.filter_headers(headers, buf)
t.ne(buf2, False)
t.eq(p.method, "GET")
t.eq(p.version, (1, 1))
@ -39,14 +39,14 @@ def test_002(buf, p):
("Host", "0.0.0.0=5000"),
("User-Agent", "curl/7.18.0 (i486-pc-linux-gnu) libcurl/7.18.0 OpenSSL/0.9.8g zlib/1.2.3.3 libidn/1.1")
])
body, tr = p.filter_body(buf[i:])
body, tr = p.filter_body(buf2)
t.eq(body, "")
@t.request("003.http")
def test_003(buf, p):
headers = []
i = p.filter_headers(headers, buf)
t.ne(i, -1)
buf2 = p.filter_headers(headers, buf)
t.ne(buf2, False)
t.eq(p.method, "GET")
t.eq(p.version, (1, 1))
t.eq(p.path, "/favicon.ico")
@ -61,28 +61,28 @@ def test_003(buf, p):
("Keep-Alive", "300"),
("User-Agent", "Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.9) Gecko/2008061015 Firefox/3.0"),
])
body, tr = p.filter_body(buf[i:])
body, tr = p.filter_body(buf2)
t.eq(body, "")
@t.request("004.http")
def test_004(buf, p):
headers = []
i = p.filter_headers(headers, buf)
t.ne(i, -1)
buf2 = p.filter_headers(headers, buf)
t.ne(buf2, False)
t.eq(p.method, "GET")
t.eq(p.version, (1, 1))
t.eq(p.path, "/dumbfuck")
t.eq(p.query_string, "")
t.eq(p.headers, [("Aaaaaaaaaaaaa", "++++++++++")])
body, tr = p.filter_body(buf[i:])
body, tr = p.filter_body(buf2)
t.eq(body, "")
@t.request("005.http")
def test_005(buf, p):
headers = []
i = p.filter_headers(headers, buf)
t.ne(i, -1)
buf2 = p.filter_headers(headers, buf)
t.ne(buf2, False)
t.eq(p.method, "GET")
t.eq(p.version, (1, 1))
t.eq(p.path, "/forums/1/topics/2375")
@ -90,55 +90,55 @@ def test_005(buf, p):
t.eq(p.fragment, "posts-17408")
body, tr = p.filter_body(buf[i:])
body, tr = p.filter_body(buf2)
t.eq(body, "")
@t.request("006.http")
def test_006(buf, p):
headers = []
i = p.filter_headers(headers, buf)
t.ne(i, -1)
buf2 = p.filter_headers(headers, buf)
t.ne(buf2, False)
t.eq(p.method, "GET")
t.eq(p.version, (1, 1))
t.eq(p.path, "/get_no_headers_no_body/world")
t.eq(p.query_string, "")
t.eq(p.fragment, "")
body, tr = p.filter_body(buf[i:])
body, tr = p.filter_body(buf2)
t.eq(body, "")
@t.request("007.http")
def test_007(buf, p):
headers = []
i = p.filter_headers(headers, buf)
t.ne(i, -1)
buf2 = p.filter_headers(headers, buf)
t.ne(buf2, False)
t.eq(p.method, "GET")
t.eq(p.version, (1, 1))
t.eq(p.path, "/get_one_header_no_body")
t.eq(p.query_string, "")
t.eq(p.fragment, "")
t.eq(p.headers, [('Accept', '*/*')])
body, tr = p.filter_body(buf[i:])
body, tr = p.filter_body(buf2)
t.eq(body, "")
@t.request("008.http")
def test_008(buf, p):
headers = []
i = p.filter_headers(headers, buf)
t.ne(i, -1)
buf2 = p.filter_headers(headers, buf)
t.ne(buf2, False)
t.eq(p.method, "GET")
t.eq(p.version, (1, 0))
t.eq(p.path, "/get_funky_content_length_body_hello")
t.eq(p.query_string, "")
t.eq(p.fragment, "")
t.eq(p.headers, [('Content-Length', '5')])
body, tr = p.filter_body(buf[i:])
body, tr = p.filter_body(buf2)
t.eq(body, "HELLO")
@t.request("009.http")
def test_009(buf, p):
headers = []
i = p.filter_headers(headers, buf)
t.ne(i, -1)
buf2 = p.filter_headers(headers, buf)
t.ne(buf2, False)
t.eq(p.method, "POST")
t.eq(p.version, (1, 1))
t.eq(p.path, "/post_identity_body_world")
@ -149,14 +149,14 @@ def test_009(buf, p):
('Content-Length', '5'),
('Transfer-Encoding', 'identity')
])
body, tr = p.filter_body(buf[i:])
body, tr = p.filter_body(buf2)
t.eq(body, "World")
@t.request("010.http")
def test_010(buf, p):
headers = []
i = p.filter_headers(headers, buf)
t.ne(i, -1)
buf2 = p.filter_headers(headers, buf)
t.ne(buf2, False)
t.eq(p.method, "POST")
t.eq(p.version, (1, 1))
t.eq(p.path, "/post_chunked_all_your_base")
@ -165,9 +165,8 @@ def test_010(buf, p):
t.eq(p._chunk_eof, False)
t.ne(p.body_eof(), True)
body = ""
buf = buf[i:]
while not p.body_eof():
chunk, buf = p.filter_body(buf)
chunk, buf2 = p.filter_body(buf2)
print chunk
if chunk:
body += chunk
@ -176,8 +175,8 @@ def test_010(buf, p):
@t.request("011.http")
def test_011(buf, p):
headers = []
i = p.filter_headers(headers, buf)
t.ne(i, -1)
buf2 = p.filter_headers(headers, buf)
t.ne(buf2, False)
t.eq(p.method, "POST")
t.eq(p.version, (1, 1))
t.eq(p.path, "/two_chunks_mult_zero_end")
@ -186,18 +185,17 @@ def test_011(buf, p):
t.eq(p._chunk_eof, False)
t.ne(p.body_eof(), True)
body = ""
buf = buf[i:]
while not p.body_eof():
chunk, buf = p.filter_body(buf)
chunk, buf2 = p.filter_body(buf2)
if chunk:
body += chunk
t.eq(body, "hello world")
@t.request("017.http")
def test_013(buf, p):
def test_012(buf, p):
headers = []
i = p.filter_headers(headers, buf)
t.ne(i, -1)
buf2 = p.filter_headers(headers, buf)
t.ne(buf2, False)
t.eq(p.method, "GET")
t.eq(p.version, (1, 0))
t.eq(p.path, "/stuff/here")

View File

@ -6,6 +6,7 @@
import array
import os
from StringIO import StringIO
import tempfile
dirname = os.path.dirname(__file__)
@ -15,12 +16,12 @@ from gunicorn.http.request import Request
from gunicorn.config import Config
def data_source(fname):
buf = StringIO()
with open(fname) as handle:
lines = []
for line in handle:
line = line.rstrip("\n").replace("\\r\\n", "\r\n")
lines.append(line)
return "".join(lines)
buf.write(line)
return buf
class request(object):
def __init__(self, name):
@ -36,10 +37,10 @@ class request(object):
class FakeSocket(object):
def __init__(self, data=""):
def __init__(self, data):
self.tmp = tempfile.TemporaryFile()
if data:
self.tmp.write(data)
self.tmp.write(data.getvalue())
self.tmp.flush()
self.tmp.seek(0)