merge grainbows & gunicorn

This commit is contained in:
benoitc 2010-03-17 00:18:16 +01:00
parent 06d58ae1c4
commit 0781caadab
18 changed files with 685 additions and 16 deletions

View File

@ -0,0 +1,29 @@
# -*- coding: utf-8 -
#
# This file is part of grainbows released under the MIT license.
# See the NOTICE for more information.
import eventlet
import time
eventlet.monkey_patch(all=False, os=True, select=True, socket=True)
class TestIter(object):
def __iter__(self):
lines = ['line 1\n', 'line 2\n']
for line in lines:
yield line
time.sleep(10)
def app(environ, start_response):
"""Application which cooperatively pauses 10 seconds before responding"""
data = 'Hello, World!\n'
status = '200 OK'
response_headers = [
('Content-type','text/plain'),
('Transfer-Encoding', "chunked"),
]
print 'request received'
start_response(status, response_headers)
return TestIter()

24
examples/eventlet_test.py Normal file
View File

@ -0,0 +1,24 @@
# -*- coding: utf-8 -
#
# This file is part of grainbows released under the MIT license.
# See the NOTICE for more information.
from wsgiref.validate import validator
import eventlet
eventlet.monkey_patch(all=False, os=False, select=True, socket=True)
@validator
def app(environ, start_response):
"""Application which cooperatively pauses 10 seconds before responding"""
data = 'Hello, World!\n'
status = '200 OK'
response_headers = [
('Content-type','text/plain'),
('Content-Length', str(len(data))) ]
print 'request received, pausing 10 seconds'
eventlet.sleep(10)
start_response(status, response_headers)
return iter([data])

View File

@ -0,0 +1,26 @@
# -*- coding: utf-8 -
#
# This file is part of grainbows released under the MIT license.
# See the NOTICE for more information.
import time
class TestIter(object):
def __iter__(self):
lines = ['line 1', 'line 2']
for line in lines:
yield line
time.sleep(10)
def app(environ, start_response):
"""Application which cooperatively pauses 10 seconds before responding"""
data = 'Hello, World!\n'
status = '200 OK'
response_headers = [
('Content-type','text/plain'),
('Transfer-Encoding', "chunked"),
]
print 'request received'
start_response(status, response_headers)
return TestIter()

View File

