From e9a3f30a0f2e4d91a39afa51ffc8f22a76d9c776 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Fri, 23 Jan 2026 23:51:25 +0100 Subject: [PATCH] fix: keep forwarded_allow_ips as strings for backward compatibility (#3459) The CIDR network support added in 24.1.0 changed forwarded_allow_ips and proxy_allow_ips from string lists to ipaddress.ip_network objects. This broke external tools like uvicorn that expect strings. This fix validates IP/CIDR format during config parsing but keeps the string representation. Network objects are cached in Config methods (forwarded_allow_networks() and proxy_allow_networks()) for efficient IP checking without repeated conversions. Also uses strict mode for ip_network validation to detect mistakes like 192.168.1.1/24 where host bits are set (should be 192.168.1.0/24). Fixes #3458 --- gunicorn/asgi/message.py | 20 +++++++++++++------- gunicorn/config.py | 33 ++++++++++++++++++++++++++++----- gunicorn/http/message.py | 20 +++++++++++++------- tests/test_asgi.py | 24 ++++++++++++++++++++++-- tests/test_config.py | 30 ++++++++++-------------------- 5 files changed, 86 insertions(+), 41 deletions(-) diff --git a/gunicorn/asgi/message.py b/gunicorn/asgi/message.py index 1bb26b99..b73e1b0b 100644 --- a/gunicorn/asgi/message.py +++ b/gunicorn/asgi/message.py @@ -39,17 +39,21 @@ VERSION_RE = re.compile(r"HTTP/(\d)\.(\d)") RFC9110_5_5_INVALID_AND_DANGEROUS = re.compile(r"[\0\r\n]") -def _ip_in_allow_list(ip_str, allow_list): - """Check if IP address is in the allow list (which may contain networks).""" +def _ip_in_allow_list(ip_str, allow_list, networks): + """Check if IP address is in the allow list. + + Args: + ip_str: The IP address string to check + allow_list: The original allow list (strings, may contain "*") + networks: Pre-computed ipaddress.ip_network objects from config + """ if '*' in allow_list: return True try: ip = ipaddress.ip_address(ip_str) except ValueError: return False - for network in allow_list: - if network == '*': - return True + for network in networks: if ip in network: return True return False @@ -219,7 +223,8 @@ class AsyncRequest: def _proxy_protocol_access_check(self): """Check if proxy protocol is allowed from this peer.""" if (isinstance(self.peer_addr, tuple) and - not _ip_in_allow_list(self.peer_addr[0], self.cfg.proxy_allow_ips)): + not _ip_in_allow_list(self.peer_addr[0], self.cfg.proxy_allow_ips, + self.cfg.proxy_allow_networks())): raise ForbiddenProxyRequest(self.peer_addr[0]) async def _parse_proxy_protocol_v1(self, buf): @@ -430,7 +435,8 @@ class AsyncRequest: if from_trailer: pass elif (not isinstance(self.peer_addr, tuple) - or _ip_in_allow_list(self.peer_addr[0], cfg.forwarded_allow_ips)): + or _ip_in_allow_list(self.peer_addr[0], cfg.forwarded_allow_ips, + cfg.forwarded_allow_networks())): secure_scheme_headers = cfg.secure_scheme_headers forwarder_headers = cfg.forwarder_headers diff --git a/gunicorn/config.py b/gunicorn/config.py index 58141e55..ffbc1b27 100644 --- a/gunicorn/config.py +++ b/gunicorn/config.py @@ -47,6 +47,8 @@ class Config: def __init__(self, usage=None, prog=None): self.settings = make_settings() + self._forwarded_allow_networks = None + self._proxy_allow_networks = None self.usage = usage self.prog = prog or os.path.basename(sys.argv[0]) self.env_orig = os.environ.copy() @@ -174,6 +176,26 @@ class Config: def is_ssl(self): return self.certfile or self.keyfile + def forwarded_allow_networks(self): + """Return cached network objects for forwarded_allow_ips (internal use).""" + if self._forwarded_allow_networks is None: + self._forwarded_allow_networks = [ + ipaddress.ip_network(addr) + for addr in self.forwarded_allow_ips + if addr != "*" + ] + return self._forwarded_allow_networks + + def proxy_allow_networks(self): + """Return cached network objects for proxy_allow_ips (internal use).""" + if self._proxy_allow_networks is None: + self._proxy_allow_networks = [ + ipaddress.ip_network(addr) + for addr in self.proxy_allow_ips + if addr != "*" + ] + return self._proxy_allow_networks + @property def ssl_options(self): opts = {} @@ -407,15 +429,16 @@ def validate_list_of_existing_files(val): def validate_string_to_addr_list(val): val = validate_string_to_list(val) - result = [] for addr in val: if addr == "*": - result.append(addr) continue - # Support both single IPs and CIDR networks - result.append(ipaddress.ip_network(addr, strict=False)) + # Validate that it's a valid IP address or CIDR network + # but keep the string representation for backward compatibility. + # Use strict mode to detect mistakes like 192.168.1.1/24 where + # host bits are set (should be 192.168.1.0/24). + ipaddress.ip_network(addr) - return result + return val def validate_string_to_list(val): diff --git a/gunicorn/http/message.py b/gunicorn/http/message.py index 81132b34..1f939c21 100644 --- a/gunicorn/http/message.py +++ b/gunicorn/http/message.py @@ -58,17 +58,21 @@ VERSION_RE = re.compile(r"HTTP/(\d)\.(\d)") RFC9110_5_5_INVALID_AND_DANGEROUS = re.compile(r"[\0\r\n]") -def _ip_in_allow_list(ip_str, allow_list): - """Check if IP address is in the allow list (which may contain networks).""" +def _ip_in_allow_list(ip_str, allow_list, networks): + """Check if IP address is in the allow list. + + Args: + ip_str: The IP address string to check + allow_list: The original allow list (strings, may contain "*") + networks: Pre-computed ipaddress.ip_network objects from config + """ if '*' in allow_list: return True try: ip = ipaddress.ip_address(ip_str) except ValueError: return False - for network in allow_list: - if network == '*': - return True + for network in networks: if ip in network: return True return False @@ -127,7 +131,8 @@ class Message: # .. or we are just behind a proxy who does not remove conflicting trailers pass elif (not isinstance(self.peer_addr, tuple) - or _ip_in_allow_list(self.peer_addr[0], cfg.forwarded_allow_ips)): + or _ip_in_allow_list(self.peer_addr[0], cfg.forwarded_allow_ips, + cfg.forwarded_allow_networks())): secure_scheme_headers = cfg.secure_scheme_headers forwarder_headers = cfg.forwarder_headers @@ -407,7 +412,8 @@ class Request(Message): def proxy_protocol_access_check(self): """Check if proxy protocol is allowed from this peer.""" if (isinstance(self.peer_addr, tuple) and - not _ip_in_allow_list(self.peer_addr[0], self.cfg.proxy_allow_ips)): + not _ip_in_allow_list(self.peer_addr[0], self.cfg.proxy_allow_ips, + self.cfg.proxy_allow_networks())): raise ForbiddenProxyRequest(self.peer_addr[0]) def _parse_proxy_protocol_v1(self, unreader, buf): diff --git a/tests/test_asgi.py b/tests/test_asgi.py index e39ae91a..4e75f7ac 100644 --- a/tests/test_asgi.py +++ b/tests/test_asgi.py @@ -50,8 +50,10 @@ class MockConfig: def __init__(self): self.is_ssl = False self.proxy_protocol = "off" - self.proxy_allow_ips = [ipaddress.ip_network("127.0.0.1")] - self.forwarded_allow_ips = [ipaddress.ip_network("127.0.0.1")] + self.proxy_allow_ips = ["127.0.0.1"] + self.forwarded_allow_ips = ["127.0.0.1"] + self._proxy_allow_networks = None + self._forwarded_allow_networks = None self.secure_scheme_headers = {} self.forwarder_headers = [] self.limit_request_line = 8190 @@ -64,6 +66,24 @@ class MockConfig: self.strip_header_spaces = False self.header_map = "refuse" + def forwarded_allow_networks(self): + if self._forwarded_allow_networks is None: + self._forwarded_allow_networks = [ + ipaddress.ip_network(addr) + for addr in self.forwarded_allow_ips + if addr != "*" + ] + return self._forwarded_allow_networks + + def proxy_allow_networks(self): + if self._proxy_allow_networks is None: + self._proxy_allow_networks = [ + ipaddress.ip_network(addr) + for addr in self.proxy_allow_ips + if addr != "*" + ] + return self._proxy_allow_networks + # AsyncUnreader Tests diff --git a/tests/test_config.py b/tests/test_config.py index 0aff90f5..7ec3d8e7 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -2,7 +2,6 @@ # This file is part of gunicorn released under the MIT license. # See the NOTICE for more information. -import ipaddress import os import re import sys @@ -166,27 +165,15 @@ def test_str_validation(): def test_str_to_addr_list_validation(): c = config.Config() - # Default values are now network objects - assert c.proxy_allow_ips == [ - ipaddress.ip_network("127.0.0.1/32"), - ipaddress.ip_network("::1/128") - ] - assert c.forwarded_allow_ips == [ - ipaddress.ip_network("127.0.0.1/32"), - ipaddress.ip_network("::1/128") - ] - # Single IPs are converted to /32 or /128 networks + # Values remain as strings for backward compatibility + assert c.proxy_allow_ips == ["127.0.0.1", "::1"] + assert c.forwarded_allow_ips == ["127.0.0.1", "::1"] + # Single IPs are validated but kept as strings c.set("forwarded_allow_ips", "127.0.0.1,192.0.2.1") - assert c.forwarded_allow_ips == [ - ipaddress.ip_network("127.0.0.1/32"), - ipaddress.ip_network("192.0.2.1/32") - ] - # CIDR networks are supported + assert c.forwarded_allow_ips == ["127.0.0.1", "192.0.2.1"] + # CIDR networks are supported and kept as strings c.set("forwarded_allow_ips", "127.0.0.0/8,192.168.0.0/16") - assert c.forwarded_allow_ips == [ - ipaddress.ip_network("127.0.0.0/8"), - ipaddress.ip_network("192.168.0.0/16") - ] + assert c.forwarded_allow_ips == ["127.0.0.0/8", "192.168.0.0/16"] # Wildcard is preserved as string c.set("forwarded_allow_ips", "*") assert c.forwarded_allow_ips == ["*"] @@ -200,6 +187,9 @@ def test_str_to_addr_list_validation(): pytest.raises(ValueError, c.set, "forwarded_allow_ips", "127.0.0") # detect typos pytest.raises(ValueError, c.set, "forwarded_allow_ips", "::f:") + # dangerous typos such as accidentally permitting half the internet + # clearly recognizable - masked bits are not zero + pytest.raises(ValueError, c.set, "forwarded_allow_ips", "100.64.0.0/1") def test_str_to_list():