Playing with simplehttp.

This commit is contained in:
Paul J. Davis 2010-04-27 15:00:37 -04:00
parent 4a519f9d61
commit f02cbc10ed
8 changed files with 224 additions and 372 deletions

View File

@ -1,18 +0,0 @@
# -*- coding: utf-8 -
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
from gunicorn.http.parser import Parser
from gunicorn.http.request import Request, KeepAliveRequest, RequestError
from gunicorn.http.response import Response, KeepAliveResponse
__all__ = [
Parser,
Request,
KeepAliveRequest,
RequestError,
Response,
KeepAliveResponse
]

View File

@ -1,239 +0,0 @@
# -*- coding: utf-8 -
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
import urlparse
class BadStatusLine(Exception):
pass
class ParserError(Exception):
pass
class Parser(object):
""" HTTP Parser compatible 1.0 & 1.1
This parser can parse HTTP requests and response.
"""
def __init__(self, ptype='request', should_close=False):
self.status_line = ""
self.status_int = None
self.reason = ""
self.status = ""
self.headers = []
self.headers_dict = {}
self.raw_version = "HTTP/1.0"
self.raw_path = ""
self.version = (1,0)
self.method = ""
self.path = ""
self.query_string = ""
self.fragment = ""
self._content_len = None
self.start_offset = 0
self.chunk_size = 0
self._chunk_eof = False
self.type = ptype
self._should_close = should_close
@classmethod
def parse_response(cls, should_close=False):
""" Return parser object for response"""
return cls(ptype='response', should_close=should_close)
@classmethod
def parse_request(cls):
""" return parser object for requests """
return cls(ptype='request')
def filter_headers(self, headers, buf):
""" take a string as buffer and an header dict
(empty or not). It return new position or -1
if parsing isn't done. headers dict is updated
with new headers.
"""
line = buf.getvalue()
i = line.find("\r\n\r\n")
if i != -1:
r = line[:i]
pos = i+4
buf2 = StringIO()
buf2.write(line[pos:])
return self.finalize_headers(headers, r, buf2)
return False
def finalize_headers(self, headers, headers_str, buf2):
""" parse the headers """
lines = headers_str.split("\r\n")
# parse first line of headers
self._first_line(lines.pop(0))
# parse headers. We silently ignore
# bad headers' lines
_headers = {}
hname = ""
for line in lines:
if line.startswith('\t') or line.startswith(' '):
headers[hname] += line.strip()
else:
try:
hname =self._parse_headerl(_headers, line)
except ValueError:
# bad headers
pass
self.headers_dict = _headers
headers.extend(list(_headers.items()))
self.headers = headers
self._content_len = int(_headers.get('Content-Length',0))
if self.type == 'request':
(_, _, self.path, self.query_string, self.fragment) = \
urlparse.urlsplit(self.raw_path)
return buf2
def _parse_version(self, version):
self.raw_version = version.strip()
try:
major, minor = self.raw_version.split("HTTP/")[1].split(".")
self.version = (int(major), int(minor))
except IndexError:
self.version = (1, 0)
def _first_line(self, line):
""" parse first line """
self.status_line = status_line = line.strip()
try:
if self.type == 'response':
version, self.status = status_line.split(None, 1)
self._parse_version(version)
try:
self.status_int, self.reason = self.status.split(None, 1)
except ValueError:
self.status_int = self.status
self.status_int = int(self.status_int)
else:
method, path, version = status_line.split(None, 2)
self._parse_version(version)
self.method = method.upper()
self.raw_path = path
except ValueError:
raise BadStatusLine(line)
def _parse_headerl(self, hdrs, line):
""" parse header line"""
name, value = line.split(":", 1)
name = name.strip().title()
value = value.rsplit("\r\n",1)[0].strip()
if name in hdrs:
hdrs[name] = "%s, %s" % (hdrs[name], value)
else:
hdrs[name] = value
return name
@property
def should_close(self):
if self._should_close:
return True
elif self.headers_dict.get("Connection") == "close":
return True
elif self.headers_dict.get("Connection") == "Keep-Alive":
return False
elif self.version <= (1, 0):
return True
return False
@property
def is_chunked(self):
""" is TE: chunked ?"""
return (self.headers_dict.get('Transfer-Encoding') == "chunked")
@property
def content_len(self):
""" return content length as integer or
None."""
transfert_encoding = self.headers_dict.get('Transfer-Encoding')
content_length = self.headers_dict.get('Content-Length')
if transfert_encoding != "chunked":
if content_length is None:
return 0
return int(content_length)
else:
return None
def body_eof(self):
"""do we have all the body ?"""
if self.is_chunked:
if self._chunk_eof:
return True
elif self._content_len == 0:
return True
return False
def read_chunk(self, buf):
line = buf.getvalue()
buf2 = StringIO()
if not self.start_offset:
i = line.find("\r\n")
if i != -1:
chunk = line[:i].strip().split(";", 1)
chunk_size = int(chunk.pop(0), 16)
self.start_offset = i+2
self.chunk_size = chunk_size
if self.start_offset:
if self.chunk_size == 0:
self._chunk_eof = True
buf2.write(line[:self.start_offset])
return '', buf2
else:
chunk = line[self.start_offset:self.start_offset+self.chunk_size]
end_offset = self.start_offset + self.chunk_size + 2
# we wait CRLF else return None
if len(buf.getvalue()) >= end_offset:
buf2.write(line[end_offset:])
self.chunk_size = 0
return chunk, buf2
return '', buf
def trailing_header(self, buf):
line = buf.getvalue()
i = line.find("\r\n\r\n")
return (i != -1)
def filter_body(self, buf):
"""\
Filter body and return a tuple: (body_chunk, new_buffer)
Both can be None, and new_buffer is always None if its empty.
"""
dlen = len(buf.getvalue())
chunk = ''
if self.is_chunked:
try:
chunk, buf2 = self.read_chunk(buf)
except Exception, e:
raise ParserError("chunked decoding error [%s]" % str(e))
if not chunk:
return '', buf
else:
buf2 = StringIO()
if self._content_len > 0:
nr = min(dlen, self._content_len)
chunk = buf.getvalue()[:nr]
self._content_len -= nr
self.start_offset = 0
buf2.seek(0, 2)
return (chunk, buf2)

