save for backup. introduce read_partial.

This commit is contained in:
Benoit Chesneau 2010-01-17 03:30:57 +01:00
parent ecd684eaed
commit b0271b6625
8 changed files with 130 additions and 67 deletions

View File

@ -34,7 +34,7 @@ import socket
import sys import sys
import time import time
from worker import Worker from .worker import Worker
logging.basicConfig(level=logging.DEBUG, format="%(levelname)s: %(message)s") logging.basicConfig(level=logging.DEBUG, format="%(levelname)s: %(message)s")
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -127,7 +127,7 @@ class Arbiter(object):
return sock return sock
def set_sockopts(self, sock): def set_sockopts(self, sock):
sock.setblocking(0)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 0) sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 0)
if hasattr(socket, "TCP_CORK"): if hasattr(socket, "TCP_CORK"):
@ -139,11 +139,9 @@ class Arbiter(object):
self.manage_workers() self.manage_workers()
while True: while True:
try: try:
sig = self.SIG_QUEUE.pop(0) if len(self.SIG_QUEUE) else None sig = self.SIG_QUEUE.pop(0) if len(self.SIG_QUEUE) else None
if sig is None: if sig is None:
self.murder_workers()
self.reap_workers()
self.manage_workers()
self.sleep() self.sleep()
continue continue
@ -159,6 +157,9 @@ class Arbiter(object):
log.info("Handling signal: %s" % signame) log.info("Handling signal: %s" % signame)
handler() handler()
self.reap_workers()
self.murder_workers()
self.manage_workers()
except StopIteration: except StopIteration:
break break
except KeyboardInterrupt: except KeyboardInterrupt:

View File

@ -24,5 +24,5 @@
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR # FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE. # OTHER DEALINGS IN THE SOFTWARE.
from request import HTTPRequest from request import HTTPRequest, RequestError
from response import HTTPResponse from response import HTTPResponse

View File

@ -43,19 +43,18 @@ class HttpParser(object):
# wee could be smarter here # wee could be smarter here
# by just reading the array, but converting # by just reading the array, but converting
# is enough for now # is enough for now
cs = "".join(buf)
ld = len("\r\n\r\n") ld = len("\r\n\r\n")
i = cs.find("\r\n\r\n") i = buf.find("\r\n\r\n")
if i != -1: if i != -1:
if i > 0: if i > 0:
r = cs[:i] r = buf[:i]
buf = create_string_buffer(cs[i+ ld:]) pos = i+ld
return self.finalize_headers(headers, r) return self.finalize_headers(headers, r, pos)
return None return -1
def finalize_headers(self, headers, headers_str): def finalize_headers(self, headers, headers_str, pos):
lines = headers_str.split("\r\n") lines = headers_str.split("\r\n")
# parse first line of headers # parse first line of headers
self._first_line(lines.pop(0)) self._first_line(lines.pop(0))
@ -73,7 +72,7 @@ class HttpParser(object):
pass pass
headers.update(self._headers) headers.update(self._headers)
self._content_len = int(self._headers.get('Content-Length') or 0) self._content_len = int(self._headers.get('Content-Length') or 0)
return headers return pos
def _first_line(self, line): def _first_line(self, line):
method, path, version = line.strip().split(" ") method, path, version = line.strip().split(" ")

View File

