# # This file is part of gunicorn released under the MIT license. # See the NOTICE for more information. import os import re import sys import tempfile import pytest from gunicorn import config from gunicorn.config import _get_default_control_socket from gunicorn.app.base import Application from gunicorn.app.wsgiapp import WSGIApplication from gunicorn.errors import ConfigError from gunicorn.util import load_class from gunicorn.workers.sync import SyncWorker from gunicorn import glogging from gunicorn.instrument import statsd dirname = os.path.dirname(__file__) def cfg_module(): return 'config.test_cfg' def alt_cfg_module(): return 'config.test_cfg_alt' def cfg_file(): return os.path.join(dirname, "config", "test_cfg.py") def alt_cfg_file(): return os.path.join(dirname, "config", "test_cfg_alt.py") def cfg_file_with_wsgi_app(): return os.path.join(dirname, "config", "test_cfg_with_wsgi_app.py") def paster_ini(): return os.path.join(dirname, "..", "examples", "frameworks", "pylonstest", "nose.ini") class AltArgs: def __init__(self, args=None): self.args = args or [] self.orig = sys.argv def __enter__(self): sys.argv = self.args def __exit__(self, exc_type, exc_inst, traceback): sys.argv = self.orig class NoConfigApp(Application): def __init__(self): super().__init__("no_usage", prog="gunicorn_test") def init(self, parser, opts, args): pass def load(self): pass class CustomWorker(SyncWorker): pass class WSGIApp(WSGIApplication): def __init__(self): super().__init__("no_usage", prog="gunicorn_test") def load(self): pass def test_worker_class(): c = config.Config() c.set("worker_class", CustomWorker) assert c.worker_class == CustomWorker try: assert isinstance(load_class(c.worker_class), object) except AttributeError: pytest.fail("'load_class doesn't support type class argument'") def test_defaults(): c = config.Config() for s in config.KNOWN_SETTINGS: assert c.settings[s.name].validator(s.default) == c.settings[s.name].get() def test_property_access(): c = config.Config() for s in config.KNOWN_SETTINGS: getattr(c, s.name) # Class was loaded assert c.worker_class == SyncWorker # logger class was loaded assert c.logger_class == glogging.Logger # Workers defaults to 1 assert c.workers == 1 c.set("workers", 3) assert c.workers == 3 # Address is parsed assert c.address == [("127.0.0.1", 8000)] # User and group defaults assert os.geteuid() == c.uid assert os.getegid() == c.gid # Proc name assert "gunicorn" == c.proc_name # Not a config property pytest.raises(AttributeError, getattr, c, "foo") # Force to be not an error class Baz: def get(self): return 3.14 c.settings["foo"] = Baz() assert c.foo == 3.14 # Attempt to set a cfg not via c.set pytest.raises(AttributeError, setattr, c, "proc_name", "baz") # No setting for name pytest.raises(AttributeError, c.set, "baz", "bar") def test_bool_validation(): c = config.Config() assert c.preload_app is False c.set("preload_app", True) assert c.preload_app is True c.set("preload_app", "true") assert c.preload_app is True c.set("preload_app", "false") assert c.preload_app is False pytest.raises(ValueError, c.set, "preload_app", "zilch") pytest.raises(TypeError, c.set, "preload_app", 4) def test_pos_int_validation(): c = config.Config() assert c.workers == 1 c.set("workers", 4) assert c.workers == 4 c.set("workers", "5") assert c.workers == 5 c.set("workers", "0xFF") assert c.workers == 255 c.set("workers", True) assert c.workers == 1 # Yes. That's right... pytest.raises(ValueError, c.set, "workers", -21) pytest.raises(TypeError, c.set, "workers", c) def test_str_validation(): c = config.Config() assert c.proc_name == "gunicorn" c.set("proc_name", " foo ") assert c.proc_name == "foo" pytest.raises(TypeError, c.set, "proc_name", 2) def test_str_to_addr_list_validation(): c = config.Config() # Values remain as strings for backward compatibility assert c.proxy_allow_ips == ["127.0.0.1", "::1"] assert c.forwarded_allow_ips == ["127.0.0.1", "::1"] # Single IPs are validated but kept as strings c.set("forwarded_allow_ips", "127.0.0.1,192.0.2.1") assert c.forwarded_allow_ips == ["127.0.0.1", "192.0.2.1"] # CIDR networks are supported and kept as strings c.set("forwarded_allow_ips", "127.0.0.0/8,192.168.0.0/16") assert c.forwarded_allow_ips == ["127.0.0.0/8", "192.168.0.0/16"] # Wildcard is preserved as string c.set("forwarded_allow_ips", "*") assert c.forwarded_allow_ips == ["*"] c.set("forwarded_allow_ips", "") assert c.forwarded_allow_ips == [] c.set("forwarded_allow_ips", None) assert c.forwarded_allow_ips == [] # demand addresses are specified unambiguously pytest.raises(TypeError, c.set, "forwarded_allow_ips", 1) # demand networks are specified unambiguously pytest.raises(ValueError, c.set, "forwarded_allow_ips", "127.0.0") # detect typos pytest.raises(ValueError, c.set, "forwarded_allow_ips", "::f:") # dangerous typos such as accidentally permitting half the internet # clearly recognizable - masked bits are not zero pytest.raises(ValueError, c.set, "forwarded_allow_ips", "100.64.0.0/1") def test_str_to_list(): c = config.Config() assert c.forwarder_headers == ["SCRIPT_NAME", "PATH_INFO"] c.set("forwarder_headers", "SCRIPT_NAME,REMOTE_USER") assert c.forwarder_headers == ["SCRIPT_NAME", "REMOTE_USER"] c.set("forwarder_headers", "") assert c.forwarder_headers == [] c.set("forwarder_headers", None) assert c.forwarder_headers == [] def test_callable_validation(): c = config.Config() def func(a, b): pass c.set("pre_fork", func) assert c.pre_fork == func pytest.raises(TypeError, c.set, "pre_fork", 1) pytest.raises(TypeError, c.set, "pre_fork", lambda x: True) def test_reload_engine_validation(): c = config.Config() assert c.reload_engine == "auto" c.set('reload_engine', 'poll') assert c.reload_engine == 'poll' pytest.raises(ConfigError, c.set, "reload_engine", "invalid") def test_callable_validation_for_string(): from os.path import isdir as testfunc assert config.validate_callable(-1)("os.path.isdir") == testfunc # invalid values tests pytest.raises( TypeError, config.validate_callable(-1), "" ) pytest.raises( TypeError, config.validate_callable(-1), "os.path.not_found_func" ) pytest.raises( TypeError, config.validate_callable(-1), "notfoundmodule.func" ) def test_cmd_line(): with AltArgs(["prog_name", "-b", "blargh"]): app = NoConfigApp() assert app.cfg.bind == ["blargh"] with AltArgs(["prog_name", "-w", "3"]): app = NoConfigApp() assert app.cfg.workers == 3 with AltArgs(["prog_name", "--preload"]): app = NoConfigApp() assert app.cfg.preload_app def test_cmd_line_invalid_setting(capsys): with AltArgs(["prog_name", "-q", "bar"]): with pytest.raises(SystemExit): NoConfigApp() _, err = capsys.readouterr() assert "error: unrecognized arguments: -q" in err def test_app_config(): with AltArgs(): app = NoConfigApp() for s in config.KNOWN_SETTINGS: assert app.cfg.settings[s.name].validator(s.default) == app.cfg.settings[s.name].get() def test_load_config(): with AltArgs(["prog_name", "-c", cfg_file()]): app = NoConfigApp() assert app.cfg.bind == ["unix:/tmp/bar/baz"] assert app.cfg.workers == 3 assert app.cfg.proc_name == "fooey" def test_load_config_explicit_file(): with AltArgs(["prog_name", "-c", "file:%s" % cfg_file()]): app = NoConfigApp() assert app.cfg.bind == ["unix:/tmp/bar/baz"] assert app.cfg.workers == 3 assert app.cfg.proc_name == "fooey" def test_load_config_module(): with AltArgs(["prog_name", "-c", "python:%s" % cfg_module()]): app = NoConfigApp() assert app.cfg.bind == ["unix:/tmp/bar/baz"] assert app.cfg.workers == 3 assert app.cfg.proc_name == "fooey" def test_cli_overrides_config(): with AltArgs(["prog_name", "-c", cfg_file(), "-b", "blarney"]): app = NoConfigApp() assert app.cfg.bind == ["blarney"] assert app.cfg.proc_name == "fooey" def test_cli_overrides_config_module(): with AltArgs(["prog_name", "-c", "python:%s" % cfg_module(), "-b", "blarney"]): app = NoConfigApp() assert app.cfg.bind == ["blarney"] assert app.cfg.proc_name == "fooey" @pytest.fixture def create_config_file(request): default_config = os.path.join(os.path.abspath(os.getcwd()), 'gunicorn.conf.py') with open(default_config, 'w+') as default: default.write("bind='0.0.0.0:9090'") def fin(): os.unlink(default_config) request.addfinalizer(fin) return default def test_default_config_file(create_config_file): assert config.get_default_config_file() == create_config_file.name with AltArgs(["prog_name"]): app = NoConfigApp() assert app.cfg.bind == ["0.0.0.0:9090"] def test_post_request(): c = config.Config() def post_request_4(worker, req, environ, resp): return 4 def post_request_3(worker, req, environ): return 3 def post_request_2(worker, req): return 2 c.set("post_request", post_request_4) assert c.post_request(1, 2, 3, 4) == 4 c.set("post_request", post_request_3) assert c.post_request(1, 2, 3, 4) == 3 c.set("post_request", post_request_2) assert c.post_request(1, 2, 3, 4) == 2 def test_nworkers_changed(): c = config.Config() def nworkers_changed_3(server, new_value, old_value): return 3 c.set("nworkers_changed", nworkers_changed_3) assert c.nworkers_changed(1, 2, 3) == 3 def test_statsd_host(): c = config.Config() assert c.statsd_host is None c.set("statsd_host", "localhost") assert c.statsd_host == ("localhost", 8125) c.set("statsd_host", "statsd:7777") assert c.statsd_host == ("statsd", 7777) c.set("statsd_host", "unix:///path/to.sock") assert c.statsd_host == "/path/to.sock" pytest.raises(TypeError, c.set, "statsd_host", 666) pytest.raises(TypeError, c.set, "statsd_host", "host:string") def test_statsd_host_with_unix_as_hostname(): # This is a regression test for major release 20. After this release # we should consider modifying the behavior of util.parse_address to # simplify gunicorn's code c = config.Config() c.set("statsd_host", "unix:7777") assert c.statsd_host == ("unix", 7777) c.set("statsd_host", "unix://some.socket") assert c.statsd_host == "some.socket" def test_statsd_changes_logger(): c = config.Config() assert c.logger_class == glogging.Logger c.set('statsd_host', 'localhost:12345') assert c.logger_class == statsd.Statsd class MyLogger(glogging.Logger): # dummy custom logger class for testing pass def test_always_use_configured_logger(): c = config.Config() c.set('logger_class', __name__ + '.MyLogger') assert c.logger_class == MyLogger c.set('statsd_host', 'localhost:12345') # still uses custom logger over statsd assert c.logger_class == MyLogger def test_load_enviroment_variables_config(monkeypatch): monkeypatch.setenv("GUNICORN_CMD_ARGS", "--workers=4") with AltArgs(): app = NoConfigApp() assert app.cfg.workers == 4 def test_config_file_environment_variable(monkeypatch): monkeypatch.setenv("GUNICORN_CMD_ARGS", "--config=" + alt_cfg_file()) with AltArgs(): app = NoConfigApp() assert app.cfg.proc_name == "not-fooey" assert app.cfg.config == alt_cfg_file() with AltArgs(["prog_name", "--config", cfg_file()]): app = NoConfigApp() assert app.cfg.proc_name == "fooey" assert app.cfg.config == cfg_file() def test_invalid_enviroment_variables_config(monkeypatch, capsys): monkeypatch.setenv("GUNICORN_CMD_ARGS", "--foo=bar") with AltArgs(): with pytest.raises(SystemExit): NoConfigApp() _, err = capsys.readouterr() assert "error: unrecognized arguments: --foo" in err def test_cli_overrides_enviroment_variables_module(monkeypatch): monkeypatch.setenv("GUNICORN_CMD_ARGS", "--workers=4") with AltArgs(["prog_name", "-c", cfg_file(), "--workers", "3"]): app = NoConfigApp() assert app.cfg.workers == 3 @pytest.mark.parametrize("options, expected", [ (["app:app"], 'app:app'), (["-c", cfg_file(), "app:app"], 'app:app'), (["-c", cfg_file_with_wsgi_app(), "app:app"], 'app:app'), (["-c", cfg_file_with_wsgi_app()], 'app1:app1'), ]) def test_wsgi_app_config(options, expected): cmdline = ["prog_name"] cmdline.extend(options) with AltArgs(cmdline): app = WSGIApp() assert app.app_uri == expected @pytest.mark.parametrize("options", [ ([]), (["-c", cfg_file()]), ]) def test_non_wsgi_app(options, capsys): cmdline = ["prog_name"] cmdline.extend(options) with AltArgs(cmdline): with pytest.raises(SystemExit): WSGIApp() _, err = capsys.readouterr() assert "Error: No application module specified." in err @pytest.mark.parametrize("options, expected", [ (["myapp:app"], False), (["--reload", "myapp:app"], True), (["--reload", "--", "myapp:app"], True), (["--reload", "-w 2", "myapp:app"], True), ]) def test_reload(options, expected): cmdline = ["prog_name"] cmdline.extend(options) with AltArgs(cmdline): app = NoConfigApp() assert app.cfg.reload == expected @pytest.mark.parametrize("options, expected", [ (["--umask", "0", "myapp:app"], 0), (["--umask", "0o0", "myapp:app"], 0), (["--umask", "0x0", "myapp:app"], 0), (["--umask", "0xFF", "myapp:app"], 255), (["--umask", "0022", "myapp:app"], 18), ]) def test_umask_config(options, expected): cmdline = ["prog_name"] cmdline.extend(options) with AltArgs(cmdline): app = NoConfigApp() assert app.cfg.umask == expected def _test_ssl_version(options, expected): cmdline = ["prog_name"] cmdline.extend(options) with AltArgs(cmdline): app = NoConfigApp() assert app.cfg.ssl_version == expected def test_bind_fd(): with AltArgs(["prog_name", "-b", "fd://42"]): app = NoConfigApp() assert app.cfg.bind == ["fd://42"] def test_repr(): c = config.Config() c.set("workers", 5) assert "with value 5" in repr(c.settings['workers']) def test_str(): c = config.Config() o = str(c) # match the first few lines, some different types, but don't go OTT # to avoid needless test fails with changes OUTPUT_MATCH = { 'access_log_format': '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s"', 'accesslog': 'None', 'backlog': '2048', 'bind': "['127.0.0.1:8000']", 'capture_output': 'False', 'child_exit': '', } for i, line in enumerate(o.splitlines()): m = re.match(r'^(\w+)\s+= ', line) assert m, "Line {} didn't match expected format: {!r}".format(i, line) key = m.group(1) try: s = OUTPUT_MATCH.pop(key) except KeyError: continue line_re = r'^{}\s+= {}$'.format(key, re.escape(s)) assert re.match(line_re, line), '{!r} != {!r}'.format(line_re, line) if not OUTPUT_MATCH: break else: assert False, 'missing expected setting lines? {}'.format( OUTPUT_MATCH.keys() ) # Tests for _get_default_control_socket class TestGetDefaultControlSocket: """Tests for the _get_default_control_socket function.""" def test_uses_xdg_runtime_dir_when_set_and_exists(self, monkeypatch): """When XDG_RUNTIME_DIR is set and exists, use it.""" with tempfile.TemporaryDirectory() as tmpdir: monkeypatch.setenv('XDG_RUNTIME_DIR', tmpdir) result = _get_default_control_socket() assert result == os.path.join(tmpdir, 'gunicorn.ctl') def test_falls_back_when_xdg_runtime_dir_not_exists(self, monkeypatch): """When XDG_RUNTIME_DIR is set but doesn't exist, fall back to home.""" monkeypatch.setenv('XDG_RUNTIME_DIR', '/nonexistent/path/that/does/not/exist') monkeypatch.setenv('HOME', '/home/testuser') result = _get_default_control_socket() assert result == '/home/testuser/.gunicorn/gunicorn.ctl' def test_falls_back_when_xdg_runtime_dir_not_set(self, monkeypatch): """When XDG_RUNTIME_DIR is not set, use home directory.""" monkeypatch.delenv('XDG_RUNTIME_DIR', raising=False) monkeypatch.setenv('HOME', '/home/testuser') result = _get_default_control_socket() assert result == '/home/testuser/.gunicorn/gunicorn.ctl' def test_uses_home_directory_structure(self, monkeypatch): """Verify the path structure uses .gunicorn subdirectory.""" monkeypatch.delenv('XDG_RUNTIME_DIR', raising=False) with tempfile.TemporaryDirectory() as tmpdir: monkeypatch.setenv('HOME', tmpdir) result = _get_default_control_socket() assert result == os.path.join(tmpdir, '.gunicorn', 'gunicorn.ctl') assert result.endswith('.gunicorn/gunicorn.ctl')