View File

@ -1,73 +0,0 @@
# -*- coding: utf-8 -
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
from gunicorn.util import http_date, write, write_chunk, is_hoppish
class Response(object):
def __init__(self, req, status, headers):
self.req = req
self.version = req.SERVER_VERSION
self.status = status
self.chunked = False
self.headers = []
self.headers_sent = False
for name, value in headers:
assert isinstance(name, basestring), "%r is not a string" % name
if is_hoppish(name):
lname = name.lower().strip()
if lname == "transfer-encoding":
if value.lower().strip() == "chunked":
self.chunked = True
elif lname == "connection":
# handle websocket
if value.lower().strip() != "upgrade":
continue
else:
# ignore hopbyhop headers
continue
self.headers.append((name.strip(), str(value).strip()))
def default_headers(self):
return [
"HTTP/1.1 %s\r\n" % self.status,
"Server: %s\r\n" % self.version,
"Date: %s\r\n" % http_date(),
"Connection: close\r\n"
]
def send_headers(self):
if self.headers_sent:
return
tosend = self.default_headers()
tosend.extend(["%s: %s\r\n" % (n, v) for n, v in self.headers])
write(self.req.socket, "%s\r\n" % "".join(tosend))
self.headers_sent = True
def write(self, arg):
self.send_headers()
assert isinstance(arg, basestring), "%r is not a string." % arg
write(self.req.socket, arg, self.chunked)
def close(self):
if not self.headers_sent:
self.send_headers()
if self.chunked:
write_chunk(self.req.socket, "")
class KeepAliveResponse(Response):
def default_headers(self):
connection = "keep-alive"
if self.req.req.should_close():
connection = "close"
return [
"HTTP/1.1 %s\r\n" % self.status,
"Server: %s\r\n" % self.version,
"Date: %s\r\n" % http_date(),
"Connection: %s\r\n" % connection
]

View File

