实现@jingrow.whitelist跨app支持,需要完整访问路径

This commit is contained in:
jingrow 2025-10-29 18:59:15 +08:00
parent aceca1e49d
commit dab5fa4cda
2 changed files with 73 additions and 42 deletions

View File

@ -63,7 +63,7 @@ _whitelisted_functions: Dict[str, Dict[str, Any]] = {}
def whitelist(allow_guest: bool = False, methods: List[str] = None):
if methods is None:
methods = ['POST']
methods = ['GET', 'POST']
def decorator(func):
module_name = func.__module__

View File

@ -8,8 +8,9 @@ Jingrow 白名单路由服务
from fastapi import APIRouter, Request, HTTPException
from fastapi.responses import JSONResponse
import importlib
import inspect
from typing import Any, Dict, List
import sys
from pathlib import Path
from typing import Any, Dict
import logging
from jingrow import get_whitelisted_function, is_whitelisted
from jingrow.utils.auth import get_jingrow_api_headers, get_session_api_headers
@ -18,6 +19,25 @@ from jingrow.utils.jingrow_api import get_logged_user
logger = logging.getLogger(__name__)
router = APIRouter()
# 确保 apps 目录在 sys.path 中(仅初始化一次)
_apps_path_initialized = False
def _ensure_apps_on_sys_path():
"""确保 apps 目录在 sys.path 中,支持跨 app 导入"""
global _apps_path_initialized
if _apps_path_initialized:
return
try:
project_root = Path(__file__).resolve().parents[4]
apps_dir = project_root / "apps"
if apps_dir.exists() and str(apps_dir) not in sys.path:
sys.path.insert(0, str(apps_dir))
except Exception:
pass
finally:
_apps_path_initialized = True
async def authenticate_request(request: Request, allow_guest: bool) -> bool:
"""
认证请求支持两种认证方式
@ -60,70 +80,81 @@ async def authenticate_request(request: Request, allow_guest: bool) -> bool:
async def _process_whitelist_call(request: Request, full_module_path: str):
"""通用处理:接收完整点分路径 '<package.module.function>' 并执行调用"""
try:
# 解析模块路径并先导入模块,确保装饰器执行
async def _get_request_data(req: Request) -> Dict[str, Any]:
"""GET 使用查询参数,其他方法使用 JSON body"""
if req.method == 'GET':
return dict(req.query_params)
try:
return await req.json()
except Exception:
return {}
# 确保 apps 目录在 sys.path 中(支持跨 app 导入)
_ensure_apps_on_sys_path()
# 先导入模块,确保装饰器执行
module_info = parse_module_path(full_module_path)
module = import_module(module_info['module_path'])
# 模块导入后再检查是否是白名单函数
if is_whitelisted(full_module_path):
whitelist_info = get_whitelisted_function(full_module_path)
# 使用实际模块路径匹配白名单(装饰器注册时使用 func.__module__
actual_module_name = module.__name__
actual_whitelist_path = f"{actual_module_name}.{module_info['function_name']}"
# 检查白名单:优先使用实际模块路径,也支持原始路径
whitelist_info = None
for check_path in [actual_whitelist_path, full_module_path]:
if is_whitelisted(check_path):
whitelist_info = get_whitelisted_function(check_path)
break
if whitelist_info:
func = whitelist_info['function']
# 检查HTTP方法
# 检查 HTTP 方法
if request.method not in whitelist_info['methods']:
raise HTTPException(status_code=405, detail=f"Method {request.method} not allowed for this endpoint")
# 检查权限(如果需要)
raise HTTPException(status_code=405, detail=f"Method {request.method} not allowed")
# 检查权限
if not whitelist_info['allow_guest']:
if not await authenticate_request(request, whitelist_info['allow_guest']):
raise HTTPException(status_code=401, detail="Authentication required")
# 获取请求数据
request_data = await request.json()
# 调用函数
# 获取请求数据并调用函数
request_data = await _get_request_data(request)
result = func(**request_data)
return JSONResponse(content={
"success": True,
"data": result
})
return JSONResponse(content={"success": True, "data": result})
# 非白名单:按原逻辑调用模块函数
function_name = module_info['function_name']
if not hasattr(module, function_name):
raise HTTPException(status_code=404, detail=f"Function {function_name} not found in module {module_info['module_path']}")
raise HTTPException(status_code=404, detail=f"Function {function_name} not found")
func = getattr(module, function_name)
# 获取请求数据
request_data = await request.json()
# 调用函数
request_data = await _get_request_data(request)
result = func(**request_data)
return JSONResponse(content={
"success": True,
"data": result
})
return JSONResponse(content={"success": True, "data": result})
except HTTPException:
raise
except Exception as e:
logger.error(f"Request handler error: {e}")
logger.error(f"Request handler error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.post("/{module_path:path}")
@router.api_route("/{module_path:path}", methods=["GET", "POST", "PUT", "DELETE"])
async def handle_request(request: Request, module_path: str):
"""
兼容旧路径直接传入完整点分路径 '<package.module.function>'
动态路由支持完整点分路径例如 demo.demo.pagetype.hello_world.hello_world.test_whitelist
"""
return await _process_whitelist_call(request, module_path)
@router.post("/{app}/{module_path:path}")
@router.api_route("/{app}/{module_path:path}", methods=["GET", "POST", "PUT", "DELETE"])
async def handle_request_with_app_prefix(request: Request, app: str, module_path: str):
"""
新路径支持以 app 作为前缀例如 app.module.function
动态路由支持 app 前缀格式例如 /demo/demo.pagetype.hello_world.hello_world.test_whitelist
"""
full_module_path = f"{app}.{module_path}"
return await _process_whitelist_call(request, full_module_path)