@ -0,0 +1,141 @@
# -*- coding: utf-8 -
#
# This file is part of grainbow released under the MIT license.
# See the NOTICE for more information.
#
# Example code from Eventlet sources
import collections
import errno
from grainbows.worker_base import ALREADY_HANDLED
from eventlet import pools
import socket
import gevent
from gevent.pool import Pool
class WebSocketWSGI(object):
def __init__(self, handler, origin):
self.handler = handler
self.origin = origin
def verify_client(self, ws):
pass
def __call__(self, environ, start_response):
if not (environ['HTTP_CONNECTION'] == 'Upgrade' and
environ['HTTP_UPGRADE'] == 'WebSocket'):
# need to check a few more things here for true compliance
start_response('400 Bad Request', [('Connection','close')])
return []
sock = environ['wsgi.input'].get_socket()
ws = WebSocket(sock,
environ.get('HTTP_ORIGIN'),
environ.get('HTTP_WEBSOCKET_PROTOCOL'),
environ.get('PATH_INFO'))
self.verify_client(ws)
handshake_reply = ("HTTP/1.1 101 Web Socket Protocol Handshake\r\n"
"Upgrade: WebSocket\r\n"
"Connection: Upgrade\r\n"
"WebSocket-Origin: %s\r\n"
"WebSocket-Location: ws://%s%s\r\n\r\n" % (
self.origin,
environ.get('HTTP_HOST'),
ws.path))
sock.sendall(handshake_reply)
try:
self.handler(ws)
except socket.error, e:
if e[0] != errno.EPIPE:
raise
# use this undocumented feature of grainbows to ensure that it
# doesn't barf on the fact that we didn't call start_response
return ALREADY_HANDLED
def parse_messages(buf):
""" Parses for messages in the buffer *buf*. It is assumed that
the buffer contains the start character for a message, but that it
may contain only part of the rest of the message. NOTE: only understands
lengthless messages for now.
Returns an array of messages, and the buffer remainder that didn't contain
any full messages."""
msgs = []
end_idx = 0
while buf:
assert ord(buf[0]) == 0, "Don't understand how to parse this type of message: %r" % buf
end_idx = buf.find("\xFF")
if end_idx == -1:
break
msgs.append(buf[1:end_idx].decode('utf-8', 'replace'))
buf = buf[end_idx+1:]
return msgs, buf
def format_message(message):
# TODO support iterable messages
if isinstance(message, unicode):
message = message.encode('utf-8')
elif not isinstance(message, str):
message = str(message)
packed = "\x00%s\xFF" % message
return packed
class WebSocket(object):
def __init__(self, sock, origin, protocol, path):
self.sock = sock
self.origin = origin
self.protocol = protocol
self.path = path
self._buf = ""
self._msgs = collections.deque()
def send(self, message):
packed = format_message(message)
# if two greenthreads are trying to send at the same time
# on the same socket, sendlock prevents interleaving and corruption
self.sock.sendall(packed)
def wait(self):
while not self._msgs:
# no parsed messages, must mean buf needs more data
delta = self.sock.recv(1024)
if delta == '':
return None
self._buf += delta
msgs, self._buf = parse_messages(self._buf)
self._msgs.extend(msgs)
return self._msgs.popleft()
# demo app
import os
import random
def handle(ws):
""" This is the websocket handler function. Note that we
can dispatch based on path in here, too."""
if ws.path == '/echo':
while True:
m = ws.wait()
if m is None:
break
ws.send(m)
elif ws.path == '/data':
for i in xrange(10000):
ws.send("0 %s %s\n" % (i, random.random()))
gevent.sleep(0.1)
wsapp = WebSocketWSGI(handle, 'http://localhost:8000')
def app(environ, start_response):
""" This resolves to the web page or the websocket depending on
the path."""
if environ['PATH_INFO'] == '/' or environ['PATH_INFO'] == "":
data = open(os.path.join(
os.path.dirname(__file__),
'websocket.html')).read()
start_response('200 OK', [('Content-Type', 'text/html'),
('Content-Length', len(data))])
return [data]
else:
return wsapp(environ, start_response)

View File

@ -0,0 +1,2 @@
arbiter = "egg:gunicorn#eventlet"
worker_connections = 1000

View File

@ -0,0 +1,17 @@
# -*- coding: utf-8 -
#
# This file is part of grainbows released under the MIT license.
# See the NOTICE for more information.
from wsgiref.validate import validator
def app(environ, start_response):
"""Application which cooperatively pauses 10 seconds before responding"""
data = 'Hello, World!\n'
status = '200 OK'
response_headers = [
('Content-type','text/plain'),
('Content-Length', str(len(data))) ]
start_response(status, response_headers)
return iter([data])

44
examples/websocket.html Normal file
View File

@ -0,0 +1,44 @@
<!DOCTYPE html>
<html>
<head>
<!-- idea and code swiped from
http://assorted.svn.sourceforge.net/viewvc/assorted/real-time-plotter/trunk/src/rtp.html?view=markup -->
<script src="http://ajax.googleapis.com/ajax/libs/jquery/1.4.1/jquery.min.js"></script>
<script src="http://people.iola.dk/olau/flot/jquery.flot.js"></script>
<script>
window.onload = function() {
var data = {};
var s = new WebSocket("ws://localhost:8000/data");
s.onopen = function() {
//alert('open');
s.send('hi');
};
s.onmessage = function(e) {
//alert('got ' + e.data);
var lines = e.data.split('\n');
for (var i = 0; i < lines.length - 1; i++) {
var parts = lines[i].split(' ');
var d = parts[0], x = parseFloat(parts[1]), y = parseFloat(parts[2]);
if (!(d in data)) data[d] = [];
data[d].push([x,y]);
}
var plots = [];
for (var d in data) plots.push( { data: data[d].slice(data[d].length - 200) } );
$.plot( $("#holder"), plots,
{
series: {
lines: { show: true, fill: true },
},
yaxis: { min: 0 },
} );
s.send('');
};
};
</script>
</head>
<body>
<h3>Plot</h3>
<div id="holder" style="width:600px;height:300px"></div>
</body>
</html>