@ -7,11 +7,12 @@ import errno
import socket
import traceback
from gunicorn import http
from gunicorn.http.tee import UnexpectedEOF
from gunicorn import util
import gunicorn.util as util
import gunicorn.wsgi as wsgi
from gunicorn.workers.base import Worker
from simplehttp import RequestParser
ALREADY_HANDLED = object()
class AsyncWorker(Worker):
@ -20,12 +21,21 @@ class AsyncWorker(Worker):
Worker.__init__(self, *args, **kwargs)
self.worker_connections = self.cfg.worker_connections
def keepalive_request(self, client, addr):
return http.KeepAliveRequest(self.cfg, client, addr, self.address)
def timeout(self):
raise NotImplementedError()
def handle(self, client, addr):
try:
while self.handle_request(client, addr):
parser = RequestParser(client)
try:
while True:
req = None
with self.timeout():
req = parser.next()
if not req:
break
self.handle_request(req, client, addr)
except StopIteration:
pass
except socket.error, e:
if e[0] not in (errno.EPIPE, errno.ECONNRESET):
@ -49,24 +59,20 @@ class AsyncWorker(Worker):
finally:
util.close(client)
def handle_request(self, client, addr):
req = self.keepalive_request(client, addr)
if not req:
return False
def handle_request(self, req, sock, addr):
try:
environ = req.read()
if not environ:
return False
respiter = self.wsgi(environ, req.start_response)
debug = self.cfg.get("debug", False)
resp, environ = wsgi.create(req, sock, addr, self.address, debug)
respiter = self.app(environ, resp.start_response)
if respiter == ALREADY_HANDLED:
return False
for item in respiter:
req.response.write(item)
req.response.close()
resp.write(item)
resp.close()
if hasattr(respiter, "close"):
respiter.close()
if req.req.should_close():
return False
if req.should_close():
raise StopIteration()
except Exception, e:
#Only send back traceback in HTTP in debug mode.
if not self.debug:

View File

@ -32,11 +32,8 @@ class EventletWorker(AsyncWorker):
hubs.use_hub()
super(EventletWorker, self).init_process()
def keepalive_request(self, client, addr):
req = None
with eventlet.Timeout(self.cfg.keepalive, False):
req = super(EventletWorker, self).keepalive_request(client, addr)
return req
def timeout(self):
return eventlet.Timeout(self.cfg.keepalive, False)
def run(self):
self.socket.setblocking(1)

View File

@ -22,11 +22,8 @@ class GEventWorker(AsyncWorker):
from gevent import monkey
monkey.patch_all(dns=False)
def keepalive_request(self, client, addr):
req = None
with gevent.Timeout(self.cfg.keepalive, False):
req = super(GEventWorker, self).keepalive_request(client, addr)
return req
def timeout(self):
return gevent.Timeout(self.cfg.keepalive, False)
def run(self):
self.socket.setblocking(1)

View File

@ -10,8 +10,10 @@ import select
import socket
import traceback
from gunicorn import http, util
from gunicorn.http.tee import UnexpectedEOF
from simplehttp import RequestParser
import gunicorn.util as util
import gunicorn.wsgi as wsgi
from gunicorn.workers.base import Worker
class SyncWorker(Worker):
@ -69,35 +71,33 @@ class SyncWorker(Worker):
def handle(self, client, addr):
try:
self.handle_request(client, addr)
parser = RequestParser(client)
req = parser.next()
self.handle_request(req, client, addr)
except socket.error, e:
if e[0] != errno.EPIPE:
self.log.exception("Error processing request.")
else:
self.log.warn("Ignoring EPIPE")
except UnexpectedEOF:
self.log.exception("Client closed the connection unexpectedly.")
except Exception, e:
self.log.exception("Error processing request.")
try:
# Last ditch attempt to notify the client of an error.
mesg = "HTTP/1.0 500 Internal Server Error\r\n\r\n"
mesg = "HTTP/1.1 500 Internal Server Error\r\n\r\n"
util.write_nonblock(client, mesg)
except:
pass
finally:
util.close(client)
def handle_request(self, client, addr):
req = http.Request(self.cfg, client, addr, self.address)
def handle_request(self, req, client, addr):
try:
environ = req.read()
if not environ:
return
respiter = self.wsgi(environ, req.start_response)
debug = self.cfg.get("debug", False)
resp, environ = wsgi.create(req, client, addr, self.address, debug)
respiter = self.app(environ, resp.start_response)
for item in respiter:
req.response.write(item)
req.response.close()
resp.write(item)
resp.close()
if hasattr(respiter, "close"):
respiter.close()
except socket.error:

182
gunicorn/wsgi.py Normal file
View File

