Compare commits

...

3 Commits

Author SHA1 Message Date
a5ccb1f951 refactor: 优化路由监听器代码,减少重复和冗余
主要优化:

1. 移除未使用的代码:
   - 删除未使用的 import json
   - 删除未使用的 _get_apisix_headers() 方法(headers 已在 session 中设置)

2. 减少代码重复:
   - 提取 _handle_cert_request() 公共方法
   - 统一单个和多个域名的证书申请处理逻辑
   - 减少约 40 行重复代码

3. 代码简化:
   - 简化 process_new_domains() 中的逻辑
   - 提高代码可读性和可维护性

优势:
- 更简洁:移除所有冗余代码
- 更易维护:减少重复,统一处理逻辑
- 更符合 DRY 原则:单一职责,代码复用
2026-01-01 20:27:44 +00:00
9d3a4f0f71 fix: 修复速率限制错误仍会重试的问题
问题:
- 遇到速率限制时抛出 RateLimitError 异常
- 但被外层的 except Exception 捕获,继续重试
- 导致持续触发速率限制,无法停止

修复:
- 在异常处理中优先检查 RateLimitError
- 如果是速率限制错误,直接重新抛出,不进行重试
- 让 route_watcher 正确捕获并记录到限制列表

效果:
- 遇到速率限制时立即停止重试
- 正确记录到 rate_limited_domains
- 后续检查自动跳过,直到限制解除
2026-01-01 20:26:01 +00:00
f34c2e28d1 feat: 添加速率限制处理机制,优化代码质量
主要改进:

1. 速率限制处理:
   - 添加 RateLimitError 异常类,用于标识速率限制错误
   - 在 ssl_manager.py 中检测 Let's Encrypt 速率限制错误
   - 解析重试时间,提供详细的错误提示
   - 在 route_watcher.py 中记录被限制的域名和重试时间
   - 自动跳过限制期间的域名,避免持续触发限制
   - 限制解除后自动恢复申请

2. 代码优化:
   - 修复重复导入 sys 的问题
   - 修复 API 调用未使用 session 连接复用的问题
   - 移除未使用的 _get_apisix_headers 方法
   - 将 RateLimitError 导入移到文件顶部

优势:
- 避免持续触发速率限制,形成死循环
- 自动等待限制解除,无需手动干预
- 提升代码质量和可维护性
- 充分利用 HTTP 连接复用,提升性能
2026-01-01 20:24:36 +00:00
2 changed files with 148 additions and 56 deletions

View File