@ -24,38 +24,40 @@
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR # FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE. # OTHER DEALINGS IN THE SOFTWARE.
from ctypes import create_string_buffer import errno
from ctypes import *
import re import re
import StringIO import StringIO
import socket
import sys import sys
from urllib import unquote from urllib import unquote
import array
import logging
from gunicorn import __version__ from gunicorn import __version__
from gunicorn.http.http_parser import HttpParser from .http_parser import HttpParser
from gunicorn.http.tee import TeeInput from .tee import TeeInput
from gunicorn.util import CHUNK_SIZE from ..util import CHUNK_SIZE, read_partial
NORMALIZE_SPACE = re.compile(r'(?:\r\n)?[ \t]+') NORMALIZE_SPACE = re.compile(r'(?:\r\n)?[ \t]+')
log = logging.getLogger(__name__)
def _normalize_name(name): def _normalize_name(name):
return "-".join([w.lower().capitalize() for w in name.split("-")]) return "-".join([w.lower().capitalize() for w in name.split("-")])
class RequestError(Exception): class RequestError(Exception):
""" raised when something wrong happend"""
def __init__(self, status_code, reason):
self.status_code = status_code
self.reason = reason
Exception.__init__(self, (status_code, reason))
class HTTPRequest(object): class HTTPRequest(object):
SERVER_VERSION = "gunicorn/%s" % __version__ SERVER_VERSION = "gunicorn/%s" % __version__
def __init__(self, socket, client_address, server_address): def __init__(self, socket, client_address, server_address, wid):
self.socket = socket.dup() self.wid = wid
self.socket = socket
self.client_address = client_address self.client_address = client_address
self.server_address = server_address self.server_address = server_address
self.response_status = None self.response_status = None
@ -67,13 +69,25 @@ class HTTPRequest(object):
def read(self): def read(self):
headers = {} headers = {}
remain = CHUNK_SIZE remain = CHUNK_SIZE
buf = create_string_buffer(remain) buf = ""
remain -= self.socket.recv_into(buf, remain) buf = read_partial(self.socket, CHUNK_SIZE)
while not self.parser.headers(headers, buf): i = self.parser.headers(headers, buf)
data = create_string_buffer(remain) if i == -1 and buf:
remain -= self.socket.recv_into(data, remain) while True:
buf = create_string_buffer(data.value + buf.value) data = read_partial(self.socket, CHUNK_SIZE)
if not data: break
buf += data
i = self.parser.headers(headers, buf)
if i != -1: break
if not headers:
return
buf = buf[i:]
log.info("worker %s. got headers:\n%s" % (self.wid, headers))
if headers.get('Except', '').lower() == "100-continue": if headers.get('Except', '').lower() == "100-continue":
self.socket.send("100 Continue\n") self.socket.send("100 Continue\n")

View File

@ -24,13 +24,15 @@
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR # FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE. # OTHER DEALINGS IN THE SOFTWARE.
import errno
import time import time
from gunicorn.util import http_date from ..util import http_date, write
class HTTPResponse(object): class HTTPResponse(object):
def __init__(self, sock, response, req): def __init__(self, sock, response, req):
self.req = req
self.sock = sock self.sock = sock
self.data = response self.data = response
self.headers = req.response_headers or {} self.headers = req.response_headers or {}
@ -38,23 +40,24 @@ class HTTPResponse(object):
self.SERVER_VERSION = req.SERVER_VERSION self.SERVER_VERSION = req.SERVER_VERSION
def send(self): def send(self):
# send headers if self.req.parser.headers:
resp_head = [] # send headers
resp_head.append("HTTP/1.1 %s\r\n" % (self.status)) resp_head = []
resp_head.append("HTTP/1.0 %s\r\n" % (self.status))
resp_head.append("Server: %s\r\n" % self.SERVER_VERSION) resp_head.append("Server: %s\r\n" % self.SERVER_VERSION)
resp_head.append("Date: %s\r\n" % http_date()) resp_head.append("Date: %s\r\n" % http_date())
# broken clients # broken clients
resp_head.append("Status: %s\r\n" % str(self.status)) resp_head.append("Status: %s\r\n" % str(self.status))
# always close the conenction # always close the conenction
resp_head.append("Connection: close\r\n") resp_head.append("Connection: close\r\n")
for name, value in self.headers.items(): for name, value in self.headers.items():
resp_head.append("%s: %s\r\n" % (name, value)) resp_head.append("%s: %s\r\n" % (name, value))
self.sock.send("%s\r\n" % "".join(resp_head)) write(self.sock, "%s\r\n" % "".join(resp_head))
for chunk in self.data: for chunk in self.data:
self.sock.send(chunk) write(self.sock, chunk)
self.sock.close() self.sock.close()

View File

@ -36,7 +36,7 @@ import StringIO
import tempfile import tempfile
from ctypes import create_string_buffer from ctypes import create_string_buffer
from gunicorn.util import MAX_BODY, CHUNK_SIZE from ..util import MAX_BODY, CHUNK_SIZE
class TeeInput(object): class TeeInput(object):

