feat: 添加速率限制处理机制,优化代码质量

主要改进:

1. 速率限制处理:
   - 添加 RateLimitError 异常类,用于标识速率限制错误
   - 在 ssl_manager.py 中检测 Let's Encrypt 速率限制错误
   - 解析重试时间,提供详细的错误提示
   - 在 route_watcher.py 中记录被限制的域名和重试时间
   - 自动跳过限制期间的域名,避免持续触发限制
   - 限制解除后自动恢复申请

2. 代码优化:
   - 修复重复导入 sys 的问题
   - 修复 API 调用未使用 session 连接复用的问题
   - 移除未使用的 _get_apisix_headers 方法
   - 将 RateLimitError 导入移到文件顶部

优势:
- 避免持续触发速率限制,形成死循环
- 自动等待限制解除,无需手动干预
- 提升代码质量和可维护性
- 充分利用 HTTP 连接复用,提升性能
This commit is contained in:
jingrow 2026-01-01 20:24:36 +00:00
parent 5d60b822e2
commit f34c2e28d1
2 changed files with 113 additions and 25 deletions

View File

@ -13,9 +13,10 @@ import logging
import requests
import ipaddress
from typing import Set, Optional, Dict
import sys
from datetime import datetime
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from ssl_manager import APISIXSSLManager
from ssl_manager import APISIXSSLManager, RateLimitError
# 配置日志
logging.basicConfig(
@ -46,6 +47,9 @@ class RouteWatcher:
'X-API-KEY': self.apisix_admin_key,
'Content-Type': 'application/json'
})
# 速率限制记录:{domain: retry_after_timestamp}
self.rate_limited_domains: Dict[str, float] = {}
def _get_apisix_headers(self):
"""获取 APISIX Admin API 请求头"""
@ -57,9 +61,8 @@ class RouteWatcher:
def get_all_routes(self) -> list:
"""获取所有路由"""
try:
response = requests.get(
response = self.session.get(
f"{self.apisix_admin_url}/apisix/admin/routes",
headers=self._get_apisix_headers(),
timeout=10
)
@ -77,9 +80,8 @@ class RouteWatcher:
def get_all_ssls(self) -> list:
"""获取所有 SSL 配置"""
try:
response = requests.get(
response = self.session.get(
f"{self.apisix_admin_url}/apisix/admin/ssls",
headers=self._get_apisix_headers(),
timeout=10
)
@ -176,6 +178,19 @@ class RouteWatcher:
logger.info(f"域名已有 SSL 配置: {domain}")
return False
# 检查是否在速率限制期间
if domain in self.rate_limited_domains:
retry_after = self.rate_limited_domains[domain]
current_time = time.time()
if current_time < retry_after:
remaining_minutes = int((retry_after - current_time) / 60) + 1
logger.debug(f"域名 {domain} 仍在速率限制期间,剩余约 {remaining_minutes} 分钟后重试")
return False
else:
# 限制已解除,移除记录
del self.rate_limited_domains[domain]
logger.info(f"域名 {domain} 速率限制已解除,将重新尝试申请证书")
return True
def process_new_domains(self):
@ -212,10 +227,18 @@ class RouteWatcher:
try:
if self.ssl_manager.request_certificate(domain):
logger.info(f"证书申请成功: {domain}")
# 申请成功,清除速率限制记录(如果存在)
self.rate_limited_domains.pop(domain, None)
else:
logger.error(f"证书申请失败: {domain}")
except Exception as e:
logger.error(f"处理域名异常 {domain}: {e}")
# 检查是否是速率限制错误
if isinstance(e, RateLimitError):
logger.warning(f"域名 {e.domain} 遇到速率限制,将在 {datetime.fromtimestamp(e.retry_after_timestamp).strftime('%Y-%m-%d %H:%M:%S')} 后自动重试")
# 记录速率限制的域名和重试时间
self.rate_limited_domains[e.domain] = e.retry_after_timestamp
else:
logger.error(f"处理域名异常 {domain}: {e}")
else:
# 多个域名,合并到一个证书申请(使用 SAN
primary_domain = domains_to_request[0]
@ -225,10 +248,20 @@ class RouteWatcher:
try:
if self.ssl_manager.request_certificate(primary_domain, additional_domains):
logger.info(f"证书申请成功: {primary_domain} (包含 {total_domains} 个域名)")
# 申请成功,清除速率限制记录
for d in [primary_domain] + additional_domains:
self.rate_limited_domains.pop(d, None)
else:
logger.error(f"证书申请失败: {primary_domain} + {additional_domains}")
except Exception as e:
logger.error(f"处理域名异常 {primary_domain} + {additional_domains}: {e}")
# 检查是否是速率限制错误
if isinstance(e, RateLimitError):
logger.warning(f"域名 {e.domain} 遇到速率限制,将在 {datetime.fromtimestamp(e.retry_after_timestamp).strftime('%Y-%m-%d %H:%M:%S')} 后自动重试")
# 记录速率限制的域名和重试时间(所有相关域名)
for d in [primary_domain] + additional_domains:
self.rate_limited_domains[d] = e.retry_after_timestamp
else:
logger.error(f"处理域名异常 {primary_domain} + {additional_domains}: {e}")
def run(self, interval: int = 60):
"""运行监听服务"""

View File

@ -32,6 +32,14 @@ logging.basicConfig(
logger = logging.getLogger(__name__)
class RateLimitError(Exception):
"""速率限制异常"""
def __init__(self, domain: str, retry_after_timestamp: float):
self.domain = domain
self.retry_after_timestamp = retry_after_timestamp
super().__init__(f"域名 {domain} 遇到速率限制,请在 {datetime.fromtimestamp(retry_after_timestamp).strftime('%Y-%m-%d %H:%M:%S')} 后重试")
# 默认配置(可通过环境变量覆盖)
DEFAULT_CONFIG = {
'apisix_admin_url': 'http://localhost:9180',
@ -345,29 +353,76 @@ class APISIXSSLManager:
logger.error(f"无法读取证书文件: {domain}")
return False
else:
# 检查是否是网络超时错误
# 检查错误类型
error_output = result.stderr or ""
stdout_output = result.stdout or ""
combined_output = error_output + stdout_output
# 检查是否是速率限制错误
is_rate_limit = "too many" in combined_output.lower() or "rate limit" in combined_output.lower() or "retry after" in combined_output.lower()
# 检查是否是网络超时错误
is_timeout_error = "ReadTimeout" in error_output or "timed out" in error_output.lower()
# 提取重试时间(如果有)
retry_after_timestamp = None
if is_rate_limit:
retry_match = re.search(r'retry after ([0-9-: ]+)', combined_output, re.IGNORECASE)
if retry_match:
try:
retry_time_str = retry_match.group(1).strip()
# 解析时间字符串格式2026-01-01 20:17:09 UTC
retry_datetime = datetime.strptime(retry_time_str, '%Y-%m-%d %H:%M:%S %Z')
retry_after_timestamp = retry_datetime.timestamp()
except Exception:
# 如果解析失败,默认等待 1 小时
retry_after_timestamp = time.time() + 3600
else:
# 如果没有找到具体时间,默认等待 1 小时
retry_after_timestamp = time.time() + 3600
logger.error(f"证书申请失败 (退出码: {result.returncode})")
if result.stdout:
logger.error(f"标准输出: {result.stdout}")
if result.stderr:
logger.error(f"错误输出: {result.stderr}")
# 处理速率限制错误(不应该重试,直接返回)
if is_rate_limit:
logger.error("=" * 60)
logger.error("⚠️ 遇到 Let's Encrypt 速率限制")
logger.error("=" * 60)
logger.error("可能的原因:")
logger.error("1. 在短时间内申请了太多证书")
logger.error("2. 多次验证失败(可能是 webroot 路由配置问题)")
logger.error("3. 达到了 Let's Encrypt 的速率限制")
if retry_after_timestamp:
retry_datetime = datetime.fromtimestamp(retry_after_timestamp)
logger.error(f"建议在以下时间后重试: {retry_datetime.strftime('%Y-%m-%d %H:%M:%S')}")
else:
logger.error("建议等待 1 小时后再重试")
logger.error("")
logger.error("解决方案:")
logger.error("1. 检查 webroot 路由是否正确配置")
logger.error("2. 确保域名可以正常访问 /.well-known/acme-challenge/ 路径")
logger.error("3. 等待速率限制解除后再重试")
logger.error("4. 查看详细日志: /var/log/letsencrypt/letsencrypt.log")
logger.error("=" * 60)
# 返回特殊值,表示速率限制(需要外部处理)
raise RateLimitError(domain, retry_after_timestamp or (time.time() + 3600))
# 处理网络超时错误
if is_timeout_error and attempt < max_retries:
logger.warning(f"证书申请网络超时 (尝试 {attempt}/{max_retries}),将重试...")
continue
else:
logger.error(f"证书申请失败 (退出码: {result.returncode})")
if result.stdout:
logger.error(f"标准输出: {result.stdout}")
if result.stderr:
logger.error(f"错误输出: {result.stderr}")
# 如果是网络超时且已尝试所有次数,给出提示
if is_timeout_error:
logger.error("网络连接超时,可能的原因:")
logger.error("1. 服务器无法访问 Let's Encrypt 服务器 (acme-staging-v02.api.letsencrypt.org 或 acme-v02.api.letsencrypt.org)")
logger.error("2. 防火墙阻止了 HTTPS 连接")
logger.error("3. 网络不稳定,建议稍后重试")
logger.error("4. 可以检查网络连接: curl -I https://acme-staging-v02.api.letsencrypt.org/directory")
return False
elif is_timeout_error:
logger.error("网络连接超时,可能的原因:")
logger.error("1. 服务器无法访问 Let's Encrypt 服务器 (acme-staging-v02.api.letsencrypt.org 或 acme-v02.api.letsencrypt.org)")
logger.error("2. 防火墙阻止了 HTTPS 连接")
logger.error("3. 网络不稳定,建议稍后重试")
logger.error("4. 可以检查网络连接: curl -I https://acme-staging-v02.api.letsencrypt.org/directory")
return False
except subprocess.TimeoutExpired:
if attempt < max_retries: