修复Page类

This commit is contained in:
jingrow 2025-10-31 19:11:45 +08:00
parent 7b992b63ba
commit c73bacff48
2 changed files with 80 additions and 182 deletions

View File

@ -2,75 +2,17 @@
# For license information, please see license.txt
"""
提供对云端数据库的增删改查操作并支持本地钩子函数
提供对云端数据库的增删改查操作
"""
from fastapi import APIRouter, HTTPException, Request, Depends
from fastapi import APIRouter, HTTPException, Request
from fastapi.responses import JSONResponse
from typing import Dict, List, Any, Optional
import logging
from typing import Dict, Any, Optional
import json
import jingrow
import requests
from jingrow.config import Config
from jingrow.utils.auth import get_jingrow_api_headers
from jingrow.services.queue import init_queue
from jingrow.services.local_job_manager import local_job_manager
from jingrow.core.hooks import execute_hook, execute_hook_async
from jingrow.utils.jingrow_api import get_record_list, get_record, create_record, update_record, delete_record, get_single_pagetype
import uuid
logger = logging.getLogger(__name__)
router = APIRouter()
# 初始化队列
init_queue()
def execute_hooks(pagetype: str, name: str, hook_name: str, data: Dict[str, Any] = None):
"""执行钩子函数(同步)"""
try:
# 统一命名page.{pagetype}.{hook_name}
hook_full_name = f"page.{pagetype}.{hook_name}"
# 重要:不要展开 data避免与 HookRegistry.send(name=...) 参数名冲突
pg_obj = jingrow.get_pg(pagetype, name)
# 仅发通配钩子对齐SaaS通配注册+统一分派)
wildcard_hook = f"page.*.{hook_name}"
execute_hook(
wildcard_hook,
sender=pagetype,
page=pg_obj,
event=hook_name,
data=data or {},
)
return True
except Exception as e:
logger.error(f"同步执行钩子失败: {e}")
return False
def execute_hooks_async(pagetype: str, name: str, hook_name: str, data: Dict[str, Any] = None):
"""异步执行钩子函数fire-and-forget"""
try:
hook_full_name = f"page.{pagetype}.{hook_name}"
pg = jingrow.get_pg(pagetype, name)
# 异步丢到事件循环(同样不要展开 data。同时发送精确与通配钩子
import asyncio
async def _fire():
wildcard_hook = f"page.*.{hook_name}"
await execute_hook_async(
wildcard_hook,
sender=pagetype,
page=pg,
event=hook_name,
data=data or {},
)
asyncio.create_task(_fire())
return True
except Exception as e:
logger.error(f"异步执行钩子失败: {e}")
return False
@router.get("/api/data/{pagetype}")
async def get_records(
@ -84,80 +26,71 @@ async def get_records(
):
"""获取记录列表"""
try:
fields_list = json.loads(fields) if fields else []
filters_list = json.loads(filters) if filters else []
try:
fields_list = json.loads(fields) if fields else []
except Exception:
raise
try:
filters_list = json.loads(filters) if filters else []
except Exception:
raise
if limit_page_length == 0:
limit = None
else:
limit = limit_start + limit_page_length
result = get_record_list(
pagetype=pagetype,
filters=filters_list,
fields=fields_list,
limit=limit
)
if result.get('success'):
data = result.get('data', [])
if limit_page_length > 0:
if limit_start > 0:
data = data[limit_start:]
data = data[:limit_page_length]
try:
data = jingrow.get_list(
pagetype=pagetype,
filters=filters_list,
fields=fields_list,
limit=limit
)
except Exception:
raise
if not isinstance(data, list):
raise HTTPException(status_code=400, detail='获取记录列表失败')
if limit_page_length > 0:
if limit_start > 0:
data = data[limit_start:]
data = data[:limit_page_length]
estimated_total = limit_start + len(data)
if limit_page_length > 0 and len(data) < limit_page_length:
estimated_total = limit_start + len(data)
if limit_page_length > 0 and len(data) < limit_page_length:
estimated_total = limit_start + len(data)
elif limit_page_length > 0:
estimated_total = limit_start + limit_page_length + 1
return JSONResponse(content={
"data": data,
"total": estimated_total
})
else:
raise HTTPException(status_code=400, detail=result.get('error', '获取记录列表失败'))
elif limit_page_length > 0:
estimated_total = limit_start + limit_page_length + 1
return JSONResponse(content={
"data": data,
"total": estimated_total
})
except Exception as e:
logger.error(f"获取记录列表失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/data/{pagetype}/{name}")
async def get_record_api(request: Request, pagetype: str, name: str):
"""获取单个记录"""
try:
result = get_record(pagetype, name)
if result.get('success'):
return JSONResponse(content={"data": result.get('data')})
else:
raise HTTPException(status_code=404, detail=result.get('error', '记录不存在'))
data = jingrow.get_pg(pagetype, name)
if data is None:
raise HTTPException(status_code=404, detail='记录不存在')
return JSONResponse(content={"data": data})
except Exception as e:
logger.error(f"获取记录失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/data/{pagetype}")
async def create_record_api(request: Request, pagetype: str, data: Dict[str, Any]):
"""创建记录"""
try:
result = create_record(pagetype, data)
if not result.get('success'):
error_msg = result.get('error', '创建记录失败')
logger.error(f"创建记录失败: {error_msg}")
raise HTTPException(status_code=400, detail=error_msg)
created_data = result.get('data', {})
record_name = created_data.get('name') or data.get('name')
if record_name:
try:
execute_hooks_async(pagetype, record_name, "after_insert", created_data)
except Exception as hook_error:
logger.error(f"执行钩子函数失败: {hook_error}")
created_data = jingrow.create_pg(pagetype, data) or {}
if not isinstance(created_data, dict):
raise HTTPException(status_code=400, detail='创建记录失败')
return JSONResponse(content={
"message": "Created successfully",
@ -167,46 +100,36 @@ async def create_record_api(request: Request, pagetype: str, data: Dict[str, Any
except HTTPException:
raise
except Exception as e:
logger.error(f"创建记录失败: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.put("/api/data/{pagetype}/{name}")
async def update_record_api(request: Request, pagetype: str, name: str, data: Dict[str, Any]):
"""更新记录"""
try:
result = update_record(pagetype, name, data)
if not result.get('success'):
raise HTTPException(status_code=400, detail=result.get('error', '更新记录失败'))
execute_hooks_async(pagetype, name, "on_update", data)
updated = jingrow.update_pg(pagetype, name, data)
if updated is False:
raise HTTPException(status_code=400, detail='更新记录失败')
return JSONResponse(content={
"message": "Updated successfully",
"data": result.get('data', {})
"data": (updated or {}) if isinstance(updated, dict) else {}
})
except Exception as e:
logger.error(f"更新记录失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/api/data/{pagetype}/{name}")
async def delete_record_api(request: Request, pagetype: str, name: str):
"""删除记录"""
try:
execute_hooks(pagetype, name, "on_trash")
result = delete_record(pagetype, name)
if not result.get('success'):
raise HTTPException(status_code=400, detail=result.get('error', '删除记录失败'))
ok = jingrow.delete_pg(pagetype, name)
if not ok:
raise HTTPException(status_code=400, detail='删除记录失败')
return JSONResponse(content={
"message": "Deleted successfully"
})
except Exception as e:
logger.error(f"删除记录失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/action/jingrow.client.delete")
@ -222,31 +145,22 @@ async def batch_delete_records(request: Request, data: Dict[str, Any]):
return await delete_record_api(request, pagetype, name)
except Exception as e:
logger.error(f"批量删除记录失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/action/jingrow.client.get_count")
async def get_record_count(request: Request, pagetype: str):
"""获取记录总数(估算)"""
try:
# 获取少量数据来估算总数
result = get_record_list(pagetype=pagetype, limit=100)
if result.get('success'):
data = result.get('data', [])
# 如果返回的数据少于100条说明总数就是实际数量
if len(data) < 100:
estimated_count = len(data)
else:
# 如果返回了100条可能还有更多返回一个估算值
estimated_count = len(data) + 1
return JSONResponse(content={"message": estimated_count})
data = jingrow.get_list(pagetype=pagetype, limit=100)
if not isinstance(data, list):
raise HTTPException(status_code=400, detail='获取记录总数失败')
if len(data) < 100:
estimated_count = len(data)
else:
raise HTTPException(status_code=400, detail=result.get('error', '获取记录总数失败'))
estimated_count = len(data) + 1
return JSONResponse(content={"message": estimated_count})
except Exception as e:
logger.error(f"获取记录总数失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@ -254,35 +168,12 @@ async def get_record_count(request: Request, pagetype: str):
async def get_single(request: Request, pagetype: str):
"""获取single类型pagetype记录"""
try:
result = get_single_pagetype(pagetype)
if result.get('success'):
result = jingrow.get_single(pagetype)
if isinstance(result, dict) and result.get('success'):
return JSONResponse(content={"data": result.get('config', {})})
else:
raise HTTPException(status_code=404, detail=result.get('error', '配置不存在'))
raise HTTPException(status_code=404, detail=(result or {}).get('error', '配置不存在') if isinstance(result, dict) else '配置不存在')
except Exception as e:
logger.error(f"获取single配置失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/hooks/execute")
async def execute_hook_task(request: Request, data: Dict[str, Any]):
"""执行钩子任务"""
try:
pagetype = data.get('pagetype')
name = data.get('name')
hook_name = data.get('hook_name')
if not all([pagetype, name, hook_name]):
raise HTTPException(status_code=400, detail="缺少必要参数")
success = execute_hooks(pagetype, name, hook_name, data.get('data'))
return JSONResponse(content={
"success": success,
"message": f"钩子 {hook_name} 执行{'成功' if success else '失败'}"
})
except Exception as e:
logger.error(f"执行钩子任务失败: {e}")
raise HTTPException(status_code=500, detail=str(e))

View File

@ -5,7 +5,6 @@ from typing import Any, Dict, List, Optional
from jingrow.services.runtime import get_adapter
from jingrow.core.hooks import execute_hook, execute_hook_async
import asyncio
import jingrow
class Page:
@ -24,7 +23,11 @@ class Page:
def _execute_hook(self, name: str, record_name: str, data: Optional[Dict[str, Any]] = None) -> bool:
try:
wildcard_hook = f"page.*.{name}"
pg_obj = jingrow.get_pg(self.pagetype, record_name)
res = self.get(record_name)
if isinstance(res, dict) and res.get('success'):
pg_obj = res.get('data') or {}
else:
pg_obj = None
execute_hook(
wildcard_hook,
sender=self.pagetype,
@ -39,7 +42,11 @@ class Page:
def _execute_hook_async(self, name: str, record_name: str, data: Optional[Dict[str, Any]] = None) -> bool:
try:
wildcard_hook = f"page.*.{name}"
pg_obj = jingrow.get_pg(self.pagetype, record_name)
res = self.get(record_name)
if isinstance(res, dict) and res.get('success'):
pg_obj = res.get('data') or {}
else:
pg_obj = None
async def _run():
await execute_hook_async(
@ -57,15 +64,15 @@ class Page:
# ---------------------- CRUD ----------------------
def get(self, name: str) -> Dict[str, Any]:
return self.adapter.get_record(self.pagetype, name)
return self.adapter.get_pg(self.pagetype, name)
def list(self, filters: Optional[List[List[Any]]] = None,
fields: Optional[List[str]] = None,
limit: Optional[int] = None) -> Dict[str, Any]:
return self.adapter.get_record_list(self.pagetype, filters=filters, fields=fields, limit=limit)
return self.adapter.get_pg_list(self.pagetype, filters=filters, fields=fields, limit=limit)
def create(self, data: Dict[str, Any]) -> Dict[str, Any]:
result = self.adapter.create_record(self.pagetype, data)
result = self.adapter.create_pg(self.pagetype, data)
if result.get('success'):
created = result.get('data', {})
record_name = created.get('name') or data.get('name')
@ -74,7 +81,7 @@ class Page:
return result
def update(self, name: str, data: Dict[str, Any]) -> Dict[str, Any]:
result = self.adapter.update_record(self.pagetype, name, data)
result = self.adapter.update_pg(self.pagetype, name, data)
if result.get('success'):
self._execute_hook_async('on_update', name, data)
return result
@ -82,7 +89,7 @@ class Page:
def delete(self, name: str) -> Dict[str, Any]:
# 先发同步 on_trash再删除
self._execute_hook('on_trash', name)
return self.adapter.delete_record(self.pagetype, name)
return self.adapter.delete_pg(self.pagetype, name)
# ---------------------- 其他便捷能力 ----------------------
def get_meta(self) -> Dict[str, Any]:
@ -91,10 +98,10 @@ class Page:
def get_module_app(self) -> Dict[str, Any]:
return self.adapter.get_pagetype_module_app(self.pagetype)
def get_record_id(self, filters: Optional[List[List[Any]]] = None,
def get_pg_id(self, filters: Optional[List[List[Any]]] = None,
field: Optional[str] = None, value: Optional[str] = None,
site: Optional[str] = None) -> Dict[str, Any]:
return self.adapter.get_record_id(self.pagetype, filters=filters, field=field, value=value, site=site)
return self.adapter.get_pg_id(self.pagetype, filters=filters, field=field, value=value, site=site)
def get_single(self) -> Dict[str, Any]:
return self.adapter.get_single_pagetype(self.pagetype)