diff --git a/examples/eventlet_longpoll.py b/examples/eventlet_longpoll.py new file mode 100644 index 00000000..97ba6b61 --- /dev/null +++ b/examples/eventlet_longpoll.py @@ -0,0 +1,29 @@ +# -*- coding: utf-8 - +# +# This file is part of grainbows released under the MIT license. +# See the NOTICE for more information. + +import eventlet +import time +eventlet.monkey_patch(all=False, os=True, select=True, socket=True) + +class TestIter(object): + + def __iter__(self): + lines = ['line 1\n', 'line 2\n'] + for line in lines: + yield line + time.sleep(10) + +def app(environ, start_response): + """Application which cooperatively pauses 10 seconds before responding""" + data = 'Hello, World!\n' + status = '200 OK' + response_headers = [ + ('Content-type','text/plain'), + ('Transfer-Encoding', "chunked"), + ] + print 'request received' + start_response(status, response_headers) + return TestIter() + \ No newline at end of file diff --git a/examples/eventlet_test.py b/examples/eventlet_test.py new file mode 100644 index 00000000..f2d3ce11 --- /dev/null +++ b/examples/eventlet_test.py @@ -0,0 +1,24 @@ +# -*- coding: utf-8 - +# +# This file is part of grainbows released under the MIT license. +# See the NOTICE for more information. + +from wsgiref.validate import validator + +import eventlet + +eventlet.monkey_patch(all=False, os=False, select=True, socket=True) + + +@validator +def app(environ, start_response): + """Application which cooperatively pauses 10 seconds before responding""" + data = 'Hello, World!\n' + status = '200 OK' + response_headers = [ + ('Content-type','text/plain'), + ('Content-Length', str(len(data))) ] + print 'request received, pausing 10 seconds' + eventlet.sleep(10) + start_response(status, response_headers) + return iter([data]) \ No newline at end of file diff --git a/examples/gevent_longpoll.py b/examples/gevent_longpoll.py new file mode 100644 index 00000000..d308aa5c --- /dev/null +++ b/examples/gevent_longpoll.py @@ -0,0 +1,26 @@ +# -*- coding: utf-8 - +# +# This file is part of grainbows released under the MIT license. +# See the NOTICE for more information. + + +import time +class TestIter(object): + + def __iter__(self): + lines = ['line 1', 'line 2'] + for line in lines: + yield line + time.sleep(10) + +def app(environ, start_response): + """Application which cooperatively pauses 10 seconds before responding""" + data = 'Hello, World!\n' + status = '200 OK' + response_headers = [ + ('Content-type','text/plain'), + ('Transfer-Encoding', "chunked"), + ] + print 'request received' + start_response(status, response_headers) + return TestIter() diff --git a/examples/gevent_websocket.py b/examples/gevent_websocket.py new file mode 100644 index 00000000..a5ecda0d --- /dev/null +++ b/examples/gevent_websocket.py @@ -0,0 +1,141 @@ +# -*- coding: utf-8 - +# +# This file is part of grainbow released under the MIT license. +# See the NOTICE for more information. +# +# Example code from Eventlet sources + +import collections +import errno +from grainbows.worker_base import ALREADY_HANDLED +from eventlet import pools +import socket +import gevent +from gevent.pool import Pool + +class WebSocketWSGI(object): + def __init__(self, handler, origin): + self.handler = handler + self.origin = origin + + def verify_client(self, ws): + pass + + def __call__(self, environ, start_response): + if not (environ['HTTP_CONNECTION'] == 'Upgrade' and + environ['HTTP_UPGRADE'] == 'WebSocket'): + # need to check a few more things here for true compliance + start_response('400 Bad Request', [('Connection','close')]) + return [] + + sock = environ['wsgi.input'].get_socket() + ws = WebSocket(sock, + environ.get('HTTP_ORIGIN'), + environ.get('HTTP_WEBSOCKET_PROTOCOL'), + environ.get('PATH_INFO')) + self.verify_client(ws) + handshake_reply = ("HTTP/1.1 101 Web Socket Protocol Handshake\r\n" + "Upgrade: WebSocket\r\n" + "Connection: Upgrade\r\n" + "WebSocket-Origin: %s\r\n" + "WebSocket-Location: ws://%s%s\r\n\r\n" % ( + self.origin, + environ.get('HTTP_HOST'), + ws.path)) + sock.sendall(handshake_reply) + try: + self.handler(ws) + except socket.error, e: + if e[0] != errno.EPIPE: + raise + # use this undocumented feature of grainbows to ensure that it + # doesn't barf on the fact that we didn't call start_response + return ALREADY_HANDLED + +def parse_messages(buf): + """ Parses for messages in the buffer *buf*. It is assumed that + the buffer contains the start character for a message, but that it + may contain only part of the rest of the message. NOTE: only understands + lengthless messages for now. + + Returns an array of messages, and the buffer remainder that didn't contain + any full messages.""" + msgs = [] + end_idx = 0 + while buf: + assert ord(buf[0]) == 0, "Don't understand how to parse this type of message: %r" % buf + end_idx = buf.find("\xFF") + if end_idx == -1: + break + msgs.append(buf[1:end_idx].decode('utf-8', 'replace')) + buf = buf[end_idx+1:] + return msgs, buf + +def format_message(message): + # TODO support iterable messages + if isinstance(message, unicode): + message = message.encode('utf-8') + elif not isinstance(message, str): + message = str(message) + packed = "\x00%s\xFF" % message + return packed + + +class WebSocket(object): + def __init__(self, sock, origin, protocol, path): + self.sock = sock + self.origin = origin + self.protocol = protocol + self.path = path + self._buf = "" + self._msgs = collections.deque() + + def send(self, message): + packed = format_message(message) + # if two greenthreads are trying to send at the same time + # on the same socket, sendlock prevents interleaving and corruption + self.sock.sendall(packed) + + def wait(self): + while not self._msgs: + # no parsed messages, must mean buf needs more data + delta = self.sock.recv(1024) + if delta == '': + return None + self._buf += delta + msgs, self._buf = parse_messages(self._buf) + self._msgs.extend(msgs) + return self._msgs.popleft() + + +# demo app +import os +import random +def handle(ws): + """ This is the websocket handler function. Note that we + can dispatch based on path in here, too.""" + if ws.path == '/echo': + while True: + m = ws.wait() + if m is None: + break + ws.send(m) + + elif ws.path == '/data': + for i in xrange(10000): + ws.send("0 %s %s\n" % (i, random.random())) + gevent.sleep(0.1) + +wsapp = WebSocketWSGI(handle, 'http://localhost:8000') +def app(environ, start_response): + """ This resolves to the web page or the websocket depending on + the path.""" + if environ['PATH_INFO'] == '/' or environ['PATH_INFO'] == "": + data = open(os.path.join( + os.path.dirname(__file__), + 'websocket.html')).read() + start_response('200 OK', [('Content-Type', 'text/html'), + ('Content-Length', len(data))]) + return [data] + else: + return wsapp(environ, start_response) diff --git a/examples/gunicorn.conf.py b/examples/gunicorn.conf.py new file mode 100644 index 00000000..664afb79 --- /dev/null +++ b/examples/gunicorn.conf.py @@ -0,0 +1,2 @@ +arbiter = "egg:gunicorn#eventlet" +worker_connections = 1000 diff --git a/examples/test_keepalive.py b/examples/test_keepalive.py new file mode 100644 index 00000000..8a4f99ad --- /dev/null +++ b/examples/test_keepalive.py @@ -0,0 +1,17 @@ +# -*- coding: utf-8 - +# +# This file is part of grainbows released under the MIT license. +# See the NOTICE for more information. + +from wsgiref.validate import validator + + +def app(environ, start_response): + """Application which cooperatively pauses 10 seconds before responding""" + data = 'Hello, World!\n' + status = '200 OK' + response_headers = [ + ('Content-type','text/plain'), + ('Content-Length', str(len(data))) ] + start_response(status, response_headers) + return iter([data]) \ No newline at end of file diff --git a/examples/websocket.html b/examples/websocket.html new file mode 100644 index 00000000..ba4cc494 --- /dev/null +++ b/examples/websocket.html @@ -0,0 +1,44 @@ + + + + + + + + + +

