Convert tests to use py.test assertions.

Closes #950
This commit is contained in:
Berker Peksag 2015-07-03 07:38:36 +03:00
parent d478968d59
commit 337900037f
8 changed files with 172 additions and 201 deletions

View File

@ -77,40 +77,3 @@ class http_request(object):
func(req)
run.func_name = func.func_name
return run
def eq(a, b):
assert a == b, "%r != %r" % (a, b)
def ne(a, b):
assert a != b, "%r == %r" % (a, b)
def lt(a, b):
assert a < b, "%r >= %r" % (a, b)
def gt(a, b):
assert a > b, "%r <= %r" % (a, b)
def isin(a, b):
assert a in b, "%r is not in %r" % (a, b)
def isnotin(a, b):
assert a not in b, "%r is in %r" % (a, b)
def has(a, b):
assert hasattr(a, b), "%r has no attribute %r" % (a, b)
def hasnot(a, b):
assert not hasattr(a, b), "%r has an attribute %r" % (a, b)
def istype(a, b):
assert isinstance(a, b), "%r is not an instance of %r" % (a, b)
def raises(exctype, func, *args, **kwargs):
try:
func(*args, **kwargs)
except exctype:
pass
else:
func_name = getattr(func, "func_name", "<builtin_function>")
raise AssertionError("Function %s did not raise %s" % (
func_name, exctype.__name__))

View File

