Merge pull request #1064 from berkerpeksag/tests-cleanup

Tests cleanup
This commit is contained in:
Randall Leeds 2015-07-07 12:36:54 -07:00
commit 1864a3c779
14 changed files with 261 additions and 284 deletions

View File

@ -32,3 +32,18 @@ def requires_mac_ver(*min_version):
wrapper.min_version = min_version
return wrapper
return decorator
try:
from types import SimpleNamespace
except ImportError:
class SimpleNamespace(object):
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
def __repr__(self):
keys = sorted(self.__dict__)
items = ("{}={!r}".format(k, self.__dict__[k]) for k in keys)
return "{}({})".format(type(self).__name__, ", ".join(items))
def __eq__(self, other):
return self.__dict__ == other.__dict__

View File

@ -77,40 +77,3 @@ class http_request(object):
func(req)
run.func_name = func.func_name
return run
def eq(a, b):
assert a == b, "%r != %r" % (a, b)
def ne(a, b):
assert a != b, "%r == %r" % (a, b)
def lt(a, b):
assert a < b, "%r >= %r" % (a, b)
def gt(a, b):
assert a > b, "%r <= %r" % (a, b)
def isin(a, b):
assert a in b, "%r is not in %r" % (a, b)
def isnotin(a, b):
assert a not in b, "%r is in %r" % (a, b)
def has(a, b):
assert hasattr(a, b), "%r has no attribute %r" % (a, b)
def hasnot(a, b):
assert not hasattr(a, b), "%r has an attribute %r" % (a, b)
def istype(a, b):
assert isinstance(a, b), "%r is not an instance of %r" % (a, b)
def raises(exctype, func, *args, **kwargs):
try:
func(*args, **kwargs)
except exctype:
pass
else:
func_name = getattr(func, "func_name", "<builtin_function>")
raise AssertionError("Function %s did not raise %s" % (
func_name, exctype.__name__))

View File

@ -1,28 +0,0 @@
import datetime
import t
from gunicorn.config import Config
from gunicorn.glogging import Logger
class Mock(object):
def __init__(self, **kwargs):
for attr in kwargs:
setattr(self, attr, kwargs[attr])
def test_atoms_defaults():
response = Mock(status='200', response_length=1024,
headers=(('Content-Type', 'application/json'),),
sent=1024)
request = Mock(headers=(('Accept', 'application/json'), ))
environ = {'REQUEST_METHOD': 'GET', 'RAW_URI': 'http://my.uri',
'SERVER_PROTOCOL': 'HTTP/1.1'}
logger = Logger(Config())
atoms = logger.atoms(response, request, environ,
datetime.timedelta(seconds=1))
t.istype(atoms, dict)
t.eq(atoms['r'], 'GET http://my.uri HTTP/1.1')
t.eq(atoms['{accept}i'], 'application/json')
t.eq(atoms['{content-type}o'], 'application/json')

View File