View File

@ -24,8 +24,13 @@
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR # FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE. # OTHER DEALINGS IN THE SOFTWARE.
import errno
import select
import socket
import time import time
timeout_default = object()
CHUNK_SIZE = (16 * 1024) CHUNK_SIZE = (16 * 1024)
MAX_BODY = 1024 * (80 + 32) MAX_BODY = 1024 * (80 + 32)
@ -35,6 +40,40 @@ weekdayname = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
monthname = [None, monthname = [None,
'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'] 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
def read_partial(sock, length):
while True:
try:
ret = select.select([sock.fileno()], [], [], 2.0)
if ret[0]: break
except socket.error, e:
if e[0] == errno.EINTR:
break
raise
data = sock.recv(length)
return data
def write(sock, data):
for i in xrange(2):
print i
try:
return sock.send(data)
except socket.error:
if i == 2:
print "raise"
raise
def write_nonblock(sock, data):
while True:
try:
ret = select.select([], [sock.fileno()], [], 2.0)
if ret[1]: break
except socket.error, e:
if e[0] == errno.EINTR:
break
raise
sock.send(data)
def import_app(module): def import_app(module):
parts = module.rsplit(":", 1) parts = module.rsplit(":", 1)

View File

@ -35,8 +35,8 @@ import sys
import tempfile import tempfile
import time import time
from gunicorn import http from . import http
from gunicorn import util from . import util
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -79,7 +79,7 @@ class Worker(object):
self.alive = False self.alive = False
def handle_exit(self, sig, frame): def handle_exit(self, sig, frame):
sys.exit(-1) sys.exit(0)
def _fchmod(self, mode): def _fchmod(self, mode):
if getattr(os, 'fchmod', None): if getattr(os, 'fchmod', None):
@ -102,20 +102,25 @@ class Worker(object):
except select.error, e: except select.error, e:
if e[0] == errno.EINTR: if e[0] == errno.EINTR:
break break
elif e[0] == errno.EBADF:
return
raise raise
spinner = (spinner+1) % 2
self._fchmod(spinner)
# Accept until we hit EAGAIN. We're betting that when we're # Accept until we hit EAGAIN. We're betting that when we're
# processing clients that more clients are waiting. When # processing clients that more clients are waiting. When
# there's no more clients waiting we go back to the select() # there's no more clients waiting we go back to the select()
# loop and wait for some lovin. # loop and wait for some lovin.
while self.alive: while self.alive:
try: try:
conn, addr = self.socket.accept() client, addr = self.socket.accept()
conn.setblocking(1) client.setblocking(0)
# handle connection # handle connection
self.handle(conn, addr) self.handle(client, addr)
# Update the fd mtime on each client completion # Update the fd mtime on each client completion
# to signal that this worker process is alive. # to signal that this worker process is alive.
spinner = (spinner+1) % 2 spinner = (spinner+1) % 2
@ -124,16 +129,18 @@ class Worker(object):
if e[0] in [errno.EAGAIN, errno.ECONNABORTED]: if e[0] in [errno.EAGAIN, errno.ECONNABORTED]:
break # Uh oh! break # Uh oh!
raise raise
def handle(self, conn, client): def handle(self, client, addr):
self.close_on_exec(conn) self.close_on_exec(client)
try: try:
req = http.HTTPRequest(conn, client, self.address) req = http.HTTPRequest(client, addr, self.address, self.id)
response = self.app(req.read(), req.start_response) response = self.app(req.read(), req.start_response)
http.HTTPResponse(conn, response, req).send() http.HTTPResponse(client, response, req).send()
except Exception, e: except Exception, e:
log.exception("Error processing request. [%s]" % str(e)) log.exception("Error processing request. [%s]" % str(e))
if e[0] == 32: msg = "HTTP/1.1 500 Internal Server Error\r\n\r\n"
raise util.write_nonblock(client, msg)
conn.send("HTTP/1.1 500 Internal Server Error\r\n\r\n") client.close()
conn.close()