@ -7,15 +7,15 @@ APISIX 路由监听服务
import os
import sys
import json
import time
import logging
import requests
import ipaddress
from typing import Set, Optional, Dict
import sys
from typing import Set, Optional, Dict, List
from datetime import datetime
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from ssl_manager import APISIXSSLManager
from ssl_manager import APISIXSSLManager, RateLimitError
# 配置日志
logging.basicConfig(
@ -46,20 +46,15 @@ class RouteWatcher:
'X-API-KEY': self.apisix_admin_key,
'Content-Type': 'application/json'
})
def _get_apisix_headers(self):
"""获取 APISIX Admin API 请求头"""
return {
'X-API-KEY': self.apisix_admin_key,
'Content-Type': 'application/json'
}
# 速率限制记录:{domain: retry_after_timestamp}
self.rate_limited_domains: Dict[str, float] = {}
def get_all_routes(self) -> list:
"""获取所有路由"""
try:
response = requests.get(
response = self.session.get(
f"{self.apisix_admin_url}/apisix/admin/routes",
headers=self._get_apisix_headers(),
timeout=10
)
@ -77,9 +72,8 @@ class RouteWatcher:
def get_all_ssls(self) -> list:
"""获取所有 SSL 配置"""
try:
response = requests.get(
response = self.session.get(
f"{self.apisix_admin_url}/apisix/admin/ssls",
headers=self._get_apisix_headers(),
timeout=10
)
@ -176,8 +170,68 @@ class RouteWatcher:
logger.info(f"域名已有 SSL 配置: {domain}")
return False
# 检查是否在速率限制期间
if domain in self.rate_limited_domains:
retry_after = self.rate_limited_domains[domain]
current_time = time.time()
if current_time < retry_after:
remaining_minutes = int((retry_after - current_time) / 60) + 1
logger.debug(f"域名 {domain} 仍在速率限制期间,剩余约 {remaining_minutes} 分钟后重试")
return False
else:
# 限制已解除,移除记录
del self.rate_limited_domains[domain]
logger.info(f"域名 {domain} 速率限制已解除,将重新尝试申请证书")
return True
def _handle_cert_request(self, primary_domain: str, additional_domains: List[str] = None):
"""处理证书申请(单个或多个域名)
Args:
primary_domain: 主域名
additional_domains: 额外域名列表可选
Returns:
bool: 是否成功
"""
domains_list = [primary_domain] + (additional_domains or [])
if additional_domains:
total_domains = len(additional_domains) + 1
logger.info(f"发现同一路由中的多个域名,合并申请证书: {primary_domain} + {additional_domains}")
else:
logger.info(f"发现新域名,准备申请证书: {primary_domain}")
try:
if self.ssl_manager.request_certificate(primary_domain, additional_domains):
if additional_domains:
logger.info(f"证书申请成功: {primary_domain} (包含 {len(additional_domains) + 1} 个域名)")
else:
logger.info(f"证书申请成功: {primary_domain}")
# 申请成功,清除速率限制记录
for d in domains_list:
self.rate_limited_domains.pop(d, None)
return True
else:
if additional_domains:
logger.error(f"证书申请失败: {primary_domain} + {additional_domains}")
else:
logger.error(f"证书申请失败: {primary_domain}")
return False
except RateLimitError as e:
logger.warning(f"域名 {e.domain} 遇到速率限制,将在 {datetime.fromtimestamp(e.retry_after_timestamp).strftime('%Y-%m-%d %H:%M:%S')} 后自动重试")
# 记录速率限制的域名和重试时间(所有相关域名)
for d in domains_list:
self.rate_limited_domains[d] = e.retry_after_timestamp
return False
except Exception as e:
if additional_domains:
logger.error(f"处理域名异常 {primary_domain} + {additional_domains}: {e}")
else:
logger.error(f"处理域名异常 {primary_domain}: {e}")
return False
def process_new_domains(self):
"""处理新域名"""
# 优化:只获取一次路由和 SSL 配置,避免重复 API 调用
@ -205,30 +259,10 @@ class RouteWatcher:
if not domains_to_request:
continue
# 如果只有一个域名,单独申请
if len(domains_to_request) == 1:
domain = domains_to_request[0]
logger.info(f"发现新域名,准备申请证书: {domain}")
try:
if self.ssl_manager.request_certificate(domain):
logger.info(f"证书申请成功: {domain}")
else:
logger.error(f"证书申请失败: {domain}")
except Exception as e:
logger.error(f"处理域名异常 {domain}: {e}")
else:
# 多个域名,合并到一个证书申请(使用 SAN
primary_domain = domains_to_request[0]
additional_domains = domains_to_request[1:]
total_domains = len(additional_domains) + 1
logger.info(f"发现同一路由中的多个域名,合并申请证书: {primary_domain} + {additional_domains}")
try:
if self.ssl_manager.request_certificate(primary_domain, additional_domains):
logger.info(f"证书申请成功: {primary_domain} (包含 {total_domains} 个域名)")
else:
logger.error(f"证书申请失败: {primary_domain} + {additional_domains}")
except Exception as e:
logger.error(f"处理域名异常 {primary_domain} + {additional_domains}: {e}")
# 处理证书申请(单个或多个域名)
primary_domain = domains_to_request[0]
additional_domains = domains_to_request[1:] if len(domains_to_request) > 1 else None
self._handle_cert_request(primary_domain, additional_domains)
def run(self, interval: int = 60):
"""运行监听服务"""

View File

@ -32,6 +32,14 @@ logging.basicConfig(
logger = logging.getLogger(__name__)
class RateLimitError(Exception):
"""速率限制异常"""
def __init__(self, domain: str, retry_after_timestamp: float):
self.domain = domain
self.retry_after_timestamp = retry_after_timestamp
super().__init__(f"域名 {domain} 遇到速率限制,请在 {datetime.fromtimestamp(retry_after_timestamp).strftime('%Y-%m-%d %H:%M:%S')} 后重试")
# 默认配置(可通过环境变量覆盖)
DEFAULT_CONFIG = {
'apisix_admin_url': 'http://localhost:9180',
@ -345,30 +353,80 @@ class APISIXSSLManager:
logger.error(f"无法读取证书文件: {domain}")
return False
else:
# 检查是否是网络超时错误
# 检查错误类型
error_output = result.stderr or ""
stdout_output = result.stdout or ""
combined_output = error_output + stdout_output
# 检查是否是速率限制错误
is_rate_limit = "too many" in combined_output.lower() or "rate limit" in combined_output.lower() or "retry after" in combined_output.lower()
# 检查是否是网络超时错误
is_timeout_error = "ReadTimeout" in error_output or "timed out" in error_output.lower()
# 提取重试时间(如果有)
retry_after_timestamp = None
if is_rate_limit:
retry_match = re.search(r'retry after ([0-9-: ]+)', combined_output, re.IGNORECASE)
if retry_match:
try:
retry_time_str = retry_match.group(1).strip()
# 解析时间字符串格式2026-01-01 20:17:09 UTC
retry_datetime = datetime.strptime(retry_time_str, '%Y-%m-%d %H:%M:%S %Z')
retry_after_timestamp = retry_datetime.timestamp()
except Exception:
# 如果解析失败,默认等待 1 小时
retry_after_timestamp = time.time() + 3600
else:
# 如果没有找到具体时间,默认等待 1 小时
retry_after_timestamp = time.time() + 3600
logger.error(f"证书申请失败 (退出码: {result.returncode})")
if result.stdout:
logger.error(f"标准输出: {result.stdout}")
if result.stderr:
logger.error(f"错误输出: {result.stderr}")
# 处理速率限制错误(不应该重试,直接返回)
if is_rate_limit:
logger.error("=" * 60)
logger.error("⚠️ 遇到 Let's Encrypt 速率限制")
logger.error("=" * 60)
logger.error("可能的原因:")
logger.error("1. 在短时间内申请了太多证书")
logger.error("2. 多次验证失败(可能是 webroot 路由配置问题)")
logger.error("3. 达到了 Let's Encrypt 的速率限制")
if retry_after_timestamp:
retry_datetime = datetime.fromtimestamp(retry_after_timestamp)
logger.error(f"建议在以下时间后重试: {retry_datetime.strftime('%Y-%m-%d %H:%M:%S')}")
else:
logger.error("建议等待 1 小时后再重试")
logger.error("")
logger.error("解决方案:")
logger.error("1. 检查 webroot 路由是否正确配置")
logger.error("2. 确保域名可以正常访问 /.well-known/acme-challenge/ 路径")
logger.error("3. 等待速率限制解除后再重试")
logger.error("4. 查看详细日志: /var/log/letsencrypt/letsencrypt.log")
logger.error("=" * 60)
# 返回特殊值,表示速率限制(需要外部处理)
raise RateLimitError(domain, retry_after_timestamp or (time.time() + 3600))
# 处理网络超时错误
if is_timeout_error and attempt < max_retries:
logger.warning(f"证书申请网络超时 (尝试 {attempt}/{max_retries}),将重试...")
continue
else:
logger.error(f"证书申请失败 (退出码: {result.returncode})")
if result.stdout:
logger.error(f"标准输出: {result.stdout}")
if result.stderr:
logger.error(f"错误输出: {result.stderr}")
# 如果是网络超时且已尝试所有次数,给出提示
if is_timeout_error:
logger.error("网络连接超时,可能的原因:")
logger.error("1. 服务器无法访问 Let's Encrypt 服务器 (acme-staging-v02.api.letsencrypt.org 或 acme-v02.api.letsencrypt.org)")
logger.error("2. 防火墙阻止了 HTTPS 连接")
logger.error("3. 网络不稳定,建议稍后重试")
logger.error("4. 可以检查网络连接: curl -I https://acme-staging-v02.api.letsencrypt.org/directory")
return False
elif is_timeout_error:
logger.error("网络连接超时,可能的原因:")
logger.error("1. 服务器无法访问 Let's Encrypt 服务器 (acme-staging-v02.api.letsencrypt.org 或 acme-v02.api.letsencrypt.org)")
logger.error("2. 防火墙阻止了 HTTPS 连接")
logger.error("3. 网络不稳定,建议稍后重试")
logger.error("4. 可以检查网络连接: curl -I https://acme-staging-v02.api.letsencrypt.org/directory")
return False
except RateLimitError:
# 速率限制错误不应该重试,直接抛出
raise
except subprocess.TimeoutExpired:
if attempt < max_retries:
logger.warning(f"证书申请超时 (尝试 {attempt}/{max_retries}),将重试...")