@ -1,70 +0,0 @@
# -*- coding: utf-8 -
# Copyright 2013 Dariusz Suchojad <dsuch at zato.io>
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
import ssl
import sys
from unittest import TestCase
from gunicorn.config import (
KeyFile, CertFile, SSLVersion, CACerts, SuppressRaggedEOFs,
DoHandshakeOnConnect, Setting,
)
if sys.version_info >= (2, 7):
from gunicorn.config import Ciphers
class SSLTestCase(TestCase):
def test_settings_classes(self):
""" Tests all settings options and their defaults.
"""
self.assertTrue(issubclass(KeyFile, Setting))
self.assertEquals(KeyFile.name, 'keyfile')
self.assertEquals(KeyFile.section, 'Ssl')
self.assertEquals(KeyFile.cli, ['--keyfile'])
self.assertEquals(KeyFile.meta, 'FILE')
self.assertEquals(KeyFile.default, None)
self.assertTrue(issubclass(CertFile, Setting))
self.assertEquals(CertFile.name, 'certfile')
self.assertEquals(CertFile.section, 'Ssl')
self.assertEquals(CertFile.cli, ['--certfile'])
self.assertEquals(CertFile.default, None)
self.assertTrue(issubclass(SSLVersion, Setting))
self.assertEquals(SSLVersion.name, 'ssl_version')
self.assertEquals(SSLVersion.section, 'Ssl')
self.assertEquals(SSLVersion.cli, ['--ssl-version'])
self.assertEquals(SSLVersion.default, ssl.PROTOCOL_TLSv1)
self.assertTrue(issubclass(CACerts, Setting))
self.assertEquals(CACerts.name, 'ca_certs')
self.assertEquals(CACerts.section, 'Ssl')
self.assertEquals(CACerts.cli, ['--ca-certs'])
self.assertEquals(CACerts.meta, 'FILE')
self.assertEquals(CACerts.default, None)
self.assertTrue(issubclass(SuppressRaggedEOFs, Setting))
self.assertEquals(SuppressRaggedEOFs.name, 'suppress_ragged_eofs')
self.assertEquals(SuppressRaggedEOFs.section, 'Ssl')
self.assertEquals(SuppressRaggedEOFs.cli, ['--suppress-ragged-eofs'])
self.assertEquals(SuppressRaggedEOFs.action, 'store_true')
self.assertEquals(SuppressRaggedEOFs.default, True)
self.assertTrue(issubclass(DoHandshakeOnConnect, Setting))
self.assertEquals(DoHandshakeOnConnect.name, 'do_handshake_on_connect')
self.assertEquals(DoHandshakeOnConnect.section, 'Ssl')
self.assertEquals(DoHandshakeOnConnect.cli, ['--do-handshake-on-connect'])
self.assertEquals(DoHandshakeOnConnect.action, 'store_true')
self.assertEquals(DoHandshakeOnConnect.default, False)
if sys.version_info >= (2, 7):
self.assertTrue(issubclass(Ciphers, Setting))
self.assertEquals(Ciphers.name, 'ciphers')
self.assertEquals(Ciphers.section, 'Ssl')
self.assertEquals(Ciphers.cli, ['--ciphers'])
self.assertEquals(Ciphers.default, 'TLSv1')

View File

@ -3,13 +3,13 @@
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
import t
import os
from gunicorn.app.base import BaseApplication
import gunicorn.app.base
import gunicorn.arbiter
class PreloadedAppWithEnvSettings(BaseApplication):
class PreloadedAppWithEnvSettings(gunicorn.app.base.BaseApplication):
"""
Simple application that makes use of the 'preload' feature to
start the application before spawning worker processes and sets
@ -18,11 +18,9 @@ class PreloadedAppWithEnvSettings(BaseApplication):
def init(self, parser, opts, args):
"""No-op"""
pass
def load(self):
"""No-op"""
pass
def load_config(self):
"""Set the 'preload_app' and 'raw_env' settings in order to verify their
@ -42,8 +40,8 @@ class PreloadedAppWithEnvSettings(BaseApplication):
def verify_env_vars():
t.eq(os.getenv('SOME_PATH'), '/tmp/something')
t.eq(os.getenv('OTHER_PATH'), '/tmp/something/else')
assert os.getenv('SOME_PATH') == '/tmp/something'
assert os.getenv('OTHER_PATH') == '/tmp/something/else'
def test_env_vars_available_during_preload():

View File

