#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ APISIX SSL 证书自动申请测试脚本 完整测试从路由创建到证书申请的整个流程 """ import os import sys import json import time import subprocess import requests from pathlib import Path from typing import Dict, Tuple, Optional from datetime import datetime # 导入 ssl_manager 的配置和管理器 from ssl_manager import DEFAULT_CONFIG, APISIXSSLManager # 颜色输出 class Colors: GREEN = '\033[92m' RED = '\033[91m' YELLOW = '\033[93m' BLUE = '\033[94m' RESET = '\033[0m' BOLD = '\033[1m' def print_success(msg: str): print(f"{Colors.GREEN}✅ {msg}{Colors.RESET}") def print_error(msg: str): print(f"{Colors.RED}❌ {msg}{Colors.RESET}") def print_warning(msg: str): print(f"{Colors.YELLOW}⚠️ {msg}{Colors.RESET}") def print_info(msg: str): print(f"{Colors.BLUE}ℹ️ {msg}{Colors.RESET}") def print_step(step: int, total: int, msg: str): print(f"\n{Colors.BOLD}[步骤 {step}/{total}] {msg}{Colors.RESET}") print("-" * 60) class SSLTestRunner: def __init__(self, config_path: str = None): """初始化测试运行器""" # 使用 APISIXSSLManager 来获取配置,确保与 ssl_manager.py 一致 # 如果提供了配置文件,传递给 APISIXSSLManager ssl_manager = APISIXSSLManager(config_path=config_path) # 从 ssl_manager 获取配置 self.apisix_admin_url = ssl_manager.apisix_admin_url self.apisix_admin_key = ssl_manager.apisix_admin_key self.webroot_path = ssl_manager.webroot_path self.staging = ssl_manager.staging self.email = ssl_manager.email self.cert_dir = ssl_manager.cert_dir # 保存 ssl_manager 实例,用于同步证书 self.ssl_manager = ssl_manager self.test_domain = f"test-{int(time.time())}.jingrowtools.cn" self.test_results = [] def _get_headers(self): """获取 APISIX Admin API 请求头""" return { 'X-API-KEY': self.apisix_admin_key, 'Content-Type': 'application/json' } def test_step(self, step_num: int, total: int, name: str, test_func) -> bool: """执行测试步骤""" print_step(step_num, total, name) try: result = test_func() if result: print_success(f"{name} - 成功") self.test_results.append((name, True, None)) return True else: print_error(f"{name} - 失败") self.test_results.append((name, False, "测试返回 False")) return False except Exception as e: print_error(f"{name} - 异常: {str(e)}") self.test_results.append((name, False, str(e))) return False def check_dependencies(self) -> bool: """检查依赖""" print_info("检查系统依赖...") # 检查 certbot try: result = subprocess.run(['which', 'certbot'], capture_output=True, text=True) if result.returncode == 0: print_success(f"Certbot 已安装: {result.stdout.strip()}") else: print_error("Certbot 未安装") return False except Exception as e: print_error(f"检查 Certbot 失败: {e}") return False # 检查 Python 模块 try: import requests print_success(f"Python requests 模块已安装: {requests.__version__}") except ImportError: print_error("Python requests 模块未安装") return False # 检查 openssl try: result = subprocess.run(['which', 'openssl'], capture_output=True, text=True) if result.returncode == 0: print_success(f"OpenSSL 已安装: {result.stdout.strip()}") else: print_warning("OpenSSL 未安装(可选)") except: pass return True def check_apisix_service(self) -> bool: """检查 APISIX 服务状态""" print_info("检查 APISIX 服务...") try: response = requests.get( f"{self.apisix_admin_url}/apisix/admin/routes", headers=self._get_headers(), timeout=5 ) if response.status_code == 200: print_success(f"APISIX Admin API 可访问: {self.apisix_admin_url}") return True else: print_error(f"APISIX Admin API 返回错误: {response.status_code}") return False except requests.exceptions.ConnectionError: print_error(f"无法连接到 APISIX Admin API: {self.apisix_admin_url}") print_info("请确保 APISIX 服务正在运行") return False except Exception as e: print_error(f"检查 APISIX 服务失败: {e}") return False def check_webroot_directory(self) -> bool: """检查 webroot 目录""" print_info(f"检查 webroot 目录: {self.webroot_path}") webroot_path = Path(self.webroot_path) challenge_path = webroot_path / '.well-known' / 'acme-challenge' # 检查目录是否存在 if not webroot_path.exists(): print_error(f"Webroot 目录不存在: {self.webroot_path}") print_info("请运行: sudo mkdir -p /var/www/certbot") return False # 创建验证目录 challenge_path.mkdir(parents=True, exist_ok=True) print_success(f"验证目录已准备: {challenge_path}") # 检查权限 if os.access(str(challenge_path), os.W_OK): print_success("目录可写") else: print_warning("目录可能不可写,请检查权限") return True def check_webroot_route(self, domain: str) -> bool: """检查 webroot 验证路由""" print_info(f"检查 webroot 验证路由: {domain}...") # 为每个域名创建独立的 webroot 路由 route_id = f"certbot-webroot-{domain}" try: response = requests.get( f"{self.apisix_admin_url}/apisix/admin/routes/{route_id}", headers=self._get_headers(), timeout=5 ) if response.status_code == 200: route_data = response.json().get('value', {}) print_success(f"Webroot 路由存在: {route_id}") print_info(f" Host: {route_data.get('host', 'None (所有域名)')}") print_info(f" URI: {route_data.get('uri')}") print_info(f" Priority: {route_data.get('priority', 0)}") return True else: print_warning(f"Webroot 路由不存在: {route_id}") print_info("将尝试创建 webroot 路由(适用于所有域名)...") return self.create_webroot_route(domain) except Exception as e: print_error(f"检查 webroot 路由失败: {e}") return False def create_webroot_route(self, domain: str) -> bool: """创建 webroot 验证路由(为每个域名创建独立路由)""" print_info(f"创建 webroot 路由: {domain}") # 为每个域名创建独立的 webroot 路由,确保 host 匹配 route_id = f"certbot-webroot-{domain}" route_config = { "uri": "/.well-known/acme-challenge/*", "name": route_id, "host": domain, # 设置 host,确保正确匹配 "priority": 10000, # 高优先级,在域名路由之前匹配 "plugins": { "serverless-pre-function": { "phase": "rewrite", "functions": [ "return function(conf, ctx) local uri = ngx.var.uri; if not uri then uri = ctx.var.uri; end local token = string.match(uri, '^/%.well%-known/acme%-challenge/(.+)$'); if not token then ngx.status = 404; ngx.say('Token not found. URI: ' .. (uri or 'nil')); return; end; local path = '/var/www/certbot/.well-known/acme-challenge/' .. token; local file = io.open(path, 'r'); if file then local content = file:read('*all'); file:close(); ngx.header.content_type = 'text/plain'; ngx.say(content); else ngx.status = 404; ngx.say('File not found: ' .. path); end end" ] } }, "status": 1 } try: response = requests.put( f"{self.apisix_admin_url}/apisix/admin/routes/{route_id}", headers=self._get_headers(), json=route_config, timeout=10 ) if response.status_code in [200, 201]: print_success(f"Webroot 路由创建成功: {route_id}") return True else: print_error(f"创建 webroot 路由失败: {response.status_code} - {response.text}") return False except Exception as e: print_error(f"创建 webroot 路由异常: {e}") return False def test_verification_path(self, domain: str) -> bool: """测试验证路径是否可访问""" print_info(f"测试验证路径: {domain}") test_token = f"test-{int(time.time())}" test_file = Path(self.webroot_path) / '.well-known' / 'acme-challenge' / test_token test_content = f"test-content-{test_token}" try: # 创建测试文件(尝试使用 sudo 或直接写入) test_file.parent.mkdir(parents=True, exist_ok=True) # 尝试直接写入 try: test_file.write_text(test_content) print_success(f"测试文件已创建: {test_file}") except PermissionError: # 如果权限不足,尝试使用 sudo print_warning("权限不足,尝试使用 sudo 创建测试文件...") try: # 使用临时文件避免 shell 注入 import tempfile with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmp: tmp.write(test_content) tmp_path = tmp.name subprocess.run( ['sudo', 'cp', tmp_path, str(test_file)], check=True, timeout=5 ) subprocess.run( ['sudo', 'chmod', '644', str(test_file)], check=True, timeout=5 ) os.unlink(tmp_path) # 清理临时文件 print_success(f"测试文件已创建(使用 sudo): {test_file}") except Exception as e: print_error(f"无法创建测试文件: {e}") print_warning("提示: 请运行以下命令修复权限:") print_info(" sudo mkdir -p /var/www/certbot/.well-known/acme-challenge") print_info(" sudo chown -R $USER:$USER /var/www/certbot") print_warning("或者跳过验证路径测试,直接进行证书申请(Certbot 会自动处理文件创建)") # 不返回 False,而是继续执行(Certbot 会处理文件创建) return True # 测试本地访问 time.sleep(1) # 等待路由生效 try: response = requests.get( f"http://localhost:9080/.well-known/acme-challenge/{test_token}", headers={"Host": domain}, timeout=5 ) if response.status_code == 200 and response.text.strip() == test_content: print_success(f"验证路径测试通过: {response.text.strip()}") # 清理测试文件 try: if test_file.exists(): test_file.unlink() except: try: subprocess.run(['sudo', 'rm', str(test_file)], timeout=5, check=False) except: pass return True else: print_warning(f"验证路径测试失败: status={response.status_code}, content={response.text[:100]}") print_info("继续执行,Certbot 会自动处理验证文件...") # 清理测试文件 try: if test_file.exists(): subprocess.run(['sudo', 'rm', str(test_file)], timeout=5, check=False) except: pass # 不返回 False,允许继续执行(Certbot 会处理) return True except Exception as e: print_warning(f"验证路径访问异常: {e}") print_info("继续执行,Certbot 会自动处理验证文件...") return True except Exception as e: print_error(f"测试验证路径异常: {e}") return False def create_test_route(self, domain: str) -> bool: """创建测试路由""" print_info(f"创建测试路由: {domain}") route_config = { "uri": "/*", "name": domain, "methods": ["GET", "POST", "PUT", "DELETE", "PATCH", "HEAD", "OPTIONS"], "host": domain, "upstream": { "nodes": [ { "host": "192.168.10.2", "port": 8201, "weight": 1 } ], "type": "roundrobin", "scheme": "http", "pass_host": "pass" }, "status": 1 } try: response = requests.put( f"{self.apisix_admin_url}/apisix/admin/routes/{domain}", headers=self._get_headers(), json=route_config, timeout=10 ) if response.status_code in [200, 201]: print_success(f"测试路由创建成功: {domain}") return True else: print_error(f"创建测试路由失败: {response.status_code} - {response.text}") return False except Exception as e: print_error(f"创建测试路由异常: {e}") return False def request_certificate(self, domain: str) -> bool: """申请证书""" print_info(f"申请证书: {domain} (staging={self.staging})") cmd = [ self.ssl_manager.certbot_path, 'certonly', '--webroot', '--webroot-path', self.webroot_path, '--non-interactive', '--agree-tos', '--email', self.email, '--cert-name', domain, '-d', domain ] if self.staging: cmd.append('--staging') try: print_info(f"执行命令: {' '.join(cmd)}") result = subprocess.run( cmd, capture_output=True, text=True, timeout=300 ) if result.returncode == 0: print_success(f"证书申请成功: {domain}") print_info(result.stdout) return True else: print_error(f"证书申请失败: {result.stderr}") return False except subprocess.TimeoutExpired: print_error("证书申请超时") return False except Exception as e: print_error(f"证书申请异常: {e}") return False def sync_certificate_to_apisix(self, domain: str) -> bool: """同步证书到 APISIX(使用 ssl_manager 的方法)""" print_info(f"同步证书到 APISIX: {domain}") # 先读取证书文件,然后使用 ssl_manager 的方法来上传证书 try: # 使用 ssl_manager 的 read_cert_files 方法读取证书 cert_data = self.ssl_manager.read_cert_files(domain) if not cert_data: print_error(f"无法读取证书文件: {domain}") return False # 调用 upload_cert_to_apisix 并传递证书内容 result = self.ssl_manager.upload_cert_to_apisix( domain, cert_data['cert'], cert_data['key'] ) if result: print_success(f"证书已同步到 APISIX: {domain}") return True else: print_error(f"证书同步失败: {domain}") return False except Exception as e: print_error(f"证书同步异常: {e}") return False def verify_certificate(self, domain: str) -> bool: """验证证书""" print_info(f"验证证书: {domain}") cert_file = Path('/etc/letsencrypt/live') / domain / 'fullchain.pem' if not cert_file.exists(): print_error("证书文件不存在") return False try: result = subprocess.run( ['openssl', 'x509', '-in', str(cert_file), '-noout', '-subject', '-dates'], capture_output=True, text=True, timeout=10 ) if result.returncode == 0: print_success("证书信息:") for line in result.stdout.strip().split('\n'): print_info(f" {line}") return True else: print_error(f"读取证书信息失败: {result.stderr}") return False except Exception as e: print_error(f"验证证书异常: {e}") return False def cleanup(self, domain: str, cleanup_route: bool = True, cleanup_ssl: bool = True): """清理测试数据""" print_info("清理测试数据...") if cleanup_route: try: # 删除测试路由 response = requests.delete( f"{self.apisix_admin_url}/apisix/admin/routes/{domain}", headers=self._get_headers(), timeout=10 ) if response.status_code in [200, 204]: print_success(f"测试路由已删除: {domain}") except: pass if cleanup_ssl: try: # 删除 SSL 配置 ssl_id = domain.replace('.', '_') response = requests.delete( f"{self.apisix_admin_url}/apisix/admin/ssls/{ssl_id}", headers=self._get_headers(), timeout=10 ) if response.status_code in [200, 204]: print_success(f"SSL 配置已删除: {ssl_id}") except: pass def run_full_test(self, domain: str = None, cleanup: bool = False): """运行完整测试""" if not domain: domain = self.test_domain print(f"\n{Colors.BOLD}{'='*60}") print(f"APISIX SSL 证书自动申请测试") print(f"测试域名: {domain}") print(f"Staging 模式: {self.staging}") print(f"{'='*60}{Colors.RESET}\n") steps = [ (1, "检查系统依赖", lambda: self.check_dependencies()), (2, "检查 APISIX 服务", lambda: self.check_apisix_service()), (3, "检查 Webroot 目录", lambda: self.check_webroot_directory()), (4, "检查/创建 Webroot 路由", lambda: self.check_webroot_route(domain)), (5, "测试验证路径", lambda: self.test_verification_path(domain)), (6, "创建测试路由", lambda: self.create_test_route(domain)), (7, "申请 SSL 证书", lambda: self.request_certificate(domain)), (8, "同步证书到 APISIX", lambda: self.sync_certificate_to_apisix(domain)), (9, "验证证书信息", lambda: self.verify_certificate(domain)), ] success_count = 0 for step_num, step_name, test_func in steps: if self.test_step(step_num, len(steps), step_name, test_func): success_count += 1 else: print_warning("测试中断,后续步骤可能无法执行") break # 打印总结 print(f"\n{Colors.BOLD}{'='*60}") print(f"测试总结") print(f"{'='*60}{Colors.RESET}\n") print(f"总步骤数: {len(steps)}") print(f"成功: {Colors.GREEN}{success_count}{Colors.RESET}") print(f"失败: {Colors.RED}{len(steps) - success_count}{Colors.RESET}") print(f"\n详细结果:") for name, success, error in self.test_results: status = f"{Colors.GREEN}✅ 成功{Colors.RESET}" if success else f"{Colors.RED}❌ 失败{Colors.RESET}" print(f" {status} - {name}") if error: print(f" 错误: {error}") # 清理 if cleanup and success_count == len(steps): print(f"\n{Colors.YELLOW}清理测试数据...{Colors.RESET}") self.cleanup(domain) return success_count == len(steps) def main(): import argparse parser = argparse.ArgumentParser(description='APISIX SSL 证书自动申请测试脚本') parser.add_argument('--domain', '-d', help='测试域名(不指定则自动生成)') parser.add_argument('--config', '-c', help='配置文件路径(可选,用于覆盖默认配置)') parser.add_argument('--cleanup', action='store_true', help='测试完成后清理测试数据') parser.add_argument('--no-cleanup', action='store_true', help='测试完成后不清理测试数据') args = parser.parse_args() runner = SSLTestRunner(args.config) if args.domain: domain = args.domain else: domain = runner.test_domain print_warning(f"未指定域名,使用自动生成的测试域名: {domain}") print_info("注意:此域名需要 DNS 解析到当前服务器才能申请证书") cleanup = args.cleanup or (not args.no_cleanup and not args.domain) success = runner.run_full_test(domain, cleanup=cleanup) sys.exit(0 if success else 1) if __name__ == '__main__': main()