apisix/ssl_manager/test_ssl_auto.py

587 lines
23 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
APISIX SSL 证书自动申请测试脚本
完整测试从路由创建到证书申请的整个流程
"""
import os
import sys
import json
import time
import subprocess
import requests
from pathlib import Path
from typing import Dict, Tuple, Optional
from datetime import datetime
# 导入 ssl_manager 的配置和管理器
from ssl_manager import DEFAULT_CONFIG, APISIXSSLManager
# 颜色输出
class Colors:
GREEN = '\033[92m'
RED = '\033[91m'
YELLOW = '\033[93m'
BLUE = '\033[94m'
RESET = '\033[0m'
BOLD = '\033[1m'
def print_success(msg: str):
print(f"{Colors.GREEN}{msg}{Colors.RESET}")
def print_error(msg: str):
print(f"{Colors.RED}{msg}{Colors.RESET}")
def print_warning(msg: str):
print(f"{Colors.YELLOW}⚠️ {msg}{Colors.RESET}")
def print_info(msg: str):
print(f"{Colors.BLUE} {msg}{Colors.RESET}")
def print_step(step: int, total: int, msg: str):
print(f"\n{Colors.BOLD}[步骤 {step}/{total}] {msg}{Colors.RESET}")
print("-" * 60)
class SSLTestRunner:
def __init__(self, config_path: str = None):
"""初始化测试运行器"""
# 使用 APISIXSSLManager 来获取配置,确保与 ssl_manager.py 一致
# 如果提供了配置文件,传递给 APISIXSSLManager
ssl_manager = APISIXSSLManager(config_path=config_path)
# 从 ssl_manager 获取配置
self.apisix_admin_url = ssl_manager.apisix_admin_url
self.apisix_admin_key = ssl_manager.apisix_admin_key
self.webroot_path = ssl_manager.webroot_path
self.staging = ssl_manager.staging
self.email = ssl_manager.email
self.cert_dir = ssl_manager.cert_dir
# 保存 ssl_manager 实例,用于同步证书
self.ssl_manager = ssl_manager
self.test_domain = f"test-{int(time.time())}.jingrowtools.cn"
self.test_results = []
def _get_headers(self):
"""获取 APISIX Admin API 请求头"""
return {
'X-API-KEY': self.apisix_admin_key,
'Content-Type': 'application/json'
}
def test_step(self, step_num: int, total: int, name: str, test_func) -> bool:
"""执行测试步骤"""
print_step(step_num, total, name)
try:
result = test_func()
if result:
print_success(f"{name} - 成功")
self.test_results.append((name, True, None))
return True
else:
print_error(f"{name} - 失败")
self.test_results.append((name, False, "测试返回 False"))
return False
except Exception as e:
print_error(f"{name} - 异常: {str(e)}")
self.test_results.append((name, False, str(e)))
return False
def check_dependencies(self) -> bool:
"""检查依赖"""
print_info("检查系统依赖...")
# 检查 certbot
try:
result = subprocess.run(['which', 'certbot'], capture_output=True, text=True)
if result.returncode == 0:
print_success(f"Certbot 已安装: {result.stdout.strip()}")
else:
print_error("Certbot 未安装")
return False
except Exception as e:
print_error(f"检查 Certbot 失败: {e}")
return False
# 检查 Python 模块
try:
import requests
print_success(f"Python requests 模块已安装: {requests.__version__}")
except ImportError:
print_error("Python requests 模块未安装")
return False
# 检查 openssl
try:
result = subprocess.run(['which', 'openssl'], capture_output=True, text=True)
if result.returncode == 0:
print_success(f"OpenSSL 已安装: {result.stdout.strip()}")
else:
print_warning("OpenSSL 未安装(可选)")
except:
pass
return True
def check_apisix_service(self) -> bool:
"""检查 APISIX 服务状态"""
print_info("检查 APISIX 服务...")
try:
response = requests.get(
f"{self.apisix_admin_url}/apisix/admin/routes",
headers=self._get_headers(),
timeout=5
)
if response.status_code == 200:
print_success(f"APISIX Admin API 可访问: {self.apisix_admin_url}")
return True
else:
print_error(f"APISIX Admin API 返回错误: {response.status_code}")
return False
except requests.exceptions.ConnectionError:
print_error(f"无法连接到 APISIX Admin API: {self.apisix_admin_url}")
print_info("请确保 APISIX 服务正在运行")
return False
except Exception as e:
print_error(f"检查 APISIX 服务失败: {e}")
return False
def check_webroot_directory(self) -> bool:
"""检查 webroot 目录"""
print_info(f"检查 webroot 目录: {self.webroot_path}")
webroot_path = Path(self.webroot_path)
challenge_path = webroot_path / '.well-known' / 'acme-challenge'
# 检查目录是否存在
if not webroot_path.exists():
print_error(f"Webroot 目录不存在: {self.webroot_path}")
print_info("请运行: sudo mkdir -p /var/www/certbot")
return False
# 创建验证目录
challenge_path.mkdir(parents=True, exist_ok=True)
print_success(f"验证目录已准备: {challenge_path}")
# 检查权限
if os.access(str(challenge_path), os.W_OK):
print_success("目录可写")
else:
print_warning("目录可能不可写,请检查权限")
return True
def check_webroot_route(self, domain: str) -> bool:
"""检查 webroot 验证路由(通用路由,适用于所有域名)"""
print_info(f"检查 webroot 验证路由(适用于所有域名)...")
# 使用通用的 webroot 路由,不指定 host
route_id = "certbot-webroot"
try:
response = requests.get(
f"{self.apisix_admin_url}/apisix/admin/routes/{route_id}",
headers=self._get_headers(),
timeout=5
)
if response.status_code == 200:
route_data = response.json().get('value', {})
print_success(f"Webroot 路由存在: {route_id}")
print_info(f" Host: {route_data.get('host', 'None (所有域名)')}")
print_info(f" URI: {route_data.get('uri')}")
print_info(f" Priority: {route_data.get('priority', 0)}")
return True
else:
print_warning(f"Webroot 路由不存在: {route_id}")
print_info("将尝试创建 webroot 路由(适用于所有域名)...")
return self.create_webroot_route(domain)
except Exception as e:
print_error(f"检查 webroot 路由失败: {e}")
return False
def create_webroot_route(self, domain: str) -> bool:
"""创建 webroot 验证路由(通用路由,适用于所有域名)"""
print_info(f"创建 webroot 路由(适用于所有域名)")
# 使用通用的 webroot 路由,不指定 host适用于所有域名
route_id = "certbot-webroot"
route_config = {
"uri": "/.well-known/acme-challenge/*",
"name": route_id,
# 不设置 host匹配所有域名
"priority": 99999, # 最高优先级,确保在所有路由之前匹配
"plugins": {
"serverless-pre-function": {
"phase": "rewrite",
"functions": [
"return function(conf, ctx) local uri = ngx.var.uri; if not uri then uri = ctx.var.uri; end local token = string.match(uri, '^/%.well%-known/acme%-challenge/(.+)$'); if not token then ngx.status = 404; ngx.say('Token not found. URI: ' .. (uri or 'nil')); return; end; local path = '/var/www/certbot/.well-known/acme-challenge/' .. token; local file = io.open(path, 'r'); if file then local content = file:read('*all'); file:close(); ngx.header.content_type = 'text/plain'; ngx.say(content); else ngx.status = 404; ngx.say('File not found: ' .. path); end end"
]
}
},
"status": 1
}
try:
response = requests.put(
f"{self.apisix_admin_url}/apisix/admin/routes/{route_id}",
headers=self._get_headers(),
json=route_config,
timeout=10
)
if response.status_code in [200, 201]:
print_success(f"Webroot 路由创建成功: {route_id}")
return True
else:
print_error(f"创建 webroot 路由失败: {response.status_code} - {response.text}")
return False
except Exception as e:
print_error(f"创建 webroot 路由异常: {e}")
return False
def test_verification_path(self, domain: str) -> bool:
"""测试验证路径是否可访问"""
print_info(f"测试验证路径: {domain}")
test_token = f"test-{int(time.time())}"
test_file = Path(self.webroot_path) / '.well-known' / 'acme-challenge' / test_token
test_content = f"test-content-{test_token}"
try:
# 创建测试文件(尝试使用 sudo 或直接写入)
test_file.parent.mkdir(parents=True, exist_ok=True)
# 尝试直接写入
try:
test_file.write_text(test_content)
print_success(f"测试文件已创建: {test_file}")
except PermissionError:
# 如果权限不足,尝试使用 sudo
print_warning("权限不足,尝试使用 sudo 创建测试文件...")
try:
# 使用临时文件避免 shell 注入
import tempfile
with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmp:
tmp.write(test_content)
tmp_path = tmp.name
subprocess.run(
['sudo', 'cp', tmp_path, str(test_file)],
check=True,
timeout=5
)
subprocess.run(
['sudo', 'chmod', '644', str(test_file)],
check=True,
timeout=5
)
os.unlink(tmp_path) # 清理临时文件
print_success(f"测试文件已创建(使用 sudo: {test_file}")
except Exception as e:
print_error(f"无法创建测试文件: {e}")
print_warning("提示: 请运行以下命令修复权限:")
print_info(" sudo mkdir -p /var/www/certbot/.well-known/acme-challenge")
print_info(" sudo chown -R $USER:$USER /var/www/certbot")
print_warning("或者跳过验证路径测试直接进行证书申请Certbot 会自动处理文件创建)")
# 不返回 False而是继续执行Certbot 会处理文件创建)
return True
# 测试本地访问
time.sleep(1) # 等待路由生效
try:
response = requests.get(
f"http://localhost:9080/.well-known/acme-challenge/{test_token}",
headers={"Host": domain},
timeout=5
)
if response.status_code == 200 and response.text.strip() == test_content:
print_success(f"验证路径测试通过: {response.text.strip()}")
# 清理测试文件
try:
if test_file.exists():
test_file.unlink()
except:
try:
subprocess.run(['sudo', 'rm', str(test_file)], timeout=5, check=False)
except:
pass
return True
else:
print_warning(f"验证路径测试失败: status={response.status_code}, content={response.text[:100]}")
print_info("继续执行Certbot 会自动处理验证文件...")
# 清理测试文件
try:
if test_file.exists():
subprocess.run(['sudo', 'rm', str(test_file)], timeout=5, check=False)
except:
pass
# 不返回 False允许继续执行Certbot 会处理)
return True
except Exception as e:
print_warning(f"验证路径访问异常: {e}")
print_info("继续执行Certbot 会自动处理验证文件...")
return True
except Exception as e:
print_error(f"测试验证路径异常: {e}")
return False
def create_test_route(self, domain: str, additional_domains: list = None) -> bool:
"""创建测试路由"""
print_info(f"创建测试路由: {domain}")
# 构建域名列表(主域名 + 额外域名)
hosts = [domain]
if additional_domains:
hosts.extend(additional_domains)
print_info(f"路由将包含域名: {', '.join(hosts)}")
route_config = {
"uri": "/*",
"name": domain,
"methods": ["GET", "POST", "PUT", "DELETE", "PATCH", "HEAD", "OPTIONS"],
"hosts": hosts, # 使用 hosts 数组支持多个域名
"upstream": {
"nodes": [
{
"host": "192.168.10.2",
"port": 8201,
"weight": 1
}
],
"type": "roundrobin",
"scheme": "http",
"pass_host": "pass"
},
"status": 1
}
try:
response = requests.put(
f"{self.apisix_admin_url}/apisix/admin/routes/{domain}",
headers=self._get_headers(),
json=route_config,
timeout=10
)
if response.status_code in [200, 201]:
print_success(f"测试路由创建成功: {domain}")
return True
else:
print_error(f"创建测试路由失败: {response.status_code} - {response.text}")
return False
except Exception as e:
print_error(f"创建测试路由异常: {e}")
return False
def request_certificate(self, domain: str, additional_domains: list = None) -> bool:
"""申请证书"""
if additional_domains:
print_info(f"申请证书: {domain} + {additional_domains} (staging={self.staging})")
else:
print_info(f"申请证书: {domain} (staging={self.staging})")
# 使用 ssl_manager 的 request_certificate 方法,它已经支持额外域名
try:
result = self.ssl_manager.request_certificate(domain, additional_domains)
if result:
print_success(f"证书申请成功: {domain}")
return True
else:
print_error(f"证书申请失败: {domain}")
return False
except Exception as e:
print_error(f"证书申请异常: {e}")
return False
def sync_certificate_to_apisix(self, domain: str) -> bool:
"""同步证书到 APISIX使用 ssl_manager 的方法)"""
print_info(f"同步证书到 APISIX: {domain}")
# 先读取证书文件,然后使用 ssl_manager 的方法来上传证书
try:
# 使用 ssl_manager 的 read_cert_files 方法读取证书
cert_data = self.ssl_manager.read_cert_files(domain)
if not cert_data:
print_error(f"无法读取证书文件: {domain}")
return False
# 调用 upload_cert_to_apisix 并传递证书内容
result = self.ssl_manager.upload_cert_to_apisix(
domain,
cert_data['cert'],
cert_data['key']
)
if result:
print_success(f"证书已同步到 APISIX: {domain}")
return True
else:
print_error(f"证书同步失败: {domain}")
return False
except Exception as e:
print_error(f"证书同步异常: {e}")
return False
def verify_certificate(self, domain: str) -> bool:
"""验证证书"""
print_info(f"验证证书: {domain}")
cert_file = Path('/etc/letsencrypt/live') / domain / 'fullchain.pem'
if not cert_file.exists():
print_error("证书文件不存在")
return False
try:
result = subprocess.run(
['openssl', 'x509', '-in', str(cert_file), '-noout', '-subject', '-dates'],
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0:
print_success("证书信息:")
for line in result.stdout.strip().split('\n'):
print_info(f" {line}")
return True
else:
print_error(f"读取证书信息失败: {result.stderr}")
return False
except Exception as e:
print_error(f"验证证书异常: {e}")
return False
def cleanup(self, domain: str, cleanup_route: bool = True, cleanup_ssl: bool = True):
"""清理测试数据"""
print_info("清理测试数据...")
if cleanup_route:
try:
# 删除测试路由
response = requests.delete(
f"{self.apisix_admin_url}/apisix/admin/routes/{domain}",
headers=self._get_headers(),
timeout=10
)
if response.status_code in [200, 204]:
print_success(f"测试路由已删除: {domain}")
except:
pass
if cleanup_ssl:
try:
# 删除 SSL 配置
ssl_id = domain.replace('.', '_')
response = requests.delete(
f"{self.apisix_admin_url}/apisix/admin/ssls/{ssl_id}",
headers=self._get_headers(),
timeout=10
)
if response.status_code in [200, 204]:
print_success(f"SSL 配置已删除: {ssl_id}")
except:
pass
def run_full_test(self, domain: str = None, additional_domains: list = None, cleanup: bool = False):
"""运行完整测试"""
if not domain:
domain = self.test_domain
print(f"\n{Colors.BOLD}{'='*60}")
print(f"APISIX SSL 证书自动申请测试")
print(f"测试域名: {domain}")
if additional_domains:
print(f"额外域名: {', '.join(additional_domains)}")
print(f"Staging 模式: {self.staging}")
print(f"{'='*60}{Colors.RESET}\n")
steps = [
(1, "检查系统依赖", lambda: self.check_dependencies()),
(2, "检查 APISIX 服务", lambda: self.check_apisix_service()),
(3, "检查 Webroot 目录", lambda: self.check_webroot_directory()),
(4, "检查/创建 Webroot 路由", lambda: self.check_webroot_route(domain)),
(5, "测试验证路径", lambda: self.test_verification_path(domain)),
(6, "创建测试路由", lambda: self.create_test_route(domain, additional_domains)),
(7, "申请 SSL 证书", lambda: self.request_certificate(domain, additional_domains)),
# 注意:证书申请成功后会自动上传到 APISIX不需要单独同步步骤
(8, "验证证书信息", lambda: self.verify_certificate(domain)),
]
success_count = 0
for step_num, step_name, test_func in steps:
if self.test_step(step_num, len(steps), step_name, test_func):
success_count += 1
else:
print_warning("测试中断,后续步骤可能无法执行")
break
# 打印总结
print(f"\n{Colors.BOLD}{'='*60}")
print(f"测试总结")
print(f"{'='*60}{Colors.RESET}\n")
print(f"总步骤数: {len(steps)}")
print(f"成功: {Colors.GREEN}{success_count}{Colors.RESET}")
print(f"失败: {Colors.RED}{len(steps) - success_count}{Colors.RESET}")
print(f"\n详细结果:")
for name, success, error in self.test_results:
status = f"{Colors.GREEN}✅ 成功{Colors.RESET}" if success else f"{Colors.RED}❌ 失败{Colors.RESET}"
print(f" {status} - {name}")
if error:
print(f" 错误: {error}")
# 清理
if cleanup and success_count == len(steps):
print(f"\n{Colors.YELLOW}清理测试数据...{Colors.RESET}")
self.cleanup(domain)
return success_count == len(steps)
def main():
import argparse
parser = argparse.ArgumentParser(description='APISIX SSL 证书自动申请测试脚本')
parser.add_argument('--domain', '-d', help='测试域名(不指定则自动生成)')
parser.add_argument('--additional-domains', '-a', nargs='+', help='额外域名(如 www 子域名)')
parser.add_argument('--config', '-c', help='配置文件路径(可选,用于覆盖默认配置)')
parser.add_argument('--cleanup', action='store_true', help='测试完成后清理测试数据')
parser.add_argument('--no-cleanup', action='store_true', help='测试完成后不清理测试数据')
args = parser.parse_args()
runner = SSLTestRunner(args.config)
if args.domain:
domain = args.domain
else:
domain = runner.test_domain
print_warning(f"未指定域名,使用自动生成的测试域名: {domain}")
print_info("注意:此域名需要 DNS 解析到当前服务器才能申请证书")
# 处理额外域名
additional_domains = []
if args.additional_domains:
additional_domains.extend(args.additional_domains)
cleanup = args.cleanup or (not args.no_cleanup and not args.domain)
success = runner.run_full_test(
domain,
additional_domains=additional_domains if additional_domains else None,
cleanup=cleanup
)
sys.exit(0 if success else 1)
if __name__ == '__main__':
main()