@ -8,6 +8,8 @@ import t
import os
import sys
import pytest
from gunicorn import config
from gunicorn.app.base import Application
from gunicorn.workers.sync import SyncWorker
@ -45,8 +47,7 @@ class NoConfigApp(Application):
def test_defaults():
c = config.Config()
for s in config.KNOWN_SETTINGS:
t.eq(c.settings[s.name].validator(s.default),
c.settings[s.name].get())
assert c.settings[s.name].validator(s.default) == c.settings[s.name].get()
def test_property_access():
c = config.Config()
@ -54,108 +55,105 @@ def test_property_access():
getattr(c, s.name)
# Class was loaded
t.eq(c.worker_class, SyncWorker)
assert c.worker_class == SyncWorker
# Workers defaults to 1
t.eq(c.workers, 1)
assert c.workers == 1
c.set("workers", 3)
t.eq(c.workers, 3)
assert c.workers == 3
# Address is parsed
t.eq(c.address, [("127.0.0.1", 8000)])
assert c.address == [("127.0.0.1", 8000)]
# User and group defaults
t.eq(os.geteuid(), c.uid)
t.eq(os.getegid(), c.gid)
assert os.geteuid() == c.uid
assert os.getegid() == c.gid
# Proc name
t.eq("gunicorn", c.proc_name)
assert "gunicorn" == c.proc_name
# Not a config property
t.raises(AttributeError, getattr, c, "foo")
pytest.raises(AttributeError, getattr, c, "foo")
# Force to be not an error
class Baz(object):
def get(self):
return 3.14
c.settings["foo"] = Baz()
t.eq(c.foo, 3.14)
assert c.foo == 3.14
# Attempt to set a cfg not via c.set
t.raises(AttributeError, setattr, c, "proc_name", "baz")
pytest.raises(AttributeError, setattr, c, "proc_name", "baz")
# No setting for name
t.raises(AttributeError, c.set, "baz", "bar")
pytest.raises(AttributeError, c.set, "baz", "bar")
def test_bool_validation():
c = config.Config()
t.eq(c.preload_app, False)
assert c.preload_app is False
c.set("preload_app", True)
t.eq(c.preload_app, True)
assert c.preload_app is True
c.set("preload_app", "true")
t.eq(c.preload_app, True)
assert c.preload_app is True
c.set("preload_app", "false")
t.eq(c.preload_app, False)
t.raises(ValueError, c.set, "preload_app", "zilch")
t.raises(TypeError, c.set, "preload_app", 4)
assert c.preload_app is False
pytest.raises(ValueError, c.set, "preload_app", "zilch")
pytest.raises(TypeError, c.set, "preload_app", 4)
def test_pos_int_validation():
c = config.Config()
t.eq(c.workers, 1)
assert c.workers == 1
c.set("workers", 4)
t.eq(c.workers, 4)
assert c.workers == 4
c.set("workers", "5")
t.eq(c.workers, 5)
assert c.workers == 5
c.set("workers", "0xFF")
t.eq(c.workers, 255)
assert c.workers == 255
c.set("workers", True)
t.eq(c.workers, 1) # Yes. That's right...
t.raises(ValueError, c.set, "workers", -21)
t.raises(TypeError, c.set, "workers", c)
assert c.workers == 1 # Yes. That's right...
pytest.raises(ValueError, c.set, "workers", -21)
pytest.raises(TypeError, c.set, "workers", c)
def test_str_validation():
c = config.Config()
t.eq(c.proc_name, "gunicorn")
assert c.proc_name == "gunicorn"
c.set("proc_name", " foo ")
t.eq(c.proc_name, "foo")
t.raises(TypeError, c.set, "proc_name", 2)
assert c.proc_name == "foo"
pytest.raises(TypeError, c.set, "proc_name", 2)
def test_str_to_list_validation():
c = config.Config()
t.eq(c.forwarded_allow_ips, ["127.0.0.1"])
assert c.forwarded_allow_ips == ["127.0.0.1"]
c.set("forwarded_allow_ips", "127.0.0.1,192.168.0.1")
t.eq(c.forwarded_allow_ips, ["127.0.0.1", "192.168.0.1"])
assert c.forwarded_allow_ips == ["127.0.0.1", "192.168.0.1"]
c.set("forwarded_allow_ips", "")
t.eq(c.forwarded_allow_ips, [])
assert c.forwarded_allow_ips == []
c.set("forwarded_allow_ips", None)
t.eq(c.forwarded_allow_ips, [])
t.raises(TypeError, c.set, "forwarded_allow_ips", 1)
assert c.forwarded_allow_ips == []
pytest.raises(TypeError, c.set, "forwarded_allow_ips", 1)
def test_callable_validation():
c = config.Config()
def func(a, b):
pass
c.set("pre_fork", func)
t.eq(c.pre_fork, func)
t.raises(TypeError, c.set, "pre_fork", 1)
t.raises(TypeError, c.set, "pre_fork", lambda x: True)
assert c.pre_fork == func
pytest.raises(TypeError, c.set, "pre_fork", 1)
pytest.raises(TypeError, c.set, "pre_fork", lambda x: True)
def test_callable_validation_for_string():
from os.path import isdir as testfunc
t.eq(
config.validate_callable(-1)("os.path.isdir"),
testfunc
)
assert config.validate_callable(-1)("os.path.isdir") == testfunc
# invalid values tests
t.raises(
pytest.raises(
TypeError,
config.validate_callable(-1), ""
)
t.raises(
pytest.raises(
TypeError,
config.validate_callable(-1), "os.path.not_found_func"
)
t.raises(
pytest.raises(
TypeError,
config.validate_callable(-1), "notfoundmodule.func"
)
@ -164,60 +162,65 @@ def test_callable_validation_for_string():
def test_cmd_line():
with AltArgs(["prog_name", "-b", "blargh"]):
app = NoConfigApp()
t.eq(app.cfg.bind, ["blargh"])
assert app.cfg.bind == ["blargh"]
with AltArgs(["prog_name", "-w", "3"]):
app = NoConfigApp()
t.eq(app.cfg.workers, 3)
assert app.cfg.workers == 3
with AltArgs(["prog_name", "--preload"]):
app = NoConfigApp()
t.eq(app.cfg.preload_app, True)
assert app.cfg.preload_app
def test_app_config():
with AltArgs():
app = NoConfigApp()
for s in config.KNOWN_SETTINGS:
t.eq(app.cfg.settings[s.name].validator(s.default),
app.cfg.settings[s.name].get())
assert app.cfg.settings[s.name].validator(s.default) == app.cfg.settings[s.name].get()
def test_load_config():
with AltArgs(["prog_name", "-c", cfg_file()]):
app = NoConfigApp()
t.eq(app.cfg.bind, ["unix:/tmp/bar/baz"])
t.eq(app.cfg.workers, 3)
t.eq(app.cfg.proc_name, "fooey")
assert app.cfg.bind == ["unix:/tmp/bar/baz"]
assert app.cfg.workers == 3
assert app.cfg.proc_name == "fooey"
def test_load_config_module():
with AltArgs(["prog_name", "-c", cfg_module()]):
app = NoConfigApp()
t.eq(app.cfg.bind, ["unix:/tmp/bar/baz"])
t.eq(app.cfg.workers, 3)
t.eq(app.cfg.proc_name, "fooey")
assert app.cfg.bind == ["unix:/tmp/bar/baz"]
assert app.cfg.workers == 3
assert app.cfg.proc_name == "fooey"
def test_cli_overrides_config():
with AltArgs(["prog_name", "-c", cfg_file(), "-b", "blarney"]):
app = NoConfigApp()
t.eq(app.cfg.bind, ["blarney"])
t.eq(app.cfg.proc_name, "fooey")
assert app.cfg.bind == ["blarney"]
assert app.cfg.proc_name == "fooey"
def test_cli_overrides_config_module():
with AltArgs(["prog_name", "-c", cfg_module(), "-b", "blarney"]):
app = NoConfigApp()
t.eq(app.cfg.bind, ["blarney"])
t.eq(app.cfg.proc_name, "fooey")
assert app.cfg.bind == ["blarney"]
assert app.cfg.proc_name == "fooey"
def test_default_config_file():
@pytest.fixture
def create_config_file(request):
default_config = os.path.join(os.path.abspath(os.getcwd()),
'gunicorn.conf.py')
'gunicorn.conf.py')
with open(default_config, 'w+') as default:
default.write("bind='0.0.0.0:9090'")
t.eq(config.get_default_config_file(), default_config)
def fin():
os.unlink(default_config)
request.addfinalizer(fin)
return default
def test_default_config_file(create_config_file):
assert config.get_default_config_file() == create_config_file.name
with AltArgs(["prog_name"]):
app = NoConfigApp()
t.eq(app.cfg.bind, ["0.0.0.0:9090"])
os.unlink(default_config)
assert app.cfg.bind == ["0.0.0.0:9090"]
def test_post_request():
c = config.Config()
@ -232,19 +235,20 @@ def test_post_request():
return 2
c.set("post_request", post_request_4)
t.eq(4, c.post_request(1, 2, 3, 4))
assert c.post_request(1, 2, 3, 4) == 4
c.set("post_request", post_request_3)
t.eq(3, c.post_request(1, 2, 3, 4))
assert c.post_request(1, 2, 3, 4) == 3
c.set("post_request", post_request_2)
t.eq(2, c.post_request(1, 2, 3, 4))
assert c.post_request(1, 2, 3, 4) == 2
def test_nworkers_changed():
c = config.Config()
def nworkers_changed_3(server, new_value, old_value):
return 3
c.set("nworkers_changed", nworkers_changed_3)
t.eq(3, c.nworkers_changed(1, 2, 3))
assert c.nworkers_changed(1, 2, 3) == 3

View File

@ -5,7 +5,7 @@ from gunicorn.six import BytesIO
def assert_readline(payload, size, expected):
body = Body(BytesIO(payload))
t.eq(body.readline(size), expected)
assert body.readline(size) == expected
def test_readline_empty_body():
@ -20,23 +20,23 @@ def test_readline_zero_size():
def test_readline_new_line_before_size():
body = Body(BytesIO(b"abc\ndef"))
t.eq(body.readline(4), b"abc\n")
t.eq(body.readline(), b"def")
assert body.readline(4) == b"abc\n"
assert body.readline() == b"def"
def test_readline_new_line_after_size():
body = Body(BytesIO(b"abc\ndef"))
t.eq(body.readline(2), b"ab")
t.eq(body.readline(), b"c\n")
assert body.readline(2) == b"ab"
assert body.readline() == b"c\n"
def test_readline_no_new_line():
body = Body(BytesIO(b"abcdef"))
t.eq(body.readline(), b"abcdef")
assert body.readline() == b"abcdef"
body = Body(BytesIO(b"abcdef"))
t.eq(body.readline(2), b"ab")
t.eq(body.readline(2), b"cd")
t.eq(body.readline(2), b"ef")
assert body.readline(2) == b"ab"
assert body.readline(2) == b"cd"
assert body.readline(2) == b"ef"
def test_readline_buffer_loaded():
@ -45,16 +45,15 @@ def test_readline_buffer_loaded():
body.read(1) # load internal buffer
reader.write(b"g\nhi")
reader.seek(7)
print(reader.getvalue())
t.eq(body.readline(), b"bc\n")
t.eq(body.readline(), b"defg\n")
t.eq(body.readline(), b"hi")
assert body.readline() == b"bc\n"
assert body.readline() == b"defg\n"
assert body.readline() == b"hi"
def test_readline_buffer_loaded_with_size():
body = Body(BytesIO(b"abc\ndef"))
body.read(1) # load internal buffer
t.eq(body.readline(2), b"bc")
t.eq(body.readline(2), b"\n")
t.eq(body.readline(2), b"de")
t.eq(body.readline(2), b"f")
body.read(1) # load internal buffer
assert body.readline(2) == b"bc"
assert body.readline(2) == b"\n"
assert body.readline(2) == b"de"
assert body.readline(2) == b"f"

View File

@ -3,17 +3,18 @@
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
import treq
import glob
import os
import pytest
import treq
dirname = os.path.dirname(__file__)
reqdir = os.path.join(dirname, "requests", "invalid")
httpfiles = glob.glob(os.path.join(reqdir, "*.http"))
@pytest.mark.parametrize("fname", httpfiles)
def test_http_parser(fname):
env = treq.load_py(os.path.splitext(fname)[0] + ".py")

26
tests/test_logger.py Normal file
View File

@ -0,0 +1,26 @@
import datetime
import t
from gunicorn.config import Config
from gunicorn.glogging import Logger
from support import SimpleNamespace
def test_atoms_defaults():
response = SimpleNamespace(
status='200', response_length=1024,
headers=(('Content-Type', 'application/json'),), sent=1024,
)
request = SimpleNamespace(headers=(('Accept', 'application/json'),))
environ = {
'REQUEST_METHOD': 'GET', 'RAW_URI': 'http://my.uri',
'SERVER_PROTOCOL': 'HTTP/1.1',
}
logger = Logger(Config())
atoms = logger.atoms(response, request, environ, datetime.timedelta(seconds=1))
assert isinstance(atoms, dict)
assert atoms['r'] == 'GET http://my.uri HTTP/1.1'
assert atoms['{accept}i'] == 'application/json'
assert atoms['{content-type}o'] == 'application/json'

81
tests/test_ssl.py Normal file
View File

@ -0,0 +1,81 @@
# -*- coding: utf-8 -
# Copyright 2013 Dariusz Suchojad <dsuch at zato.io>
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
import sys
import pytest
from gunicorn.config import (
KeyFile, CertFile, SSLVersion, CACerts, SuppressRaggedEOFs,
DoHandshakeOnConnect, Setting,
)
ssl = pytest.importorskip('ssl')
def test_keyfile():
assert issubclass(KeyFile, Setting)
assert KeyFile.name == 'keyfile'
assert KeyFile.section == 'Ssl'
assert KeyFile.cli == ['--keyfile']
assert KeyFile.meta == 'FILE'
assert KeyFile.default is None
def test_certfile():
assert issubclass(CertFile, Setting)
assert CertFile.name == 'certfile'
assert CertFile.section == 'Ssl'
assert CertFile.cli == ['--certfile']
assert CertFile.default is None
def test_ssl_version():
assert issubclass(SSLVersion, Setting)
assert SSLVersion.name == 'ssl_version'
assert SSLVersion.section == 'Ssl'
assert SSLVersion.cli == ['--ssl-version']
assert SSLVersion.default == ssl.PROTOCOL_TLSv1
def test_cacerts():
assert issubclass(CACerts, Setting)
assert CACerts.name == 'ca_certs'
assert CACerts.section == 'Ssl'
assert CACerts.cli == ['--ca-certs']
assert CACerts.meta == 'FILE'
assert CACerts.default is None
def test_suppress_ragged_eofs():
assert issubclass(SuppressRaggedEOFs, Setting)
assert SuppressRaggedEOFs.name == 'suppress_ragged_eofs'
assert SuppressRaggedEOFs.section == 'Ssl'
assert SuppressRaggedEOFs.cli == ['--suppress-ragged-eofs']
assert SuppressRaggedEOFs.action == 'store_true'
assert SuppressRaggedEOFs.default is True
def test_do_handshake_on_connect():
assert issubclass(DoHandshakeOnConnect, Setting)
assert DoHandshakeOnConnect.name == 'do_handshake_on_connect'
assert DoHandshakeOnConnect.section == 'Ssl'
assert DoHandshakeOnConnect.cli == ['--do-handshake-on-connect']
assert DoHandshakeOnConnect.action == 'store_true'
assert DoHandshakeOnConnect.default is False
@pytest.mark.skipif(sys.version_info < (2, 7),
reason="requires Python 2.7+")
def test_ciphers():
from gunicorn.config import Ciphers
assert issubclass(Ciphers, Setting)
assert Ciphers.name == 'ciphers'
assert Ciphers.section == 'Ssl'
assert Ciphers.cli == ['--ciphers']
assert Ciphers.default == 'TLSv1'

View File

@ -3,11 +3,6 @@ import socket
import t
try:
from cStringIO import StringIO
except ImportError:
from io import StringIO
import logging
import tempfile
import shutil
@ -15,13 +10,15 @@ import os
from gunicorn.config import Config
from gunicorn.instrument.statsd import Statsd
from gunicorn.six import StringIO
from support import SimpleNamespace
class TestException(Exception):
class StatsdTestException(Exception):
pass
class MockSocket(object):
"Pretend to be a UDP socket"
def __init__(self, failp):
@ -30,7 +27,7 @@ class MockSocket(object):
def send(self, msg):
if self.failp:
raise TestException("Should not interrupt the logger")
raise StatsdTestException("Should not interrupt the logger")
sock_dir = tempfile.mkdtemp()
sock_file = os.path.join(sock_dir, "test.sock")
@ -50,13 +47,9 @@ class MockSocket(object):
server.close()
shutil.rmtree(sock_dir)
def reset(self):
self.msgs = []
class MockResponse(object):
def __init__(self, status):
self.status = status
def test_statsd_fail():
"UDP socket fails"
@ -78,30 +71,30 @@ def test_instrument():
# Regular message
logger.info("Blah", extra={"mtype": "gauge", "metric": "gunicorn.test", "value": 666})
t.eq(logger.sock.msgs[0], b"gunicorn.test:666|g")
t.eq(sio.getvalue(), "Blah\n")
assert logger.sock.msgs[0] == b"gunicorn.test:666|g"
assert sio.getvalue() == "Blah\n"
logger.sock.reset()
# Only metrics, no logging
logger.info("", extra={"mtype": "gauge", "metric": "gunicorn.test", "value": 666})
t.eq(logger.sock.msgs[0], b"gunicorn.test:666|g")
t.eq(sio.getvalue(), "Blah\n") # log is unchanged
assert logger.sock.msgs[0] == b"gunicorn.test:666|g"
assert sio.getvalue() == "Blah\n" # log is unchanged
logger.sock.reset()
# Debug logging also supports metrics
logger.debug("", extra={"mtype": "gauge", "metric": "gunicorn.debug", "value": 667})
t.eq(logger.sock.msgs[0], b"gunicorn.debug:667|g")
t.eq(sio.getvalue(), "Blah\n") # log is unchanged
assert logger.sock.msgs[0] == b"gunicorn.debug:667|g"
assert sio.getvalue() == "Blah\n" # log is unchanged
logger.sock.reset()
logger.critical("Boom")
t.eq(logger.sock.msgs[0], b"gunicorn.log.critical:1|c|@1.0")
assert logger.sock.msgs[0] == b"gunicorn.log.critical:1|c|@1.0"
logger.sock.reset()
logger.access(MockResponse("200 OK"), None, {}, timedelta(seconds=7))
t.eq(logger.sock.msgs[0], b"gunicorn.request.duration:7000.0|ms")
t.eq(logger.sock.msgs[1], b"gunicorn.requests:1|c|@1.0")
t.eq(logger.sock.msgs[2], b"gunicorn.request.status.200:1|c|@1.0")
logger.access(SimpleNamespace(status="200 OK"), None, {}, timedelta(seconds=7))
assert logger.sock.msgs[0] == b"gunicorn.request.duration:7000.0|ms"
assert logger.sock.msgs[1] == b"gunicorn.requests:1|c|@1.0"
assert logger.sock.msgs[2] == b"gunicorn.request.status.200:1|c|@1.0"
def test_prefix():
c = Config()
@ -110,7 +103,7 @@ def test_prefix():
logger.sock = MockSocket(False)
logger.info("Blah", extra={"mtype": "gauge", "metric": "gunicorn.test", "value": 666})
t.eq(logger.sock.msgs[0], b"test.gunicorn.test:666|g")
assert logger.sock.msgs[0] == b"test.gunicorn.test:666|g"
def test_prefix_no_dot():
c = Config()
@ -119,7 +112,7 @@ def test_prefix_no_dot():
logger.sock = MockSocket(False)
logger.info("Blah", extra={"mtype": "gauge", "metric": "gunicorn.test", "value": 666})
t.eq(logger.sock.msgs[0], b"test.gunicorn.test:666|g")
assert logger.sock.msgs[0] == b"test.gunicorn.test:666|g"
def test_prefix_multiple_dots():
c = Config()
@ -128,7 +121,7 @@ def test_prefix_multiple_dots():
logger.sock = MockSocket(False)
logger.info("Blah", extra={"mtype": "gauge", "metric": "gunicorn.test", "value": 666})
t.eq(logger.sock.msgs[0], b"test.gunicorn.test:666|g")
assert logger.sock.msgs[0] == b"test.gunicorn.test:666|g"
def test_prefix_nested():
c = Config()
@ -137,4 +130,4 @@ def test_prefix_nested():
logger.sock = MockSocket(False)
logger.info("Blah", extra={"mtype": "gauge", "metric": "gunicorn.test", "value": 666})
t.eq(logger.sock.msgs[0], b"test.asdf.gunicorn.test:666|g")
assert logger.sock.msgs[0] == b"test.asdf.gunicorn.test:666|g"

View File

@ -3,30 +3,25 @@
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
import treq
import glob
import os
import pytest
import treq
dirname = os.path.dirname(__file__)
reqdir = os.path.join(dirname, "requests", "valid")
httpfiles = glob.glob(os.path.join(reqdir, "*.http"))
def a_case(fname):
@pytest.mark.parametrize("fname", httpfiles)
def test_http_parser(fname):
env = treq.load_py(os.path.splitext(fname)[0] + ".py")
expect = env['request']
cfg = env['cfg']
req = treq.request(fname, expect)
for case in req.gen_cases(cfg):
case[0](*case[1:])
def test_http_parser():
for fname in glob.glob(os.path.join(reqdir, "*.http")):
if os.getenv("GUNS_BLAZING"):
env = treq.load_py(os.path.splitext(fname)[0] + ".py")
expect = env['request']
cfg = env['cfg']
req = treq.request(fname, expect)
for case in req.gen_cases(cfg):
yield case
else:
yield (a_case, fname)

View File

@ -253,18 +253,18 @@ class request(object):
p = RequestParser(cfg, sender())
for req in p:
self.same(req, sizer, matcher, cases.pop(0))
t.eq(len(cases), 0)
assert len(cases) == 0
def same(self, req, sizer, matcher, exp):
t.eq(req.method, exp["method"])
t.eq(req.uri, exp["uri"]["raw"])
t.eq(req.path, exp["uri"]["path"])
t.eq(req.query, exp["uri"]["query"])
t.eq(req.fragment, exp["uri"]["fragment"])
t.eq(req.version, exp["version"])
t.eq(req.headers, exp["headers"])
assert req.method == exp["method"]
assert req.uri == exp["uri"]["raw"]
assert req.path == exp["uri"]["path"]
assert req.query == exp["uri"]["query"]
assert req.fragment == exp["uri"]["fragment"]
assert req.version == exp["version"]
assert req.headers == exp["headers"]
matcher(req, exp["body"], sizer)
t.eq(req.trailers, exp.get("trailers", []))
assert req.trailers == exp.get("trailers", [])
class badrequest(object):
def __init__(self, fname):