147
examples/websocket.py Normal file
View File

@ -0,0 +1,147 @@
# -*- coding: utf-8 -
#
# This file is part of grainbow released under the MIT license.
# See the NOTICE for more information.
#
# Example code from Eventlet sources
import collections
import errno
from grainbows.worker_base import ALREADY_HANDLED
from eventlet import pools
import socket
import eventlet
from eventlet.common import get_errno
class WebSocketWSGI(object):
def __init__(self, handler, origin):
self.handler = handler
self.origin = origin
def verify_client(self, ws):
pass
def __call__(self, environ, start_response):
if not (environ['HTTP_CONNECTION'] == 'Upgrade' and
environ['HTTP_UPGRADE'] == 'WebSocket'):
# need to check a few more things here for true compliance
start_response('400 Bad Request', [('Connection','close')])
return []
sock = environ['wsgi.input'].get_socket()
ws = WebSocket(sock,
environ.get('HTTP_ORIGIN'),
environ.get('HTTP_WEBSOCKET_PROTOCOL'),
environ.get('PATH_INFO'))
self.verify_client(ws)
handshake_reply = ("HTTP/1.1 101 Web Socket Protocol Handshake\r\n"
"Upgrade: WebSocket\r\n"
"Connection: Upgrade\r\n"
"WebSocket-Origin: %s\r\n"
"WebSocket-Location: ws://%s%s\r\n\r\n" % (
self.origin,
environ.get('HTTP_HOST'),
ws.path))
sock.sendall(handshake_reply)
try:
self.handler(ws)
except socket.error, e:
if get_errno(e) != errno.EPIPE:
raise
# use this undocumented feature of grainbows to ensure that it
# doesn't barf on the fact that we didn't call start_response
return ALREADY_HANDLED
def parse_messages(buf):
""" Parses for messages in the buffer *buf*. It is assumed that
the buffer contains the start character for a message, but that it
may contain only part of the rest of the message. NOTE: only understands
lengthless messages for now.
Returns an array of messages, and the buffer remainder that didn't contain
any full messages."""
msgs = []
end_idx = 0
while buf:
assert ord(buf[0]) == 0, "Don't understand how to parse this type of message: %r" % buf
end_idx = buf.find("\xFF")
if end_idx == -1:
break
msgs.append(buf[1:end_idx].decode('utf-8', 'replace'))
buf = buf[end_idx+1:]
return msgs, buf
def format_message(message):
# TODO support iterable messages
if isinstance(message, unicode):
message = message.encode('utf-8')
elif not isinstance(message, str):
message = str(message)
packed = "\x00%s\xFF" % message
return packed
class WebSocket(object):
def __init__(self, sock, origin, protocol, path):
self.sock = sock
self.origin = origin
self.protocol = protocol
self.path = path
self._buf = ""
self._msgs = collections.deque()
self._sendlock = pools.TokenPool(1)
def send(self, message):
packed = format_message(message)
# if two greenthreads are trying to send at the same time
# on the same socket, sendlock prevents interleaving and corruption
t = self._sendlock.get()
try:
self.sock.sendall(packed)
finally:
self._sendlock.put(t)
def wait(self):
while not self._msgs:
# no parsed messages, must mean buf needs more data
delta = self.sock.recv(1024)
if delta == '':
return None
self._buf += delta
msgs, self._buf = parse_messages(self._buf)
self._msgs.extend(msgs)
return self._msgs.popleft()
# demo app
import os
import random
def handle(ws):
""" This is the websocket handler function. Note that we
can dispatch based on path in here, too."""
if ws.path == '/echo':
while True:
m = ws.wait()
if m is None:
break
ws.send(m)
elif ws.path == '/data':
for i in xrange(10000):
ws.send("0 %s %s\n" % (i, random.random()))
eventlet.sleep(0.1)
wsapp = WebSocketWSGI(handle, 'http://localhost:8000')
def app(environ, start_response):
""" This resolves to the web page or the websocket depending on
the path."""
if environ['PATH_INFO'] == '/' or environ['PATH_INFO'] == "":
data = open(os.path.join(
os.path.dirname(__file__),
'websocket.html')).read()
start_response('200 OK', [('Content-Type', 'text/html'),
('Content-Length', len(data))])
return [data]
else:
return wsapp(environ, start_response)