@ -0,0 +1,182 @@
# -*- coding: utf-8 -
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
import errno
import logging
import os
import re
import socket
import sys
from urllib import unquote
from gunicorn import __version__
import gunicorn.util as util
SERVER_VERSION = "gunicorn/%s" % __version__
NORMALIZE_SPACE = re.compile(r'(?:\r\n)?[ \t]+')
log = logging.getLogger(__name__)
def create(req, sock, client, server, debug=False):
resp = Response(req, sock)
environ = {}
# authors should be aware that REMOTE_HOST and REMOTE_ADDR
# may not qualify the remote addr:
# http://www.ietf.org/rfc/rfc3875
client = client or "127.0.0.1"
forward = client
script_name = os.environ.get("SCRIPT_NAME", "")
content_type = ""
content_length = ""
for hdr_name, hdr_value in req.headers:
name = hdr_name.lower()
if name == "expect":
# handle expect
if hdr_value.lower() == "100-continue":
self.socket.send("HTTP/1.1 100 Continue\r\n\r\n")
elif name == "x-forwarded-for":
forward_address = hdr_value
elif name == "host":
host = hdr_value
elif name == "script_name":
script_name = hdr_value
elif name == "content-type":
content_type = hdr_value
elif name == "content-length":
content_length = hdr_value
else:
continue
# This value should evaluate true if an equivalent application
# object may be simultaneously invoked by another process, and
# should evaluate false otherwise. In debug mode we fall to one
# worker so we comply to pylons and other paster app.
wsgi_multiprocess = (debug == False)
if isinstance(forward, basestring):
# we only took the last one
# http://en.wikipedia.org/wiki/X-Forwarded-For
if forward.find(",") >= 0:
forward = forward.rsplit(",", 1)[1].strip()
remote = forward.split(":")
if len(remote) == 1:
remote.append('')
else:
remote = forward
if isinstance(server, basestring):
server = server.split(":")
if len(server) == 1:
server.append('')
path_info = req.path
if script_name:
path_info = path_info.split(script_name, 1)[1]
environ = {
"wsgi.url_scheme": 'http',
"wsgi.input": req.body,
"wsgi.errors": sys.stderr,
"wsgi.version": (1, 0),
"wsgi.multithread": False,
"wsgi.multiprocess": wsgi_multiprocess,
"wsgi.run_once": False,
"SCRIPT_NAME": script_name,
"SERVER_SOFTWARE": SERVER_VERSION,
"REQUEST_METHOD": req.method,
"PATH_INFO": unquote(path_info),
"QUERY_STRING": req.query,
"RAW_URI": req.path,
"CONTENT_TYPE": content_type,
"CONTENT_LENGTH": content_length,
"REMOTE_ADDR": remote[0],
"REMOTE_PORT": str(remote[1]),
"SERVER_NAME": server[0],
"SERVER_PORT": str(server[1]),
"SERVER_PROTOCOL": req.version
}
for key, value in req.headers:
key = 'HTTP_' + key.upper().replace('-', '_')
if key not in ('HTTP_CONTENT_TYPE', 'HTTP_CONTENT_LENGTH'):
environ[key] = value
return resp, environ
class Response(object):
def __init__(self, req, sock):
self.req = req
self.sock = sock
self.version = SERVER_VERSION
self.status = None
self.chunked = False
self.headers = []
self.headers_sent = False
def start_response(self, status, headers, exc_info=None):
if exc_info:
try:
if self.status and self.headers_sent:
raise exc_info[0], exc_info[1], exc_info[2]
finally:
exc_info = None
elif self.status is not None:
raise AssertionError("Response headers already set!")
self.status = status
self.process_headers(headers)
return self.write
def process_headers(self, headers):
for name, value in headers:
assert isinstance(name, basestring), "%r is not a string" % name
if util.is_hoppish(name):
lname = name.lower().strip()
if lname == "transfer-encoding":
if value.lower().strip() == "chunked":
self.chunked = True
elif lname == "connection":
# handle websocket
if value.lower().strip() != "upgrade":
continue
else:
# ignore hopbyhop headers
continue
self.headers.append((name.strip(), str(value).strip()))
def default_headers(self):
connection = "keep-alive"
if self.req.should_close():
connection = "close"
return [
"HTTP/1.1 %s\r\n" % self.status,
"Server: %s\r\n" % self.version,
"Date: %s\r\n" % util.http_date(),
"Connection: %s\r\n" % connection
]
def send_headers(self):
if self.headers_sent:
return
tosend = self.default_headers()
tosend.extend(["%s: %s\r\n" % (n, v) for n, v in self.headers])
util.write(self.sock, "%s\r\n" % "".join(tosend))
self.headers_sent = True
def write(self, arg):
self.send_headers()
assert isinstance(arg, basestring), "%r is not a string." % arg
util.write(self.sock, arg, self.chunked)
def close(self):
if not self.headers_sent:
self.send_headers()
if self.chunked:
util.write_chunk(self.sock, "")