mirror of
https://github.com/frappe/gunicorn.git
synced 2026-01-14 11:09:11 +08:00
load application from factory function (#2178)
* load application from factory function Use `ast.parse` to validate that the string passed to the CLI is either an attribute name or a function call. Use `ast.literal_eval` to parse any positional and keyword arguments to the function. Call the function to get the real application. Co-authored-by: Connor Brinton <connor.brinton@gmail.com> * test coverage for util.import_app * document app factory pattern
This commit is contained in:
parent
94ab209117
commit
19cb68f4c3
@ -44,8 +44,29 @@ Example with the test app:
|
|||||||
|
|
||||||
You can now run the app with the following command::
|
You can now run the app with the following command::
|
||||||
|
|
||||||
|
.. code-block:: text
|
||||||
|
|
||||||
$ gunicorn --workers=2 test:app
|
$ gunicorn --workers=2 test:app
|
||||||
|
|
||||||
|
The variable name can also be a function call. In that case the name
|
||||||
|
will be imported from the module, then called to get the application
|
||||||
|
object. This is commonly referred to as the "application factory"
|
||||||
|
pattern.
|
||||||
|
|
||||||
|
.. code-block:: python
|
||||||
|
|
||||||
|
def create_app():
|
||||||
|
app = FrameworkApp()
|
||||||
|
...
|
||||||
|
return app
|
||||||
|
|
||||||
|
.. code-block:: text
|
||||||
|
|
||||||
|
$ gunicorn --workers=2 'test:create_app()'
|
||||||
|
|
||||||
|
Positional and keyword arguments can also be passed, but it is
|
||||||
|
recommended to load configuration from environment variables rather than
|
||||||
|
the command line.
|
||||||
|
|
||||||
Commonly Used Arguments
|
Commonly Used Arguments
|
||||||
^^^^^^^^^^^^^^^^^^^^^^^
|
^^^^^^^^^^^^^^^^^^^^^^^
|
||||||
|
|||||||
@ -2,7 +2,7 @@
|
|||||||
#
|
#
|
||||||
# This file is part of gunicorn released under the MIT license.
|
# This file is part of gunicorn released under the MIT license.
|
||||||
# See the NOTICE for more information.
|
# See the NOTICE for more information.
|
||||||
|
import ast
|
||||||
import email.utils
|
import email.utils
|
||||||
import errno
|
import errno
|
||||||
import fcntl
|
import fcntl
|
||||||
@ -320,6 +320,32 @@ def write_error(sock, status_int, reason, mesg):
|
|||||||
write_nonblock(sock, http.encode('latin1'))
|
write_nonblock(sock, http.encode('latin1'))
|
||||||
|
|
||||||
|
|
||||||
|
def _called_with_wrong_args(f):
|
||||||
|
"""Check whether calling a function raised a ``TypeError`` because
|
||||||
|
the call failed or because something in the function raised the
|
||||||
|
error.
|
||||||
|
|
||||||
|
:param f: The function that was called.
|
||||||
|
:return: ``True`` if the call failed.
|
||||||
|
"""
|
||||||
|
tb = sys.exc_info()[2]
|
||||||
|
|
||||||
|
try:
|
||||||
|
while tb is not None:
|
||||||
|
if tb.tb_frame.f_code is f.__code__:
|
||||||
|
# In the function, it was called successfully.
|
||||||
|
return False
|
||||||
|
|
||||||
|
tb = tb.tb_next
|
||||||
|
|
||||||
|
# Didn't reach the function.
|
||||||
|
return True
|
||||||
|
finally:
|
||||||
|
# Delete tb to break a circular reference in Python 2.
|
||||||
|
# https://docs.python.org/2/library/sys.html#sys.exc_info
|
||||||
|
del tb
|
||||||
|
|
||||||
|
|
||||||
def import_app(module):
|
def import_app(module):
|
||||||
parts = module.split(":", 1)
|
parts = module.split(":", 1)
|
||||||
if len(parts) == 1:
|
if len(parts) == 1:
|
||||||
@ -335,13 +361,65 @@ def import_app(module):
|
|||||||
raise ImportError(msg % (module.rsplit(".", 1)[0], obj))
|
raise ImportError(msg % (module.rsplit(".", 1)[0], obj))
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
# Parse obj as a single expression to determine if it's a valid
|
||||||
|
# attribute name or function call.
|
||||||
|
try:
|
||||||
|
expression = ast.parse(obj, mode="eval").body
|
||||||
|
except SyntaxError:
|
||||||
|
raise AppImportError(
|
||||||
|
"Failed to parse %r as an attribute name or function call." % obj
|
||||||
|
)
|
||||||
|
|
||||||
|
if isinstance(expression, ast.Name):
|
||||||
|
name = expression.id
|
||||||
|
args = kwargs = None
|
||||||
|
elif isinstance(expression, ast.Call):
|
||||||
|
# Ensure the function name is an attribute name only.
|
||||||
|
if not isinstance(expression.func, ast.Name):
|
||||||
|
raise AppImportError("Function reference must be a simple name: %r" % obj)
|
||||||
|
|
||||||
|
name = expression.func.id
|
||||||
|
|
||||||
|
# Parse the positional and keyword arguments as literals.
|
||||||
|
try:
|
||||||
|
args = [ast.literal_eval(arg) for arg in expression.args]
|
||||||
|
kwargs = {kw.arg: ast.literal_eval(kw.value) for kw in expression.keywords}
|
||||||
|
except ValueError:
|
||||||
|
# literal_eval gives cryptic error messages, show a generic
|
||||||
|
# message with the full expression instead.
|
||||||
|
raise AppImportError(
|
||||||
|
"Failed to parse arguments as literal values: %r" % obj
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
raise AppImportError(
|
||||||
|
"Failed to parse %r as an attribute name or function call." % obj
|
||||||
|
)
|
||||||
|
|
||||||
is_debug = logging.root.level == logging.DEBUG
|
is_debug = logging.root.level == logging.DEBUG
|
||||||
try:
|
try:
|
||||||
app = getattr(mod, obj)
|
app = getattr(mod, name)
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
if is_debug:
|
if is_debug:
|
||||||
traceback.print_exception(*sys.exc_info())
|
traceback.print_exception(*sys.exc_info())
|
||||||
raise AppImportError("Failed to find application object %r in %r" % (obj, module))
|
raise AppImportError("Failed to find attribute %r in %r." % (name, module))
|
||||||
|
|
||||||
|
# If the expression was a function call, call the retrieved object
|
||||||
|
# to get the real application.
|
||||||
|
if args is not None:
|
||||||
|
try:
|
||||||
|
app = app(*args, **kwargs)
|
||||||
|
except TypeError as e:
|
||||||
|
# If the TypeError was due to bad arguments to the factory
|
||||||
|
# function, show Python's nice error message without a
|
||||||
|
# traceback.
|
||||||
|
if _called_with_wrong_args(app):
|
||||||
|
raise AppImportError(
|
||||||
|
"".join(traceback.format_exception_only(TypeError, e)).strip()
|
||||||
|
)
|
||||||
|
|
||||||
|
# Otherwise it was raised from within the function, show the
|
||||||
|
# full traceback.
|
||||||
|
raise
|
||||||
|
|
||||||
if app is None:
|
if app is None:
|
||||||
raise AppImportError("Failed to find application object: %r" % obj)
|
raise AppImportError("Failed to find application object: %r" % obj)
|
||||||
|
|||||||
@ -7,19 +7,32 @@ from wsgiref.validate import validator
|
|||||||
HOST = "127.0.0.1"
|
HOST = "127.0.0.1"
|
||||||
|
|
||||||
|
|
||||||
@validator
|
def create_app(name="World", count=1):
|
||||||
def app(environ, start_response):
|
message = (('Hello, %s!\n' % name) * count).encode("utf8")
|
||||||
"""Simplest possible application object"""
|
length = str(len(message))
|
||||||
|
|
||||||
data = b'Hello, World!\n'
|
@validator
|
||||||
status = '200 OK'
|
def app(environ, start_response):
|
||||||
|
"""Simplest possible application object"""
|
||||||
|
|
||||||
response_headers = [
|
status = '200 OK'
|
||||||
('Content-type', 'text/plain'),
|
|
||||||
('Content-Length', str(len(data))),
|
response_headers = [
|
||||||
]
|
('Content-type', 'text/plain'),
|
||||||
start_response(status, response_headers)
|
('Content-Length', length),
|
||||||
return iter([data])
|
]
|
||||||
|
start_response(status, response_headers)
|
||||||
|
return iter([message])
|
||||||
|
|
||||||
|
return app
|
||||||
|
|
||||||
|
|
||||||
|
app = application = create_app()
|
||||||
|
none_app = None
|
||||||
|
|
||||||
|
|
||||||
|
def error_factory():
|
||||||
|
raise TypeError("inner")
|
||||||
|
|
||||||
|
|
||||||
def requires_mac_ver(*min_version):
|
def requires_mac_ver(*min_version):
|
||||||
|
|||||||
@ -2,6 +2,7 @@
|
|||||||
#
|
#
|
||||||
# This file is part of gunicorn released under the MIT license.
|
# This file is part of gunicorn released under the MIT license.
|
||||||
# See the NOTICE for more information.
|
# See the NOTICE for more information.
|
||||||
|
import os
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
@ -60,17 +61,49 @@ def test_warn(capsys):
|
|||||||
assert '!!! WARNING: test warn' in err
|
assert '!!! WARNING: test warn' in err
|
||||||
|
|
||||||
|
|
||||||
def test_import_app():
|
@pytest.mark.parametrize(
|
||||||
assert util.import_app('support:app')
|
"value",
|
||||||
|
[
|
||||||
|
"support",
|
||||||
|
"support:app",
|
||||||
|
"support:create_app()",
|
||||||
|
"support:create_app('Gunicorn', 3)",
|
||||||
|
"support:create_app(count=3)",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
def test_import_app_good(value):
|
||||||
|
assert util.import_app(value)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
("value", "exc_type", "msg"),
|
||||||
|
[
|
||||||
|
("a:app", ImportError, "No module"),
|
||||||
|
("support:create_app(", AppImportError, "Failed to parse"),
|
||||||
|
("support:create.app()", AppImportError, "Function reference"),
|
||||||
|
("support:create_app(Gunicorn)", AppImportError, "literal values"),
|
||||||
|
("support:create.app", AppImportError, "attribute name"),
|
||||||
|
("support:wrong_app", AppImportError, "find attribute"),
|
||||||
|
("support:error_factory(1)", AppImportError, "error_factory() takes"),
|
||||||
|
("support:error_factory()", TypeError, "inner"),
|
||||||
|
("support:none_app", AppImportError, "find application object"),
|
||||||
|
("support:HOST", AppImportError, "callable"),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
def test_import_app_bad(value, exc_type, msg):
|
||||||
|
with pytest.raises(exc_type) as exc_info:
|
||||||
|
util.import_app(value)
|
||||||
|
|
||||||
|
assert msg in str(exc_info.value)
|
||||||
|
|
||||||
|
|
||||||
|
def test_import_app_py_ext(monkeypatch):
|
||||||
|
monkeypatch.chdir(os.path.dirname(__file__))
|
||||||
|
|
||||||
with pytest.raises(ImportError) as exc_info:
|
with pytest.raises(ImportError) as exc_info:
|
||||||
util.import_app('a:app')
|
util.import_app("support.py")
|
||||||
assert 'No module' in str(exc_info.value)
|
|
||||||
|
|
||||||
with pytest.raises(AppImportError) as exc_info:
|
assert "did you mean" in str(exc_info.value)
|
||||||
util.import_app('support:wrong_app')
|
|
||||||
msg = "Failed to find application object 'wrong_app' in 'support'"
|
|
||||||
assert msg in str(exc_info.value)
|
|
||||||
|
|
||||||
|
|
||||||
def test_to_bytestring():
|
def test_to_bytestring():
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user