View File

@ -154,8 +154,8 @@ class Arbiter(object):
self.stop()
self.log.info("Shutting down: %s" % self.master_name)
if self.pidfile:
del self.pidfile
#if self.pidfile:
# del self.pidfile
sys.exit(0)
def handle_chld(self, sig, frame):

View File

126
gunicorn/async/base.py Normal file
View File

@ -0,0 +1,126 @@
# -*- coding: utf-8 -
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
import errno
import os
import select
import socket
import traceback
from gunicorn import http
from gunicorn.http.tee import UnexpectedEOF
from gunicorn import util
from gunicorn.worker import Worker
ALREADY_HANDLED = object()
class KeepaliveResponse(http.Response):
def default_headers(self):
if self.req.parser.should_close:
connection_hdr = "close"
else:
connection_hdr = "keep-alive"
return [
"HTTP/1.1 %s\r\n" % self.status,
"Server: %s\r\n" % self.SERVER_VERSION,
"Date: %s\r\n" % util.http_date(),
"Connection: %s\r\n" % connection_hdr
]
class KeepaliveRequest(http.Request):
def read(self):
ret = select.select([self._sock], [], [], self.conf.keepalive)
if not ret[0]:
return
try:
return super(KeepaliveRequest, self).read()
except socket.error, e:
if e[0] == 54:
return
raise
class KeepaliveWorker(Worker):
def __init__(self, *args, **kwargs):
Worker.__init__(self, *args, **kwargs)
self.nb_connections = 0
self.worker_connections = self.conf.worker_connections
def handle(self, client, addr):
self.nb_connections += 1
try:
self.init_sock(client)
while True:
req = KeepaliveRequest(client, addr, self.address, self.conf)
try:
environ = req.read()
if not environ or not req.parser.headers:
return
response = self.app(environ, req.start_response)
if response == ALREADY_HANDLED:
break
except Exception, e:
#Only send back traceback in HTTP in debug mode.
if not self.debug:
raise
util.write_error(client, traceback.format_exc())
break
KeepaliveResponse(client, response, req).send()
if req.parser.should_close:
break
except socket.error, e:
if e[0] != errno.EPIPE:
self.log.exception("Error processing request.")
else:
self.log.warn("Ignoring EPIPE")
except UnexpectedEOF:
self.log.exception("remote closed the connection unexpectedly.")
except Exception, e:
self.log.exception("Error processing request.")
try:
# Last ditch attempt to notify the client of an error.
mesg = "HTTP/1.0 500 Internal Server Error\r\n\r\n"
util.write_nonblock(client, mesg)
except:
pass
return
finally:
self.nb_connections -= 1
util.close(client)
def run(self):
self.init_process()
self.socket.setblocking(0)
while self.alive:
self.notify()
# If our parent changed then we shut down.
if self.ppid != os.getppid():
self.log.info("Parent changed, shutting down: %s" % self)
return
if self.nb_connections > self.worker_connections:
continue
try:
ret = select.select([self.socket], [], [], 1)
if ret[0]:
self.accept()
except select.error, e:
if e[0] == errno.EINTR:
continue
if e[0] == errno.EBADF:
continue
raise
except KeyboardInterrupt :
return

View File

