Use socket.sendfile() instead of os.sendfile().

Fixes #2223.

Unfortunately, eventlet doesn't implement GreenSocket.sendfile, so we have to do it for it.

Add gevent and eventlet to tox.ini and add tests to make sure we can at least import the workers. Some tests that this actually functions would be nice...

Update the gevent and eventlet setup extras to require the versions that are enforced in their worker modules.
This commit is contained in:
Jason Madden 2020-01-04 06:31:25 -06:00
parent 9c1438f013
commit 2d40e6dace
No known key found for this signature in database
GPG Key ID: 349F84431A08B99E
10 changed files with 77 additions and 58 deletions

View File

@ -1,33 +1,31 @@
sudo: false
language: python language: python
matrix: matrix:
include: include:
- python: 3.8 - python: 3.8
env: TOXENV=lint env: TOXENV=lint
dist: xenial
sudo: true
- python: 3.5 - python: 3.5
env: TOXENV=py35 env: TOXENV=py35
- python: 3.6 - python: 3.6
env: TOXENV=py36 env: TOXENV=py36
- python: 3.7 - python: 3.7
env: TOXENV=py37 env: TOXENV=py37
dist: xenial
sudo: true
- python: pypy3 - python: pypy3
env: TOXENV=pypy3 env:
- TOXENV=pypy3
# Embedded c-ares takes a long time to build and
# as-of 2020-01-04 there are no PyPy3 manylinux
# wheels for gevent on PyPI.
- GEVENTSETUP_EMBED_CARES=no
dist: xenial dist: xenial
- python: 3.8 - python: 3.8
env: TOXENV=py38 env: TOXENV=py38
dist: xenial
sudo: true
- python: 3.8 - python: 3.8
env: TOXENV=docs-lint env: TOXENV=docs-lint
dist: xenial install: pip install -U tox coverage
sudo: true
install: pip install tox
# TODO: https://github.com/tox-dev/tox/issues/149 # TODO: https://github.com/tox-dev/tox/issues/149
script: tox --recreate script: tox --recreate
after_success:
- if [ -f .coverage ]; then coverage report ; fi
cache: cache:
directories: directories:
- .tox - .tox

View File

@ -360,12 +360,6 @@ class Response(object):
offset = os.lseek(fileno, 0, os.SEEK_CUR) offset = os.lseek(fileno, 0, os.SEEK_CUR)
if self.response_length is None: if self.response_length is None:
filesize = os.fstat(fileno).st_size filesize = os.fstat(fileno).st_size
# The file may be special and sendfile will fail.
# It may also be zero-length, but that is okay.
if filesize == 0:
return False
nbytes = filesize - offset nbytes = filesize - offset
else: else:
nbytes = self.response_length nbytes = self.response_length
@ -378,12 +372,7 @@ class Response(object):
chunk_size = "%X\r\n" % nbytes chunk_size = "%X\r\n" % nbytes
self.sock.sendall(chunk_size.encode('utf-8')) self.sock.sendall(chunk_size.encode('utf-8'))
sockno = self.sock.fileno() self.sock.sendfile(respiter.filelike, count=nbytes)
sent = 0
while sent != nbytes:
count = min(nbytes - sent, BLKSIZE)
sent += os.sendfile(sockno, fileno, offset + sent, count)
if self.is_chunked(): if self.is_chunked():
self.sock.sendall(b"\r\n") self.sock.sendall(b"\r\n")

View File

@ -4,8 +4,6 @@
# See the NOTICE for more information. # See the NOTICE for more information.
from functools import partial from functools import partial
import errno
import os
import sys import sys
try: try:
@ -19,22 +17,49 @@ else:
from eventlet import hubs, greenthread from eventlet import hubs, greenthread
from eventlet.greenio import GreenSocket from eventlet.greenio import GreenSocket
from eventlet.hubs import trampoline
from eventlet.wsgi import ALREADY_HANDLED as EVENTLET_ALREADY_HANDLED from eventlet.wsgi import ALREADY_HANDLED as EVENTLET_ALREADY_HANDLED
import greenlet import greenlet
from gunicorn.workers.base_async import AsyncWorker from gunicorn.workers.base_async import AsyncWorker
def _eventlet_sendfile(fdout, fdin, offset, nbytes, _os_sendfile=os.sendfile): def _eventlet_socket_sendfile(self, file, offset=0, count=None):
while True: # Based on the implementation in gevent which in turn is slightly
try: # modified from the standard library implementation.
return _os_sendfile(fdout, fdin, offset, nbytes) if self.gettimeout() == 0:
except OSError as e: raise ValueError("non-blocking sockets are not supported")
if e.args[0] == errno.EAGAIN: if offset:
trampoline(fdout, write=True) file.seek(offset)
else: blocksize = min(count, 8192) if count else 8192
raise total_sent = 0
# localize variable access to minimize overhead
file_read = file.read
sock_send = self.send
try:
while True:
if count:
blocksize = min(count - total_sent, blocksize)
if blocksize <= 0:
break
data = memoryview(file_read(blocksize))
if not data:
break # EOF
while True:
try:
sent = sock_send(data)
except BlockingIOError:
continue
else:
total_sent += sent
if sent < len(data):
data = data[sent:]
else:
break
return total_sent
finally:
if total_sent > 0 and hasattr(file, 'seek'):
file.seek(offset + total_sent)
def _eventlet_serve(sock, handle, concurrency): def _eventlet_serve(sock, handle, concurrency):
@ -79,7 +104,17 @@ def _eventlet_stop(client, server, conn):
def patch_sendfile(): def patch_sendfile():
setattr(os, "sendfile", _eventlet_sendfile) # As of eventlet 0.25.1, GreenSocket.sendfile doesn't exist,
# meaning the native implementations of socket.sendfile will be used.
# If os.sendfile exists, it will attempt to use that, failing explicitly
# if the socket is in non-blocking mode, which the underlying
# socket object /is/. Even the regular _sendfile_use_send will
# fail in that way; plus, it would use the underlying socket.send which isn't
# properly cooperative. So we have to monkey-patch a working socket.sendfile()
# into GreenSocket; in this method, `self.send` will be the GreenSocket's
# send method which is properly cooperative.
if not hasattr(GreenSocket, 'sendfile'):
GreenSocket.sendfile = _eventlet_socket_sendfile
class EventletWorker(AsyncWorker): class EventletWorker(AsyncWorker):

View File

@ -3,7 +3,6 @@
# This file is part of gunicorn released under the MIT license. # This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information. # See the NOTICE for more information.
import errno
import os import os
import sys import sys
from datetime import datetime from datetime import datetime
@ -30,21 +29,6 @@ from gunicorn.workers.base_async import AsyncWorker
VERSION = "gevent/%s gunicorn/%s" % (gevent.__version__, gunicorn.__version__) VERSION = "gevent/%s gunicorn/%s" % (gevent.__version__, gunicorn.__version__)
def _gevent_sendfile(fdout, fdin, offset, nbytes, _os_sendfile=os.sendfile):
while True:
try:
return _os_sendfile(fdout, fdin, offset, nbytes)
except OSError as e:
if e.args[0] == errno.EAGAIN:
socket.wait_write(fdout)
else:
raise
def patch_sendfile():
setattr(os, "sendfile", _gevent_sendfile)
class GeventWorker(AsyncWorker): class GeventWorker(AsyncWorker):
server_class = None server_class = None
@ -53,9 +37,6 @@ class GeventWorker(AsyncWorker):
def patch(self): def patch(self):
monkey.patch_all() monkey.patch_all()
# monkey patch sendfile to make it none blocking
patch_sendfile()
# patch sockets # patch sockets
sockets = [] sockets = []
for s in self.sockets: for s in self.sockets:

View File

@ -1,4 +1,6 @@
aiohttp aiohttp
gevent
eventlet
coverage coverage
pytest pytest
pytest-cov pytest-cov

View File

@ -76,8 +76,8 @@ install_requires = [
] ]
extras_require = { extras_require = {
'gevent': ['gevent>=0.13'], 'gevent': ['gevent>=1.4.0'],
'eventlet': ['eventlet>=0.9.7'], 'eventlet': ['eventlet>=0.24.1'],
'tornado': ['tornado>=0.2'], 'tornado': ['tornado>=0.2'],
'gthread': [], 'gthread': [],
'setproctitle': ['setproctitle'], 'setproctitle': ['setproctitle'],

View File

View File

@ -0,0 +1,7 @@
# -*- coding: utf-8 -
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
def test_import():
__import__('gunicorn.workers.geventlet')

View File

@ -0,0 +1,7 @@
# -*- coding: utf-8 -
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
def test_import():
__import__('gunicorn.workers.ggevent')

View File

@ -4,7 +4,7 @@ skipsdist = True
[testenv] [testenv]
usedevelop = True usedevelop = True
commands = py.test {posargs} commands = py.test --cov=gunicorn {posargs}
deps = deps =
-rrequirements_test.txt -rrequirements_test.txt