diff --git a/ssl_manager/route_watcher.py b/ssl_manager/route_watcher.py index 53be08d..b299873 100755 --- a/ssl_manager/route_watcher.py +++ b/ssl_manager/route_watcher.py @@ -7,12 +7,11 @@ APISIX 路由监听服务 import os import sys -import json import time import logging import requests import ipaddress -from typing import Set, Optional, Dict +from typing import Set, Optional, Dict, List from datetime import datetime sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) @@ -51,13 +50,6 @@ class RouteWatcher: # 速率限制记录:{domain: retry_after_timestamp} self.rate_limited_domains: Dict[str, float] = {} - def _get_apisix_headers(self): - """获取 APISIX Admin API 请求头""" - return { - 'X-API-KEY': self.apisix_admin_key, - 'Content-Type': 'application/json' - } - def get_all_routes(self) -> list: """获取所有路由""" try: @@ -193,6 +185,53 @@ class RouteWatcher: return True + def _handle_cert_request(self, primary_domain: str, additional_domains: List[str] = None): + """处理证书申请(单个或多个域名) + + Args: + primary_domain: 主域名 + additional_domains: 额外域名列表(可选) + + Returns: + bool: 是否成功 + """ + domains_list = [primary_domain] + (additional_domains or []) + + if additional_domains: + total_domains = len(additional_domains) + 1 + logger.info(f"发现同一路由中的多个域名,合并申请证书: {primary_domain} + {additional_domains}") + else: + logger.info(f"发现新域名,准备申请证书: {primary_domain}") + + try: + if self.ssl_manager.request_certificate(primary_domain, additional_domains): + if additional_domains: + logger.info(f"证书申请成功: {primary_domain} (包含 {len(additional_domains) + 1} 个域名)") + else: + logger.info(f"证书申请成功: {primary_domain}") + # 申请成功,清除速率限制记录 + for d in domains_list: + self.rate_limited_domains.pop(d, None) + return True + else: + if additional_domains: + logger.error(f"证书申请失败: {primary_domain} + {additional_domains}") + else: + logger.error(f"证书申请失败: {primary_domain}") + return False + except RateLimitError as e: + logger.warning(f"域名 {e.domain} 遇到速率限制,将在 {datetime.fromtimestamp(e.retry_after_timestamp).strftime('%Y-%m-%d %H:%M:%S')} 后自动重试") + # 记录速率限制的域名和重试时间(所有相关域名) + for d in domains_list: + self.rate_limited_domains[d] = e.retry_after_timestamp + return False + except Exception as e: + if additional_domains: + logger.error(f"处理域名异常 {primary_domain} + {additional_domains}: {e}") + else: + logger.error(f"处理域名异常 {primary_domain}: {e}") + return False + def process_new_domains(self): """处理新域名""" # 优化:只获取一次路由和 SSL 配置,避免重复 API 调用 @@ -220,48 +259,10 @@ class RouteWatcher: if not domains_to_request: continue - # 如果只有一个域名,单独申请 - if len(domains_to_request) == 1: - domain = domains_to_request[0] - logger.info(f"发现新域名,准备申请证书: {domain}") - try: - if self.ssl_manager.request_certificate(domain): - logger.info(f"证书申请成功: {domain}") - # 申请成功,清除速率限制记录(如果存在) - self.rate_limited_domains.pop(domain, None) - else: - logger.error(f"证书申请失败: {domain}") - except Exception as e: - # 检查是否是速率限制错误 - if isinstance(e, RateLimitError): - logger.warning(f"域名 {e.domain} 遇到速率限制,将在 {datetime.fromtimestamp(e.retry_after_timestamp).strftime('%Y-%m-%d %H:%M:%S')} 后自动重试") - # 记录速率限制的域名和重试时间 - self.rate_limited_domains[e.domain] = e.retry_after_timestamp - else: - logger.error(f"处理域名异常 {domain}: {e}") - else: - # 多个域名,合并到一个证书申请(使用 SAN) - primary_domain = domains_to_request[0] - additional_domains = domains_to_request[1:] - total_domains = len(additional_domains) + 1 - logger.info(f"发现同一路由中的多个域名,合并申请证书: {primary_domain} + {additional_domains}") - try: - if self.ssl_manager.request_certificate(primary_domain, additional_domains): - logger.info(f"证书申请成功: {primary_domain} (包含 {total_domains} 个域名)") - # 申请成功,清除速率限制记录 - for d in [primary_domain] + additional_domains: - self.rate_limited_domains.pop(d, None) - else: - logger.error(f"证书申请失败: {primary_domain} + {additional_domains}") - except Exception as e: - # 检查是否是速率限制错误 - if isinstance(e, RateLimitError): - logger.warning(f"域名 {e.domain} 遇到速率限制,将在 {datetime.fromtimestamp(e.retry_after_timestamp).strftime('%Y-%m-%d %H:%M:%S')} 后自动重试") - # 记录速率限制的域名和重试时间(所有相关域名) - for d in [primary_domain] + additional_domains: - self.rate_limited_domains[d] = e.retry_after_timestamp - else: - logger.error(f"处理域名异常 {primary_domain} + {additional_domains}: {e}") + # 处理证书申请(单个或多个域名) + primary_domain = domains_to_request[0] + additional_domains = domains_to_request[1:] if len(domains_to_request) > 1 else None + self._handle_cert_request(primary_domain, additional_domains) def run(self, interval: int = 60): """运行监听服务"""