From c73bacff486c3f8bf1fe16fc7d4728852fcd92c9 Mon Sep 17 00:00:00 2001 From: jingrow Date: Fri, 31 Oct 2025 19:11:45 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8DPage=E7=B1=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/jingrow/jingrow/api/page.py | 235 ++++++++--------------------- apps/jingrow/jingrow/model/page.py | 27 ++-- 2 files changed, 80 insertions(+), 182 deletions(-) diff --git a/apps/jingrow/jingrow/api/page.py b/apps/jingrow/jingrow/api/page.py index dad1054..6fc21f7 100644 --- a/apps/jingrow/jingrow/api/page.py +++ b/apps/jingrow/jingrow/api/page.py @@ -2,75 +2,17 @@ # For license information, please see license.txt """ -提供对云端数据库的增删改查操作,并支持本地钩子函数 +提供对云端数据库的增删改查操作 """ -from fastapi import APIRouter, HTTPException, Request, Depends +from fastapi import APIRouter, HTTPException, Request from fastapi.responses import JSONResponse -from typing import Dict, List, Any, Optional -import logging +from typing import Dict, Any, Optional import json import jingrow -import requests -from jingrow.config import Config -from jingrow.utils.auth import get_jingrow_api_headers -from jingrow.services.queue import init_queue -from jingrow.services.local_job_manager import local_job_manager -from jingrow.core.hooks import execute_hook, execute_hook_async -from jingrow.utils.jingrow_api import get_record_list, get_record, create_record, update_record, delete_record, get_single_pagetype -import uuid - -logger = logging.getLogger(__name__) router = APIRouter() -# 初始化队列 -init_queue() - - -def execute_hooks(pagetype: str, name: str, hook_name: str, data: Dict[str, Any] = None): - """执行钩子函数(同步)""" - try: - # 统一命名:page.{pagetype}.{hook_name} - hook_full_name = f"page.{pagetype}.{hook_name}" - # 重要:不要展开 data,避免与 HookRegistry.send(name=...) 参数名冲突 - pg_obj = jingrow.get_pg(pagetype, name) - # 仅发通配钩子(对齐SaaS:通配注册+统一分派) - wildcard_hook = f"page.*.{hook_name}" - execute_hook( - wildcard_hook, - sender=pagetype, - page=pg_obj, - event=hook_name, - data=data or {}, - ) - return True - except Exception as e: - logger.error(f"同步执行钩子失败: {e}") - return False - -def execute_hooks_async(pagetype: str, name: str, hook_name: str, data: Dict[str, Any] = None): - """异步执行钩子函数(fire-and-forget)""" - try: - hook_full_name = f"page.{pagetype}.{hook_name}" - pg = jingrow.get_pg(pagetype, name) - # 异步丢到事件循环(同样不要展开 data)。同时发送精确与通配钩子 - import asyncio - async def _fire(): - wildcard_hook = f"page.*.{hook_name}" - await execute_hook_async( - wildcard_hook, - sender=pagetype, - page=pg, - event=hook_name, - data=data or {}, - ) - asyncio.create_task(_fire()) - return True - except Exception as e: - logger.error(f"异步执行钩子失败: {e}") - return False - @router.get("/api/data/{pagetype}") async def get_records( @@ -84,80 +26,71 @@ async def get_records( ): """获取记录列表""" try: - fields_list = json.loads(fields) if fields else [] - filters_list = json.loads(filters) if filters else [] + try: + fields_list = json.loads(fields) if fields else [] + except Exception: + raise + try: + filters_list = json.loads(filters) if filters else [] + except Exception: + raise if limit_page_length == 0: limit = None else: limit = limit_start + limit_page_length - result = get_record_list( - pagetype=pagetype, - filters=filters_list, - fields=fields_list, - limit=limit - ) - - if result.get('success'): - data = result.get('data', []) - - if limit_page_length > 0: - if limit_start > 0: - data = data[limit_start:] - data = data[:limit_page_length] - + try: + data = jingrow.get_list( + pagetype=pagetype, + filters=filters_list, + fields=fields_list, + limit=limit + ) + except Exception: + raise + + if not isinstance(data, list): + raise HTTPException(status_code=400, detail='获取记录列表失败') + + if limit_page_length > 0: + if limit_start > 0: + data = data[limit_start:] + data = data[:limit_page_length] + + estimated_total = limit_start + len(data) + if limit_page_length > 0 and len(data) < limit_page_length: estimated_total = limit_start + len(data) - if limit_page_length > 0 and len(data) < limit_page_length: - estimated_total = limit_start + len(data) - elif limit_page_length > 0: - estimated_total = limit_start + limit_page_length + 1 - - return JSONResponse(content={ - "data": data, - "total": estimated_total - }) - else: - raise HTTPException(status_code=400, detail=result.get('error', '获取记录列表失败')) + elif limit_page_length > 0: + estimated_total = limit_start + limit_page_length + 1 + + return JSONResponse(content={ + "data": data, + "total": estimated_total + }) except Exception as e: - logger.error(f"获取记录列表失败: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/api/data/{pagetype}/{name}") async def get_record_api(request: Request, pagetype: str, name: str): """获取单个记录""" try: - result = get_record(pagetype, name) - - if result.get('success'): - return JSONResponse(content={"data": result.get('data')}) - else: - raise HTTPException(status_code=404, detail=result.get('error', '记录不存在')) + data = jingrow.get_pg(pagetype, name) + if data is None: + raise HTTPException(status_code=404, detail='记录不存在') + return JSONResponse(content={"data": data}) except Exception as e: - logger.error(f"获取记录失败: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/api/data/{pagetype}") async def create_record_api(request: Request, pagetype: str, data: Dict[str, Any]): """创建记录""" try: - result = create_record(pagetype, data) - - if not result.get('success'): - error_msg = result.get('error', '创建记录失败') - logger.error(f"创建记录失败: {error_msg}") - raise HTTPException(status_code=400, detail=error_msg) - - created_data = result.get('data', {}) - record_name = created_data.get('name') or data.get('name') - - if record_name: - try: - execute_hooks_async(pagetype, record_name, "after_insert", created_data) - except Exception as hook_error: - logger.error(f"执行钩子函数失败: {hook_error}") + created_data = jingrow.create_pg(pagetype, data) or {} + if not isinstance(created_data, dict): + raise HTTPException(status_code=400, detail='创建记录失败') return JSONResponse(content={ "message": "Created successfully", @@ -167,46 +100,36 @@ async def create_record_api(request: Request, pagetype: str, data: Dict[str, Any except HTTPException: raise except Exception as e: - logger.error(f"创建记录失败: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @router.put("/api/data/{pagetype}/{name}") async def update_record_api(request: Request, pagetype: str, name: str, data: Dict[str, Any]): """更新记录""" try: - result = update_record(pagetype, name, data) - - if not result.get('success'): - raise HTTPException(status_code=400, detail=result.get('error', '更新记录失败')) - - execute_hooks_async(pagetype, name, "on_update", data) - + updated = jingrow.update_pg(pagetype, name, data) + if updated is False: + raise HTTPException(status_code=400, detail='更新记录失败') return JSONResponse(content={ "message": "Updated successfully", - "data": result.get('data', {}) + "data": (updated or {}) if isinstance(updated, dict) else {} }) except Exception as e: - logger.error(f"更新记录失败: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.delete("/api/data/{pagetype}/{name}") async def delete_record_api(request: Request, pagetype: str, name: str): """删除记录""" try: - execute_hooks(pagetype, name, "on_trash") - - result = delete_record(pagetype, name) - - if not result.get('success'): - raise HTTPException(status_code=400, detail=result.get('error', '删除记录失败')) + ok = jingrow.delete_pg(pagetype, name) + if not ok: + raise HTTPException(status_code=400, detail='删除记录失败') return JSONResponse(content={ "message": "Deleted successfully" }) except Exception as e: - logger.error(f"删除记录失败: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/api/action/jingrow.client.delete") @@ -222,31 +145,22 @@ async def batch_delete_records(request: Request, data: Dict[str, Any]): return await delete_record_api(request, pagetype, name) except Exception as e: - logger.error(f"批量删除记录失败: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/api/action/jingrow.client.get_count") async def get_record_count(request: Request, pagetype: str): """获取记录总数(估算)""" try: - # 获取少量数据来估算总数 - result = get_record_list(pagetype=pagetype, limit=100) - - if result.get('success'): - data = result.get('data', []) - # 如果返回的数据少于100条,说明总数就是实际数量 - if len(data) < 100: - estimated_count = len(data) - else: - # 如果返回了100条,可能还有更多,返回一个估算值 - estimated_count = len(data) + 1 - - return JSONResponse(content={"message": estimated_count}) + data = jingrow.get_list(pagetype=pagetype, limit=100) + if not isinstance(data, list): + raise HTTPException(status_code=400, detail='获取记录总数失败') + if len(data) < 100: + estimated_count = len(data) else: - raise HTTPException(status_code=400, detail=result.get('error', '获取记录总数失败')) + estimated_count = len(data) + 1 + return JSONResponse(content={"message": estimated_count}) except Exception as e: - logger.error(f"获取记录总数失败: {e}") raise HTTPException(status_code=500, detail=str(e)) @@ -254,35 +168,12 @@ async def get_record_count(request: Request, pagetype: str): async def get_single(request: Request, pagetype: str): """获取single类型pagetype记录""" try: - result = get_single_pagetype(pagetype) - - if result.get('success'): + result = jingrow.get_single(pagetype) + if isinstance(result, dict) and result.get('success'): return JSONResponse(content={"data": result.get('config', {})}) - else: - raise HTTPException(status_code=404, detail=result.get('error', '配置不存在')) + raise HTTPException(status_code=404, detail=(result or {}).get('error', '配置不存在') if isinstance(result, dict) else '配置不存在') except Exception as e: - logger.error(f"获取single配置失败: {e}") raise HTTPException(status_code=500, detail=str(e)) -@router.post("/api/hooks/execute") -async def execute_hook_task(request: Request, data: Dict[str, Any]): - """执行钩子任务""" - try: - pagetype = data.get('pagetype') - name = data.get('name') - hook_name = data.get('hook_name') - - if not all([pagetype, name, hook_name]): - raise HTTPException(status_code=400, detail="缺少必要参数") - - success = execute_hooks(pagetype, name, hook_name, data.get('data')) - - return JSONResponse(content={ - "success": success, - "message": f"钩子 {hook_name} 执行{'成功' if success else '失败'}" - }) - - except Exception as e: - logger.error(f"执行钩子任务失败: {e}") - raise HTTPException(status_code=500, detail=str(e)) + diff --git a/apps/jingrow/jingrow/model/page.py b/apps/jingrow/jingrow/model/page.py index 25aa071..44684bb 100644 --- a/apps/jingrow/jingrow/model/page.py +++ b/apps/jingrow/jingrow/model/page.py @@ -5,7 +5,6 @@ from typing import Any, Dict, List, Optional from jingrow.services.runtime import get_adapter from jingrow.core.hooks import execute_hook, execute_hook_async import asyncio -import jingrow class Page: @@ -24,7 +23,11 @@ class Page: def _execute_hook(self, name: str, record_name: str, data: Optional[Dict[str, Any]] = None) -> bool: try: wildcard_hook = f"page.*.{name}" - pg_obj = jingrow.get_pg(self.pagetype, record_name) + res = self.get(record_name) + if isinstance(res, dict) and res.get('success'): + pg_obj = res.get('data') or {} + else: + pg_obj = None execute_hook( wildcard_hook, sender=self.pagetype, @@ -39,7 +42,11 @@ class Page: def _execute_hook_async(self, name: str, record_name: str, data: Optional[Dict[str, Any]] = None) -> bool: try: wildcard_hook = f"page.*.{name}" - pg_obj = jingrow.get_pg(self.pagetype, record_name) + res = self.get(record_name) + if isinstance(res, dict) and res.get('success'): + pg_obj = res.get('data') or {} + else: + pg_obj = None async def _run(): await execute_hook_async( @@ -57,15 +64,15 @@ class Page: # ---------------------- CRUD ---------------------- def get(self, name: str) -> Dict[str, Any]: - return self.adapter.get_record(self.pagetype, name) + return self.adapter.get_pg(self.pagetype, name) def list(self, filters: Optional[List[List[Any]]] = None, fields: Optional[List[str]] = None, limit: Optional[int] = None) -> Dict[str, Any]: - return self.adapter.get_record_list(self.pagetype, filters=filters, fields=fields, limit=limit) + return self.adapter.get_pg_list(self.pagetype, filters=filters, fields=fields, limit=limit) def create(self, data: Dict[str, Any]) -> Dict[str, Any]: - result = self.adapter.create_record(self.pagetype, data) + result = self.adapter.create_pg(self.pagetype, data) if result.get('success'): created = result.get('data', {}) record_name = created.get('name') or data.get('name') @@ -74,7 +81,7 @@ class Page: return result def update(self, name: str, data: Dict[str, Any]) -> Dict[str, Any]: - result = self.adapter.update_record(self.pagetype, name, data) + result = self.adapter.update_pg(self.pagetype, name, data) if result.get('success'): self._execute_hook_async('on_update', name, data) return result @@ -82,7 +89,7 @@ class Page: def delete(self, name: str) -> Dict[str, Any]: # 先发同步 on_trash,再删除 self._execute_hook('on_trash', name) - return self.adapter.delete_record(self.pagetype, name) + return self.adapter.delete_pg(self.pagetype, name) # ---------------------- 其他便捷能力 ---------------------- def get_meta(self) -> Dict[str, Any]: @@ -91,10 +98,10 @@ class Page: def get_module_app(self) -> Dict[str, Any]: return self.adapter.get_pagetype_module_app(self.pagetype) - def get_record_id(self, filters: Optional[List[List[Any]]] = None, + def get_pg_id(self, filters: Optional[List[List[Any]]] = None, field: Optional[str] = None, value: Optional[str] = None, site: Optional[str] = None) -> Dict[str, Any]: - return self.adapter.get_record_id(self.pagetype, filters=filters, field=field, value=value, site=site) + return self.adapter.get_pg_id(self.pagetype, filters=filters, field=field, value=value, site=site) def get_single(self) -> Dict[str, Any]: return self.adapter.get_single_pagetype(self.pagetype)