Plot

+
+ + \ No newline at end of file diff --git a/examples/websocket.py b/examples/websocket.py new file mode 100644 index 00000000..81d65453 --- /dev/null +++ b/examples/websocket.py @@ -0,0 +1,147 @@ +# -*- coding: utf-8 - +# +# This file is part of grainbow released under the MIT license. +# See the NOTICE for more information. +# +# Example code from Eventlet sources + + +import collections +import errno +from grainbows.worker_base import ALREADY_HANDLED +from eventlet import pools +import socket +import eventlet +from eventlet.common import get_errno + +class WebSocketWSGI(object): + def __init__(self, handler, origin): + self.handler = handler + self.origin = origin + + def verify_client(self, ws): + pass + + def __call__(self, environ, start_response): + if not (environ['HTTP_CONNECTION'] == 'Upgrade' and + environ['HTTP_UPGRADE'] == 'WebSocket'): + # need to check a few more things here for true compliance + start_response('400 Bad Request', [('Connection','close')]) + return [] + + sock = environ['wsgi.input'].get_socket() + ws = WebSocket(sock, + environ.get('HTTP_ORIGIN'), + environ.get('HTTP_WEBSOCKET_PROTOCOL'), + environ.get('PATH_INFO')) + self.verify_client(ws) + handshake_reply = ("HTTP/1.1 101 Web Socket Protocol Handshake\r\n" + "Upgrade: WebSocket\r\n" + "Connection: Upgrade\r\n" + "WebSocket-Origin: %s\r\n" + "WebSocket-Location: ws://%s%s\r\n\r\n" % ( + self.origin, + environ.get('HTTP_HOST'), + ws.path)) + sock.sendall(handshake_reply) + try: + self.handler(ws) + except socket.error, e: + if get_errno(e) != errno.EPIPE: + raise + # use this undocumented feature of grainbows to ensure that it + # doesn't barf on the fact that we didn't call start_response + return ALREADY_HANDLED + +def parse_messages(buf): + """ Parses for messages in the buffer *buf*. It is assumed that + the buffer contains the start character for a message, but that it + may contain only part of the rest of the message. NOTE: only understands + lengthless messages for now. + + Returns an array of messages, and the buffer remainder that didn't contain + any full messages.""" + msgs = [] + end_idx = 0 + while buf: + assert ord(buf[0]) == 0, "Don't understand how to parse this type of message: %r" % buf + end_idx = buf.find("\xFF") + if end_idx == -1: + break + msgs.append(buf[1:end_idx].decode('utf-8', 'replace')) + buf = buf[end_idx+1:] + return msgs, buf + +def format_message(message): + # TODO support iterable messages + if isinstance(message, unicode): + message = message.encode('utf-8') + elif not isinstance(message, str): + message = str(message) + packed = "\x00%s\xFF" % message + return packed + + +class WebSocket(object): + def __init__(self, sock, origin, protocol, path): + self.sock = sock + self.origin = origin + self.protocol = protocol + self.path = path + self._buf = "" + self._msgs = collections.deque() + self._sendlock = pools.TokenPool(1) + + def send(self, message): + packed = format_message(message) + # if two greenthreads are trying to send at the same time + # on the same socket, sendlock prevents interleaving and corruption + t = self._sendlock.get() + try: + self.sock.sendall(packed) + finally: + self._sendlock.put(t) + + def wait(self): + while not self._msgs: + # no parsed messages, must mean buf needs more data + delta = self.sock.recv(1024) + if delta == '': + return None + self._buf += delta + msgs, self._buf = parse_messages(self._buf) + self._msgs.extend(msgs) + return self._msgs.popleft() + + +# demo app +import os +import random +def handle(ws): + """ This is the websocket handler function. Note that we + can dispatch based on path in here, too.""" + if ws.path == '/echo': + while True: + m = ws.wait() + if m is None: + break + ws.send(m) + + elif ws.path == '/data': + for i in xrange(10000): + ws.send("0 %s %s\n" % (i, random.random())) + eventlet.sleep(0.1) + +wsapp = WebSocketWSGI(handle, 'http://localhost:8000') +def app(environ, start_response): + """ This resolves to the web page or the websocket depending on + the path.""" + if environ['PATH_INFO'] == '/' or environ['PATH_INFO'] == "": + data = open(os.path.join( + os.path.dirname(__file__), + 'websocket.html')).read() + start_response('200 OK', [('Content-Type', 'text/html'), + ('Content-Length', len(data))]) + return [data] + else: + return wsapp(environ, start_response) diff --git a/gunicorn/arbiter.py b/gunicorn/arbiter.py index 11da6fdb..0da150a1 100644 --- a/gunicorn/arbiter.py +++ b/gunicorn/arbiter.py @@ -154,8 +154,8 @@ class Arbiter(object): self.stop() self.log.info("Shutting down: %s" % self.master_name) - if self.pidfile: - del self.pidfile + #if self.pidfile: + # del self.pidfile sys.exit(0) def handle_chld(self, sig, frame): diff --git a/gunicorn/async/__init__.py b/gunicorn/async/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/gunicorn/async/base.py b/gunicorn/async/base.py new file mode 100644 index 00000000..08259bbf --- /dev/null +++ b/gunicorn/async/base.py @@ -0,0 +1,126 @@ +# -*- coding: utf-8 - +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +import errno +import os +import select +import socket +import traceback + +from gunicorn import http +from gunicorn.http.tee import UnexpectedEOF +from gunicorn import util +from gunicorn.worker import Worker + +ALREADY_HANDLED = object() + +class KeepaliveResponse(http.Response): + + def default_headers(self): + if self.req.parser.should_close: + connection_hdr = "close" + else: + connection_hdr = "keep-alive" + + return [ + "HTTP/1.1 %s\r\n" % self.status, + "Server: %s\r\n" % self.SERVER_VERSION, + "Date: %s\r\n" % util.http_date(), + "Connection: %s\r\n" % connection_hdr + ] + +class KeepaliveRequest(http.Request): + + def read(self): + ret = select.select([self._sock], [], [], self.conf.keepalive) + if not ret[0]: + return + try: + return super(KeepaliveRequest, self).read() + except socket.error, e: + if e[0] == 54: + return + raise + +class KeepaliveWorker(Worker): + + def __init__(self, *args, **kwargs): + Worker.__init__(self, *args, **kwargs) + self.nb_connections = 0 + self.worker_connections = self.conf.worker_connections + + def handle(self, client, addr): + + self.nb_connections += 1 + try: + self.init_sock(client) + while True: + req = KeepaliveRequest(client, addr, self.address, self.conf) + + try: + environ = req.read() + if not environ or not req.parser.headers: + return + response = self.app(environ, req.start_response) + if response == ALREADY_HANDLED: + break + except Exception, e: + #Only send back traceback in HTTP in debug mode. + if not self.debug: + raise + util.write_error(client, traceback.format_exc()) + break + + KeepaliveResponse(client, response, req).send() + if req.parser.should_close: + break + except socket.error, e: + if e[0] != errno.EPIPE: + self.log.exception("Error processing request.") + else: + self.log.warn("Ignoring EPIPE") + except UnexpectedEOF: + self.log.exception("remote closed the connection unexpectedly.") + except Exception, e: + self.log.exception("Error processing request.") + try: + # Last ditch attempt to notify the client of an error. + mesg = "HTTP/1.0 500 Internal Server Error\r\n\r\n" + util.write_nonblock(client, mesg) + except: + pass + return + finally: + self.nb_connections -= 1 + util.close(client) + + def run(self): + self.init_process() + self.socket.setblocking(0) + + while self.alive: + self.notify() + + # If our parent changed then we shut down. + if self.ppid != os.getppid(): + self.log.info("Parent changed, shutting down: %s" % self) + return + + if self.nb_connections > self.worker_connections: + continue + + try: + ret = select.select([self.socket], [], [], 1) + if ret[0]: + self.accept() + except select.error, e: + if e[0] == errno.EINTR: + continue + if e[0] == errno.EBADF: + continue + raise + except KeyboardInterrupt : + return + diff --git a/gunicorn/async/eventlet_server.py b/gunicorn/async/eventlet_server.py new file mode 100644 index 00000000..b619b253 --- /dev/null +++ b/gunicorn/async/eventlet_server.py @@ -0,0 +1,66 @@ +# -*- coding: utf-8 - +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +import errno + +import collections +import eventlet +from eventlet.green import os +from eventlet.green import socket +from eventlet import greenio +from eventlet.hubs import trampoline + +from gunicorn import util +from gunicorn import arbiter +from gunicorn.async.base import KeepaliveWorker + +__original_GreenPipe__ = greenio.GreenPipe + +class _GreenPipe(__original_GreenPipe__): + + def tell(self): + return self.fd.tell() + + def seek(self, offset, whence=0): + fd = self.fd + self.read() + fd.seek(offset, whence) + +_eventlet_patched = None +def patch_eventlet(): + global _eventlet_patched + if _eventlet_patched: + return + greenio.GreenPipe = _GreenPipe + _eventlet_patched = True + +class EventletWorker(KeepaliveWorker): + + def init_process(self): + super(EventletWorker, self).init_process() + self.pool = eventlet.GreenPool(self.worker_connections) + + def accept(self): + try: + client, addr = self.socket.accept() + self.pool.spawn_n(self.handle, client, addr) + except socket.error, e: + if e[0] in (errno.EWOULDBLOCK, errno.EAGAIN): + return + raise + + +class EventletArbiter(arbiter.Arbiter): + + @classmethod + def setup(cls): + import eventlet + if eventlet.version_info < (0,9,7): + raise RuntimeError("You need eventlet >= 0.9.7") + patch_eventlet() + eventlet.monkey_patch(all=True) + + def init_worker(self, worker_age, pid, listener, app, timeout, conf): + return EventletWorker(worker_age, pid, listener, app, timeout, conf) diff --git a/gunicorn/async/gevent_server.py b/gunicorn/async/gevent_server.py new file mode 100644 index 00000000..41bc7377 --- /dev/null +++ b/gunicorn/async/gevent_server.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 - +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + + +import errno +import os + +import gevent +from gevent import socket +from gevent.event import Event +from gevent.greenlet import Greenlet +from gevent.pool import Pool + + +from gunicorn import arbiter +from gunicorn import util +from gunicorn.async.base import KeepaliveWorker + +class GEventWorker(KeepaliveWorker): + + def init_process(self): + super(GEventWorker, self).init_process() + self.pool = Pool(self.worker_connections) + #self.socket = socket.socket(_sock=self.socket) + + def accept(self): + try: + client, addr = self.socket.accept() + self.pool.spawn(self.handle, client, addr) + except socket.error, e: + if e[0] not in (errno.EAGAIN, errno.EWOULDBLOCK): + raise + +class GEventArbiter(arbiter.Arbiter): + + @classmethod + def setup(cls): + from gevent import monkey + monkey.patch_all(thread=True, ssl=True) + + def init_worker(self, worker_age, pid, listener, app, timeout, conf): + return GEventWorker(worker_age, pid, listener, app, timeout, conf) diff --git a/gunicorn/config.py b/gunicorn/config.py index 1bec3827..bfe38121 100644 --- a/gunicorn/config.py +++ b/gunicorn/config.py @@ -15,7 +15,7 @@ class Config(object): DEFAULT_CONFIG_FILE = 'gunicorn.conf.py' DEFAULTS = dict( - arbiter="egg:gunicorn", + arbiter="egg:gunicorn", backlog=2048, bind='127.0.0.1:8000', daemon=False, diff --git a/gunicorn/http/__init__.py b/gunicorn/http/__init__.py index dcbceff4..9a6f53bb 100644 --- a/gunicorn/http/__init__.py +++ b/gunicorn/http/__init__.py @@ -8,5 +8,3 @@ from gunicorn.http.request import Request, RequestError from gunicorn.http.response import Response __all__ = [Parser, Request, RequestError, Response] - -__version__ = '0.4' \ No newline at end of file diff --git a/gunicorn/http/response.py b/gunicorn/http/response.py index 498f8e58..438b4593 100644 --- a/gunicorn/http/response.py +++ b/gunicorn/http/response.py @@ -3,7 +3,7 @@ # This file is part of gunicorn released under the MIT license. # See the NOTICE for more information. -from gunicorn.util import close, http_date, write, write_chunk +from gunicorn.util import http_date, write, write_chunk class Response(object): @@ -16,14 +16,17 @@ class Response(object): self.SERVER_VERSION = req.SERVER_VERSION self.chunked = req.response_chunked - def send(self): - # send headers - resp_head = [ + def default_headers(self): + return [ "HTTP/1.1 %s\r\n" % self.status, "Server: %s\r\n" % self.SERVER_VERSION, "Date: %s\r\n" % http_date(), "Connection: close\r\n" ] + + def send(self): + # send headers + resp_head = self.default_headers() resp_head.extend(["%s: %s\r\n" % (n, v) for n, v in self.headers]) write(self._sock, "%s\r\n" % "".join(resp_head)) @@ -32,10 +35,8 @@ class Response(object): write(self._sock, chunk, self.chunked) if self.chunked: - # send last chunk - write_chunk(self._sock, "") - - close(self._sock) + # send last chunk + write_chunk(self._sock, "") if hasattr(self.data, "close"): self.data.close() diff --git a/gunicorn/worker.py b/gunicorn/worker.py index 9cd393ba..414658bb 100644 --- a/gunicorn/worker.py +++ b/gunicorn/worker.py @@ -154,7 +154,11 @@ class Worker(object): req = http.Request(client, addr, self.address, self.conf) try: - response = self.app(req.read(), req.start_response) + environ = req.read() + if not environ or not req.parser.status: + return + + response = self.app(environ, req.start_response) except Exception, e: # Only send back traceback in HTTP in debug mode. if not self.debug: @@ -180,4 +184,3 @@ class Worker(object): pass finally: util.close(client) - diff --git a/setup.py b/setup.py index 2fe84c4a..72cfaa14 100644 --- a/setup.py +++ b/setup.py @@ -50,7 +50,8 @@ setup( [gunicorn.arbiter] main=gunicorn.arbiter:Arbiter - + eventlet=gunicorn.async.eventlet_server:EventletArbiter + gevent=gunicorn.async.gevent_server:GEventArbiter [paste.server_runner] main=gunicorn.main:paste_server