@ -0,0 +1,66 @@
# -*- coding: utf-8 -
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
import errno
import collections
import eventlet
from eventlet.green import os
from eventlet.green import socket
from eventlet import greenio
from eventlet.hubs import trampoline
from gunicorn import util
from gunicorn import arbiter
from gunicorn.async.base import KeepaliveWorker
__original_GreenPipe__ = greenio.GreenPipe
class _GreenPipe(__original_GreenPipe__):
def tell(self):
return self.fd.tell()
def seek(self, offset, whence=0):
fd = self.fd
self.read()
fd.seek(offset, whence)
_eventlet_patched = None
def patch_eventlet():
global _eventlet_patched
if _eventlet_patched:
return
greenio.GreenPipe = _GreenPipe
_eventlet_patched = True
class EventletWorker(KeepaliveWorker):
def init_process(self):
super(EventletWorker, self).init_process()
self.pool = eventlet.GreenPool(self.worker_connections)
def accept(self):
try:
client, addr = self.socket.accept()
self.pool.spawn_n(self.handle, client, addr)
except socket.error, e:
if e[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
return
raise
class EventletArbiter(arbiter.Arbiter):
@classmethod
def setup(cls):
import eventlet
if eventlet.version_info < (0,9,7):
raise RuntimeError("You need eventlet >= 0.9.7")
patch_eventlet()
eventlet.monkey_patch(all=True)
def init_worker(self, worker_age, pid, listener, app, timeout, conf):
return EventletWorker(worker_age, pid, listener, app, timeout, conf)

View File

@ -0,0 +1,44 @@
# -*- coding: utf-8 -
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
import errno
import os
import gevent
from gevent import socket
from gevent.event import Event
from gevent.greenlet import Greenlet
from gevent.pool import Pool
from gunicorn import arbiter
from gunicorn import util
from gunicorn.async.base import KeepaliveWorker
class GEventWorker(KeepaliveWorker):
def init_process(self):
super(GEventWorker, self).init_process()
self.pool = Pool(self.worker_connections)
#self.socket = socket.socket(_sock=self.socket)
def accept(self):
try:
client, addr = self.socket.accept()
self.pool.spawn(self.handle, client, addr)
except socket.error, e:
if e[0] not in (errno.EAGAIN, errno.EWOULDBLOCK):
raise
class GEventArbiter(arbiter.Arbiter):
@classmethod
def setup(cls):
from gevent import monkey
monkey.patch_all(thread=True, ssl=True)
def init_worker(self, worker_age, pid, listener, app, timeout, conf):
return GEventWorker(worker_age, pid, listener, app, timeout, conf)

View File

@ -15,7 +15,7 @@ class Config(object):
DEFAULT_CONFIG_FILE = 'gunicorn.conf.py'
DEFAULTS = dict(
arbiter="egg:gunicorn",
arbiter="egg:gunicorn",
backlog=2048,
bind='127.0.0.1:8000',
daemon=False,

View File

@ -8,5 +8,3 @@ from gunicorn.http.request import Request, RequestError
from gunicorn.http.response import Response
__all__ = [Parser, Request, RequestError, Response]
__version__ = '0.4'

View File

@ -3,7 +3,7 @@
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
from gunicorn.util import close, http_date, write, write_chunk
from gunicorn.util import http_date, write, write_chunk
class Response(object):
@ -16,14 +16,17 @@ class Response(object):
self.SERVER_VERSION = req.SERVER_VERSION
self.chunked = req.response_chunked
def send(self):
# send headers
resp_head = [
def default_headers(self):
return [
"HTTP/1.1 %s\r\n" % self.status,
"Server: %s\r\n" % self.SERVER_VERSION,
"Date: %s\r\n" % http_date(),
"Connection: close\r\n"
]
def send(self):
# send headers
resp_head = self.default_headers()
resp_head.extend(["%s: %s\r\n" % (n, v) for n, v in self.headers])
write(self._sock, "%s\r\n" % "".join(resp_head))
@ -32,10 +35,8 @@ class Response(object):
write(self._sock, chunk, self.chunked)
if self.chunked:
# send last chunk
write_chunk(self._sock, "")
close(self._sock)
# send last chunk
write_chunk(self._sock, "")
if hasattr(self.data, "close"):
self.data.close()

View File

@ -154,7 +154,11 @@ class Worker(object):
req = http.Request(client, addr, self.address, self.conf)
try:
response = self.app(req.read(), req.start_response)
environ = req.read()
if not environ or not req.parser.status:
return
response = self.app(environ, req.start_response)
except Exception, e:
# Only send back traceback in HTTP in debug mode.
if not self.debug:
@ -180,4 +184,3 @@ class Worker(object):
pass
finally:
util.close(client)

View File

@ -50,7 +50,8 @@ setup(
[gunicorn.arbiter]
main=gunicorn.arbiter:Arbiter
eventlet=gunicorn.async.eventlet_server:EventletArbiter
gevent=gunicorn.async.gevent_server:GEventArbiter
[paste.server_runner]
main=gunicorn.main:paste_server