@ -8,6 +8,8 @@ import t
import os
import sys
import pytest
from gunicorn import config
from gunicorn.app.base import Application
from gunicorn.workers.sync import SyncWorker
@ -45,8 +47,7 @@ class NoConfigApp(Application):
def test_defaults():
c = config.Config()
for s in config.KNOWN_SETTINGS:
t.eq(c.settings[s.name].validator(s.default),
c.settings[s.name].get())
assert c.settings[s.name].validator(s.default) == c.settings[s.name].get()
def test_property_access():
c = config.Config()
@ -54,108 +55,105 @@ def test_property_access():
getattr(c, s.name)
# Class was loaded
t.eq(c.worker_class, SyncWorker)
assert c.worker_class == SyncWorker
# Workers defaults to 1
t.eq(c.workers, 1)
assert c.workers == 1
c.set("workers", 3)
t.eq(c.workers, 3)
assert c.workers == 3
# Address is parsed
t.eq(c.address, [("127.0.0.1", 8000)])
assert c.address == [("127.0.0.1", 8000)]
# User and group defaults
t.eq(os.geteuid(), c.uid)
t.eq(os.getegid(), c.gid)
assert os.geteuid() == c.uid
assert os.getegid() == c.gid
# Proc name
t.eq("gunicorn", c.proc_name)
assert "gunicorn" == c.proc_name
# Not a config property
t.raises(AttributeError, getattr, c, "foo")
pytest.raises(AttributeError, getattr, c, "foo")
# Force to be not an error
class Baz(object):
def get(self):
return 3.14
c.settings["foo"] = Baz()
t.eq(c.foo, 3.14)
assert c.foo == 3.14
# Attempt to set a cfg not via c.set
t.raises(AttributeError, setattr, c, "proc_name", "baz")
pytest.raises(AttributeError, setattr, c, "proc_name", "baz")
# No setting for name
t.raises(AttributeError, c.set, "baz", "bar")
pytest.raises(AttributeError, c.set, "baz", "bar")
def test_bool_validation():
c = config.Config()
t.eq(c.preload_app, False)
assert c.preload_app is False
c.set("preload_app", True)
t.eq(c.preload_app, True)
assert c.preload_app is True
c.set("preload_app", "true")
t.eq(c.preload_app, True)
assert c.preload_app is True
c.set("preload_app", "false")
t.eq(c.preload_app, False)
t.raises(ValueError, c.set, "preload_app", "zilch")
t.raises(TypeError, c.set, "preload_app", 4)
assert c.preload_app is False
pytest.raises(ValueError, c.set, "preload_app", "zilch")
pytest.raises(TypeError, c.set, "preload_app", 4)
def test_pos_int_validation():
c = config.Config()
t.eq(c.workers, 1)
assert c.workers == 1
c.set("workers", 4)
t.eq(c.workers, 4)
assert c.workers == 4
c.set("workers", "5")
t.eq(c.workers, 5)
assert c.workers == 5
c.set("workers", "0xFF")
t.eq(c.workers, 255)
assert c.workers == 255
c.set("workers", True)
t.eq(c.workers, 1) # Yes. That's right...
t.raises(ValueError, c.set, "workers", -21)
t.raises(TypeError, c.set, "workers", c)
assert c.workers == 1 # Yes. That's right...
pytest.raises(ValueError, c.set, "workers", -21)
pytest.raises(TypeError, c.set, "workers", c)
def test_str_validation():
c = config.Config()
t.eq(c.proc_name, "gunicorn")
assert c.proc_name == "gunicorn"
c.set("proc_name", " foo ")
t.eq(c.proc_name, "foo")
t.raises(TypeError, c.set, "proc_name", 2)
assert c.proc_name == "foo"
pytest.raises(TypeError, c.set, "proc_name", 2)
def test_str_to_list_validation():
c = config.Config()
t.eq(c.forwarded_allow_ips, ["127.0.0.1"])
assert c.forwarded_allow_ips == ["127.0.0.1"]
c.set("forwarded_allow_ips", "127.0.0.1,192.168.0.1")
t.eq(c.forwarded_allow_ips, ["127.0.0.1", "192.168.0.1"])
assert c.forwarded_allow_ips == ["127.0.0.1", "192.168.0.1"]
c.set("forwarded_allow_ips", "")
t.eq(c.forwarded_allow_ips, [])
assert c.forwarded_allow_ips == []
c.set("forwarded_allow_ips", None)
t.eq(c.forwarded_allow_ips, [])
t.raises(TypeError, c.set, "forwarded_allow_ips", 1)
assert c.forwarded_allow_ips == []
pytest.raises(TypeError, c.set, "forwarded_allow_ips", 1)
def test_callable_validation():
c = config.Config()
def func(a, b):
pass
c.set("pre_fork", func)
t.eq(c.pre_fork, func)
t.raises(TypeError, c.set, "pre_fork", 1)
t.raises(TypeError, c.set, "pre_fork", lambda x: True)
assert c.pre_fork == func
pytest.raises(TypeError, c.set, "pre_fork", 1)
pytest.raises(TypeError, c.set, "pre_fork", lambda x: True)
def test_callable_validation_for_string():
from os.path import isdir as testfunc
t.eq(
config.validate_callable(-1)("os.path.isdir"),
testfunc
)
assert config.validate_callable(-1)("os.path.isdir") == testfunc
# invalid values tests
t.raises(
pytest.raises(
TypeError,
config.validate_callable(-1), ""
)
t.raises(
pytest.raises(
TypeError,
config.validate_callable(-1), "os.path.not_found_func"
)
t.raises(
pytest.raises(
TypeError,
config.validate_callable(-1), "notfoundmodule.func"
)
@ -164,46 +162,45 @@ def test_callable_validation_for_string():
def test_cmd_line():
with AltArgs(["prog_name", "-b", "blargh"]):
app = NoConfigApp()
t.eq(app.cfg.bind, ["blargh"])
assert app.cfg.bind == ["blargh"]
with AltArgs(["prog_name", "-w", "3"]):
app = NoConfigApp()
t.eq(app.cfg.workers, 3)
assert app.cfg.workers == 3
with AltArgs(["prog_name", "--preload"]):
app = NoConfigApp()
t.eq(app.cfg.preload_app, True)
assert app.cfg.preload_app
def test_app_config():
with AltArgs():
app = NoConfigApp()
for s in config.KNOWN_SETTINGS:
t.eq(app.cfg.settings[s.name].validator(s.default),
app.cfg.settings[s.name].get())
assert app.cfg.settings[s.name].validator(s.default) == app.cfg.settings[s.name].get()
def test_load_config():
with AltArgs(["prog_name", "-c", cfg_file()]):
app = NoConfigApp()
t.eq(app.cfg.bind, ["unix:/tmp/bar/baz"])
t.eq(app.cfg.workers, 3)
t.eq(app.cfg.proc_name, "fooey")
assert app.cfg.bind == ["unix:/tmp/bar/baz"]
assert app.cfg.workers == 3
assert app.cfg.proc_name == "fooey"
def test_load_config_module():
with AltArgs(["prog_name", "-c", cfg_module()]):
app = NoConfigApp()
t.eq(app.cfg.bind, ["unix:/tmp/bar/baz"])
t.eq(app.cfg.workers, 3)
t.eq(app.cfg.proc_name, "fooey")
assert app.cfg.bind == ["unix:/tmp/bar/baz"]
assert app.cfg.workers == 3
assert app.cfg.proc_name == "fooey"
def test_cli_overrides_config():
with AltArgs(["prog_name", "-c", cfg_file(), "-b", "blarney"]):
app = NoConfigApp()
t.eq(app.cfg.bind, ["blarney"])
t.eq(app.cfg.proc_name, "fooey")
assert app.cfg.bind == ["blarney"]
assert app.cfg.proc_name == "fooey"
def test_cli_overrides_config_module():
with AltArgs(["prog_name", "-c", cfg_module(), "-b", "blarney"]):
app = NoConfigApp()
t.eq(app.cfg.bind, ["blarney"])
t.eq(app.cfg.proc_name, "fooey")
assert app.cfg.bind == ["blarney"]
assert app.cfg.proc_name == "fooey"
def test_default_config_file():
default_config = os.path.join(os.path.abspath(os.getcwd()),
@ -232,19 +229,20 @@ def test_post_request():
return 2
c.set("post_request", post_request_4)
t.eq(4, c.post_request(1, 2, 3, 4))
assert c.post_request(1, 2, 3, 4) == 4
c.set("post_request", post_request_3)
t.eq(3, c.post_request(1, 2, 3, 4))
assert c.post_request(1, 2, 3, 4) == 3
c.set("post_request", post_request_2)
t.eq(2, c.post_request(1, 2, 3, 4))
assert c.post_request(1, 2, 3, 4) == 2
def test_nworkers_changed():
c = config.Config()
def nworkers_changed_3(server, new_value, old_value):
return 3
c.set("nworkers_changed", nworkers_changed_3)
t.eq(3, c.nworkers_changed(1, 2, 3))
assert c.nworkers_changed(1, 2, 3) == 3

View File

@ -5,7 +5,7 @@ from gunicorn.six import BytesIO
def assert_readline(payload, size, expected):
body = Body(BytesIO(payload))
t.eq(body.readline(size), expected)
assert body.readline(size) == expected
def test_readline_empty_body():
@ -20,23 +20,23 @@ def test_readline_zero_size():
def test_readline_new_line_before_size():
body = Body(BytesIO(b"abc\ndef"))
t.eq(body.readline(4), b"abc\n")
t.eq(body.readline(), b"def")
assert body.readline(4) == b"abc\n"
assert body.readline() == b"def"
def test_readline_new_line_after_size():
body = Body(BytesIO(b"abc\ndef"))
t.eq(body.readline(2), b"ab")
t.eq(body.readline(), b"c\n")
assert body.readline(2) == b"ab"
assert body.readline() == b"c\n"
def test_readline_no_new_line():
body = Body(BytesIO(b"abcdef"))
t.eq(body.readline(), b"abcdef")
assert body.readline() == b"abcdef"
body = Body(BytesIO(b"abcdef"))
t.eq(body.readline(2), b"ab")
t.eq(body.readline(2), b"cd")
t.eq(body.readline(2), b"ef")
assert body.readline(2) == b"ab"
assert body.readline(2) == b"cd"
assert body.readline(2) == b"ef"
def test_readline_buffer_loaded():
@ -45,16 +45,15 @@ def test_readline_buffer_loaded():
body.read(1) # load internal buffer
reader.write(b"g\nhi")
reader.seek(7)
print(reader.getvalue())
t.eq(body.readline(), b"bc\n")
t.eq(body.readline(), b"defg\n")
t.eq(body.readline(), b"hi")
assert body.readline() == b"bc\n"
assert body.readline() == b"defg\n"
assert body.readline() == b"hi"
def test_readline_buffer_loaded_with_size():
body = Body(BytesIO(b"abc\ndef"))
body.read(1) # load internal buffer
t.eq(body.readline(2), b"bc")
t.eq(body.readline(2), b"\n")
t.eq(body.readline(2), b"de")
t.eq(body.readline(2), b"f")
body.read(1) # load internal buffer
assert body.readline(2) == b"bc"
assert body.readline(2) == b"\n"
assert body.readline(2) == b"de"
assert body.readline(2) == b"f"

View File

@ -22,7 +22,7 @@ def test_atoms_defaults():
logger = Logger(Config())
atoms = logger.atoms(response, request, environ,
datetime.timedelta(seconds=1))
t.istype(atoms, dict)
t.eq(atoms['r'], 'GET http://my.uri HTTP/1.1')
t.eq(atoms['{accept}i'], 'application/json')
t.eq(atoms['{content-type}o'], 'application/json')
assert isinstance(atoms, dict)
assert atoms['r'] == 'GET http://my.uri HTTP/1.1'
assert atoms['{accept}i'] == 'application/json'
assert atoms['{content-type}o'] == 'application/json'

View File

@ -5,66 +5,77 @@
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
import ssl
import sys
from unittest import TestCase
import pytest
from gunicorn.config import (
KeyFile, CertFile, SSLVersion, CACerts, SuppressRaggedEOFs,
DoHandshakeOnConnect, Setting,
)
if sys.version_info >= (2, 7):
ssl = pytest.importorskip('ssl')
def test_keyfile():
assert issubclass(KeyFile, Setting)
assert KeyFile.name == 'keyfile'
assert KeyFile.section == 'Ssl'
assert KeyFile.cli == ['--keyfile']
assert KeyFile.meta == 'FILE'
assert KeyFile.default is None
def test_certfile():
assert issubclass(CertFile, Setting)
assert CertFile.name == 'certfile'
assert CertFile.section == 'Ssl'
assert CertFile.cli == ['--certfile']
assert CertFile.default is None
def test_ssl_version():
assert issubclass(SSLVersion, Setting)
assert SSLVersion.name == 'ssl_version'
assert SSLVersion.section == 'Ssl'
assert SSLVersion.cli == ['--ssl-version']
assert SSLVersion.default == ssl.PROTOCOL_TLSv1
def test_cacerts():
assert issubclass(CACerts, Setting)
assert CACerts.name == 'ca_certs'
assert CACerts.section == 'Ssl'
assert CACerts.cli == ['--ca-certs']
assert CACerts.meta == 'FILE'
assert CACerts.default is None
def test_suppress_ragged_eofs():
assert issubclass(SuppressRaggedEOFs, Setting)
assert SuppressRaggedEOFs.name == 'suppress_ragged_eofs'
assert SuppressRaggedEOFs.section == 'Ssl'
assert SuppressRaggedEOFs.cli == ['--suppress-ragged-eofs']
assert SuppressRaggedEOFs.action == 'store_true'
assert SuppressRaggedEOFs.default is True
def test_do_handshake_on_connect():
assert issubclass(DoHandshakeOnConnect, Setting)
assert DoHandshakeOnConnect.name == 'do_handshake_on_connect'
assert DoHandshakeOnConnect.section == 'Ssl'
assert DoHandshakeOnConnect.cli == ['--do-handshake-on-connect']
assert DoHandshakeOnConnect.action == 'store_true'
assert DoHandshakeOnConnect.default is False
@pytest.mark.skipif(sys.version_info < (2, 7),
reason="requires Python 2.7+")
def test_ciphers():
from gunicorn.config import Ciphers
class SSLTestCase(TestCase):
def test_settings_classes(self):
""" Tests all settings options and their defaults.
"""
self.assertTrue(issubclass(KeyFile, Setting))
self.assertEquals(KeyFile.name, 'keyfile')
self.assertEquals(KeyFile.section, 'Ssl')
self.assertEquals(KeyFile.cli, ['--keyfile'])
self.assertEquals(KeyFile.meta, 'FILE')
self.assertEquals(KeyFile.default, None)
self.assertTrue(issubclass(CertFile, Setting))
self.assertEquals(CertFile.name, 'certfile')
self.assertEquals(CertFile.section, 'Ssl')
self.assertEquals(CertFile.cli, ['--certfile'])
self.assertEquals(CertFile.default, None)
self.assertTrue(issubclass(SSLVersion, Setting))
self.assertEquals(SSLVersion.name, 'ssl_version')
self.assertEquals(SSLVersion.section, 'Ssl')
self.assertEquals(SSLVersion.cli, ['--ssl-version'])
self.assertEquals(SSLVersion.default, ssl.PROTOCOL_TLSv1)
self.assertTrue(issubclass(CACerts, Setting))
self.assertEquals(CACerts.name, 'ca_certs')
self.assertEquals(CACerts.section, 'Ssl')
self.assertEquals(CACerts.cli, ['--ca-certs'])
self.assertEquals(CACerts.meta, 'FILE')
self.assertEquals(CACerts.default, None)
self.assertTrue(issubclass(SuppressRaggedEOFs, Setting))
self.assertEquals(SuppressRaggedEOFs.name, 'suppress_ragged_eofs')
self.assertEquals(SuppressRaggedEOFs.section, 'Ssl')
self.assertEquals(SuppressRaggedEOFs.cli, ['--suppress-ragged-eofs'])
self.assertEquals(SuppressRaggedEOFs.action, 'store_true')
self.assertEquals(SuppressRaggedEOFs.default, True)
self.assertTrue(issubclass(DoHandshakeOnConnect, Setting))
self.assertEquals(DoHandshakeOnConnect.name, 'do_handshake_on_connect')
self.assertEquals(DoHandshakeOnConnect.section, 'Ssl')
self.assertEquals(DoHandshakeOnConnect.cli, ['--do-handshake-on-connect'])
self.assertEquals(DoHandshakeOnConnect.action, 'store_true')
self.assertEquals(DoHandshakeOnConnect.default, False)
if sys.version_info >= (2, 7):
self.assertTrue(issubclass(Ciphers, Setting))
self.assertEquals(Ciphers.name, 'ciphers')
self.assertEquals(Ciphers.section, 'Ssl')
self.assertEquals(Ciphers.cli, ['--ciphers'])
self.assertEquals(Ciphers.default, 'TLSv1')
assert issubclass(Ciphers, Setting)
assert Ciphers.name == 'ciphers'
assert Ciphers.section == 'Ssl'
assert Ciphers.cli == ['--ciphers']
assert Ciphers.default == 'TLSv1'

View File

@ -42,8 +42,8 @@ class PreloadedAppWithEnvSettings(BaseApplication):
def verify_env_vars():
t.eq(os.getenv('SOME_PATH'), '/tmp/something')
t.eq(os.getenv('OTHER_PATH'), '/tmp/something/else')
assert os.getenv('SOME_PATH') == '/tmp/something'
assert os.getenv('OTHER_PATH') == '/tmp/something/else'
def test_env_vars_available_during_preload():

View File

@ -78,30 +78,30 @@ def test_instrument():
# Regular message
logger.info("Blah", extra={"mtype": "gauge", "metric": "gunicorn.test", "value": 666})
t.eq(logger.sock.msgs[0], b"gunicorn.test:666|g")
t.eq(sio.getvalue(), "Blah\n")
assert logger.sock.msgs[0] == b"gunicorn.test:666|g"
assert sio.getvalue() == "Blah\n"
logger.sock.reset()
# Only metrics, no logging
logger.info("", extra={"mtype": "gauge", "metric": "gunicorn.test", "value": 666})
t.eq(logger.sock.msgs[0], b"gunicorn.test:666|g")
t.eq(sio.getvalue(), "Blah\n") # log is unchanged
assert logger.sock.msgs[0] == b"gunicorn.test:666|g"
assert sio.getvalue() == "Blah\n" # log is unchanged
logger.sock.reset()
# Debug logging also supports metrics
logger.debug("", extra={"mtype": "gauge", "metric": "gunicorn.debug", "value": 667})
t.eq(logger.sock.msgs[0], b"gunicorn.debug:667|g")
t.eq(sio.getvalue(), "Blah\n") # log is unchanged
assert logger.sock.msgs[0] == b"gunicorn.debug:667|g"
assert sio.getvalue() == "Blah\n" # log is unchanged
logger.sock.reset()
logger.critical("Boom")
t.eq(logger.sock.msgs[0], b"gunicorn.log.critical:1|c|@1.0")
assert logger.sock.msgs[0] == b"gunicorn.log.critical:1|c|@1.0"
logger.sock.reset()
logger.access(MockResponse("200 OK"), None, {}, timedelta(seconds=7))
t.eq(logger.sock.msgs[0], b"gunicorn.request.duration:7000.0|ms")
t.eq(logger.sock.msgs[1], b"gunicorn.requests:1|c|@1.0")
t.eq(logger.sock.msgs[2], b"gunicorn.request.status.200:1|c|@1.0")
assert logger.sock.msgs[0] == b"gunicorn.request.duration:7000.0|ms"
assert logger.sock.msgs[1] == b"gunicorn.requests:1|c|@1.0"
assert logger.sock.msgs[2] == b"gunicorn.request.status.200:1|c|@1.0"
def test_prefix():
c = Config()
@ -110,7 +110,7 @@ def test_prefix():
logger.sock = MockSocket(False)
logger.info("Blah", extra={"mtype": "gauge", "metric": "gunicorn.test", "value": 666})
t.eq(logger.sock.msgs[0], b"test.gunicorn.test:666|g")
assert logger.sock.msgs[0] == b"test.gunicorn.test:666|g"
def test_prefix_no_dot():
c = Config()
@ -119,7 +119,7 @@ def test_prefix_no_dot():
logger.sock = MockSocket(False)
logger.info("Blah", extra={"mtype": "gauge", "metric": "gunicorn.test", "value": 666})
t.eq(logger.sock.msgs[0], b"test.gunicorn.test:666|g")
assert logger.sock.msgs[0] == b"test.gunicorn.test:666|g"
def test_prefix_multiple_dots():
c = Config()
@ -128,7 +128,7 @@ def test_prefix_multiple_dots():
logger.sock = MockSocket(False)
logger.info("Blah", extra={"mtype": "gauge", "metric": "gunicorn.test", "value": 666})
t.eq(logger.sock.msgs[0], b"test.gunicorn.test:666|g")
assert logger.sock.msgs[0] == b"test.gunicorn.test:666|g"
def test_prefix_nested():
c = Config()
@ -137,4 +137,4 @@ def test_prefix_nested():
logger.sock = MockSocket(False)
logger.info("Blah", extra={"mtype": "gauge", "metric": "gunicorn.test", "value": 666})
t.eq(logger.sock.msgs[0], b"test.asdf.gunicorn.test:666|g")
assert logger.sock.msgs[0] == b"test.asdf.gunicorn.test:666|g"

View File

@ -253,18 +253,18 @@ class request(object):
p = RequestParser(cfg, sender())
for req in p:
self.same(req, sizer, matcher, cases.pop(0))
t.eq(len(cases), 0)
assert len(cases) == 0
def same(self, req, sizer, matcher, exp):
t.eq(req.method, exp["method"])
t.eq(req.uri, exp["uri"]["raw"])
t.eq(req.path, exp["uri"]["path"])
t.eq(req.query, exp["uri"]["query"])
t.eq(req.fragment, exp["uri"]["fragment"])
t.eq(req.version, exp["version"])
t.eq(req.headers, exp["headers"])
assert req.method == exp["method"]
assert req.uri == exp["uri"]["raw"]
assert req.path == exp["uri"]["path"]
assert req.query == exp["uri"]["query"]
assert req.fragment == exp["uri"]["fragment"]
assert req.version == exp["version"]
assert req.headers == exp["headers"]
matcher(req, exp["body"], sizer)
t.eq(req.trailers, exp.get("trailers", []))
assert req.trailers == exp.get("trailers", [])
class badrequest(object):
def __init__(self, fname):