diff --git a/gunicorn/http/sendfile.py b/gunicorn/http/sendfile.py new file mode 100644 index 00000000..71b393b8 --- /dev/null +++ b/gunicorn/http/sendfile.py @@ -0,0 +1,55 @@ +# -*- coding: utf-8 - +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +import ctypes +import ctypes.util +import errno +import os +import sys + +if sys.version_info >= (2, 6): + _libc = ctypes.CDLL(ctypes.util.find_library("c"), use_errno=True) + _sendfile = _libc.sendfile +else: + _sendfile = None + +if _sendfile: + if sys.platform == 'darwin': + # MacOS X - int sendfile(int fd, int s, off_t offset, off_t *len, + # struct sf_hdtr *hdtr, int flags); + + _sendfile.argtypes = [ctypes.c_int, ctypes.c_int, ctypes.c_uint64, + ctypes.POINTER(ctypes.c_uint64), ctypes.c_voidp, + ctypes.c_int] + + def sendfile(fileno, sockno, offset, nbytes): + _nbytes = ctypes.c_uint64(nbytes) + result = _sendfile(fileno, sockno, offset, _nbytes, None, 0) + if result == -1: + e = ctypes.get_errno() + if e == errno.EAGAIN and _nbytes.value: + return _nbytes.value + raise OSError(e, os.strerror(e)) + return _nbytes.value + + elif sys.platform == 'linux2': + # Linux - size_t sendfile(int out_fd, int in_fd, off_t *offset, + # size_t count); + + _sendfile.argtypes = [ctypes.c_int, ctypes.c_int, + ctypes.POINTER(ctypes.c_uint64), ctypes.c_size_t] + + def sendfile(fileno, sockno, offset, nbytes): + _offset = ctypes.c_uint64(offset) + result = _sendfile(sockno, fileno, _offset, nbytes) + if result == -1: + e = ctypes.get_errno() + raise OSError(e, os.strerror(e)) + return result + + else: + sendfile = None +else: + sendfile = None diff --git a/gunicorn/http/wsgi.py b/gunicorn/http/wsgi.py index 752d3304..4a3907a8 100644 --- a/gunicorn/http/wsgi.py +++ b/gunicorn/http/wsgi.py @@ -12,10 +12,30 @@ from urllib import unquote from gunicorn import SERVER_SOFTWARE import gunicorn.util as util +try: + # Python 3.3 has os.sendfile(). + from os import sendfile +except: + from sendfile import sendfile + NORMALIZE_SPACE = re.compile(r'(?:\r\n)?[ \t]+') log = logging.getLogger(__name__) +class FileWrapper: + + def __init__(self, filelike, blksize=8192): + self.filelike = filelike + self.blksize = blksize + if hasattr(filelike, 'close'): + self.close = filelike.close + + def __getitem__(self, key): + data = self.filelike.read(self.blksize) + if data: + return data + raise IndexError + def create(req, sock, client, server, cfg): resp = Response(req, sock) @@ -26,6 +46,7 @@ def create(req, sock, client, server, cfg): "wsgi.multithread": False, "wsgi.multiprocess": (cfg.workers > 1), "wsgi.run_once": False, + "wsgi.file_wrapper": FileWrapper, "gunicorn.socket": sock, "SERVER_SOFTWARE": SERVER_SOFTWARE, "REQUEST_METHOD": req.method, @@ -226,6 +247,54 @@ class Response(object): self.sent += tosend util.write(self.sock, arg, self.chunked) + def sendfile_all(self, fileno, sockno, offset, nbytes): + # Send file in at most 1GB blocks as some operating + # systems can have problems with sending files in blocks + # over 2GB. + + BLKSIZE = 0x3FFFFFFF + + if nbytes > BLKSIZE: + for m in range(0, nbytes, BLKSIZE): + self.sendfile_all(fileno, sockno, offset, min(nbytes, BLKSIZE)) + offset += BLKSIZE + nbytes -= BLKSIZE + else: + sent = 0 + sent += sendfile(fileno, sockno, offset+sent, nbytes-sent) + while sent != nbytes: + sent += sendfile(fileno, sockno, offset+sent, nbytes-sent) + + def write_file(self, respiter): + if sendfile and hasattr(respiter.filelike, 'fileno') and \ + hasattr(respiter.filelike, 'tell'): + + fileno = respiter.filelike.fileno() + fd_offset = os.lseek(fileno, 0, os.SEEK_CUR) + fo_offset = respiter.filelike.tell() + nbytes = max(os.fstat(fileno).st_size - fo_offset, 0) + + if self.clength: + nbytes = min(nbytes, self.clength) + + if nbytes == 0: + return + + self.send_headers() + + if self.is_chunked(): + self.sock.sendall("%X\r\n" % nbytes) + + self.sendfile_all(fileno, self.sock.fileno(), fo_offset, nbytes) + + if self.is_chunked(): + self.sock.sendall("\r\n") + + os.lseek(fileno, fd_offset, os.SEEK_SET) + else: + for item in respiter: + self.write(item) + def close(self): if not self.headers_sent: self.send_headers() diff --git a/gunicorn/workers/async.py b/gunicorn/workers/async.py index 4f6127b8..486acec3 100644 --- a/gunicorn/workers/async.py +++ b/gunicorn/workers/async.py @@ -62,11 +62,13 @@ class AsyncWorker(base.Worker): respiter = self.wsgi(environ, resp.start_response) if respiter == ALREADY_HANDLED: return False - for item in respiter: - resp.write(item) - resp.close() - if hasattr(respiter, "close"): - respiter.close() + try: + for item in respiter: + resp.write(item) + resp.close() + finally: + if hasattr(respiter, "close"): + respiter.close() if resp.should_close(): raise StopIteration() except StopIteration: diff --git a/gunicorn/workers/sync.py b/gunicorn/workers/sync.py index a9e66d56..fe895d4b 100644 --- a/gunicorn/workers/sync.py +++ b/gunicorn/workers/sync.py @@ -94,11 +94,16 @@ class SyncWorker(base.Worker): self.log.info("Autorestarting worker after current request.") self.alive = False respiter = self.wsgi(environ, resp.start_response) - for item in respiter: - resp.write(item) - resp.close() - if hasattr(respiter, "close"): - respiter.close() + try: + if isinstance(respiter, environ['wsgi.file_wrapper']): + resp.write_file(respiter) + else: + for item in respiter: + resp.write(item) + resp.close() + finally: + if hasattr(respiter, "close"): + respiter.close() except socket.error: raise except Exception, e: