refactor: 优化路由监听器代码,减少重复和冗余

主要优化:

1. 移除未使用的代码:
   - 删除未使用的 import json
   - 删除未使用的 _get_apisix_headers() 方法(headers 已在 session 中设置)

2. 减少代码重复:
   - 提取 _handle_cert_request() 公共方法
   - 统一单个和多个域名的证书申请处理逻辑
   - 减少约 40 行重复代码

3. 代码简化:
   - 简化 process_new_domains() 中的逻辑
   - 提高代码可读性和可维护性

优势:
- 更简洁:移除所有冗余代码
- 更易维护:减少重复,统一处理逻辑
- 更符合 DRY 原则:单一职责,代码复用
This commit is contained in:
jingrow 2026-01-01 20:27:44 +00:00
parent 9d3a4f0f71
commit a5ccb1f951

View File

@ -7,12 +7,11 @@ APISIX 路由监听服务
import os
import sys
import json
import time
import logging
import requests
import ipaddress
from typing import Set, Optional, Dict
from typing import Set, Optional, Dict, List
from datetime import datetime
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
@ -51,13 +50,6 @@ class RouteWatcher:
# 速率限制记录:{domain: retry_after_timestamp}
self.rate_limited_domains: Dict[str, float] = {}
def _get_apisix_headers(self):
"""获取 APISIX Admin API 请求头"""
return {
'X-API-KEY': self.apisix_admin_key,
'Content-Type': 'application/json'
}
def get_all_routes(self) -> list:
"""获取所有路由"""
try:
@ -193,6 +185,53 @@ class RouteWatcher:
return True
def _handle_cert_request(self, primary_domain: str, additional_domains: List[str] = None):
"""处理证书申请(单个或多个域名)
Args:
primary_domain: 主域名
additional_domains: 额外域名列表可选
Returns:
bool: 是否成功
"""
domains_list = [primary_domain] + (additional_domains or [])
if additional_domains:
total_domains = len(additional_domains) + 1
logger.info(f"发现同一路由中的多个域名,合并申请证书: {primary_domain} + {additional_domains}")
else:
logger.info(f"发现新域名,准备申请证书: {primary_domain}")
try:
if self.ssl_manager.request_certificate(primary_domain, additional_domains):
if additional_domains:
logger.info(f"证书申请成功: {primary_domain} (包含 {len(additional_domains) + 1} 个域名)")
else:
logger.info(f"证书申请成功: {primary_domain}")
# 申请成功,清除速率限制记录
for d in domains_list:
self.rate_limited_domains.pop(d, None)
return True
else:
if additional_domains:
logger.error(f"证书申请失败: {primary_domain} + {additional_domains}")
else:
logger.error(f"证书申请失败: {primary_domain}")
return False
except RateLimitError as e:
logger.warning(f"域名 {e.domain} 遇到速率限制,将在 {datetime.fromtimestamp(e.retry_after_timestamp).strftime('%Y-%m-%d %H:%M:%S')} 后自动重试")
# 记录速率限制的域名和重试时间(所有相关域名)
for d in domains_list:
self.rate_limited_domains[d] = e.retry_after_timestamp
return False
except Exception as e:
if additional_domains:
logger.error(f"处理域名异常 {primary_domain} + {additional_domains}: {e}")
else:
logger.error(f"处理域名异常 {primary_domain}: {e}")
return False
def process_new_domains(self):
"""处理新域名"""
# 优化:只获取一次路由和 SSL 配置,避免重复 API 调用
@ -220,48 +259,10 @@ class RouteWatcher:
if not domains_to_request:
continue
# 如果只有一个域名,单独申请
if len(domains_to_request) == 1:
domain = domains_to_request[0]
logger.info(f"发现新域名,准备申请证书: {domain}")
try:
if self.ssl_manager.request_certificate(domain):
logger.info(f"证书申请成功: {domain}")
# 申请成功,清除速率限制记录(如果存在)
self.rate_limited_domains.pop(domain, None)
else:
logger.error(f"证书申请失败: {domain}")
except Exception as e:
# 检查是否是速率限制错误
if isinstance(e, RateLimitError):
logger.warning(f"域名 {e.domain} 遇到速率限制,将在 {datetime.fromtimestamp(e.retry_after_timestamp).strftime('%Y-%m-%d %H:%M:%S')} 后自动重试")
# 记录速率限制的域名和重试时间
self.rate_limited_domains[e.domain] = e.retry_after_timestamp
else:
logger.error(f"处理域名异常 {domain}: {e}")
else:
# 多个域名,合并到一个证书申请(使用 SAN
primary_domain = domains_to_request[0]
additional_domains = domains_to_request[1:]
total_domains = len(additional_domains) + 1
logger.info(f"发现同一路由中的多个域名,合并申请证书: {primary_domain} + {additional_domains}")
try:
if self.ssl_manager.request_certificate(primary_domain, additional_domains):
logger.info(f"证书申请成功: {primary_domain} (包含 {total_domains} 个域名)")
# 申请成功,清除速率限制记录
for d in [primary_domain] + additional_domains:
self.rate_limited_domains.pop(d, None)
else:
logger.error(f"证书申请失败: {primary_domain} + {additional_domains}")
except Exception as e:
# 检查是否是速率限制错误
if isinstance(e, RateLimitError):
logger.warning(f"域名 {e.domain} 遇到速率限制,将在 {datetime.fromtimestamp(e.retry_after_timestamp).strftime('%Y-%m-%d %H:%M:%S')} 后自动重试")
# 记录速率限制的域名和重试时间(所有相关域名)
for d in [primary_domain] + additional_domains:
self.rate_limited_domains[d] = e.retry_after_timestamp
else:
logger.error(f"处理域名异常 {primary_domain} + {additional_domains}: {e}")
# 处理证书申请(单个或多个域名)
primary_domain = domains_to_request[0]
additional_domains = domains_to_request[1:] if len(domains_to_request) > 1 else None
self._handle_cert_request(primary_domain, additional_domains)
def run(self, interval: int = 60):
"""运行监听服务"""