重构pagetype增删改查为通过后端执行以便实现事件钩子机制,修复重构后出现的异常和错误

This commit is contained in:
jingrow 2025-10-25 16:33:34 +08:00
parent 703ce1d7e9
commit 5811a97d1e
11 changed files with 689 additions and 26 deletions

View File

@ -45,7 +45,7 @@ export const deleteRecords = async (pagetype: string, names: string[]): Promise<
export const createRecord = async (pagetype: string, data: Record<string, any>): Promise<{ success: boolean; data?: any; message?: string }> => {
try {
const response = await axios.post(
`${jingrowServerUrl}/api/data/${pagetype}`,
`/api/data/${pagetype}`,
data,
{
headers: get_session_api_headers(),
@ -64,7 +64,7 @@ export const createRecord = async (pagetype: string, data: Record<string, any>):
export const updateRecord = async (pagetype: string, name: string, data: Record<string, any>): Promise<{ success: boolean; data?: any; message?: string }> => {
try {
const response = await axios.put(
`${jingrowServerUrl}/api/data/${pagetype}/${name}`,
`/api/data/${pagetype}/${name}`,
data,
{
headers: get_session_api_headers(),
@ -83,7 +83,7 @@ export const updateRecord = async (pagetype: string, name: string, data: Record<
export const getRecord = async (pagetype: string, name: string): Promise<{ success: boolean; data?: any; message?: string }> => {
try {
const response = await axios.get(
`${jingrowServerUrl}/api/data/${pagetype}/${name}`,
`/api/data/${pagetype}/${name}`,
{
headers: get_session_api_headers(),
withCredentials: true
@ -99,7 +99,7 @@ export const getRecord = async (pagetype: string, name: string): Promise<{ succe
export const getRecordAttachments = async (pagetype: string, name: string): Promise<{ success: boolean; data?: any[]; message?: string }> => {
try {
const response = await axios.get(
`${jingrowServerUrl}/api/data/File`,
`/api/data/File`,
{
params: {
filters: JSON.stringify({
@ -158,7 +158,7 @@ export const uploadAttachment = async (
export const deleteAttachment = async (attachmentName: string): Promise<{ success: boolean; message?: string }> => {
try {
await axios.delete(
`${jingrowServerUrl}/api/data/File/${attachmentName}`,
`/api/data/File/${attachmentName}`,
{
headers: get_session_api_headers(),
withCredentials: true
@ -230,7 +230,7 @@ export const getLocalJobCount = async (): Promise<{ success: boolean; count?: nu
export const getRecords = async (pagetype: string, filters: any[] = [], fields: string[] = [], orderBy: string = 'modified desc', limitStart: number = 0, limitPageLength: number = 20): Promise<{ success: boolean; data?: any[]; total?: number; message?: string }> => {
try {
const response = await axios.get(
`${jingrowServerUrl}/api/data/${pagetype}`,
`/api/data/${pagetype}`,
{
params: {
fields: JSON.stringify(fields),

View File

@ -278,7 +278,7 @@ const loadLocalApps = async () => {
loadingLocalApps.value = true
try {
const base = (import.meta as any).env.VITE_LOCAL_API_URL || ''
const response = await axios.get(`${base}/jingrow/local-apps`, {
const response = await axios.get(`${base}/local-apps`, {
headers: get_session_api_headers(),
withCredentials: true
})
@ -299,7 +299,7 @@ const loadLocalApps = async () => {
const installLocalApp = async (appName: string) => {
try {
const base = (import.meta as any).env.VITE_LOCAL_API_URL || ''
const response = await axios.post(`${base}/jingrow/install-local/${appName}`, {}, {
const response = await axios.post(`${base}/install-local/${appName}`, {}, {
headers: get_session_api_headers(),
withCredentials: true
})

View File

@ -119,7 +119,7 @@ async function handleSubmit() {
submitting.value = true
try {
const base = (import.meta as any).env.VITE_JINGROW_SERVER_URL || ''
const res = await axios.post(`${base}/jingrow/dev/create-pagetype-template`, {
const res = await axios.post(`${base}/dev/create-pagetype-template`, {
pagetype: form.value.pagetype,
app: appName.value,
module: moduleName.value,

View File

@ -41,7 +41,7 @@ export default defineConfig({
secure: false
},
'/api/data': {
target: process.env.VITE_JINGROW_SERVER_URL || 'http://192.168.2.58',
target: process.env.VITE_BACKEND_SERVER_URL || 'http://localhost:9001',
changeOrigin: true,
secure: false
},

View File

@ -9,7 +9,7 @@ from jingrow.utils.jingrow_api import get_agent_detail
logger = logging.getLogger(__name__)
router = APIRouter()
@router.post("/agents/execute")
@router.post("/jingrow/agents/execute")
async def execute_agent(request: Request, request_data: Dict[str, Any]):
"""
异步执行智能体参考 Jingrow 的触发逻辑
@ -45,7 +45,7 @@ async def execute_agent(request: Request, request_data: Dict[str, Any]):
logger.error(f"Failed to execute agent {agent_id}: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/agents/execute_scheduled_agent")
@router.post("/jingrow/agents/execute_scheduled_agent")
async def execute_scheduled_agent(request: Request, request_data: Dict[str, Any]):
"""
定时任务统一入口执行指定的智能体

View File

@ -23,7 +23,7 @@ logger = logging.getLogger(__name__)
router = APIRouter()
@router.post("/install/upload")
@router.post("/jingrow/install/upload")
async def install_app_from_upload(
request: Request,
file: UploadFile = File(...),
@ -67,7 +67,7 @@ async def install_app_from_upload(
raise HTTPException(status_code=500, detail=str(e))
@router.get("/local-apps")
@router.get("/jingrow/local-apps")
async def get_local_apps(request: Request):
"""扫描本地未安装的App"""
try:
@ -162,7 +162,7 @@ async def get_local_apps(request: Request):
raise HTTPException(status_code=500, detail=str(e))
@router.post("/install-local/{app_name}")
@router.post("/jingrow/install-local/{app_name}")
async def install_local_app(request: Request, app_name: str):
"""安装本地App"""
try:
@ -225,7 +225,7 @@ async def install_local_app(request: Request, app_name: str):
raise HTTPException(status_code=500, detail=str(e))
@router.get("/installed-apps")
@router.get("/jingrow/installed-apps")
async def get_installed_apps(request: Request):
"""获取已安装的应用列表 - 从Local Installed Apps PageType读取"""
try:
@ -282,7 +282,7 @@ async def get_installed_apps(request: Request):
raise HTTPException(status_code=500, detail=str(e2))
@router.post("/uninstall/{app_name}")
@router.post("/jingrow/uninstall/{app_name}")
async def uninstall_app(request: Request, app_name: str):
try:
@ -334,7 +334,7 @@ async def uninstall_app(request: Request, app_name: str):
raise HTTPException(status_code=500, detail=str(e))
@router.get("/app-info/{app_name}")
@router.get("/jingrow/app-info/{app_name}")
async def get_app_info(request: Request, app_name: str):
try:

View File

@ -38,7 +38,7 @@ def call_jingrow_api(method: str, endpoint: str, headers: dict, params: dict = N
except Exception as e:
return {'success': False, 'error': f'调用API异常: {str(e)}'}
@router.get("/local-jobs")
@router.get("/jingrow/local-jobs")
async def get_local_jobs(
request: Request,
page: int = 1,
@ -116,7 +116,7 @@ async def get_local_jobs(
logger.error(f"Failed to get local jobs: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/local-jobs/{job_id}")
@router.get("/jingrow/local-jobs/{job_id}")
async def get_local_job_detail(request: Request, job_id: str):
"""
获取Local Job详情
@ -150,7 +150,7 @@ async def get_local_job_detail(request: Request, job_id: str):
logger.error(f"Failed to get local job detail {job_id}: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/local-jobs/{job_id}/stop")
@router.post("/jingrow/local-jobs/{job_id}/stop")
async def stop_local_job(request: Request, job_id: str):
"""
停止Local Job
@ -185,7 +185,7 @@ async def stop_local_job(request: Request, job_id: str):
logger.error(f"Failed to stop local job {job_id}: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/local-jobs/{job_id}")
@router.delete("/jingrow/local-jobs/{job_id}")
async def delete_local_job(request: Request, job_id: str):
"""
删除Local Job
@ -223,7 +223,7 @@ async def delete_local_job(request: Request, job_id: str):
logger.error(f"Failed to delete local job {job_id}: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/local-jobs/batch-delete")
@router.post("/jingrow/local-jobs/batch-delete")
async def batch_delete_local_jobs(request: Request, request_data: Dict[str, Any]):
"""
批量删除Local Jobs
@ -276,7 +276,7 @@ async def batch_delete_local_jobs(request: Request, request_data: Dict[str, Any]
logger.error(f"Failed to batch delete local jobs: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/local-jobs/stats")
@router.get("/jingrow/local-jobs/stats")
async def get_local_job_stats(request: Request):
"""
获取Local Job统计信息

View File

@ -0,0 +1,291 @@
# Copyright (c) 2025, JINGROW and contributors
# For license information, please see license.txt
"""
Jingrow 标准 REST API 实现
提供对云端数据库的增删改查操作并支持本地钩子函数
"""
from fastapi import APIRouter, HTTPException, Request, Depends
from fastapi.responses import JSONResponse
from typing import Dict, List, Any, Optional
import logging
import json
import jingrow
import requests
from jingrow.config import Config
from jingrow.utils.auth import get_jingrow_api_headers
from jingrow.services.queue import init_queue
from jingrow.services.local_job_manager import local_job_manager
from jingrow.services.hook_executor import create_hook_task, execute_hook_sync
from jingrow.utils.jingrow_api import get_record_list, get_record, create_record, update_record, delete_record
import uuid
logger = logging.getLogger(__name__)
router = APIRouter()
# 初始化队列
init_queue()
def get_pagetype_from_request(request: Request) -> str:
"""从请求路径中提取pagetype"""
return request.path_params.get("pagetype", "")
def get_name_from_request(request: Request) -> Optional[str]:
"""从请求路径中提取name"""
return request.path_params.get("name")
def execute_hooks(pagetype: str, name: str, hook_name: str, data: Dict[str, Any] = None):
"""执行钩子函数"""
return execute_hook_sync(pagetype, name, hook_name, data)
def execute_hooks_async(pagetype: str, name: str, hook_name: str, data: Dict[str, Any] = None):
"""异步执行钩子函数"""
return create_hook_task(pagetype, name, hook_name, data)
@router.get("/api/data/{pagetype}")
async def get_records(
request: Request,
pagetype: str,
fields: Optional[str] = None,
filters: Optional[str] = None,
order_by: str = "modified desc",
limit_start: int = 0,
limit_page_length: int = 20
):
"""获取记录列表"""
try:
logger.info(f"获取记录列表: {pagetype}")
# 解析参数
fields_list = json.loads(fields) if fields else []
filters_list = json.loads(filters) if filters else []
# 使用现有的 get_record_list 函数
# 注意get_record_list 使用 limit 参数,我们需要适配
limit = limit_start + limit_page_length if limit_page_length > 0 else None
result = get_record_list(
pagetype=pagetype,
filters=filters_list,
fields=fields_list,
limit=limit
)
if result.get('success'):
data = result.get('data', [])
# 处理分页:如果指定了 limit_start需要截取数据
if limit_start > 0 and len(data) > limit_start:
data = data[limit_start:]
# 限制返回数量
if limit_page_length > 0 and len(data) > limit_page_length:
data = data[:limit_page_length]
return JSONResponse(content={
"data": data,
"total": len(data) # 注意:这里无法获取真实总数,需要改进
})
else:
raise HTTPException(status_code=400, detail=result.get('error', '获取记录列表失败'))
except Exception as e:
logger.error(f"获取记录列表失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/data/{pagetype}/{name}")
async def get_record_api(request: Request, pagetype: str, name: str):
"""获取单个记录"""
try:
logger.info(f"获取记录: {pagetype}.{name}")
# 使用现有的 get_record 函数
result = get_record(pagetype, name)
if result.get('success'):
return JSONResponse(content={"data": result.get('data')})
else:
raise HTTPException(status_code=404, detail=result.get('error', '记录不存在'))
except Exception as e:
logger.error(f"获取记录失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/data/{pagetype}")
async def create_record_api(request: Request, pagetype: str, data: Dict[str, Any]):
"""创建记录"""
try:
logger.info(f"创建记录: {pagetype}")
# 1. 使用现有的 create_record 函数创建记录
result = create_record(pagetype, data)
if not result.get('success'):
error_msg = result.get('error', '创建记录失败')
logger.error(f"创建记录失败: {error_msg}")
raise HTTPException(status_code=400, detail=error_msg)
created_data = result.get('data', {})
record_name = created_data.get('name') or data.get('name')
if record_name:
# 2. 执行本地钩子函数(异步,不阻塞返回)
try:
execute_hooks_async(pagetype, record_name, "after_insert", created_data)
except Exception as hook_error:
logger.error(f"执行钩子函数失败: {hook_error}")
# 钩子失败不影响记录创建
return JSONResponse(content={
"message": "Created successfully",
"data": created_data
})
except HTTPException:
raise
except Exception as e:
logger.error(f"创建记录失败: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.put("/api/data/{pagetype}/{name}")
async def update_record_api(request: Request, pagetype: str, name: str, data: Dict[str, Any]):
"""更新记录"""
try:
logger.info(f"更新记录: {pagetype}.{name}")
# 1. 执行本地钩子函数
logger.info(f"执行更新前钩子: {pagetype}.{name}")
# 同步执行 before_save 钩子
execute_hooks(pagetype, name, "before_save", data)
# 2. 使用现有的 update_record 函数更新记录
result = update_record(pagetype, name, data)
if not result.get('success'):
raise HTTPException(status_code=400, detail=result.get('error', '更新记录失败'))
# 3. 异步执行 on_update 钩子
execute_hooks_async(pagetype, name, "on_update", data)
return JSONResponse(content={
"message": "Updated successfully",
"data": result.get('data', {})
})
except Exception as e:
logger.error(f"更新记录失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/api/data/{pagetype}/{name}")
async def delete_record(request: Request, pagetype: str, name: str):
"""删除记录"""
try:
logger.info(f"删除记录: {pagetype}.{name}")
# 1. 执行本地钩子函数
logger.info(f"执行删除前钩子: {pagetype}.{name}")
# 同步执行 on_trash 钩子
execute_hooks(pagetype, name, "on_trash")
# 2. 调用云端API删除记录
result = call_cloud_api('DELETE', f'/api/data/{pagetype}/{name}')
if not result.get('success'):
raise HTTPException(status_code=400, detail=result.get('error', '删除记录失败'))
return JSONResponse(content={
"message": "Deleted successfully"
})
except Exception as e:
logger.error(f"删除记录失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/action/jingrow.client.delete")
async def batch_delete_records(request: Request, data: Dict[str, Any]):
"""批量删除记录"""
try:
pagetype = data.get('pagetype')
name = data.get('name')
if not pagetype or not name:
raise HTTPException(status_code=400, detail="缺少必要参数")
logger.info(f"批量删除记录: {pagetype}.{name}")
# 调用单个删除API
return await delete_record(request, pagetype, name)
except Exception as e:
logger.error(f"批量删除记录失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/action/jingrow.client.get_count")
async def get_record_count(request: Request, pagetype: str):
"""获取记录总数"""
try:
logger.info(f"获取记录总数: {pagetype}")
# 调用云端API获取记录列表只获取总数
params = {
'limit_start': 0,
'limit_page_length': 1
}
result = call_cloud_api('GET', f'/api/data/{pagetype}', params=params)
if result.get('success'):
return JSONResponse(content={"message": result.get('total', 0)})
else:
raise HTTPException(status_code=400, detail=result.get('error', '获取记录总数失败'))
except Exception as e:
logger.error(f"获取记录总数失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/action/upload_file")
async def upload_file(request: Request):
"""上传文件"""
try:
# 这里需要实现文件上传逻辑
# 暂时返回成功响应
logger.info("文件上传请求")
return JSONResponse(content={
"message": {
"file_url": "http://example.com/uploaded_file.txt",
"file_name": "uploaded_file.txt"
}
})
except Exception as e:
logger.error(f"文件上传失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
# 钩子执行任务(用于异步执行钩子)
@router.post("/api/hooks/execute")
async def execute_hook_task(request: Request, data: Dict[str, Any]):
"""执行钩子任务"""
try:
pagetype = data.get('pagetype')
name = data.get('name')
hook_name = data.get('hook_name')
if not all([pagetype, name, hook_name]):
raise HTTPException(status_code=400, detail="缺少必要参数")
logger.info(f"执行钩子任务: {pagetype}.{name}.{hook_name}")
# 执行钩子
success = execute_hooks(pagetype, name, hook_name, data.get('data'))
return JSONResponse(content={
"success": success,
"message": f"钩子 {hook_name} 执行{'成功' if success else '失败'}"
})
except Exception as e:
logger.error(f"执行钩子任务失败: {e}")
raise HTTPException(status_code=500, detail=str(e))

View File

@ -58,8 +58,8 @@ def create_app():
allow_headers=["*"],
)
# 自动注册 Jingrow 框架的静态路由
include_routers_from_package(app, "jingrow.api", prefix="/jingrow")
# 自动注册 Jingrow 框架的静态路由(无前缀)
include_routers_from_package(app, "jingrow.api")
# 手动注册动态路由(最后注册,避免冲突)
app.include_router(router, prefix="/jingrow")

View File

@ -0,0 +1,151 @@
# Copyright (c) 2025, JINGROW and contributors
# For license information, please see license.txt
"""
钩子执行器 - 基于现有消息队列的钩子执行系统
"""
import json
import logging
import uuid
import dramatiq
from typing import Dict, Any
from jingrow.services.local_job_manager import local_job_manager
from jingrow.utils.jingrow_api import get_logged_user
logger = logging.getLogger(__name__)
@dramatiq.actor(max_retries=3, time_limit=30_000)
def execute_hook_task(hook_data: str) -> None:
"""执行钩子任务"""
try:
data = json.loads(hook_data)
pagetype = data.get('pagetype')
name = data.get('name')
hook_name = data.get('hook_name')
hook_args = data.get('data', {})
if not all([pagetype, name, hook_name]):
logger.error(f"钩子任务参数不完整: {data}")
return
logger.info(f"开始执行钩子任务: {pagetype}.{name}.{hook_name}")
# 更新任务状态为执行中
job_id = data.get('job_id', str(uuid.uuid4()))
local_job_manager.update_job(job_id, {
'status': 'started',
'started_at': __import__('datetime').datetime.now().isoformat()
})
# 获取Page对象并执行钩子
import jingrow
pg = jingrow.get_pg(pagetype, name)
if pg and hasattr(pg, hook_name):
hook_func = getattr(pg, hook_name)
if callable(hook_func):
# 执行钩子函数
result = hook_func(**hook_args)
# 更新任务状态为完成
local_job_manager.update_job(job_id, {
'status': 'finished',
'ended_at': __import__('datetime').datetime.now().isoformat(),
'result': str(result) if result is not None else 'None'
})
logger.info(f"钩子执行成功: {pagetype}.{name}.{hook_name}")
else:
logger.warning(f"钩子函数不可调用: {pagetype}.{name}.{hook_name}")
local_job_manager.update_job(job_id, {
'status': 'failed',
'ended_at': __import__('datetime').datetime.now().isoformat(),
'exc_info': f"钩子函数不可调用: {hook_name}"
})
else:
logger.warning(f"钩子函数不存在: {pagetype}.{name}.{hook_name}")
local_job_manager.update_job(job_id, {
'status': 'failed',
'ended_at': __import__('datetime').datetime.now().isoformat(),
'exc_info': f"钩子函数不存在: {hook_name}"
})
except Exception as e:
logger.error(f"执行钩子任务失败: {e}")
job_id = data.get('job_id', 'unknown')
local_job_manager.update_job(job_id, {
'status': 'failed',
'ended_at': __import__('datetime').datetime.now().isoformat(),
'exc_info': str(e)
})
def create_hook_task(pagetype: str, name: str, hook_name: str, data: Dict[str, Any] = None, session_cookie: str = None) -> str:
"""创建钩子任务"""
try:
# 生成任务ID
job_id = str(uuid.uuid4())
# 构建任务数据
task_data = {
'job_id': job_id,
'pagetype': pagetype,
'name': name,
'hook_name': hook_name,
'data': data or {},
'session_cookie': session_cookie
}
# 获取当前用户
creator = get_logged_user(session_cookie) or 'system'
# 创建本地任务记录
local_job_manager.create_job({
'job_id': job_id,
'target_type': 'hook',
'pagetype': pagetype,
'name': name,
'hook_name': hook_name,
'status': 'queued',
'queue': 'default',
'job_name': f"Hook: {pagetype}.{name}.{hook_name}",
'arguments': json.dumps(task_data, indent=2, ensure_ascii=False),
'owner': creator,
'modified_by': creator
})
# 发送到消息队列
execute_hook_task.send(json.dumps(task_data))
logger.info(f"钩子任务已创建: {job_id} - {pagetype}.{name}.{hook_name}")
return job_id
except Exception as e:
logger.error(f"创建钩子任务失败: {e}")
return None
def execute_hook_sync(pagetype: str, name: str, hook_name: str, data: Dict[str, Any] = None) -> bool:
"""同步执行钩子函数"""
try:
import jingrow
# 对于新记录创建,记录可能还不存在,需要特殊处理
if hook_name == "before_insert":
logger.warning(f"跳过 before_insert 钩子,因为记录 {pagetype}.{name} 还不存在")
return False
pg = jingrow.get_pg(pagetype, name)
if pg and hasattr(pg, hook_name):
hook_func = getattr(pg, hook_name)
if callable(hook_func):
logger.info(f"同步执行钩子: {pagetype}.{name}.{hook_name}")
hook_func(**(data or {}))
return True
logger.warning(f"钩子函数不存在或不可调用: {pagetype}.{name}.{hook_name}")
return False
except Exception as e:
logger.error(f"同步执行钩子失败: {e}")
return False

221
test_api.py Normal file
View File

@ -0,0 +1,221 @@
#!/usr/bin/env python3
"""
测试本地版REST API和钩子功能
"""
import requests
import json
import time
# 配置
BASE_URL = "http://localhost:9001"
TEST_PAGETYPE = "Test Page"
TEST_NAME = "TEST-001"
def test_create_record():
"""测试创建记录"""
print("=== 测试创建记录 ===")
data = {
"name": TEST_NAME,
"title": "测试记录",
"status": "Active"
}
try:
response = requests.post(
f"{BASE_URL}/api/data/{TEST_PAGETYPE}",
json=data,
headers={"Content-Type": "application/json"}
)
print(f"状态码: {response.status_code}")
print(f"响应: {response.json()}")
if response.status_code == 200:
print("✅ 创建记录成功")
return True
else:
print("❌ 创建记录失败")
return False
except Exception as e:
print(f"❌ 创建记录异常: {e}")
return False
def test_update_record():
"""测试更新记录"""
print("\n=== 测试更新记录 ===")
data = {
"title": "更新后的标题",
"status": "Inactive"
}
try:
response = requests.put(
f"{BASE_URL}/api/data/{TEST_PAGETYPE}/{TEST_NAME}",
json=data,
headers={"Content-Type": "application/json"}
)
print(f"状态码: {response.status_code}")
print(f"响应: {response.json()}")
if response.status_code == 200:
print("✅ 更新记录成功")
return True
else:
print("❌ 更新记录失败")
return False
except Exception as e:
print(f"❌ 更新记录异常: {e}")
return False
def test_get_record():
"""测试获取记录"""
print("\n=== 测试获取记录 ===")
try:
response = requests.get(
f"{BASE_URL}/api/data/{TEST_PAGETYPE}/{TEST_NAME}"
)
print(f"状态码: {response.status_code}")
print(f"响应: {response.json()}")
if response.status_code == 200:
print("✅ 获取记录成功")
return True
else:
print("❌ 获取记录失败")
return False
except Exception as e:
print(f"❌ 获取记录异常: {e}")
return False
def test_get_records():
"""测试获取记录列表"""
print("\n=== 测试获取记录列表 ===")
try:
response = requests.get(
f"{BASE_URL}/api/data/{TEST_PAGETYPE}",
params={
"limit_page_length": 10
}
)
print(f"状态码: {response.status_code}")
print(f"响应: {response.json()}")
if response.status_code == 200:
print("✅ 获取记录列表成功")
return True
else:
print("❌ 获取记录列表失败")
return False
except Exception as e:
print(f"❌ 获取记录列表异常: {e}")
return False
def test_delete_record():
"""测试删除记录"""
print("\n=== 测试删除记录 ===")
try:
response = requests.delete(
f"{BASE_URL}/api/data/{TEST_PAGETYPE}/{TEST_NAME}"
)
print(f"状态码: {response.status_code}")
print(f"响应: {response.json()}")
if response.status_code == 200:
print("✅ 删除记录成功")
return True
else:
print("❌ 删除记录失败")
return False
except Exception as e:
print(f"❌ 删除记录异常: {e}")
return False
def test_hook_execution():
"""测试钩子执行"""
print("\n=== 测试钩子执行 ===")
data = {
"pagetype": TEST_PAGETYPE,
"name": TEST_NAME,
"hook_name": "on_update",
"data": {"test": "hook data"}
}
try:
response = requests.post(
f"{BASE_URL}/api/hooks/execute",
json=data,
headers={"Content-Type": "application/json"}
)
print(f"状态码: {response.status_code}")
print(f"响应: {response.json()}")
if response.status_code == 200:
print("✅ 钩子执行成功")
return True
else:
print("❌ 钩子执行失败")
return False
except Exception as e:
print(f"❌ 钩子执行异常: {e}")
return False
def main():
"""主测试函数"""
print("开始测试本地版REST API和钩子功能...")
print(f"测试目标: {BASE_URL}")
print(f"测试PageType: {TEST_PAGETYPE}")
print(f"测试记录名: {TEST_NAME}")
# 等待服务启动
print("\n等待服务启动...")
time.sleep(2)
# 执行测试
tests = [
test_create_record,
test_update_record,
test_get_record,
test_get_records,
test_hook_execution,
test_delete_record
]
passed = 0
total = len(tests)
for test in tests:
try:
if test():
passed += 1
except Exception as e:
print(f"❌ 测试异常: {e}")
print(f"\n=== 测试结果 ===")
print(f"通过: {passed}/{total}")
print(f"成功率: {passed/total*100:.1f}%")
if passed == total:
print("🎉 所有测试通过!")
else:
print("⚠️ 部分测试失败,请检查日志")
if __name__ == "__main__":
main()