fix: keep forwarded_allow_ips as strings for backward compatibility (#3459)

The CIDR network support added in 24.1.0 changed forwarded_allow_ips
and proxy_allow_ips from string lists to ipaddress.ip_network objects.
This broke external tools like uvicorn that expect strings.

This fix validates IP/CIDR format during config parsing but keeps the
string representation. Network objects are cached in Config methods
(forwarded_allow_networks() and proxy_allow_networks()) for efficient
IP checking without repeated conversions.

Also uses strict mode for ip_network validation to detect mistakes like
192.168.1.1/24 where host bits are set (should be 192.168.1.0/24).

Fixes #3458
This commit is contained in:
Benoit Chesneau 2026-01-23 23:51:25 +01:00 committed by GitHub
parent d73ff4b1d8
commit e9a3f30a0f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 86 additions and 41 deletions

View File

@ -39,17 +39,21 @@ VERSION_RE = re.compile(r"HTTP/(\d)\.(\d)")
RFC9110_5_5_INVALID_AND_DANGEROUS = re.compile(r"[\0\r\n]")
def _ip_in_allow_list(ip_str, allow_list):
"""Check if IP address is in the allow list (which may contain networks)."""
def _ip_in_allow_list(ip_str, allow_list, networks):
"""Check if IP address is in the allow list.
Args:
ip_str: The IP address string to check
allow_list: The original allow list (strings, may contain "*")
networks: Pre-computed ipaddress.ip_network objects from config
"""
if '*' in allow_list:
return True
try:
ip = ipaddress.ip_address(ip_str)
except ValueError:
return False
for network in allow_list:
if network == '*':
return True
for network in networks:
if ip in network:
return True
return False
@ -219,7 +223,8 @@ class AsyncRequest:
def _proxy_protocol_access_check(self):
"""Check if proxy protocol is allowed from this peer."""
if (isinstance(self.peer_addr, tuple) and
not _ip_in_allow_list(self.peer_addr[0], self.cfg.proxy_allow_ips)):
not _ip_in_allow_list(self.peer_addr[0], self.cfg.proxy_allow_ips,
self.cfg.proxy_allow_networks())):
raise ForbiddenProxyRequest(self.peer_addr[0])
async def _parse_proxy_protocol_v1(self, buf):
@ -430,7 +435,8 @@ class AsyncRequest:
if from_trailer:
pass
elif (not isinstance(self.peer_addr, tuple)
or _ip_in_allow_list(self.peer_addr[0], cfg.forwarded_allow_ips)):
or _ip_in_allow_list(self.peer_addr[0], cfg.forwarded_allow_ips,
cfg.forwarded_allow_networks())):
secure_scheme_headers = cfg.secure_scheme_headers
forwarder_headers = cfg.forwarder_headers

View File

@ -47,6 +47,8 @@ class Config:
def __init__(self, usage=None, prog=None):
self.settings = make_settings()
self._forwarded_allow_networks = None
self._proxy_allow_networks = None
self.usage = usage
self.prog = prog or os.path.basename(sys.argv[0])
self.env_orig = os.environ.copy()
@ -174,6 +176,26 @@ class Config:
def is_ssl(self):
return self.certfile or self.keyfile
def forwarded_allow_networks(self):
"""Return cached network objects for forwarded_allow_ips (internal use)."""
if self._forwarded_allow_networks is None:
self._forwarded_allow_networks = [
ipaddress.ip_network(addr)
for addr in self.forwarded_allow_ips
if addr != "*"
]
return self._forwarded_allow_networks
def proxy_allow_networks(self):
"""Return cached network objects for proxy_allow_ips (internal use)."""
if self._proxy_allow_networks is None:
self._proxy_allow_networks = [
ipaddress.ip_network(addr)
for addr in self.proxy_allow_ips
if addr != "*"
]
return self._proxy_allow_networks
@property
def ssl_options(self):
opts = {}
@ -407,15 +429,16 @@ def validate_list_of_existing_files(val):
def validate_string_to_addr_list(val):
val = validate_string_to_list(val)
result = []
for addr in val:
if addr == "*":
result.append(addr)
continue
# Support both single IPs and CIDR networks
result.append(ipaddress.ip_network(addr, strict=False))
# Validate that it's a valid IP address or CIDR network
# but keep the string representation for backward compatibility.
# Use strict mode to detect mistakes like 192.168.1.1/24 where
# host bits are set (should be 192.168.1.0/24).
ipaddress.ip_network(addr)
return result
return val
def validate_string_to_list(val):

View File

