diff --git a/ssl_manager/ssl_manager.py b/ssl_manager/ssl_manager.py index 6632f58..5239e90 100755 --- a/ssl_manager/ssl_manager.py +++ b/ssl_manager/ssl_manager.py @@ -295,6 +295,78 @@ class APISIXSSLManager: logger.error(f"上传证书到 APISIX 失败: {e}") return False + def ensure_webroot_route(self, domain: str = None) -> bool: + """确保 webroot 路由存在并正确配置 + + Args: + domain: 域名,如果提供则创建针对该域名的路由,否则创建通用路由 + """ + if domain: + route_id = f"certbot-webroot-{domain.replace('.', '-')}" + else: + route_id = "certbot-webroot" + + route_url = f"{self.apisix_admin_url}/apisix/admin/routes/{route_id}" + + try: + # 检查路由是否存在 + response = self.session.get(route_url, timeout=10) + + route_config = { + "uri": "/.well-known/acme-challenge/*", + "name": route_id, + "priority": 99999, + "plugins": { + "serverless-pre-function": { + "phase": "rewrite", + "functions": [ + "return function(conf, ctx) local uri = ngx.var.uri; if not uri then uri = ctx.var.uri; end local token = string.match(uri, \"^/%.well%-known/acme%-challenge/(.+)$\"); if not token then ngx.status = 404; ngx.say(\"Token not found. URI: \" .. (uri or \"nil\")); return; end; local path = \"/var/www/certbot/.well-known/acme-challenge/\" .. token; local file = io.open(path, \"r\"); if file then local content = file:read(\"*all\"); file:close(); ngx.header.content_type = \"text/plain\"; ngx.say(content); else ngx.status = 404; ngx.say(\"File not found: \" .. path); end end" + ] + } + }, + "status": 1 + } + + # 如果指定了域名,设置 host 字段以确保正确匹配 + if domain: + route_config["host"] = domain + + if response.status_code == 200: + # 路由存在,检查是否需要更新 + existing_route = response.json().get('value', {}) + existing_priority = existing_route.get('priority', 0) + existing_host = existing_route.get('host') + + # 检查是否需要更新(优先级或 host 不匹配) + needs_update = (existing_priority < 99999) or (domain and existing_host != domain) + + if needs_update: + logger.info(f"更新 webroot 路由: {route_id}") + response = self.session.put(route_url, json=route_config, timeout=10) + if response.status_code in [200, 201]: + logger.info("Webroot 路由更新成功") + return True + else: + logger.warning(f"Webroot 路由更新失败: {response.status_code} - {response.text}") + return False + else: + logger.debug(f"Webroot 路由已存在且配置正确: {route_id}") + return True + else: + # 路由不存在,创建它 + logger.info(f"创建 webroot 路由: {route_id}") + response = self.session.put(route_url, json=route_config, timeout=10) + if response.status_code in [200, 201]: + logger.info("Webroot 路由创建成功") + return True + else: + logger.error(f"Webroot 路由创建失败: {response.status_code} - {response.text}") + return False + + except Exception as e: + logger.error(f"确保 webroot 路由时出错: {e}") + return False + def request_certificate(self, domain: str, additional_domains: List[str] = None, max_retries: int = 3) -> bool: """申请 Let's Encrypt 证书 @@ -303,6 +375,9 @@ class APISIXSSLManager: additional_domains: 额外域名列表 max_retries: 最大重试次数(默认3次) """ + # 确保 webroot 路由存在(为当前域名创建) + self.ensure_webroot_route(domain) + domains = [domain] if additional_domains: domains.extend(additional_domains)