Compare commits
2 Commits
7c464126a9
...
d81d3362af
| Author | SHA1 | Date | |
|---|---|---|---|
| d81d3362af | |||
| b30fafd34f |
@ -11,7 +11,8 @@ import json
|
||||
import time
|
||||
import logging
|
||||
import requests
|
||||
from typing import Set, Optional
|
||||
import ipaddress
|
||||
from typing import Set, Optional, Dict
|
||||
import sys
|
||||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||||
from ssl_manager import APISIXSSLManager
|
||||
@ -39,7 +40,12 @@ class RouteWatcher:
|
||||
self.apisix_admin_url = os.getenv('APISIX_ADMIN_URL', 'http://localhost:9180')
|
||||
self.apisix_admin_key = os.getenv('APISIX_ADMIN_KEY', '8206e6e42b6b53243c52a767cc633137')
|
||||
|
||||
# 不再使用已处理列表,直接检查实际 SSL 配置
|
||||
# 创建 HTTP 会话,复用连接
|
||||
self.session = requests.Session()
|
||||
self.session.headers.update({
|
||||
'X-API-KEY': self.apisix_admin_key,
|
||||
'Content-Type': 'application/json'
|
||||
})
|
||||
|
||||
def _get_apisix_headers(self):
|
||||
"""获取 APISIX Admin API 请求头"""
|
||||
@ -131,29 +137,55 @@ class RouteWatcher:
|
||||
domains.update(snis)
|
||||
return domains
|
||||
|
||||
def should_request_cert(self, domain: str) -> bool:
|
||||
"""判断是否需要申请证书"""
|
||||
def _is_valid_domain(self, domain: str) -> bool:
|
||||
"""检查是否为有效域名(非 IP 地址和本地域名)"""
|
||||
# 跳过本地域名
|
||||
if domain in ['localhost', '127.0.0.1', '0.0.0.0']:
|
||||
return False
|
||||
|
||||
# 跳过 IP 地址
|
||||
if domain.replace('.', '').isdigit():
|
||||
# 检查是否为 IP 地址(支持 IPv4 和 IPv6)
|
||||
try:
|
||||
ipaddress.ip_address(domain)
|
||||
return False
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
return True
|
||||
|
||||
def _build_ssl_domains_set(self, ssls: list) -> Set[str]:
|
||||
"""构建所有已配置 SSL 的域名集合(用于快速查找)"""
|
||||
ssl_domains = set()
|
||||
for ssl in ssls:
|
||||
domains = self.extract_domains_from_ssl(ssl)
|
||||
ssl_domains.update(domains)
|
||||
return ssl_domains
|
||||
|
||||
def should_request_cert(self, domain: str, existing_ssl_domains: Set[str]) -> bool:
|
||||
"""判断是否需要申请证书
|
||||
|
||||
Args:
|
||||
domain: 要检查的域名
|
||||
existing_ssl_domains: 已存在的 SSL 域名集合(用于快速查找)
|
||||
"""
|
||||
# 检查是否为有效域名
|
||||
if not self._is_valid_domain(domain):
|
||||
return False
|
||||
|
||||
# 检查是否已有 SSL 配置(直接检查实际配置,最准确)
|
||||
ssls = self.get_all_ssls()
|
||||
for ssl in ssls:
|
||||
ssl_domains = self.extract_domains_from_ssl(ssl)
|
||||
if domain in ssl_domains:
|
||||
logger.info(f"域名已有 SSL 配置: {domain}")
|
||||
return False
|
||||
# 检查是否已有 SSL 配置
|
||||
if domain in existing_ssl_domains:
|
||||
logger.info(f"域名已有 SSL 配置: {domain}")
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def process_new_domains(self):
|
||||
"""处理新域名"""
|
||||
# 优化:只获取一次路由和 SSL 配置,避免重复 API 调用
|
||||
routes = self.get_all_routes()
|
||||
ssls = self.get_all_ssls()
|
||||
|
||||
# 构建已存在的 SSL 域名集合(用于快速查找)
|
||||
existing_ssl_domains = self._build_ssl_domains_set(ssls)
|
||||
|
||||
# 按路由处理,同一路由的多个域名合并到一个证书
|
||||
for route in routes:
|
||||
@ -167,8 +199,8 @@ class RouteWatcher:
|
||||
if not domains:
|
||||
continue
|
||||
|
||||
# 过滤出需要申请证书的域名
|
||||
domains_to_request = [d for d in domains if self.should_request_cert(d)]
|
||||
# 过滤出需要申请证书的域名(使用缓存的 SSL 域名集合)
|
||||
domains_to_request = [d for d in domains if self.should_request_cert(d, existing_ssl_domains)]
|
||||
|
||||
if not domains_to_request:
|
||||
continue
|
||||
|
||||
@ -18,8 +18,7 @@ import requests
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import Optional, List, Dict
|
||||
from datetime import datetime, timedelta
|
||||
import base64
|
||||
from datetime import datetime
|
||||
|
||||
# 配置日志
|
||||
logging.basicConfig(
|
||||
@ -65,6 +64,10 @@ class APISIXSSLManager:
|
||||
|
||||
# 验证配置
|
||||
self._validate_config()
|
||||
|
||||
# 创建 HTTP 会话,复用连接
|
||||
self.session = requests.Session()
|
||||
self.session.headers.update(self._get_apisix_headers())
|
||||
|
||||
def load_config(self, config_path: str):
|
||||
"""从配置文件加载配置(可选,用于覆盖默认配置)"""
|
||||
@ -221,13 +224,11 @@ class APISIXSSLManager:
|
||||
|
||||
logger.info(f"配置 SNI 域名列表: {cert_domains}")
|
||||
|
||||
headers = self._get_apisix_headers()
|
||||
|
||||
try:
|
||||
# 先检查是否已存在相同 SNI 的配置
|
||||
# 方法1:通过 ID 查找(如果之前创建时使用了这个 ID)
|
||||
check_url = f"{self.apisix_admin_url}/apisix/admin/ssls/{ssl_id}"
|
||||
response = requests.get(check_url, headers=headers, timeout=10)
|
||||
response = self.session.get(check_url, timeout=10)
|
||||
|
||||
existing_ssl_id = None
|
||||
if response.status_code == 200:
|
||||
@ -236,7 +237,7 @@ class APISIXSSLManager:
|
||||
else:
|
||||
# 方法2:查询所有 SSL 配置,检查是否有相同 SNI 的配置
|
||||
all_ssls_url = f"{self.apisix_admin_url}/apisix/admin/ssls"
|
||||
all_response = requests.get(all_ssls_url, headers=headers, timeout=10)
|
||||
all_response = self.session.get(all_ssls_url, timeout=10)
|
||||
if all_response.status_code == 200:
|
||||
all_ssls = all_response.json()
|
||||
ssl_list = all_ssls.get('list', []) if isinstance(all_ssls, dict) else all_ssls
|
||||
@ -261,18 +262,16 @@ class APISIXSSLManager:
|
||||
# 更新现有证书(更新时需要 id)
|
||||
logger.info(f"更新 APISIX SSL 配置: {domain} (ID: {existing_ssl_id})")
|
||||
ssl_config["id"] = existing_ssl_id
|
||||
response = requests.put(
|
||||
response = self.session.put(
|
||||
f"{self.apisix_admin_url}/apisix/admin/ssls/{existing_ssl_id}",
|
||||
headers=headers,
|
||||
json=ssl_config,
|
||||
timeout=10
|
||||
)
|
||||
else:
|
||||
# 创建新证书(POST 时不包含 id,让 APISIX 自动生成)
|
||||
logger.info(f"创建 APISIX SSL 配置: {domain}")
|
||||
response = requests.post(
|
||||
response = self.session.post(
|
||||
f"{self.apisix_admin_url}/apisix/admin/ssls",
|
||||
headers=headers,
|
||||
json=ssl_config,
|
||||
timeout=10
|
||||
)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user