mirror of
https://github.com/frappe/gunicorn.git
synced 2026-01-14 11:09:11 +08:00
support websocket hybi13
This commit is contained in:
parent
da11bce9a7
commit
90e698aa3c
@ -1,28 +1,25 @@
|
|||||||
# -*- coding: utf-8 -
|
|
||||||
#
|
|
||||||
# This file is part of gunicorn released under the MIT license.
|
|
||||||
# See the NOTICE for more information.
|
|
||||||
#
|
|
||||||
# Example code from Eventlet sources
|
|
||||||
|
|
||||||
import collections
|
import collections
|
||||||
import errno
|
import errno
|
||||||
import re
|
import re
|
||||||
from hashlib import md5
|
from hashlib import md5, sha1
|
||||||
|
import base64
|
||||||
|
from base64 import b64encode, b64decode
|
||||||
import socket
|
import socket
|
||||||
import struct
|
import struct
|
||||||
|
import logging
|
||||||
|
from socket import error as SocketError
|
||||||
|
|
||||||
import gevent
|
|
||||||
from gevent.pool import Pool
|
|
||||||
from gunicorn.workers.async import ALREADY_HANDLED
|
from gunicorn.workers.async import ALREADY_HANDLED
|
||||||
|
|
||||||
# Parts adapted from http://code.google.com/p/pywebsocket/
|
logger = logging.getLogger(__name__)
|
||||||
# mod_pywebsocket/handshake/handshake.py
|
|
||||||
|
WS_KEY = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
|
||||||
|
|
||||||
class WebSocketWSGI(object):
|
class WebSocketWSGI(object):
|
||||||
def __init__(self, handler):
|
def __init__(self, handler):
|
||||||
self.handler = handler
|
self.handler = handler
|
||||||
|
|
||||||
def verify_client(self, ws):
|
def verify_client(self, ws):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@ -37,41 +34,63 @@ class WebSocketWSGI(object):
|
|||||||
return part
|
return part
|
||||||
|
|
||||||
def __call__(self, environ, start_response):
|
def __call__(self, environ, start_response):
|
||||||
if not (environ['HTTP_CONNECTION'] == 'Upgrade' and
|
if not (environ.get('HTTP_CONNECTION').find('Upgrade') != -1 and
|
||||||
environ['HTTP_UPGRADE'] == 'WebSocket'):
|
environ['HTTP_UPGRADE'].lower() == 'websocket'):
|
||||||
# need to check a few more things here for true compliance
|
# need to check a few more things here for true compliance
|
||||||
start_response('400 Bad Request', [('Connection','close')])
|
start_response('400 Bad Request', [('Connection','close')])
|
||||||
return []
|
return []
|
||||||
|
|
||||||
sock = environ['gunicorn.socket']
|
sock = environ['gunicorn.socket']
|
||||||
|
|
||||||
ws = WebSocket(sock,
|
version = environ.get('HTTP_SEC_WEBSOCKET_VERSION')
|
||||||
environ.get('HTTP_ORIGIN'),
|
|
||||||
environ.get('HTTP_WEBSOCKET_PROTOCOL'),
|
|
||||||
environ.get('PATH_INFO'))
|
|
||||||
|
|
||||||
key1 = self._get_key_value(environ.get('HTTP_SEC_WEBSOCKET_KEY1'))
|
ws = WebSocket(sock, environ, version)
|
||||||
key2 = self._get_key_value(environ.get('HTTP_SEC_WEBSOCKET_KEY2'))
|
|
||||||
|
|
||||||
handshake_reply = ("HTTP/1.1 101 Web Socket Protocol Handshake\r\n"
|
handshake_reply = ("HTTP/1.1 101 Switching Protocols\r\n"
|
||||||
"Upgrade: WebSocket\r\n"
|
"Upgrade: websocket\r\n"
|
||||||
"Connection: Upgrade\r\n")
|
"Connection: Upgrade\r\n")
|
||||||
|
|
||||||
if key1 and key2:
|
key = environ.get('HTTP_SEC_WEBSOCKET_KEY')
|
||||||
challenge = ""
|
if key:
|
||||||
challenge += struct.pack("!I", key1) # network byteorder int
|
ws_key = base64.b64decode(key)
|
||||||
challenge += struct.pack("!I", key2) # network byteorder int
|
if len(ws_key) != 16:
|
||||||
challenge += environ['wsgi.input'].read()
|
start_response('400 Bad Request', [('Connection','close')])
|
||||||
|
return []
|
||||||
|
|
||||||
|
protocols = []
|
||||||
|
subprotocols = environ.get('HTTP_SEC_WEBSOCKET_PROTOCOL')
|
||||||
|
ws_protocols = []
|
||||||
|
if subprotocols:
|
||||||
|
for s in subprotocols.split(','):
|
||||||
|
s = s.strip()
|
||||||
|
if s in protocols:
|
||||||
|
ws_protocols.append(s)
|
||||||
|
if ws_protocols:
|
||||||
|
handshake_reply += 'Sec-WebSocket-Protocol: %s\r\n' % ', '.join(ws_protocols)
|
||||||
|
|
||||||
|
exts = []
|
||||||
|
extensions = environ.get('HTTP_SEC_WEBSOCKET_EXTENSIONS')
|
||||||
|
ws_extensions = []
|
||||||
|
if extensions:
|
||||||
|
for ext in extensions.split(','):
|
||||||
|
ext = ext.strip()
|
||||||
|
if ext in exts:
|
||||||
|
ws_extensions.append(ext)
|
||||||
|
if ws_extensions:
|
||||||
|
handshake_reply += 'Sec-WebSocket-Extensions: %s\r\n' % ', '.join(ws_extensions)
|
||||||
|
|
||||||
handshake_reply += (
|
handshake_reply += (
|
||||||
"Sec-WebSocket-Origin: %s\r\n"
|
"Sec-WebSocket-Origin: %s\r\n"
|
||||||
"Sec-WebSocket-Location: ws://%s%s\r\n"
|
"Sec-WebSocket-Location: ws://%s%s\r\n"
|
||||||
"Sec-WebSocket-Protocol: %s\r\n"
|
"Sec-WebSocket-Version: %s\r\n"
|
||||||
"\r\n%s" % (
|
"Sec-WebSocket-Accept: %s\r\n\r\n"
|
||||||
environ.get('HTTP_ORIGIN'),
|
% (
|
||||||
environ.get('HTTP_HOST'),
|
environ.get('HTTP_ORIGIN'),
|
||||||
ws.path,
|
environ.get('HTTP_HOST'),
|
||||||
environ.get('HTTP_SEC_WEBSOCKET_PROTOCOL'),
|
ws.path,
|
||||||
md5(challenge).digest()))
|
version,
|
||||||
|
base64.b64encode(sha1(key + WS_KEY).digest())
|
||||||
|
))
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
|
||||||
@ -93,91 +112,299 @@ class WebSocketWSGI(object):
|
|||||||
# doesn't barf on the fact that we didn't call start_response
|
# doesn't barf on the fact that we didn't call start_response
|
||||||
return ALREADY_HANDLED
|
return ALREADY_HANDLED
|
||||||
|
|
||||||
def parse_messages(buf):
|
|
||||||
""" Parses for messages in the buffer *buf*. It is assumed that
|
|
||||||
the buffer contains the start character for a message, but that it
|
|
||||||
may contain only part of the rest of the message. NOTE: only understands
|
|
||||||
lengthless messages for now.
|
|
||||||
|
|
||||||
Returns an array of messages, and the buffer remainder that didn't contain
|
|
||||||
any full messages."""
|
|
||||||
msgs = []
|
|
||||||
end_idx = 0
|
|
||||||
while buf:
|
|
||||||
assert ord(buf[0]) == 0, "Don't understand how to parse this type of message: %r" % buf
|
|
||||||
end_idx = buf.find("\xFF")
|
|
||||||
if end_idx == -1:
|
|
||||||
break
|
|
||||||
msgs.append(buf[1:end_idx].decode('utf-8', 'replace'))
|
|
||||||
buf = buf[end_idx+1:]
|
|
||||||
return msgs, buf
|
|
||||||
|
|
||||||
def format_message(message):
|
|
||||||
# TODO support iterable messages
|
|
||||||
if isinstance(message, unicode):
|
|
||||||
message = message.encode('utf-8')
|
|
||||||
elif not isinstance(message, str):
|
|
||||||
message = str(message)
|
|
||||||
packed = "\x00%s\xFF" % message
|
|
||||||
return packed
|
|
||||||
|
|
||||||
|
|
||||||
class WebSocket(object):
|
class WebSocket(object):
|
||||||
def __init__(self, sock, origin, protocol, path):
|
"""A websocket object that handles the details of
|
||||||
self.sock = sock
|
serialization/deserialization to the socket.
|
||||||
self.origin = origin
|
|
||||||
self.protocol = protocol
|
The primary way to interact with a :class:`WebSocket` object is to
|
||||||
self.path = path
|
call :meth:`send` and :meth:`wait` in order to pass messages back
|
||||||
|
and forth with the browser. Also available are the following
|
||||||
|
properties:
|
||||||
|
|
||||||
|
path
|
||||||
|
The path value of the request. This is the same as the WSGI PATH_INFO variable, but more convenient.
|
||||||
|
protocol
|
||||||
|
The value of the Websocket-Protocol header.
|
||||||
|
origin
|
||||||
|
The value of the 'Origin' header.
|
||||||
|
environ
|
||||||
|
The full WSGI environment for this request.
|
||||||
|
|
||||||
|
"""
|
||||||
|
def __init__(self, sock, environ, version=76):
|
||||||
|
"""
|
||||||
|
:param socket: The eventlet socket
|
||||||
|
:type socket: :class:`eventlet.greenio.GreenSocket`
|
||||||
|
:param environ: The wsgi environment
|
||||||
|
:param version: The WebSocket spec version to follow (default is 76)
|
||||||
|
"""
|
||||||
|
self.socket = sock
|
||||||
|
self.origin = environ.get('HTTP_ORIGIN')
|
||||||
|
self.protocol = environ.get('HTTP_WEBSOCKET_PROTOCOL')
|
||||||
|
self.path = environ.get('PATH_INFO')
|
||||||
|
self.environ = environ
|
||||||
|
self.version = version
|
||||||
|
self.websocket_closed = False
|
||||||
self._buf = ""
|
self._buf = ""
|
||||||
self._msgs = collections.deque()
|
self._msgs = collections.deque()
|
||||||
|
#self._sendlock = semaphore.Semaphore()
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def encode_hybi(buf, opcode, base64=False):
|
||||||
|
""" Encode a HyBi style WebSocket frame.
|
||||||
|
Optional opcode:
|
||||||
|
0x0 - continuation
|
||||||
|
0x1 - text frame (base64 encode buf)
|
||||||
|
0x2 - binary frame (use raw buf)
|
||||||
|
0x8 - connection close
|
||||||
|
0x9 - ping
|
||||||
|
0xA - pong
|
||||||
|
"""
|
||||||
|
if base64:
|
||||||
|
buf = b64encode(buf)
|
||||||
|
|
||||||
|
b1 = 0x80 | (opcode & 0x0f) # FIN + opcode
|
||||||
|
payload_len = len(buf)
|
||||||
|
if payload_len <= 125:
|
||||||
|
header = struct.pack('>BB', b1, payload_len)
|
||||||
|
elif payload_len > 125 and payload_len < 65536:
|
||||||
|
header = struct.pack('>BBH', b1, 126, payload_len)
|
||||||
|
elif payload_len >= 65536:
|
||||||
|
header = struct.pack('>BBQ', b1, 127, payload_len)
|
||||||
|
|
||||||
|
#print("Encoded: %s" % repr(header + buf))
|
||||||
|
|
||||||
|
return header + buf, len(header), 0
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def decode_hybi(buf, base64=False):
|
||||||
|
""" Decode HyBi style WebSocket packets.
|
||||||
|
Returns:
|
||||||
|
{'fin' : 0_or_1,
|
||||||
|
'opcode' : number,
|
||||||
|
'mask' : 32_bit_number,
|
||||||
|
'hlen' : header_bytes_number,
|
||||||
|
'length' : payload_bytes_number,
|
||||||
|
'payload' : decoded_buffer,
|
||||||
|
'left' : bytes_left_number,
|
||||||
|
'close_code' : number,
|
||||||
|
'close_reason' : string}
|
||||||
|
"""
|
||||||
|
|
||||||
|
f = {'fin' : 0,
|
||||||
|
'opcode' : 0,
|
||||||
|
'mask' : 0,
|
||||||
|
'hlen' : 2,
|
||||||
|
'length' : 0,
|
||||||
|
'payload' : None,
|
||||||
|
'left' : 0,
|
||||||
|
'close_code' : None,
|
||||||
|
'close_reason' : None}
|
||||||
|
|
||||||
|
blen = len(buf)
|
||||||
|
f['left'] = blen
|
||||||
|
|
||||||
|
if blen < f['hlen']:
|
||||||
|
return f # Incomplete frame header
|
||||||
|
|
||||||
|
b1, b2 = struct.unpack_from(">BB", buf)
|
||||||
|
f['opcode'] = b1 & 0x0f
|
||||||
|
f['fin'] = (b1 & 0x80) >> 7
|
||||||
|
has_mask = (b2 & 0x80) >> 7
|
||||||
|
|
||||||
|
f['length'] = b2 & 0x7f
|
||||||
|
|
||||||
|
if f['length'] == 126:
|
||||||
|
f['hlen'] = 4
|
||||||
|
if blen < f['hlen']:
|
||||||
|
return f # Incomplete frame header
|
||||||
|
(f['length'],) = struct.unpack_from('>xxH', buf)
|
||||||
|
elif f['length'] == 127:
|
||||||
|
f['hlen'] = 10
|
||||||
|
if blen < f['hlen']:
|
||||||
|
return f # Incomplete frame header
|
||||||
|
(f['length'],) = struct.unpack_from('>xxQ', buf)
|
||||||
|
|
||||||
|
full_len = f['hlen'] + has_mask * 4 + f['length']
|
||||||
|
|
||||||
|
if blen < full_len: # Incomplete frame
|
||||||
|
return f # Incomplete frame header
|
||||||
|
|
||||||
|
# Number of bytes that are part of the next frame(s)
|
||||||
|
f['left'] = blen - full_len
|
||||||
|
|
||||||
|
# Process 1 frame
|
||||||
|
if has_mask:
|
||||||
|
# unmask payload
|
||||||
|
f['mask'] = buf[f['hlen']:f['hlen']+4]
|
||||||
|
b = c = ''
|
||||||
|
if f['length'] >= 4:
|
||||||
|
data = struct.unpack('<I', buf[f['hlen']:f['hlen']+4])[0]
|
||||||
|
of1 = f['hlen']+4
|
||||||
|
b = ''
|
||||||
|
for i in xrange(0, int(f['length']/4)):
|
||||||
|
mask = struct.unpack('<I', buf[of1+4*i:of1+4*(i+1)])[0]
|
||||||
|
b += struct.pack('I', data ^ mask)
|
||||||
|
|
||||||
|
if f['length'] % 4:
|
||||||
|
l = f['length'] % 4
|
||||||
|
of1 = f['hlen']
|
||||||
|
of2 = full_len - l
|
||||||
|
c = ''
|
||||||
|
for i in range(0, l):
|
||||||
|
mask = struct.unpack('B', buf[of1 + i])[0]
|
||||||
|
data = struct.unpack('B', buf[of2 + i])[0]
|
||||||
|
c += chr(data ^ mask)
|
||||||
|
|
||||||
|
f['payload'] = b + c
|
||||||
|
else:
|
||||||
|
print("Unmasked frame: %s" % repr(buf))
|
||||||
|
f['payload'] = buf[(f['hlen'] + has_mask * 4):full_len]
|
||||||
|
|
||||||
|
if base64 and f['opcode'] in [1, 2]:
|
||||||
|
try:
|
||||||
|
f['payload'] = b64decode(f['payload'])
|
||||||
|
except:
|
||||||
|
print("Exception while b64decoding buffer: %s" %
|
||||||
|
repr(buf))
|
||||||
|
raise
|
||||||
|
|
||||||
|
if f['opcode'] == 0x08:
|
||||||
|
if f['length'] >= 2:
|
||||||
|
f['close_code'] = struct.unpack_from(">H", f['payload'])
|
||||||
|
if f['length'] > 3:
|
||||||
|
f['close_reason'] = f['payload'][2:]
|
||||||
|
|
||||||
|
return f
|
||||||
|
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _pack_message(message):
|
||||||
|
"""Pack the message inside ``00`` and ``FF``
|
||||||
|
|
||||||
|
As per the dataframing section (5.3) for the websocket spec
|
||||||
|
"""
|
||||||
|
if isinstance(message, unicode):
|
||||||
|
message = message.encode('utf-8')
|
||||||
|
elif not isinstance(message, str):
|
||||||
|
message = str(message)
|
||||||
|
packed = "\x00%s\xFF" % message
|
||||||
|
return packed
|
||||||
|
|
||||||
|
def _parse_messages(self):
|
||||||
|
""" Parses for messages in the buffer *buf*. It is assumed that
|
||||||
|
the buffer contains the start character for a message, but that it
|
||||||
|
may contain only part of the rest of the message.
|
||||||
|
|
||||||
|
Returns an array of messages, and the buffer remainder that
|
||||||
|
didn't contain any full messages."""
|
||||||
|
msgs = []
|
||||||
|
end_idx = 0
|
||||||
|
buf = self._buf
|
||||||
|
while buf:
|
||||||
|
if self.version in ['7', '8', '13']:
|
||||||
|
frame = self.decode_hybi(buf, base64=False)
|
||||||
|
#print("Received buf: %s, frame: %s" % (repr(buf), frame))
|
||||||
|
|
||||||
|
if frame['payload'] == None:
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
if frame['opcode'] == 0x8: # connection close
|
||||||
|
self.websocket_closed = True
|
||||||
|
break
|
||||||
|
#elif frame['opcode'] == 0x1:
|
||||||
|
else:
|
||||||
|
msgs.append(frame['payload']);
|
||||||
|
#msgs.append(frame['payload'].decode('utf-8', 'replace'));
|
||||||
|
#buf = buf[-frame['left']:]
|
||||||
|
if frame['left']:
|
||||||
|
buf = buf[-frame['left']:]
|
||||||
|
else:
|
||||||
|
buf = ''
|
||||||
|
|
||||||
|
|
||||||
|
else:
|
||||||
|
frame_type = ord(buf[0])
|
||||||
|
if frame_type == 0:
|
||||||
|
# Normal message.
|
||||||
|
end_idx = buf.find("\xFF")
|
||||||
|
if end_idx == -1: #pragma NO COVER
|
||||||
|
break
|
||||||
|
msgs.append(buf[1:end_idx].decode('utf-8', 'replace'))
|
||||||
|
buf = buf[end_idx+1:]
|
||||||
|
elif frame_type == 255:
|
||||||
|
# Closing handshake.
|
||||||
|
assert ord(buf[1]) == 0, "Unexpected closing handshake: %r" % buf
|
||||||
|
self.websocket_closed = True
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
raise ValueError("Don't understand how to parse this type of message: %r" % buf)
|
||||||
|
self._buf = buf
|
||||||
|
return msgs
|
||||||
|
|
||||||
def send(self, message):
|
def send(self, message):
|
||||||
packed = format_message(message)
|
"""Send a message to the browser.
|
||||||
|
|
||||||
|
*message* should be convertable to a string; unicode objects should be
|
||||||
|
encodable as utf-8. Raises socket.error with errno of 32
|
||||||
|
(broken pipe) if the socket has already been closed by the client."""
|
||||||
|
if self.version in ['7', '8', '13']:
|
||||||
|
packed, lenhead, lentail = self.encode_hybi(message, opcode=0x01, base64=False)
|
||||||
|
else:
|
||||||
|
packed = self._pack_message(message)
|
||||||
# if two greenthreads are trying to send at the same time
|
# if two greenthreads are trying to send at the same time
|
||||||
# on the same socket, sendlock prevents interleaving and corruption
|
# on the same socket, sendlock prevents interleaving and corruption
|
||||||
self.sock.sendall(packed)
|
#self._sendlock.acquire()
|
||||||
|
try:
|
||||||
|
self.socket.sendall(packed)
|
||||||
|
finally:
|
||||||
|
pass
|
||||||
|
#self._sendlock.release()
|
||||||
|
|
||||||
def wait(self):
|
def wait(self):
|
||||||
|
"""Waits for and deserializes messages.
|
||||||
|
|
||||||
|
Returns a single message; the oldest not yet processed. If the client
|
||||||
|
has already closed the connection, returns None. This is different
|
||||||
|
from normal socket behavior because the empty string is a valid
|
||||||
|
websocket message."""
|
||||||
while not self._msgs:
|
while not self._msgs:
|
||||||
|
# Websocket might be closed already.
|
||||||
|
if self.websocket_closed:
|
||||||
|
return None
|
||||||
# no parsed messages, must mean buf needs more data
|
# no parsed messages, must mean buf needs more data
|
||||||
delta = self.sock.recv(1024)
|
delta = self.socket.recv(8096)
|
||||||
if delta == '':
|
if delta == '':
|
||||||
return None
|
return None
|
||||||
self._buf += delta
|
self._buf += delta
|
||||||
msgs, self._buf = parse_messages(self._buf)
|
msgs = self._parse_messages()
|
||||||
self._msgs.extend(msgs)
|
self._msgs.extend(msgs)
|
||||||
return self._msgs.popleft()
|
return self._msgs.popleft()
|
||||||
|
|
||||||
|
def _send_closing_frame(self, ignore_send_errors=False):
|
||||||
|
"""Sends the closing frame to the client, if required."""
|
||||||
|
if self.version in ['7', '8', '13'] and not self.websocket_closed:
|
||||||
|
msg = ''
|
||||||
|
#if code != None:
|
||||||
|
# msg = struct.pack(">H%ds" % (len(reason)), code)
|
||||||
|
|
||||||
# demo app
|
buf, h, t = self.encode_hybi(msg, opcode=0x08, base64=False)
|
||||||
import os
|
self.socket.sendall(buf)
|
||||||
import random
|
self.websocket_closed = True
|
||||||
def handle(ws):
|
|
||||||
""" This is the websocket handler function. Note that we
|
elif self.version == 76 and not self.websocket_closed:
|
||||||
can dispatch based on path in here, too."""
|
try:
|
||||||
if ws.path == '/echo':
|
self.socket.sendall("\xff\x00")
|
||||||
while True:
|
except SocketError:
|
||||||
m = ws.wait()
|
# Sometimes, like when the remote side cuts off the connection,
|
||||||
if m is None:
|
# we don't care about this.
|
||||||
break
|
if not ignore_send_errors: #pragma NO COVER
|
||||||
ws.send(m)
|
raise
|
||||||
|
self.websocket_closed = True
|
||||||
elif ws.path == '/data':
|
|
||||||
for i in xrange(10000):
|
def close(self):
|
||||||
ws.send("0 %s %s\n" % (i, random.random()))
|
"""Forcibly close the websocket; generally it is preferable to
|
||||||
gevent.sleep(0.1)
|
return from the handler method."""
|
||||||
|
self._send_closing_frame()
|
||||||
wsapp = WebSocketWSGI(handle)
|
self.socket.shutdown(True)
|
||||||
def app(environ, start_response):
|
self.socket.close()
|
||||||
""" This resolves to the web page or the websocket depending on
|
|
||||||
the path."""
|
|
||||||
if environ['PATH_INFO'] == '/' or environ['PATH_INFO'] == "":
|
|
||||||
data = open(os.path.join(
|
|
||||||
os.path.dirname(__file__),
|
|
||||||
'websocket.html')).read()
|
|
||||||
data = data % environ
|
|
||||||
start_response('200 OK', [('Content-Type', 'text/html'),
|
|
||||||
('Content-Length', len(data))])
|
|
||||||
return [data]
|
|
||||||
else:
|
|
||||||
return wsapp(environ, start_response)
|
|
||||||
|
|||||||
@ -1,30 +1,30 @@
|
|||||||
# -*- coding: utf-8 -
|
|
||||||
#
|
|
||||||
# This file is part of gunicorn released under the MIT license.
|
|
||||||
# See the NOTICE for more information.
|
|
||||||
#
|
|
||||||
# Example code from Eventlet sources
|
|
||||||
|
|
||||||
|
|
||||||
import collections
|
import collections
|
||||||
import errno
|
import errno
|
||||||
from hashlib import md5
|
|
||||||
import re
|
import re
|
||||||
|
from hashlib import md5, sha1
|
||||||
|
import base64
|
||||||
|
from base64 import b64encode, b64decode
|
||||||
import socket
|
import socket
|
||||||
import struct
|
import struct
|
||||||
|
import logging
|
||||||
|
from socket import error as SocketError
|
||||||
|
|
||||||
|
import eventlet
|
||||||
from gunicorn.workers.async import ALREADY_HANDLED
|
from gunicorn.workers.async import ALREADY_HANDLED
|
||||||
from eventlet import pools
|
from eventlet import pools
|
||||||
|
|
||||||
import eventlet
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
WS_KEY = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
|
||||||
|
|
||||||
class WebSocketWSGI(object):
|
class WebSocketWSGI(object):
|
||||||
def __init__(self, handler):
|
def __init__(self, handler):
|
||||||
self.handler = handler
|
self.handler = handler
|
||||||
|
|
||||||
def verify_client(self, ws):
|
def verify_client(self, ws):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def _get_key_value(self, key_value):
|
def _get_key_value(self, key_value):
|
||||||
if not key_value:
|
if not key_value:
|
||||||
return
|
return
|
||||||
@ -36,52 +36,75 @@ class WebSocketWSGI(object):
|
|||||||
return part
|
return part
|
||||||
|
|
||||||
def __call__(self, environ, start_response):
|
def __call__(self, environ, start_response):
|
||||||
if not (environ['HTTP_CONNECTION'] == 'Upgrade' and
|
if not (environ.get('HTTP_CONNECTION').find('Upgrade') != -1 and
|
||||||
environ['HTTP_UPGRADE'] == 'WebSocket'):
|
environ['HTTP_UPGRADE'].lower() == 'websocket'):
|
||||||
# need to check a few more things here for true compliance
|
# need to check a few more things here for true compliance
|
||||||
start_response('400 Bad Request', [('Connection','close')])
|
start_response('400 Bad Request', [('Connection','close')])
|
||||||
return []
|
return []
|
||||||
|
|
||||||
sock = environ['gunicorn.socket']
|
|
||||||
ws = WebSocket(sock,
|
|
||||||
environ.get('HTTP_ORIGIN'),
|
|
||||||
environ.get('HTTP_WEBSOCKET_PROTOCOL'),
|
|
||||||
environ.get('PATH_INFO'))
|
|
||||||
|
|
||||||
handshake_reply = ("HTTP/1.1 101 Web Socket Protocol Handshake\r\n"
|
sock = environ['gunicorn.socket']
|
||||||
"Upgrade: WebSocket\r\n"
|
|
||||||
|
version = environ.get('HTTP_SEC_WEBSOCKET_VERSION')
|
||||||
|
|
||||||
|
ws = WebSocket(sock, environ, version)
|
||||||
|
|
||||||
|
handshake_reply = ("HTTP/1.1 101 Switching Protocols\r\n"
|
||||||
|
"Upgrade: websocket\r\n"
|
||||||
"Connection: Upgrade\r\n")
|
"Connection: Upgrade\r\n")
|
||||||
|
|
||||||
self.verify_client(ws)
|
key = environ.get('HTTP_SEC_WEBSOCKET_KEY')
|
||||||
|
if key:
|
||||||
key1 = self._get_key_value(environ.get('HTTP_SEC_WEBSOCKET_KEY1'))
|
ws_key = base64.b64decode(key)
|
||||||
key2 = self._get_key_value(environ.get('HTTP_SEC_WEBSOCKET_KEY2'))
|
if len(ws_key) != 16:
|
||||||
|
start_response('400 Bad Request', [('Connection','close')])
|
||||||
|
return []
|
||||||
if key1 and key2:
|
|
||||||
challenge = ""
|
protocols = []
|
||||||
challenge += struct.pack("!I", key1) # network byteorder int
|
subprotocols = environ.get('HTTP_SEC_WEBSOCKET_PROTOCOL')
|
||||||
challenge += struct.pack("!I", key2) # network byteorder int
|
ws_protocols = []
|
||||||
challenge += environ['wsgi.input'].read()
|
if subprotocols:
|
||||||
|
for s in subprotocols.split(','):
|
||||||
|
s = s.strip()
|
||||||
|
if s in protocols:
|
||||||
|
ws_protocols.append(s)
|
||||||
|
if ws_protocols:
|
||||||
|
handshake_reply += 'Sec-WebSocket-Protocol: %s\r\n' % ', '.join(ws_protocols)
|
||||||
|
|
||||||
|
exts = []
|
||||||
|
extensions = environ.get('HTTP_SEC_WEBSOCKET_EXTENSIONS')
|
||||||
|
ws_extensions = []
|
||||||
|
if extensions:
|
||||||
|
for ext in extensions.split(','):
|
||||||
|
ext = ext.strip()
|
||||||
|
if ext in exts:
|
||||||
|
ws_extensions.append(ext)
|
||||||
|
if ws_extensions:
|
||||||
|
handshake_reply += 'Sec-WebSocket-Extensions: %s\r\n' % ', '.join(ws_extensions)
|
||||||
|
|
||||||
handshake_reply += (
|
handshake_reply += (
|
||||||
"Sec-WebSocket-Origin: %s\r\n"
|
"Sec-WebSocket-Origin: %s\r\n"
|
||||||
"Sec-WebSocket-Location: ws://%s%s\r\n"
|
"Sec-WebSocket-Location: ws://%s%s\r\n"
|
||||||
"Sec-WebSocket-Protocol: %s\r\n"
|
"Sec-WebSocket-Version: %s\r\n"
|
||||||
"\r\n%s" % (
|
"Sec-WebSocket-Accept: %s\r\n\r\n"
|
||||||
environ.get('HTTP_ORIGIN'),
|
% (
|
||||||
environ.get('HTTP_HOST'),
|
environ.get('HTTP_ORIGIN'),
|
||||||
ws.path,
|
environ.get('HTTP_HOST'),
|
||||||
environ.get('HTTP_SEC_WEBSOCKET_PROTOCOL'),
|
ws.path,
|
||||||
md5(challenge).digest()))
|
version,
|
||||||
|
base64.b64encode(sha1(key + WS_KEY).digest())
|
||||||
|
))
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
|
||||||
handshake_reply += (
|
handshake_reply += (
|
||||||
"WebSocket-Origin: %s\r\n"
|
"WebSocket-Origin: %s\r\n"
|
||||||
"WebSocket-Location: ws://%s%s\r\n\r\n" % (
|
"WebSocket-Location: ws://%s%s\r\n\r\n" % (
|
||||||
environ.get('HTTP_ORIGIN'),
|
environ.get('HTTP_ORIGIN'),
|
||||||
environ.get('HTTP_HOST'),
|
environ.get('HTTP_HOST'),
|
||||||
ws.path))
|
ws.path))
|
||||||
|
|
||||||
sock.sendall(handshake_reply)
|
sock.sendall(handshake_reply)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.handler(ws)
|
self.handler(ws)
|
||||||
except socket.error, e:
|
except socket.error, e:
|
||||||
@ -91,66 +114,302 @@ class WebSocketWSGI(object):
|
|||||||
# doesn't barf on the fact that we didn't call start_response
|
# doesn't barf on the fact that we didn't call start_response
|
||||||
return ALREADY_HANDLED
|
return ALREADY_HANDLED
|
||||||
|
|
||||||
def parse_messages(buf):
|
|
||||||
""" Parses for messages in the buffer *buf*. It is assumed that
|
|
||||||
the buffer contains the start character for a message, but that it
|
|
||||||
may contain only part of the rest of the message. NOTE: only understands
|
|
||||||
lengthless messages for now.
|
|
||||||
|
|
||||||
Returns an array of messages, and the buffer remainder that didn't contain
|
|
||||||
any full messages."""
|
|
||||||
msgs = []
|
|
||||||
end_idx = 0
|
|
||||||
while buf:
|
|
||||||
assert ord(buf[0]) == 0, "Don't understand how to parse this type of message: %r" % buf
|
|
||||||
end_idx = buf.find("\xFF")
|
|
||||||
if end_idx == -1:
|
|
||||||
break
|
|
||||||
msgs.append(buf[1:end_idx].decode('utf-8', 'replace'))
|
|
||||||
buf = buf[end_idx+1:]
|
|
||||||
return msgs, buf
|
|
||||||
|
|
||||||
def format_message(message):
|
|
||||||
# TODO support iterable messages
|
|
||||||
if isinstance(message, unicode):
|
|
||||||
message = message.encode('utf-8')
|
|
||||||
elif not isinstance(message, str):
|
|
||||||
message = str(message)
|
|
||||||
packed = "\x00%s\xFF" % message
|
|
||||||
return packed
|
|
||||||
|
|
||||||
|
|
||||||
class WebSocket(object):
|
class WebSocket(object):
|
||||||
def __init__(self, sock, origin, protocol, path):
|
"""A websocket object that handles the details of
|
||||||
self.sock = sock
|
serialization/deserialization to the socket.
|
||||||
self.origin = origin
|
|
||||||
self.protocol = protocol
|
The primary way to interact with a :class:`WebSocket` object is to
|
||||||
self.path = path
|
call :meth:`send` and :meth:`wait` in order to pass messages back
|
||||||
|
and forth with the browser. Also available are the following
|
||||||
|
properties:
|
||||||
|
|
||||||
|
path
|
||||||
|
The path value of the request. This is the same as the WSGI PATH_INFO variable, but more convenient.
|
||||||
|
protocol
|
||||||
|
The value of the Websocket-Protocol header.
|
||||||
|
origin
|
||||||
|
The value of the 'Origin' header.
|
||||||
|
environ
|
||||||
|
The full WSGI environment for this request.
|
||||||
|
|
||||||
|
"""
|
||||||
|
def __init__(self, sock, environ, version=76):
|
||||||
|
"""
|
||||||
|
:param socket: The eventlet socket
|
||||||
|
:type socket: :class:`eventlet.greenio.GreenSocket`
|
||||||
|
:param environ: The wsgi environment
|
||||||
|
:param version: The WebSocket spec version to follow (default is 76)
|
||||||
|
"""
|
||||||
|
self.socket = sock
|
||||||
|
self.origin = environ.get('HTTP_ORIGIN')
|
||||||
|
self.protocol = environ.get('HTTP_WEBSOCKET_PROTOCOL')
|
||||||
|
self.path = environ.get('PATH_INFO')
|
||||||
|
self.environ = environ
|
||||||
|
self.version = version
|
||||||
|
self.websocket_closed = False
|
||||||
self._buf = ""
|
self._buf = ""
|
||||||
self._msgs = collections.deque()
|
self._msgs = collections.deque()
|
||||||
self._sendlock = pools.TokenPool(1)
|
self._sendlock = pools.TokenPool(1)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def encode_hybi(buf, opcode, base64=False):
|
||||||
|
""" Encode a HyBi style WebSocket frame.
|
||||||
|
Optional opcode:
|
||||||
|
0x0 - continuation
|
||||||
|
0x1 - text frame (base64 encode buf)
|
||||||
|
0x2 - binary frame (use raw buf)
|
||||||
|
0x8 - connection close
|
||||||
|
0x9 - ping
|
||||||
|
0xA - pong
|
||||||
|
"""
|
||||||
|
if base64:
|
||||||
|
buf = b64encode(buf)
|
||||||
|
|
||||||
|
b1 = 0x80 | (opcode & 0x0f) # FIN + opcode
|
||||||
|
payload_len = len(buf)
|
||||||
|
if payload_len <= 125:
|
||||||
|
header = struct.pack('>BB', b1, payload_len)
|
||||||
|
elif payload_len > 125 and payload_len < 65536:
|
||||||
|
header = struct.pack('>BBH', b1, 126, payload_len)
|
||||||
|
elif payload_len >= 65536:
|
||||||
|
header = struct.pack('>BBQ', b1, 127, payload_len)
|
||||||
|
|
||||||
|
#print("Encoded: %s" % repr(header + buf))
|
||||||
|
|
||||||
|
return header + buf, len(header), 0
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def decode_hybi(buf, base64=False):
|
||||||
|
""" Decode HyBi style WebSocket packets.
|
||||||
|
Returns:
|
||||||
|
{'fin' : 0_or_1,
|
||||||
|
'opcode' : number,
|
||||||
|
'mask' : 32_bit_number,
|
||||||
|
'hlen' : header_bytes_number,
|
||||||
|
'length' : payload_bytes_number,
|
||||||
|
'payload' : decoded_buffer,
|
||||||
|
'left' : bytes_left_number,
|
||||||
|
'close_code' : number,
|
||||||
|
'close_reason' : string}
|
||||||
|
"""
|
||||||
|
|
||||||
|
f = {'fin' : 0,
|
||||||
|
'opcode' : 0,
|
||||||
|
'mask' : 0,
|
||||||
|
'hlen' : 2,
|
||||||
|
'length' : 0,
|
||||||
|
'payload' : None,
|
||||||
|
'left' : 0,
|
||||||
|
'close_code' : None,
|
||||||
|
'close_reason' : None}
|
||||||
|
|
||||||
|
blen = len(buf)
|
||||||
|
f['left'] = blen
|
||||||
|
|
||||||
|
if blen < f['hlen']:
|
||||||
|
return f # Incomplete frame header
|
||||||
|
|
||||||
|
b1, b2 = struct.unpack_from(">BB", buf)
|
||||||
|
f['opcode'] = b1 & 0x0f
|
||||||
|
f['fin'] = (b1 & 0x80) >> 7
|
||||||
|
has_mask = (b2 & 0x80) >> 7
|
||||||
|
|
||||||
|
f['length'] = b2 & 0x7f
|
||||||
|
|
||||||
|
if f['length'] == 126:
|
||||||
|
f['hlen'] = 4
|
||||||
|
if blen < f['hlen']:
|
||||||
|
return f # Incomplete frame header
|
||||||
|
(f['length'],) = struct.unpack_from('>xxH', buf)
|
||||||
|
elif f['length'] == 127:
|
||||||
|
f['hlen'] = 10
|
||||||
|
if blen < f['hlen']:
|
||||||
|
return f # Incomplete frame header
|
||||||
|
(f['length'],) = struct.unpack_from('>xxQ', buf)
|
||||||
|
|
||||||
|
full_len = f['hlen'] + has_mask * 4 + f['length']
|
||||||
|
|
||||||
|
if blen < full_len: # Incomplete frame
|
||||||
|
return f # Incomplete frame header
|
||||||
|
|
||||||
|
# Number of bytes that are part of the next frame(s)
|
||||||
|
f['left'] = blen - full_len
|
||||||
|
|
||||||
|
# Process 1 frame
|
||||||
|
if has_mask:
|
||||||
|
# unmask payload
|
||||||
|
f['mask'] = buf[f['hlen']:f['hlen']+4]
|
||||||
|
b = c = ''
|
||||||
|
if f['length'] >= 4:
|
||||||
|
data = struct.unpack('<I', buf[f['hlen']:f['hlen']+4])[0]
|
||||||
|
of1 = f['hlen']+4
|
||||||
|
b = ''
|
||||||
|
for i in xrange(0, int(f['length']/4)):
|
||||||
|
mask = struct.unpack('<I', buf[of1+4*i:of1+4*(i+1)])[0]
|
||||||
|
b += struct.pack('I', data ^ mask)
|
||||||
|
|
||||||
|
if f['length'] % 4:
|
||||||
|
l = f['length'] % 4
|
||||||
|
of1 = f['hlen']
|
||||||
|
of2 = full_len - l
|
||||||
|
c = ''
|
||||||
|
for i in range(0, l):
|
||||||
|
mask = struct.unpack('B', buf[of1 + i])[0]
|
||||||
|
data = struct.unpack('B', buf[of2 + i])[0]
|
||||||
|
c += chr(data ^ mask)
|
||||||
|
|
||||||
|
f['payload'] = b + c
|
||||||
|
else:
|
||||||
|
print("Unmasked frame: %s" % repr(buf))
|
||||||
|
f['payload'] = buf[(f['hlen'] + has_mask * 4):full_len]
|
||||||
|
|
||||||
|
if base64 and f['opcode'] in [1, 2]:
|
||||||
|
try:
|
||||||
|
f['payload'] = b64decode(f['payload'])
|
||||||
|
except:
|
||||||
|
print("Exception while b64decoding buffer: %s" %
|
||||||
|
repr(buf))
|
||||||
|
raise
|
||||||
|
|
||||||
|
if f['opcode'] == 0x08:
|
||||||
|
if f['length'] >= 2:
|
||||||
|
f['close_code'] = struct.unpack_from(">H", f['payload'])
|
||||||
|
if f['length'] > 3:
|
||||||
|
f['close_reason'] = f['payload'][2:]
|
||||||
|
|
||||||
|
return f
|
||||||
|
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _pack_message(message):
|
||||||
|
"""Pack the message inside ``00`` and ``FF``
|
||||||
|
|
||||||
|
As per the dataframing section (5.3) for the websocket spec
|
||||||
|
"""
|
||||||
|
if isinstance(message, unicode):
|
||||||
|
message = message.encode('utf-8')
|
||||||
|
elif not isinstance(message, str):
|
||||||
|
message = str(message)
|
||||||
|
packed = "\x00%s\xFF" % message
|
||||||
|
return packed
|
||||||
|
|
||||||
|
def _parse_messages(self):
|
||||||
|
""" Parses for messages in the buffer *buf*. It is assumed that
|
||||||
|
the buffer contains the start character for a message, but that it
|
||||||
|
may contain only part of the rest of the message.
|
||||||
|
|
||||||
|
Returns an array of messages, and the buffer remainder that
|
||||||
|
didn't contain any full messages."""
|
||||||
|
msgs = []
|
||||||
|
end_idx = 0
|
||||||
|
buf = self._buf
|
||||||
|
while buf:
|
||||||
|
if self.version in ['7', '8', '13']:
|
||||||
|
frame = self.decode_hybi(buf, base64=False)
|
||||||
|
#print("Received buf: %s, frame: %s" % (repr(buf), frame))
|
||||||
|
|
||||||
|
if frame['payload'] == None:
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
if frame['opcode'] == 0x8: # connection close
|
||||||
|
self.websocket_closed = True
|
||||||
|
break
|
||||||
|
#elif frame['opcode'] == 0x1:
|
||||||
|
else:
|
||||||
|
msgs.append(frame['payload']);
|
||||||
|
#msgs.append(frame['payload'].decode('utf-8', 'replace'));
|
||||||
|
#buf = buf[-frame['left']:]
|
||||||
|
if frame['left']:
|
||||||
|
buf = buf[-frame['left']:]
|
||||||
|
else:
|
||||||
|
buf = ''
|
||||||
|
|
||||||
|
|
||||||
|
else:
|
||||||
|
frame_type = ord(buf[0])
|
||||||
|
if frame_type == 0:
|
||||||
|
# Normal message.
|
||||||
|
end_idx = buf.find("\xFF")
|
||||||
|
if end_idx == -1: #pragma NO COVER
|
||||||
|
break
|
||||||
|
msgs.append(buf[1:end_idx].decode('utf-8', 'replace'))
|
||||||
|
buf = buf[end_idx+1:]
|
||||||
|
elif frame_type == 255:
|
||||||
|
# Closing handshake.
|
||||||
|
assert ord(buf[1]) == 0, "Unexpected closing handshake: %r" % buf
|
||||||
|
self.websocket_closed = True
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
raise ValueError("Don't understand how to parse this type of message: %r" % buf)
|
||||||
|
self._buf = buf
|
||||||
|
return msgs
|
||||||
|
|
||||||
def send(self, message):
|
def send(self, message):
|
||||||
packed = format_message(message)
|
"""Send a message to the browser.
|
||||||
|
|
||||||
|
*message* should be convertable to a string; unicode objects should be
|
||||||
|
encodable as utf-8. Raises socket.error with errno of 32
|
||||||
|
(broken pipe) if the socket has already been closed by the client."""
|
||||||
|
if self.version in ['7', '8', '13']:
|
||||||
|
packed, lenhead, lentail = self.encode_hybi(message, opcode=0x01, base64=False)
|
||||||
|
else:
|
||||||
|
packed = self._pack_message(message)
|
||||||
# if two greenthreads are trying to send at the same time
|
# if two greenthreads are trying to send at the same time
|
||||||
# on the same socket, sendlock prevents interleaving and corruption
|
# on the same socket, sendlock prevents interleaving and corruption
|
||||||
|
#self._sendlock.acquire()
|
||||||
t = self._sendlock.get()
|
t = self._sendlock.get()
|
||||||
try:
|
try:
|
||||||
self.sock.sendall(packed)
|
self.socket.sendall(packed)
|
||||||
finally:
|
finally:
|
||||||
self._sendlock.put(t)
|
self._sendlock.put(t)
|
||||||
|
|
||||||
def wait(self):
|
def wait(self):
|
||||||
|
"""Waits for and deserializes messages.
|
||||||
|
|
||||||
|
Returns a single message; the oldest not yet processed. If the client
|
||||||
|
has already closed the connection, returns None. This is different
|
||||||
|
from normal socket behavior because the empty string is a valid
|
||||||
|
websocket message."""
|
||||||
while not self._msgs:
|
while not self._msgs:
|
||||||
|
# Websocket might be closed already.
|
||||||
|
if self.websocket_closed:
|
||||||
|
return None
|
||||||
# no parsed messages, must mean buf needs more data
|
# no parsed messages, must mean buf needs more data
|
||||||
delta = self.sock.recv(1024)
|
delta = self.socket.recv(8096)
|
||||||
if delta == '':
|
if delta == '':
|
||||||
return None
|
return None
|
||||||
self._buf += delta
|
self._buf += delta
|
||||||
msgs, self._buf = parse_messages(self._buf)
|
msgs = self._parse_messages()
|
||||||
self._msgs.extend(msgs)
|
self._msgs.extend(msgs)
|
||||||
return self._msgs.popleft()
|
return self._msgs.popleft()
|
||||||
|
|
||||||
|
def _send_closing_frame(self, ignore_send_errors=False):
|
||||||
|
"""Sends the closing frame to the client, if required."""
|
||||||
|
if self.version in ['7', '8', '13'] and not self.websocket_closed:
|
||||||
|
msg = ''
|
||||||
|
#if code != None:
|
||||||
|
# msg = struct.pack(">H%ds" % (len(reason)), code)
|
||||||
|
|
||||||
|
buf, h, t = self.encode_hybi(msg, opcode=0x08, base64=False)
|
||||||
|
self.socket.sendall(buf)
|
||||||
|
self.websocket_closed = True
|
||||||
|
|
||||||
|
elif self.version == 76 and not self.websocket_closed:
|
||||||
|
try:
|
||||||
|
self.socket.sendall("\xff\x00")
|
||||||
|
except SocketError:
|
||||||
|
# Sometimes, like when the remote side cuts off the connection,
|
||||||
|
# we don't care about this.
|
||||||
|
if not ignore_send_errors: #pragma NO COVER
|
||||||
|
raise
|
||||||
|
self.websocket_closed = True
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
"""Forcibly close the websocket; generally it is preferable to
|
||||||
|
return from the handler method."""
|
||||||
|
self._send_closing_frame()
|
||||||
|
self.socket.shutdown(True)
|
||||||
|
self.socket.close()
|
||||||
|
|
||||||
# demo app
|
# demo app
|
||||||
import os
|
import os
|
||||||
@ -184,3 +443,4 @@ def app(environ, start_response):
|
|||||||
return [data]
|
return [data]
|
||||||
else:
|
else:
|
||||||
return wsapp(environ, start_response)
|
return wsapp(environ, start_response)
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user