mirror of
https://github.com/frappe/gunicorn.git
synced 2026-01-14 11:09:11 +08:00
Set blocking to 0 back since we prevented inheritence of the socket the socket.
This commit is contained in:
parent
bfeb4f9416
commit
db01c210a2
@ -135,7 +135,8 @@ class Arbiter(object):
|
|||||||
log.error("Unhandled signal: %s" % signame)
|
log.error("Unhandled signal: %s" % signame)
|
||||||
continue
|
continue
|
||||||
log.info("Handling signal: %s" % signame)
|
log.info("Handling signal: %s" % signame)
|
||||||
handler()
|
handler()
|
||||||
|
self.wakeup()
|
||||||
except StopIteration:
|
except StopIteration:
|
||||||
break
|
break
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
|
|||||||
@ -17,13 +17,14 @@ import logging
|
|||||||
from gunicorn import __version__
|
from gunicorn import __version__
|
||||||
from gunicorn.http.http_parser import HttpParser
|
from gunicorn.http.http_parser import HttpParser
|
||||||
from gunicorn.http.tee import TeeInput
|
from gunicorn.http.tee import TeeInput
|
||||||
from gunicorn.util import CHUNK_SIZE, read_partial, normalize_name
|
from gunicorn.util import CHUNK_SIZE, read_partial, \
|
||||||
|
normalize_name
|
||||||
|
|
||||||
|
|
||||||
NORMALIZE_SPACE = re.compile(r'(?:\r\n)?[ \t]+')
|
NORMALIZE_SPACE = re.compile(r'(?:\r\n)?[ \t]+')
|
||||||
|
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class RequestError(Exception):
|
class RequestError(Exception):
|
||||||
@ -57,7 +58,7 @@ class HTTPRequest(object):
|
|||||||
self.parser = HttpParser()
|
self.parser = HttpParser()
|
||||||
self.start_response_called = False
|
self.start_response_called = False
|
||||||
|
|
||||||
self.log = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
def read(self):
|
def read(self):
|
||||||
environ = {}
|
environ = {}
|
||||||
@ -75,13 +76,12 @@ class HTTPRequest(object):
|
|||||||
if i != -1: break
|
if i != -1: break
|
||||||
|
|
||||||
if not headers:
|
if not headers:
|
||||||
print "ici :()"
|
|
||||||
environ.update(self.DEFAULTS)
|
environ.update(self.DEFAULTS)
|
||||||
return environ
|
return environ
|
||||||
|
|
||||||
self.log.info("%s", self.parser.status)
|
log.info("%s", self.parser.status)
|
||||||
|
|
||||||
self.log.info("Got headers:\n%s" % headers)
|
log.info("Got headers:\n%s" % headers)
|
||||||
|
|
||||||
if self.parser.headers_dict.get('Except', '').lower() == "100-continue":
|
if self.parser.headers_dict.get('Except', '').lower() == "100-continue":
|
||||||
self.socket.send("100 Continue\n")
|
self.socket.send("100 Continue\n")
|
||||||
|
|||||||
@ -122,12 +122,13 @@ class TeeInput(object):
|
|||||||
data = read_partial(self.socket, length)
|
data = read_partial(self.socket, length)
|
||||||
self.buf += data
|
self.buf += data
|
||||||
chunk, self.buf = self.parser.filter_body(self.buf)
|
chunk, self.buf = self.parser.filter_body(self.buf)
|
||||||
print self.buf
|
|
||||||
if chunk:
|
if chunk:
|
||||||
print chunk
|
|
||||||
self.tmp.write(chunk)
|
self.tmp.write(chunk)
|
||||||
self.tmp.seek(0, os.SEEK_END)
|
self.tmp.seek(0, os.SEEK_END)
|
||||||
return chunk
|
return chunk
|
||||||
|
if not data:
|
||||||
|
self._is_socket = False
|
||||||
|
break
|
||||||
self._finalize()
|
self._finalize()
|
||||||
return ""
|
return ""
|
||||||
|
|
||||||
@ -136,7 +137,7 @@ class TeeInput(object):
|
|||||||
if any."""
|
if any."""
|
||||||
if self.parser.body_eof():
|
if self.parser.body_eof():
|
||||||
# handle trailing headers
|
# handle trailing headers
|
||||||
if self.parser.is_chunked:
|
if self.parser.is_chunked and self._is_socket:
|
||||||
while not self.parser.trailing_header(self.buf):
|
while not self.parser.trailing_header(self.buf):
|
||||||
data = read_partial(self.socket, CHUNK_SIZE)
|
data = read_partial(self.socket, CHUNK_SIZE)
|
||||||
if not data: break
|
if not data: break
|
||||||
|
|||||||
@ -8,6 +8,7 @@ import select
|
|||||||
import socket
|
import socket
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
|
||||||
timeout_default = object()
|
timeout_default = object()
|
||||||
|
|
||||||
CHUNK_SIZE = (16 * 1024)
|
CHUNK_SIZE = (16 * 1024)
|
||||||
@ -37,14 +38,15 @@ def close(sock):
|
|||||||
def read_partial(sock, length):
|
def read_partial(sock, length):
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
ret = select.select([sock.fileno()], [], [])
|
ret = select.select([sock.fileno()], [], [], 0)
|
||||||
if ret[0]: break
|
if ret[0]: break
|
||||||
except select.error, e:
|
except select.error, e:
|
||||||
if e[0] == errno.EINTR:
|
if e[0] == errno.EINTR:
|
||||||
break
|
continue
|
||||||
raise
|
raise
|
||||||
data = sock.recv(length)
|
data = sock.recv(length)
|
||||||
return data
|
return data
|
||||||
|
|
||||||
|
|
||||||
def write(sock, data):
|
def write(sock, data):
|
||||||
buf = ""
|
buf = ""
|
||||||
|
|||||||
@ -18,6 +18,8 @@ import time
|
|||||||
from gunicorn import http
|
from gunicorn import http
|
||||||
from gunicorn import util
|
from gunicorn import util
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
class Worker(object):
|
class Worker(object):
|
||||||
|
|
||||||
SIGNALS = map(
|
SIGNALS = map(
|
||||||
@ -37,14 +39,19 @@ class Worker(object):
|
|||||||
self.close_on_exec(socket)
|
self.close_on_exec(socket)
|
||||||
self.close_on_exec(fd)
|
self.close_on_exec(fd)
|
||||||
|
|
||||||
|
|
||||||
|
# Set blocking to 0 back since we
|
||||||
|
# prevented inheritence of the socket
|
||||||
|
# the socket.
|
||||||
|
socket.setblocking(0)
|
||||||
|
|
||||||
self.socket = socket
|
self.socket = socket
|
||||||
self.address = socket.getsockname()
|
self.address = socket.getsockname()
|
||||||
|
|
||||||
self.app = app
|
self.app = app
|
||||||
self.alive = True
|
self.alive = True
|
||||||
|
|
||||||
self.log = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
def close_on_exec(self, fd):
|
def close_on_exec(self, fd):
|
||||||
flags = fcntl.fcntl(fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
|
flags = fcntl.fcntl(fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
|
||||||
fcntl.fcntl(fd, fcntl.F_SETFL, flags)
|
fcntl.fcntl(fd, fcntl.F_SETFL, flags)
|
||||||
@ -72,6 +79,31 @@ class Worker(object):
|
|||||||
self.init_signals()
|
self.init_signals()
|
||||||
spinner = 0
|
spinner = 0
|
||||||
while self.alive:
|
while self.alive:
|
||||||
|
|
||||||
|
nr = 0
|
||||||
|
# Accept until we hit EAGAIN. We're betting that when we're
|
||||||
|
# processing clients that more clients are waiting. When
|
||||||
|
# there's no more clients waiting we go back to the select()
|
||||||
|
# loop and wait for some lovin.
|
||||||
|
while self.alive:
|
||||||
|
try:
|
||||||
|
client, addr = self.socket.accept()
|
||||||
|
|
||||||
|
|
||||||
|
# handle connection
|
||||||
|
self.handle(client, addr)
|
||||||
|
|
||||||
|
# Update the fd mtime on each client completion
|
||||||
|
# to signal that this worker process is alive.
|
||||||
|
spinner = (spinner+1) % 2
|
||||||
|
self._fchmod(spinner)
|
||||||
|
nr += 1
|
||||||
|
except socket.error, e:
|
||||||
|
if e[0] in [errno.EAGAIN, errno.ECONNABORTED,
|
||||||
|
errno.EWOULDBLOCK]:
|
||||||
|
break # Uh oh!
|
||||||
|
raise
|
||||||
|
if nr == 0: break
|
||||||
|
|
||||||
while self.alive:
|
while self.alive:
|
||||||
spinner = (spinner+1) % 2
|
spinner = (spinner+1) % 2
|
||||||
@ -90,28 +122,6 @@ class Worker(object):
|
|||||||
|
|
||||||
spinner = (spinner+1) % 2
|
spinner = (spinner+1) % 2
|
||||||
self._fchmod(spinner)
|
self._fchmod(spinner)
|
||||||
|
|
||||||
# Accept until we hit EAGAIN. We're betting that when we're
|
|
||||||
# processing clients that more clients are waiting. When
|
|
||||||
# there's no more clients waiting we go back to the select()
|
|
||||||
# loop and wait for some lovin.
|
|
||||||
while self.alive:
|
|
||||||
try:
|
|
||||||
client, addr = self.socket.accept()
|
|
||||||
client.setblocking(0)
|
|
||||||
|
|
||||||
# handle connection
|
|
||||||
self.handle(client, addr)
|
|
||||||
|
|
||||||
# Update the fd mtime on each client completion
|
|
||||||
# to signal that this worker process is alive.
|
|
||||||
spinner = (spinner+1) % 2
|
|
||||||
self._fchmod(spinner)
|
|
||||||
except socket.error, e:
|
|
||||||
if e[0] in [errno.EAGAIN, errno.ECONNABORTED,
|
|
||||||
errno.EWOULDBLOCK]:
|
|
||||||
break # Uh oh!
|
|
||||||
raise
|
|
||||||
|
|
||||||
def handle(self, client, addr):
|
def handle(self, client, addr):
|
||||||
self.close_on_exec(client)
|
self.close_on_exec(client)
|
||||||
@ -120,7 +130,7 @@ class Worker(object):
|
|||||||
response = self.app(req.read(), req.start_response)
|
response = self.app(req.read(), req.start_response)
|
||||||
http.HTTPResponse(client, response, req).send()
|
http.HTTPResponse(client, response, req).send()
|
||||||
except Exception, e:
|
except Exception, e:
|
||||||
self.log.exception("Error processing request. [%s]" % str(e))
|
log.exception("Error processing request. [%s]" % str(e))
|
||||||
# try to send something if an error happend
|
# try to send something if an error happend
|
||||||
msg = "HTTP/1.1 500 Internal Server Error\r\n\r\n"
|
msg = "HTTP/1.1 500 Internal Server Error\r\n\r\n"
|
||||||
util.write_nonblock(client, msg)
|
util.write_nonblock(client, msg)
|
||||||
|
|||||||
@ -17,6 +17,7 @@ def test_001(buf, p):
|
|||||||
])
|
])
|
||||||
body, tr = p.filter_body(buf[i:])
|
body, tr = p.filter_body(buf[i:])
|
||||||
t.eq(body, '{"nom": "nom"}')
|
t.eq(body, '{"nom": "nom"}')
|
||||||
|
print t.eq(p.body_eof(), True)
|
||||||
|
|
||||||
@t.request("002.http")
|
@t.request("002.http")
|
||||||
def test_002(buf, p):
|
def test_002(buf, p):
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user