diff --git a/apps/jingrow/jingrow/__init__.py b/apps/jingrow/jingrow/__init__.py index 77f053c..3af92ce 100644 --- a/apps/jingrow/jingrow/__init__.py +++ b/apps/jingrow/jingrow/__init__.py @@ -7,7 +7,7 @@ import inspect import logging import os from contextvars import ContextVar -from jingrow.model.page import Page +from jingrow.model.page import Page, get_page_instance from jingrow.config import Config from jingrow.utils.jingrow_api import upload_file_to_jingrow, get_record_count @@ -36,7 +36,7 @@ for _lvl in list(_ANSI.keys()): def get_pg(pagetype: str, name: str): """获取单条记录并转为可属性访问的对象,失败返回 None。""" - pg = Page(pagetype) + pg = get_page_instance(pagetype) res = pg.get(name) if not isinstance(res, dict) or not res.get('success'): return None @@ -46,7 +46,7 @@ def get_pg(pagetype: str, name: str): def create_pg(pagetype: str, data: Dict[str, Any]): """创建记录,返回创建后的数据对象或 None。""" - pg = Page(pagetype) + pg = get_page_instance(pagetype) res = pg.create(data) if not isinstance(res, dict) or not res.get('success'): return None @@ -56,7 +56,7 @@ def create_pg(pagetype: str, data: Dict[str, Any]): def update_pg(pagetype: str, name: str, data: Dict[str, Any]): """更新记录,成功返回更新后的数据对象或 True,失败返回 False。""" - pg = Page(pagetype) + pg = get_page_instance(pagetype) res = pg.update(name, data) if not isinstance(res, dict) or not res.get('success'): return False @@ -101,14 +101,14 @@ def get_hook_source() -> Optional[str]: def delete_pg(pagetype: str, name: str) -> bool: - pg = Page(pagetype) + pg = get_page_instance(pagetype) res = pg.delete(name) return bool(isinstance(res, dict) and res.get('success')) def get_list(pagetype: str, filters: List[List[Any]] = None, fields: List[str] = None, limit: int = None): """获取记录列表,返回对象列表;失败返回空列表。""" - pg = Page(pagetype) + pg = get_page_instance(pagetype) res = pg.list(filters=filters, fields=fields, limit=limit) if not isinstance(res, dict) or not res.get('success'): return [] @@ -135,7 +135,7 @@ def get_single(pagetype: str): def get_module_app(pagetype: str): """获取指定 pagetype 的模块应用信息,返回后端适配器的原始结果结构。""" - return Page(pagetype).get_module_app() + return get_page_instance(pagetype).get_module_app() def get_pg_id( @@ -146,22 +146,22 @@ def get_pg_id( site: Optional[str] = None, ): """根据过滤条件或字段值获取文档 ID,返回后端适配器的原始结果结构。""" - return Page(pagetype).get_pg_id(filters=filters, field=field, value=value, site=site) + return get_page_instance(pagetype).get_pg_id(filters=filters, field=field, value=value, site=site) def get_meta(pagetype: str): """获取 pagetype 的元数据,返回后端适配器的原始结果结构。""" - return Page(pagetype).get_meta() + return get_page_instance(pagetype).get_meta() def get_field_mapping_from_jingrow(pagetype: str): """获取字段的label到fieldname的映射,返回 {label: fieldname} 的映射字典。""" - return Page(pagetype).get_field_mapping_from_jingrow() + return get_page_instance(pagetype).get_field_mapping_from_jingrow() def get_field_value_from_jingrow(pagetype: str, name: str, fieldname: str): """从Jingrow获取字段的当前值,返回字段的当前值,如果为空则返回None。""" - return Page(pagetype).get_field_value_from_jingrow(name, fieldname) + return get_page_instance(pagetype).get_field_value_from_jingrow(name, fieldname) def upload_file(file_data: bytes, filename: str, attached_to_pagetype: Optional[str] = None, attached_to_name: Optional[str] = None, attached_to_field: Optional[str] = None): diff --git a/apps/jingrow/jingrow/ai/pagetype/local_ai_node/local_ai_node.py b/apps/jingrow/jingrow/ai/pagetype/local_ai_node/local_ai_node.py new file mode 100644 index 0000000..f20cb6b --- /dev/null +++ b/apps/jingrow/jingrow/ai/pagetype/local_ai_node/local_ai_node.py @@ -0,0 +1,123 @@ +# Copyright (c) 2025, jingrow and contributors +# For license information, please see license.txt + +from typing import Dict, Any +from pathlib import Path +import json +import jingrow +from jingrow.model.page import Page +from jingrow.utils.path import get_jingrow_root +from jingrow.utils.fs import atomic_write_json + + +class LocalAiNode(Page): + """ + Local Ai Node 页面类型 + 在保存时自动同步节点数据到 JSON 文件 + """ + + def create(self, data: Dict[str, Any]) -> Dict[str, Any]: + """创建节点记录,并同步到 JSON 文件""" + result = super().create(data) + if result.get('success'): + created_data = result.get('data', {}) + if created_data.get('node_type'): + _sync_node_to_json_file(created_data) + return result + + def update(self, name: str, data: Dict[str, Any]) -> Dict[str, Any]: + """更新节点记录,并同步到 JSON 文件""" + result = super().update(name, data) + if result.get('success'): + # 获取更新后的完整数据 + updated_res = self.get(name) + if updated_res.get('success'): + updated_data = updated_res.get('data', {}) + if updated_data.get('node_type'): + _sync_node_to_json_file(updated_data) + return result + + +def _sync_node_to_json_file(node_data: Dict[str, Any]) -> bool: + """ + 将节点数据同步到 JSON 文件(LocalAiNode 类的辅助函数) + 当保存 Local Ai Node 时,自动更新对应的 JSON 文件 + + Args: + node_data: 节点数据字典,包含 node_type, node_label, node_icon 等字段 + + Returns: + bool: 同步是否成功 + """ + try: + node_type = node_data.get('node_type') + if not node_type: + return False + + # 获取节点目录路径 + jingrow_root = get_jingrow_root() + nodes_root = jingrow_root / "ai" / "nodes" + node_dir = nodes_root / node_type + json_file = node_dir / f"{node_type}.json" + + # 确保目录存在 + node_dir.mkdir(parents=True, exist_ok=True) + + # 解析 node_schema(可能是字典或 JSON 字符串) + node_schema = node_data.get('node_schema') or {} + if isinstance(node_schema, str): + try: + node_schema = json.loads(node_schema) + except (json.JSONDecodeError, TypeError): + node_schema = {} + if not isinstance(node_schema, dict): + node_schema = {} + + # 确保 node_schema 中不包含 metadata(如果存在则移除) + node_schema = {k: v for k, v in node_schema.items() if k != 'metadata'} + + # 构建完整的 JSON 文件内容 + json_content = { + "metadata": { + "type": node_type, + "label": node_data.get('node_label') or node_type, + "icon": node_data.get('node_icon') or "fa-cube", + "color": node_data.get('node_color') or "#6b7280", + "description": node_data.get('node_description') or "", + "group": node_data.get('node_group') or "其他", + "component_type": node_data.get('node_component') or "GenericNode" + } + } + + # 合并 node_schema 的所有字段(properties, required, _layout 等) + # 确保包含完整的 schema 配置 + json_content.update(node_schema) + + # 确保必需的字段存在(即使 node_schema 中没有) + json_content.setdefault("properties", {}) + json_content.setdefault("required", []) + json_content.setdefault("_layout", { + "tabs": [{ + "id": "tab_1", + "label": "基本设置", + "sections": [{ + "id": "section_1", + "label": "", + "columns": [{ + "id": "column_1", + "label": "", + "fields": [] + }] + }] + }], + "activeTab": "tab_1" + }) + + # 使用原子写入确保数据完整性 + atomic_write_json(json_file, json_content) + + return True + except Exception as e: + # 静默失败,不影响主流程 + jingrow.log_error("同步节点到 JSON 文件失败", str(e), exc=e) + return False diff --git a/apps/jingrow/jingrow/model/page.py b/apps/jingrow/jingrow/model/page.py index 9b0553b..244eb9b 100644 --- a/apps/jingrow/jingrow/model/page.py +++ b/apps/jingrow/jingrow/model/page.py @@ -7,6 +7,39 @@ from jingrow.core.hooks import execute_hook, execute_hook_async import asyncio +def get_page_instance(pagetype: str) -> 'Page': + """ + 获取 Page 实例,支持使用自定义 Page 子类 + + 自动发现并加载 pagetype 对应的 Page 子类: + - 命名规则:pagetype 转换为类名,如 "Local Ai Node" -> "LocalAiNode" + - 模块路径:jingrow.ai.pagetype.{pagetype_snake_case}.{pagetype_snake_case} + - 如果找不到子类,回退到默认 Page 类 + + Args: + pagetype: 页面类型名称 + + Returns: + Page 实例(可能是自定义子类或默认 Page 类) + """ + # 转换命名: "Local Ai Node" -> "LocalAiNode" + class_name = pagetype.replace(" ", "").replace("_", "") + + # 尝试从 pagetype 目录导入 + try: + pagetype_snake = pagetype.lower().replace(' ', '_') + module_path = f"jingrow.ai.pagetype.{pagetype_snake}.{pagetype_snake}" + module = __import__(module_path, fromlist=[class_name]) + page_class = getattr(module, class_name, None) + if page_class and issubclass(page_class, Page): + return page_class(pagetype) + except (ImportError, AttributeError, TypeError): + pass + + # 如果找不到子类,使用默认 Page + return Page(pagetype) + + class Page: """基于适配器的通用 Page 模型,提供常用 CRUD 与钩子机制。 钩子事件命名与 API 对齐:after_insert, on_update, on_trash 等。