diff --git a/ssl_manager/route_watcher.py b/ssl_manager/route_watcher.py index 7b9bec4..2210a7c 100755 --- a/ssl_manager/route_watcher.py +++ b/ssl_manager/route_watcher.py @@ -11,7 +11,8 @@ import json import time import logging import requests -from typing import Set, Optional +import ipaddress +from typing import Set, Optional, Dict import sys sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from ssl_manager import APISIXSSLManager @@ -39,7 +40,12 @@ class RouteWatcher: self.apisix_admin_url = os.getenv('APISIX_ADMIN_URL', 'http://localhost:9180') self.apisix_admin_key = os.getenv('APISIX_ADMIN_KEY', '8206e6e42b6b53243c52a767cc633137') - # 不再使用已处理列表,直接检查实际 SSL 配置 + # 创建 HTTP 会话,复用连接 + self.session = requests.Session() + self.session.headers.update({ + 'X-API-KEY': self.apisix_admin_key, + 'Content-Type': 'application/json' + }) def _get_apisix_headers(self): """获取 APISIX Admin API 请求头""" @@ -131,29 +137,55 @@ class RouteWatcher: domains.update(snis) return domains - def should_request_cert(self, domain: str) -> bool: - """判断是否需要申请证书""" + def _is_valid_domain(self, domain: str) -> bool: + """检查是否为有效域名(非 IP 地址和本地域名)""" # 跳过本地域名 if domain in ['localhost', '127.0.0.1', '0.0.0.0']: return False - # 跳过 IP 地址 - if domain.replace('.', '').isdigit(): + # 检查是否为 IP 地址(支持 IPv4 和 IPv6) + try: + ipaddress.ip_address(domain) + return False + except ValueError: + pass + + return True + + def _build_ssl_domains_set(self, ssls: list) -> Set[str]: + """构建所有已配置 SSL 的域名集合(用于快速查找)""" + ssl_domains = set() + for ssl in ssls: + domains = self.extract_domains_from_ssl(ssl) + ssl_domains.update(domains) + return ssl_domains + + def should_request_cert(self, domain: str, existing_ssl_domains: Set[str]) -> bool: + """判断是否需要申请证书 + + Args: + domain: 要检查的域名 + existing_ssl_domains: 已存在的 SSL 域名集合(用于快速查找) + """ + # 检查是否为有效域名 + if not self._is_valid_domain(domain): return False - # 检查是否已有 SSL 配置(直接检查实际配置,最准确) - ssls = self.get_all_ssls() - for ssl in ssls: - ssl_domains = self.extract_domains_from_ssl(ssl) - if domain in ssl_domains: - logger.info(f"域名已有 SSL 配置: {domain}") - return False + # 检查是否已有 SSL 配置 + if domain in existing_ssl_domains: + logger.info(f"域名已有 SSL 配置: {domain}") + return False return True def process_new_domains(self): """处理新域名""" + # 优化:只获取一次路由和 SSL 配置,避免重复 API 调用 routes = self.get_all_routes() + ssls = self.get_all_ssls() + + # 构建已存在的 SSL 域名集合(用于快速查找) + existing_ssl_domains = self._build_ssl_domains_set(ssls) # 按路由处理,同一路由的多个域名合并到一个证书 for route in routes: @@ -167,8 +199,8 @@ class RouteWatcher: if not domains: continue - # 过滤出需要申请证书的域名 - domains_to_request = [d for d in domains if self.should_request_cert(d)] + # 过滤出需要申请证书的域名(使用缓存的 SSL 域名集合) + domains_to_request = [d for d in domains if self.should_request_cert(d, existing_ssl_domains)] if not domains_to_request: continue