From 3059c81cc9bd8367a9ab19145af8889c7addb809 Mon Sep 17 00:00:00 2001 From: jingrow Date: Thu, 30 Oct 2025 02:42:38 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=A0=E9=99=A4=E5=86=97=E4=BD=99=E7=9A=84?= =?UTF-8?q?=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/jingrow/jingrow/api/page.py | 26 ++- apps/jingrow/jingrow/model/base_page.py | 159 ------------------ apps/jingrow/jingrow/model/page.py | 57 +++---- .../jingrow/jingrow/services/hook_executor.py | 151 ----------------- 4 files changed, 46 insertions(+), 347 deletions(-) delete mode 100644 apps/jingrow/jingrow/model/base_page.py delete mode 100644 apps/jingrow/jingrow/services/hook_executor.py diff --git a/apps/jingrow/jingrow/api/page.py b/apps/jingrow/jingrow/api/page.py index ecf8937..8dd9b48 100644 --- a/apps/jingrow/jingrow/api/page.py +++ b/apps/jingrow/jingrow/api/page.py @@ -16,7 +16,7 @@ from jingrow.config import Config from jingrow.utils.auth import get_jingrow_api_headers from jingrow.services.queue import init_queue from jingrow.services.local_job_manager import local_job_manager -from jingrow.services.hook_executor import create_hook_task, execute_hook_sync +from jingrow.core.hooks import execute_hook, execute_hook_async from jingrow.utils.jingrow_api import get_record_list, get_record, create_record, update_record, delete_record, get_single_pagetype import uuid @@ -29,12 +29,28 @@ init_queue() def execute_hooks(pagetype: str, name: str, hook_name: str, data: Dict[str, Any] = None): - """执行钩子函数""" - return execute_hook_sync(pagetype, name, hook_name, data) + """执行钩子函数(同步)""" + try: + # 统一命名:page.{pagetype}.{hook_name} + hook_full_name = f"page.{pagetype}.{hook_name}" + execute_hook(hook_full_name, sender=pagetype, page=jingrow.get_pg(pagetype, name), **(data or {})) + return True + except Exception as e: + logger.error(f"同步执行钩子失败: {e}") + return False def execute_hooks_async(pagetype: str, name: str, hook_name: str, data: Dict[str, Any] = None): - """异步执行钩子函数""" - return create_hook_task(pagetype, name, hook_name, data) + """异步执行钩子函数(fire-and-forget)""" + try: + hook_full_name = f"page.{pagetype}.{hook_name}" + pg = jingrow.get_pg(pagetype, name) + # 异步丢到事件循环 + import asyncio + asyncio.create_task(execute_hook_async(hook_full_name, sender=pagetype, page=pg, **(data or {}))) + return True + except Exception as e: + logger.error(f"异步执行钩子失败: {e}") + return False @router.get("/api/data/{pagetype}") diff --git a/apps/jingrow/jingrow/model/base_page.py b/apps/jingrow/jingrow/model/base_page.py deleted file mode 100644 index 8ef2cdc..0000000 --- a/apps/jingrow/jingrow/model/base_page.py +++ /dev/null @@ -1,159 +0,0 @@ -# Copyright (c) 2025, JINGROW -# License: MIT. See LICENSE - -from __future__ import annotations - -import importlib -import weakref -from functools import cached_property -from typing import Any, Iterable, TYPE_CHECKING, TypeVar - -import jingrow -from jingrow.modules import load_pagetype_module - -if TYPE_CHECKING: - from jingrow.model.page import Page - -D = TypeVar("D", bound="Page") - - -def get_controller(pagetype: str): - """ - 返回给定 PageType 的类对象(带本地缓存)。 - 本地版不依赖数据库元数据,直接走模块加载规则。 - """ - if not pagetype: - raise ImportError("Empty pagetype") - - site_controllers = jingrow.controllers.setdefault(jingrow.local.site, {}) - if pagetype not in site_controllers: - site_controllers[pagetype] = import_controller(pagetype) - - return site_controllers[pagetype] - - -def import_controller(pagetype: str): - """ - 解析并加载 pagetype 对应的 Python 模块与类。 - 规则对齐云端版: - - 模块路径:jingrow..pagetype./.py - - 类名:去空格/连字符后的驼峰。如 "Local Ai Agent" → "LocalAIAgent" - - 支持 hooks:override_pagetype_class(可选) - """ - module_name = "Core" - - # hooks 覆盖(兼容云端约定) - get_hooks = getattr(jingrow, "get_hooks", lambda *a, **k: {}) - class_overrides = get_hooks("override_pagetype_class") - module_path = None - classname = pagetype.replace(" ", "").replace("-", "") - - if class_overrides and class_overrides.get(pagetype): - import_path = class_overrides[pagetype][-1] - module_path, classname = import_path.rsplit(".", 1) - get_module = getattr(jingrow, "get_module", importlib.import_module) - module = get_module(module_path) - else: - module = load_pagetype_module(pagetype, module_name) - - class_ = getattr(module, classname, None) - if class_ is None: - raise ImportError( - pagetype - if module_path is None - else f"{pagetype}: {classname} does not exist in module {module_path}" - ) - - # 为了保持接口一致,这里不强制继承 BasePage,但推荐继承。 - return class_ - - -class BasePage: - """本地版最小可用 BasePage,保持与云端关键接口一致。 - - 仅实现 pagetype 机制需要的通用数据容器与子表能力,复杂校验与数据库操作 - 交由现有本地框架(jingrow.save_pg / get_pg 等)处理。 - """ - - _reserved_keywords = frozenset( - ( - "pagetype", - "meta", - "flags", - "parent_pg", - "_table_fieldnames", - "_reserved_keywords", - "permitted_fieldnames", - ) - ) - - def __init__(self, d: dict[str, Any]): - if d.get("pagetype"): - self.pagetype = d["pagetype"] - - self.flags = jingrow._dict() - self._table_fieldnames = set() - self.update(d) - - if hasattr(self, "__setup__"): - self.__setup__() - - @cached_property - def meta(self): - # 需要元数据时,走本地 jingrow.get_meta - return jingrow.get_meta(self.pagetype) - - def update(self, d: dict[str, Any]): - if "name" in d: - self.name = d["name"] - for key, value in d.items(): - self.set(key, value) - return self - - def get(self, key: str, default: Any = None): - return self.__dict__.get(key, default) - - def set(self, key: str, value: Any): - if key in self._reserved_keywords: - return - self.__dict__[key] = value - - def append(self, key: str, value: D | dict | None = None) -> D: - if value is None: - value = {} - if (table := self.__dict__.get(key)) is None: - self.__dict__[key] = table = [] - d = self._init_child(value, key) - table.append(d) - d.parent_pg = weakref.ref(self) - return d - - def extend(self, key: str, value: Iterable[dict | D]): - for v in value: - self.append(key, v) - - def _init_child(self, value: dict | D, key: str): - if not isinstance(value, BasePage): - if not (pagetype := self.get_table_field_pagetype(key)): - raise AttributeError(key) - value["pagetype"] = pagetype - child_cls = get_controller(pagetype) - value = child_cls(value) - - value.parent = getattr(self, "name", None) - value.parenttype = self.pagetype - value.parentfield = key - if not getattr(value, "idx", None): - table = getattr(self, key, None) or [] - value.idx = len(table) + 1 - if not getattr(value, "name", None): - value.__dict__["__islocal"] = 1 - return value - - def get_table_field_pagetype(self, fieldname: str) -> str | None: - try: - return self.meta.get_field(fieldname).options - except Exception: - return None - - diff --git a/apps/jingrow/jingrow/model/page.py b/apps/jingrow/jingrow/model/page.py index 814b6d9..921df8a 100644 --- a/apps/jingrow/jingrow/model/page.py +++ b/apps/jingrow/jingrow/model/page.py @@ -6,45 +6,38 @@ from __future__ import annotations from typing import Any, Iterable import jingrow -from jingrow.model.base_page import BasePage, get_controller -def get_pg(*args, **kwargs): - """本地版 get_pg:兼容云端签名。 +class Page(): + """最小可用 Page:本地数据容器 + 钩子触发""" - 支持: - - get_pg("PageType", name) - - get_pg({"pagetype": "PageType", ...}) - - get_pg(pagetype="PageType", field=...) - """ - if args: - if isinstance(args[0], BasePage): - return args[0] - elif isinstance(args[0], str): - pagetype = args[0] - elif isinstance(args[0], dict): - kwargs = args[0] - else: - raise ValueError("First non keyword argument must be a string or dict") + def __init__(self, d: dict[str, Any] | None = None): + if d is None: + d = {} + if not isinstance(d, dict): + raise ValueError("Page expects a dict payload") + # 基础字段 + self.pagetype = d.get("pagetype") + if "name" in d: + self.name = d["name"] + # flags 与数据 + self.flags = jingrow._dict() + self.update(d) - if len(args) < 2 and kwargs: - if "pagetype" in kwargs: - pagetype = kwargs["pagetype"] - else: - raise ValueError('"pagetype" is a required key') + @property + def meta(self): + return jingrow.get_meta(self.pagetype) - controller = get_controller(pagetype) - if controller: - return controller(*args, **kwargs) + def update(self, d: dict[str, Any]): + for key, value in d.items(): + self.set(key, value) + return self - raise ImportError(pagetype) + def get(self, key: str, default: Any = None): + return self.__dict__.get(key, default) - -class Page(BasePage): - """最小可用 Page,复用 jingrow 本地数据库 API(save_pg等) - 仅保留 pagetype 生命周期接口:insert/save/delete/run_method。 - 集成现代化的 hooks 系统。 - """ + def set(self, key: str, value: Any): + self.__dict__[key] = value def insert(self, **kwargs) -> "Page": """插入页面,触发相关钩子""" diff --git a/apps/jingrow/jingrow/services/hook_executor.py b/apps/jingrow/jingrow/services/hook_executor.py deleted file mode 100644 index 828b3e7..0000000 --- a/apps/jingrow/jingrow/services/hook_executor.py +++ /dev/null @@ -1,151 +0,0 @@ -# Copyright (c) 2025, JINGROW and contributors -# For license information, please see license.txt - -""" -钩子执行器 - 基于现有消息队列的钩子执行系统 -""" - -import json -import logging -import uuid -import dramatiq -from typing import Dict, Any -from jingrow.services.local_job_manager import local_job_manager -from jingrow.utils.jingrow_api import get_logged_user - -logger = logging.getLogger(__name__) - -@dramatiq.actor(max_retries=3, time_limit=30_000) -def execute_hook_task(hook_data: str) -> None: - """执行钩子任务""" - try: - data = json.loads(hook_data) - pagetype = data.get('pagetype') - name = data.get('name') - hook_name = data.get('hook_name') - hook_args = data.get('data', {}) - - if not all([pagetype, name, hook_name]): - logger.error(f"钩子任务参数不完整: {data}") - return - - logger.info(f"开始执行钩子任务: {pagetype}.{name}.{hook_name}") - - # 更新任务状态为执行中 - job_id = data.get('job_id', str(uuid.uuid4())) - local_job_manager.update_job(job_id, { - 'status': 'started', - 'started_at': __import__('datetime').datetime.now().isoformat() - }) - - # 获取Page对象并执行钩子 - import jingrow - pg = jingrow.get_pg(pagetype, name) - - if pg and hasattr(pg, hook_name): - hook_func = getattr(pg, hook_name) - if callable(hook_func): - # 执行钩子函数 - result = hook_func(**hook_args) - - # 更新任务状态为完成 - local_job_manager.update_job(job_id, { - 'status': 'finished', - 'ended_at': __import__('datetime').datetime.now().isoformat(), - 'result': str(result) if result is not None else 'None' - }) - - logger.info(f"钩子执行成功: {pagetype}.{name}.{hook_name}") - else: - logger.warning(f"钩子函数不可调用: {pagetype}.{name}.{hook_name}") - local_job_manager.update_job(job_id, { - 'status': 'failed', - 'ended_at': __import__('datetime').datetime.now().isoformat(), - 'exc_info': f"钩子函数不可调用: {hook_name}" - }) - else: - logger.warning(f"钩子函数不存在: {pagetype}.{name}.{hook_name}") - local_job_manager.update_job(job_id, { - 'status': 'failed', - 'ended_at': __import__('datetime').datetime.now().isoformat(), - 'exc_info': f"钩子函数不存在: {hook_name}" - }) - - except Exception as e: - logger.error(f"执行钩子任务失败: {e}") - job_id = data.get('job_id', 'unknown') - local_job_manager.update_job(job_id, { - 'status': 'failed', - 'ended_at': __import__('datetime').datetime.now().isoformat(), - 'exc_info': str(e) - }) - -def create_hook_task(pagetype: str, name: str, hook_name: str, data: Dict[str, Any] = None, session_cookie: str = None) -> str: - """创建钩子任务""" - try: - # 生成任务ID - job_id = str(uuid.uuid4()) - - # 构建任务数据 - task_data = { - 'job_id': job_id, - 'pagetype': pagetype, - 'name': name, - 'hook_name': hook_name, - 'data': data or {}, - 'session_cookie': session_cookie - } - - # 获取当前用户 - creator = get_logged_user(session_cookie) or 'system' - - # 创建本地任务记录 - local_job_manager.create_job({ - 'job_id': job_id, - 'target_type': 'hook', - 'pagetype': pagetype, - 'name': name, - 'hook_name': hook_name, - 'status': 'queued', - 'queue': 'default', - 'job_name': f"Hook: {pagetype}.{name}.{hook_name}", - 'arguments': json.dumps(task_data, indent=2, ensure_ascii=False), - 'owner': creator, - 'modified_by': creator - }) - - # 发送到消息队列 - execute_hook_task.send(json.dumps(task_data)) - - logger.info(f"钩子任务已创建: {job_id} - {pagetype}.{name}.{hook_name}") - return job_id - - except Exception as e: - logger.error(f"创建钩子任务失败: {e}") - return None - -def execute_hook_sync(pagetype: str, name: str, hook_name: str, data: Dict[str, Any] = None) -> bool: - """同步执行钩子函数""" - try: - import jingrow - - # 对于新记录创建,记录可能还不存在,需要特殊处理 - if hook_name == "before_insert": - logger.warning(f"跳过 before_insert 钩子,因为记录 {pagetype}.{name} 还不存在") - return False - - pg = jingrow.get_pg(pagetype, name) - - if pg and hasattr(pg, hook_name): - hook_func = getattr(pg, hook_name) - if callable(hook_func): - logger.info(f"同步执行钩子: {pagetype}.{name}.{hook_name}") - hook_func(**(data or {})) - return True - - logger.warning(f"钩子函数不存在或不可调用: {pagetype}.{name}.{hook_name}") - return False - - except Exception as e: - logger.error(f"同步执行钩子失败: {e}") - return False