Compare commits
2 Commits
7c464126a9
...
d81d3362af
| Author | SHA1 | Date | |
|---|---|---|---|
| d81d3362af | |||
| b30fafd34f |
@ -11,7 +11,8 @@ import json
|
|||||||
import time
|
import time
|
||||||
import logging
|
import logging
|
||||||
import requests
|
import requests
|
||||||
from typing import Set, Optional
|
import ipaddress
|
||||||
|
from typing import Set, Optional, Dict
|
||||||
import sys
|
import sys
|
||||||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||||||
from ssl_manager import APISIXSSLManager
|
from ssl_manager import APISIXSSLManager
|
||||||
@ -39,7 +40,12 @@ class RouteWatcher:
|
|||||||
self.apisix_admin_url = os.getenv('APISIX_ADMIN_URL', 'http://localhost:9180')
|
self.apisix_admin_url = os.getenv('APISIX_ADMIN_URL', 'http://localhost:9180')
|
||||||
self.apisix_admin_key = os.getenv('APISIX_ADMIN_KEY', '8206e6e42b6b53243c52a767cc633137')
|
self.apisix_admin_key = os.getenv('APISIX_ADMIN_KEY', '8206e6e42b6b53243c52a767cc633137')
|
||||||
|
|
||||||
# 不再使用已处理列表,直接检查实际 SSL 配置
|
# 创建 HTTP 会话,复用连接
|
||||||
|
self.session = requests.Session()
|
||||||
|
self.session.headers.update({
|
||||||
|
'X-API-KEY': self.apisix_admin_key,
|
||||||
|
'Content-Type': 'application/json'
|
||||||
|
})
|
||||||
|
|
||||||
def _get_apisix_headers(self):
|
def _get_apisix_headers(self):
|
||||||
"""获取 APISIX Admin API 请求头"""
|
"""获取 APISIX Admin API 请求头"""
|
||||||
@ -131,29 +137,55 @@ class RouteWatcher:
|
|||||||
domains.update(snis)
|
domains.update(snis)
|
||||||
return domains
|
return domains
|
||||||
|
|
||||||
def should_request_cert(self, domain: str) -> bool:
|
def _is_valid_domain(self, domain: str) -> bool:
|
||||||
"""判断是否需要申请证书"""
|
"""检查是否为有效域名(非 IP 地址和本地域名)"""
|
||||||
# 跳过本地域名
|
# 跳过本地域名
|
||||||
if domain in ['localhost', '127.0.0.1', '0.0.0.0']:
|
if domain in ['localhost', '127.0.0.1', '0.0.0.0']:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# 跳过 IP 地址
|
# 检查是否为 IP 地址(支持 IPv4 和 IPv6)
|
||||||
if domain.replace('.', '').isdigit():
|
try:
|
||||||
|
ipaddress.ip_address(domain)
|
||||||
|
return False
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
def _build_ssl_domains_set(self, ssls: list) -> Set[str]:
|
||||||
|
"""构建所有已配置 SSL 的域名集合(用于快速查找)"""
|
||||||
|
ssl_domains = set()
|
||||||
|
for ssl in ssls:
|
||||||
|
domains = self.extract_domains_from_ssl(ssl)
|
||||||
|
ssl_domains.update(domains)
|
||||||
|
return ssl_domains
|
||||||
|
|
||||||
|
def should_request_cert(self, domain: str, existing_ssl_domains: Set[str]) -> bool:
|
||||||
|
"""判断是否需要申请证书
|
||||||
|
|
||||||
|
Args:
|
||||||
|
domain: 要检查的域名
|
||||||
|
existing_ssl_domains: 已存在的 SSL 域名集合(用于快速查找)
|
||||||
|
"""
|
||||||
|
# 检查是否为有效域名
|
||||||
|
if not self._is_valid_domain(domain):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# 检查是否已有 SSL 配置(直接检查实际配置,最准确)
|
# 检查是否已有 SSL 配置
|
||||||
ssls = self.get_all_ssls()
|
if domain in existing_ssl_domains:
|
||||||
for ssl in ssls:
|
logger.info(f"域名已有 SSL 配置: {domain}")
|
||||||
ssl_domains = self.extract_domains_from_ssl(ssl)
|
return False
|
||||||
if domain in ssl_domains:
|
|
||||||
logger.info(f"域名已有 SSL 配置: {domain}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def process_new_domains(self):
|
def process_new_domains(self):
|
||||||
"""处理新域名"""
|
"""处理新域名"""
|
||||||
|
# 优化:只获取一次路由和 SSL 配置,避免重复 API 调用
|
||||||
routes = self.get_all_routes()
|
routes = self.get_all_routes()
|
||||||
|
ssls = self.get_all_ssls()
|
||||||
|
|
||||||
|
# 构建已存在的 SSL 域名集合(用于快速查找)
|
||||||
|
existing_ssl_domains = self._build_ssl_domains_set(ssls)
|
||||||
|
|
||||||
# 按路由处理,同一路由的多个域名合并到一个证书
|
# 按路由处理,同一路由的多个域名合并到一个证书
|
||||||
for route in routes:
|
for route in routes:
|
||||||
@ -167,8 +199,8 @@ class RouteWatcher:
|
|||||||
if not domains:
|
if not domains:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# 过滤出需要申请证书的域名
|
# 过滤出需要申请证书的域名(使用缓存的 SSL 域名集合)
|
||||||
domains_to_request = [d for d in domains if self.should_request_cert(d)]
|
domains_to_request = [d for d in domains if self.should_request_cert(d, existing_ssl_domains)]
|
||||||
|
|
||||||
if not domains_to_request:
|
if not domains_to_request:
|
||||||
continue
|
continue
|
||||||
|
|||||||
@ -18,8 +18,7 @@ import requests
|
|||||||
import logging
|
import logging
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Optional, List, Dict
|
from typing import Optional, List, Dict
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime
|
||||||
import base64
|
|
||||||
|
|
||||||
# 配置日志
|
# 配置日志
|
||||||
logging.basicConfig(
|
logging.basicConfig(
|
||||||
@ -65,6 +64,10 @@ class APISIXSSLManager:
|
|||||||
|
|
||||||
# 验证配置
|
# 验证配置
|
||||||
self._validate_config()
|
self._validate_config()
|
||||||
|
|
||||||
|
# 创建 HTTP 会话,复用连接
|
||||||
|
self.session = requests.Session()
|
||||||
|
self.session.headers.update(self._get_apisix_headers())
|
||||||
|
|
||||||
def load_config(self, config_path: str):
|
def load_config(self, config_path: str):
|
||||||
"""从配置文件加载配置(可选,用于覆盖默认配置)"""
|
"""从配置文件加载配置(可选,用于覆盖默认配置)"""
|
||||||
@ -221,13 +224,11 @@ class APISIXSSLManager:
|
|||||||
|
|
||||||
logger.info(f"配置 SNI 域名列表: {cert_domains}")
|
logger.info(f"配置 SNI 域名列表: {cert_domains}")
|
||||||
|
|
||||||
headers = self._get_apisix_headers()
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# 先检查是否已存在相同 SNI 的配置
|
# 先检查是否已存在相同 SNI 的配置
|
||||||
# 方法1:通过 ID 查找(如果之前创建时使用了这个 ID)
|
# 方法1:通过 ID 查找(如果之前创建时使用了这个 ID)
|
||||||
check_url = f"{self.apisix_admin_url}/apisix/admin/ssls/{ssl_id}"
|
check_url = f"{self.apisix_admin_url}/apisix/admin/ssls/{ssl_id}"
|
||||||
response = requests.get(check_url, headers=headers, timeout=10)
|
response = self.session.get(check_url, timeout=10)
|
||||||
|
|
||||||
existing_ssl_id = None
|
existing_ssl_id = None
|
||||||
if response.status_code == 200:
|
if response.status_code == 200:
|
||||||
@ -236,7 +237,7 @@ class APISIXSSLManager:
|
|||||||
else:
|
else:
|
||||||
# 方法2:查询所有 SSL 配置,检查是否有相同 SNI 的配置
|
# 方法2:查询所有 SSL 配置,检查是否有相同 SNI 的配置
|
||||||
all_ssls_url = f"{self.apisix_admin_url}/apisix/admin/ssls"
|
all_ssls_url = f"{self.apisix_admin_url}/apisix/admin/ssls"
|
||||||
all_response = requests.get(all_ssls_url, headers=headers, timeout=10)
|
all_response = self.session.get(all_ssls_url, timeout=10)
|
||||||
if all_response.status_code == 200:
|
if all_response.status_code == 200:
|
||||||
all_ssls = all_response.json()
|
all_ssls = all_response.json()
|
||||||
ssl_list = all_ssls.get('list', []) if isinstance(all_ssls, dict) else all_ssls
|
ssl_list = all_ssls.get('list', []) if isinstance(all_ssls, dict) else all_ssls
|
||||||
@ -261,18 +262,16 @@ class APISIXSSLManager:
|
|||||||
# 更新现有证书(更新时需要 id)
|
# 更新现有证书(更新时需要 id)
|
||||||
logger.info(f"更新 APISIX SSL 配置: {domain} (ID: {existing_ssl_id})")
|
logger.info(f"更新 APISIX SSL 配置: {domain} (ID: {existing_ssl_id})")
|
||||||
ssl_config["id"] = existing_ssl_id
|
ssl_config["id"] = existing_ssl_id
|
||||||
response = requests.put(
|
response = self.session.put(
|
||||||
f"{self.apisix_admin_url}/apisix/admin/ssls/{existing_ssl_id}",
|
f"{self.apisix_admin_url}/apisix/admin/ssls/{existing_ssl_id}",
|
||||||
headers=headers,
|
|
||||||
json=ssl_config,
|
json=ssl_config,
|
||||||
timeout=10
|
timeout=10
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
# 创建新证书(POST 时不包含 id,让 APISIX 自动生成)
|
# 创建新证书(POST 时不包含 id,让 APISIX 自动生成)
|
||||||
logger.info(f"创建 APISIX SSL 配置: {domain}")
|
logger.info(f"创建 APISIX SSL 配置: {domain}")
|
||||||
response = requests.post(
|
response = self.session.post(
|
||||||
f"{self.apisix_admin_url}/apisix/admin/ssls",
|
f"{self.apisix_admin_url}/apisix/admin/ssls",
|
||||||
headers=headers,
|
|
||||||
json=ssl_config,
|
json=ssl_config,
|
||||||
timeout=10
|
timeout=10
|
||||||
)
|
)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user