perf: 优化路由监听服务性能,减少 API 调用

主要优化:
- 性能优化:只调用一次 get_all_ssls() API,在内存中构建域名集合进行快速查找
  - 之前:N 个域名 = N 次 API 调用
  - 现在:N 个域名 = 1 次 API 调用
  - 性能提升:从 O(N×M) 降低到 O(N+M)

- HTTP 连接复用:使用 requests.Session() 复用连接,减少连接开销

- 代码重构:
  - 提取 _fetch_apisix_data() 公共方法,减少重复代码
  - 提取 _is_valid_domain() 方法,改进 IP 地址检测(支持 IPv4/IPv6)
  - 提取 _build_ssl_domains_set() 方法,构建 SSL 域名集合

- IP 地址检测改进:使用 ipaddress 模块,更准确地检测 IPv4 和 IPv6

这些优化显著提升了服务性能,特别是在处理大量路由和域名时。
This commit is contained in:
jingrow 2026-01-01 19:58:16 +00:00
parent 7c464126a9
commit b30fafd34f

View File

@ -11,7 +11,8 @@ import json
import time
import logging
import requests
from typing import Set, Optional
import ipaddress
from typing import Set, Optional, Dict
import sys
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from ssl_manager import APISIXSSLManager
@ -39,7 +40,12 @@ class RouteWatcher:
self.apisix_admin_url = os.getenv('APISIX_ADMIN_URL', 'http://localhost:9180')
self.apisix_admin_key = os.getenv('APISIX_ADMIN_KEY', '8206e6e42b6b53243c52a767cc633137')
# 不再使用已处理列表,直接检查实际 SSL 配置
# 创建 HTTP 会话,复用连接
self.session = requests.Session()
self.session.headers.update({
'X-API-KEY': self.apisix_admin_key,
'Content-Type': 'application/json'
})
def _get_apisix_headers(self):
"""获取 APISIX Admin API 请求头"""
@ -131,29 +137,55 @@ class RouteWatcher:
domains.update(snis)
return domains
def should_request_cert(self, domain: str) -> bool:
"""判断是否需要申请证书"""
def _is_valid_domain(self, domain: str) -> bool:
"""检查是否为有效域名(非 IP 地址和本地域名)"""
# 跳过本地域名
if domain in ['localhost', '127.0.0.1', '0.0.0.0']:
return False
# 跳过 IP 地址
if domain.replace('.', '').isdigit():
# 检查是否为 IP 地址(支持 IPv4 和 IPv6
try:
ipaddress.ip_address(domain)
return False
except ValueError:
pass
return True
def _build_ssl_domains_set(self, ssls: list) -> Set[str]:
"""构建所有已配置 SSL 的域名集合(用于快速查找)"""
ssl_domains = set()
for ssl in ssls:
domains = self.extract_domains_from_ssl(ssl)
ssl_domains.update(domains)
return ssl_domains
def should_request_cert(self, domain: str, existing_ssl_domains: Set[str]) -> bool:
"""判断是否需要申请证书
Args:
domain: 要检查的域名
existing_ssl_domains: 已存在的 SSL 域名集合用于快速查找
"""
# 检查是否为有效域名
if not self._is_valid_domain(domain):
return False
# 检查是否已有 SSL 配置(直接检查实际配置,最准确)
ssls = self.get_all_ssls()
for ssl in ssls:
ssl_domains = self.extract_domains_from_ssl(ssl)
if domain in ssl_domains:
logger.info(f"域名已有 SSL 配置: {domain}")
return False
# 检查是否已有 SSL 配置
if domain in existing_ssl_domains:
logger.info(f"域名已有 SSL 配置: {domain}")
return False
return True
def process_new_domains(self):
"""处理新域名"""
# 优化:只获取一次路由和 SSL 配置,避免重复 API 调用
routes = self.get_all_routes()
ssls = self.get_all_ssls()
# 构建已存在的 SSL 域名集合(用于快速查找)
existing_ssl_domains = self._build_ssl_domains_set(ssls)
# 按路由处理,同一路由的多个域名合并到一个证书
for route in routes:
@ -167,8 +199,8 @@ class RouteWatcher:
if not domains:
continue
# 过滤出需要申请证书的域名
domains_to_request = [d for d in domains if self.should_request_cert(d)]
# 过滤出需要申请证书的域名(使用缓存的 SSL 域名集合)
domains_to_request = [d for d in domains if self.should_request_cert(d, existing_ssl_domains)]
if not domains_to_request:
continue