@ -58,17 +58,21 @@ VERSION_RE = re.compile(r"HTTP/(\d)\.(\d)")
RFC9110_5_5_INVALID_AND_DANGEROUS = re.compile(r"[\0\r\n]")
def _ip_in_allow_list(ip_str, allow_list):
"""Check if IP address is in the allow list (which may contain networks)."""
def _ip_in_allow_list(ip_str, allow_list, networks):
"""Check if IP address is in the allow list.
Args:
ip_str: The IP address string to check
allow_list: The original allow list (strings, may contain "*")
networks: Pre-computed ipaddress.ip_network objects from config
"""
if '*' in allow_list:
return True
try:
ip = ipaddress.ip_address(ip_str)
except ValueError:
return False
for network in allow_list:
if network == '*':
return True
for network in networks:
if ip in network:
return True
return False
@ -127,7 +131,8 @@ class Message:
# .. or we are just behind a proxy who does not remove conflicting trailers
pass
elif (not isinstance(self.peer_addr, tuple)
or _ip_in_allow_list(self.peer_addr[0], cfg.forwarded_allow_ips)):
or _ip_in_allow_list(self.peer_addr[0], cfg.forwarded_allow_ips,
cfg.forwarded_allow_networks())):
secure_scheme_headers = cfg.secure_scheme_headers
forwarder_headers = cfg.forwarder_headers
@ -407,7 +412,8 @@ class Request(Message):
def proxy_protocol_access_check(self):
"""Check if proxy protocol is allowed from this peer."""
if (isinstance(self.peer_addr, tuple) and
not _ip_in_allow_list(self.peer_addr[0], self.cfg.proxy_allow_ips)):
not _ip_in_allow_list(self.peer_addr[0], self.cfg.proxy_allow_ips,
self.cfg.proxy_allow_networks())):
raise ForbiddenProxyRequest(self.peer_addr[0])
def _parse_proxy_protocol_v1(self, unreader, buf):

View File

@ -50,8 +50,10 @@ class MockConfig:
def __init__(self):
self.is_ssl = False
self.proxy_protocol = "off"
self.proxy_allow_ips = [ipaddress.ip_network("127.0.0.1")]
self.forwarded_allow_ips = [ipaddress.ip_network("127.0.0.1")]
self.proxy_allow_ips = ["127.0.0.1"]
self.forwarded_allow_ips = ["127.0.0.1"]
self._proxy_allow_networks = None
self._forwarded_allow_networks = None
self.secure_scheme_headers = {}
self.forwarder_headers = []
self.limit_request_line = 8190
@ -64,6 +66,24 @@ class MockConfig:
self.strip_header_spaces = False
self.header_map = "refuse"
def forwarded_allow_networks(self):
if self._forwarded_allow_networks is None:
self._forwarded_allow_networks = [
ipaddress.ip_network(addr)
for addr in self.forwarded_allow_ips
if addr != "*"
]
return self._forwarded_allow_networks
def proxy_allow_networks(self):
if self._proxy_allow_networks is None:
self._proxy_allow_networks = [
ipaddress.ip_network(addr)
for addr in self.proxy_allow_ips
if addr != "*"
]
return self._proxy_allow_networks
# AsyncUnreader Tests

View File

@ -2,7 +2,6 @@
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
import ipaddress
import os
import re
import sys
@ -166,27 +165,15 @@ def test_str_validation():
def test_str_to_addr_list_validation():
c = config.Config()
# Default values are now network objects
assert c.proxy_allow_ips == [
ipaddress.ip_network("127.0.0.1/32"),
ipaddress.ip_network("::1/128")
]
assert c.forwarded_allow_ips == [
ipaddress.ip_network("127.0.0.1/32"),
ipaddress.ip_network("::1/128")
]
# Single IPs are converted to /32 or /128 networks
# Values remain as strings for backward compatibility
assert c.proxy_allow_ips == ["127.0.0.1", "::1"]
assert c.forwarded_allow_ips == ["127.0.0.1", "::1"]
# Single IPs are validated but kept as strings
c.set("forwarded_allow_ips", "127.0.0.1,192.0.2.1")
assert c.forwarded_allow_ips == [
ipaddress.ip_network("127.0.0.1/32"),
ipaddress.ip_network("192.0.2.1/32")
]
# CIDR networks are supported
assert c.forwarded_allow_ips == ["127.0.0.1", "192.0.2.1"]
# CIDR networks are supported and kept as strings
c.set("forwarded_allow_ips", "127.0.0.0/8,192.168.0.0/16")
assert c.forwarded_allow_ips == [
ipaddress.ip_network("127.0.0.0/8"),
ipaddress.ip_network("192.168.0.0/16")
]
assert c.forwarded_allow_ips == ["127.0.0.0/8", "192.168.0.0/16"]
# Wildcard is preserved as string
c.set("forwarded_allow_ips", "*")
assert c.forwarded_allow_ips == ["*"]
@ -200,6 +187,9 @@ def test_str_to_addr_list_validation():
pytest.raises(ValueError, c.set, "forwarded_allow_ips", "127.0.0")
# detect typos
pytest.raises(ValueError, c.set, "forwarded_allow_ips", "::f:")
# dangerous typos such as accidentally permitting half the internet
# clearly recognizable - masked bits are not zero
pytest.raises(ValueError, c.set, "forwarded_allow_ips", "100.64.0.0/1")
def test_str_to_list():