From 89277d1f4974535ce83c0f8c93732048027ad303 Mon Sep 17 00:00:00 2001 From: jingrow Date: Thu, 30 Oct 2025 14:41:30 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=9D=E6=AD=A5=E5=AE=9E=E7=8E=B0Local=20Ai?= =?UTF-8?q?=20Agent=E4=BA=8B=E4=BB=B6=E8=A7=A6=E5=8F=91=EF=BC=8C=E6=B5=8B?= =?UTF-8?q?=E8=AF=95=E6=88=90=E5=8A=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/jingrow/jingrow/__init__.py | 4 + apps/jingrow/jingrow/adapters/api_adapter.py | 7 +- .../ai/pagetype/local_ai_agent/__init__.py | 157 ++++++++++++++++++ apps/jingrow/jingrow/api/page.py | 40 ++++- apps/jingrow/jingrow/core/hooks/loader.py | 9 +- apps/jingrow/jingrow/hooks.py | 9 + apps/jingrow/jingrow/model/page.py | 33 ++-- apps/jingrow/jingrow/utils/jinja.py | 56 +++++++ 8 files changed, 297 insertions(+), 18 deletions(-) create mode 100644 apps/jingrow/jingrow/ai/pagetype/local_ai_agent/__init__.py create mode 100644 apps/jingrow/jingrow/utils/jinja.py diff --git a/apps/jingrow/jingrow/__init__.py b/apps/jingrow/jingrow/__init__.py index 2fbeba2..771222e 100644 --- a/apps/jingrow/jingrow/__init__.py +++ b/apps/jingrow/jingrow/__init__.py @@ -29,6 +29,10 @@ def get_pg(pagetype: str, name: str = None, **kwargs): def get_list(pagetype: str, *args, **kwargs): return _adapter.get_list(pagetype, *args, **kwargs) +def get_all(pagetype: str, *args, **kwargs): + """与 SaaS 版兼容的别名,返回列表。""" + return _adapter.get_list(pagetype, *args, **kwargs) + def new_pg(pagetype: str, **kwargs): return _adapter.new_pg(pagetype, **kwargs) diff --git a/apps/jingrow/jingrow/adapters/api_adapter.py b/apps/jingrow/jingrow/adapters/api_adapter.py index c5aa03b..6ac970f 100644 --- a/apps/jingrow/jingrow/adapters/api_adapter.py +++ b/apps/jingrow/jingrow/adapters/api_adapter.py @@ -6,6 +6,7 @@ Jingrow API 适配器, 通过 API 调用 Jingrow SaaS 版的服务 """ from typing import Dict, List, Any, Optional, Union +import json import logging import requests from jingrow.model.page import Page @@ -152,9 +153,11 @@ class ApiAdapter: # 构建查询参数 params = {} if fields is not None: - params['fields'] = fields + # API 期望 JSON 字符串 + params['fields'] = json.dumps(fields) if not isinstance(fields, str) else fields if filters is not None: - params['filters'] = filters + # API 期望 JSON 字符串 + params['filters'] = json.dumps(filters) if not isinstance(filters, str) else filters if group_by is not None: params['group_by'] = group_by if order_by is not None: diff --git a/apps/jingrow/jingrow/ai/pagetype/local_ai_agent/__init__.py b/apps/jingrow/jingrow/ai/pagetype/local_ai_agent/__init__.py new file mode 100644 index 0000000..6c58b8b --- /dev/null +++ b/apps/jingrow/jingrow/ai/pagetype/local_ai_agent/__init__.py @@ -0,0 +1,157 @@ +# Copyright (c) 2025, JINGROW +# License: MIT. See LICENSE + +import jingrow +import logging +from jingrow.utils.jinja import render_template +from jingrow.utils.jingrow_api import get_record_list, get_page_meta + + +def _get_all_local_ai_agents(): + """获取所有启用的事件触发 Local Ai Agent,支持按PageType和Module分组缓存返回。 + 通过本地 API 封装获取(非适配器)。 + """ + res = get_record_list( + "Local Ai Agent", + filters=[["enabled", "=", 1], ["trigger_mode", "=", "Event Trigger"]], + fields=[ + "name", + "agent_name", + "trigger_mode", + "event_type", + "target_pagetype", + "target_module", + "condition", + ], + ) + data = res.get("data", []) if res.get("success") else [] + + agents: dict[str, list[dict]] = {} + for agent in data: + tp = (agent or {}).get("target_pagetype") + tm = (agent or {}).get("target_module") + if tp: + agents.setdefault(tp, []).append(agent) + elif tm: + key = f"module:{str(tm).strip().lower()}" + agents.setdefault(key, []).append(agent) + return agents + + +def run_agent(pg=None, method=None, event=None, page=None, **kwargs): + """本地版 Local Ai Agent 事件驱动机制。 + + - 支持按 pagetype 与 module 分组、合并去重 + - condition 支持 Jinja2 表达式 + - 使用本地任务队列,避免链式重复触发 + """ + logger = logging.getLogger(__name__) + # 兼容多种调用方式: + pg = pg or page + method = event or method or kwargs.get("event") + logger.debug(f"LocalAIAgent.run_agent called: pagetype={getattr(pg,'pagetype',None)}, name={getattr(pg,'name',None)}, method={method}") + + # 简化实现:不使用 flags,始终尝试匹配 + + ai_agents = _get_all_local_ai_agents() + + # pagetype 分组 + agents_by_pagetype = ai_agents.get(pg.pagetype, []) or [] + + # module 分组 + # 获取 pagetype 所属 module(通过 API 获取 meta) + module_name = None + try: + meta_res = get_page_meta(pg.pagetype) + if meta_res.get("success"): + module_name = (meta_res.get("data") or {}).get("module") + except Exception: + module_name = None + agents_by_module = [] + if module_name: + key = f"module:{str(module_name).strip().lower()}" + agents_by_module = ai_agents.get(key, []) or [] + + # 合并去重 + agents_for_pg = {a.get("name"): a for a in agents_by_pagetype + agents_by_module}.values() + logger.debug( + f"LocalAIAgent groups: pagetype_count={len(agents_by_pagetype)}, module_count={len(agents_by_module)}, merged={len(list(agents_for_pg))}" + ) + if not agents_for_pg: + logger.debug("No Local Ai Agents matched for this page") + return + + # 支持的事件(不依赖 flags) + event_list = [ + "on_update", + "after_insert", + "on_submit", + "on_cancel", + "on_trash", + "on_change", + "before_update_after_submit", + ] + + for agent in agents_for_pg: + event = method if method in event_list else None + if event and (agent.get("event_type") == event): + # 简化:不做最近去重缓存,直接执行 + + trigger = False + if not agent.get("condition"): + trigger = True + else: + try: + result = render_template(agent.get("condition"), {"pg": pg}) + if str(result).strip().lower() in ("true", "1", "yes"): + trigger = True + except Exception: + logger.warning(f"Condition render error, skip agent: agent={agent.get('name')}") + trigger = False + + if trigger: + logger.info(f"Run Local Ai Agent: agent={agent.get('name')}, event={event}, page={pg.pagetype}:{pg.name}") + enqueue_local_ai_agent(pg, agent.get('name')) + else: + logger.debug(f"Condition not met, skip agent: agent={agent.get('name')}, event={event}") + + +def _add_agent_to_queue(agent, pg): + """向后兼容:直接调用入队函数(无本地事务队列)。""" + enqueue_local_ai_agent(pg, agent.get('name')) + + +def enqueue_local_ai_agent(pg, agent_name): + """后台实际入队执行 Local Ai Agent。 + + 此函数会读取本地智能体配置,调用本地任务创建函数以执行远端/本地智能体。 + """ + try: + # 通过 API 获取 Local Ai Agent 详情 + res = get_record_list("Local Ai Agent", filters=[["name", "=", agent_name]], fields=["name", "agent_name"], limit=1) + agent = (res.get("data") or [None])[0] if res.get("success") else None + if not agent: + return + + # 无 agent_id 字段,统一使用 name 作为标识 + agent_id = agent.get("name") + agent_name_value = agent.get("agent_name") + + from .local_ai_agent import create_agent_job + + # 将触发上下文透传为执行参数的一部分(由下游路由读取) + _ = create_agent_job( + agent_id=str(agent_id), + agent_name=agent_name_value, + # 使用统一的执行路由;会在本地队列处理器里转发 + route="jingrow/agents/execute", + ) + except Exception as e: + jingrow.log_error("Local Ai Agent执行失败", f"agent={agent_name}, error={str(e)}") + + +def clear_local_ai_agent_cache(): + """清空本地智能体缓存(简化实现:无缓存,函数保留占位)。""" + return None + + diff --git a/apps/jingrow/jingrow/api/page.py b/apps/jingrow/jingrow/api/page.py index 8dd9b48..1a17b8e 100644 --- a/apps/jingrow/jingrow/api/page.py +++ b/apps/jingrow/jingrow/api/page.py @@ -33,7 +33,25 @@ def execute_hooks(pagetype: str, name: str, hook_name: str, data: Dict[str, Any] try: # 统一命名:page.{pagetype}.{hook_name} hook_full_name = f"page.{pagetype}.{hook_name}" - execute_hook(hook_full_name, sender=pagetype, page=jingrow.get_pg(pagetype, name), **(data or {})) + # 重要:不要展开 data,避免与 HookRegistry.send(name=...) 参数名冲突 + pg_obj = jingrow.get_pg(pagetype, name) + # 1) 精确钩子 + execute_hook( + hook_full_name, + sender=pagetype, + page=pg_obj, + event=hook_name, + data=data or {}, + ) + # 2) 通配钩子(与 hooks.py 的 "*" 对齐) + wildcard_hook = f"page.*.{hook_name}" + execute_hook( + wildcard_hook, + sender=pagetype, + page=pg_obj, + event=hook_name, + data=data or {}, + ) return True except Exception as e: logger.error(f"同步执行钩子失败: {e}") @@ -44,9 +62,25 @@ def execute_hooks_async(pagetype: str, name: str, hook_name: str, data: Dict[str try: hook_full_name = f"page.{pagetype}.{hook_name}" pg = jingrow.get_pg(pagetype, name) - # 异步丢到事件循环 + # 异步丢到事件循环(同样不要展开 data)。同时发送精确与通配钩子 import asyncio - asyncio.create_task(execute_hook_async(hook_full_name, sender=pagetype, page=pg, **(data or {}))) + async def _fire(): + await execute_hook_async( + hook_full_name, + sender=pagetype, + page=pg, + event=hook_name, + data=data or {}, + ) + wildcard_hook = f"page.*.{hook_name}" + await execute_hook_async( + wildcard_hook, + sender=pagetype, + page=pg, + event=hook_name, + data=data or {}, + ) + asyncio.create_task(_fire()) return True except Exception as e: logger.error(f"异步执行钩子失败: {e}") diff --git a/apps/jingrow/jingrow/core/hooks/loader.py b/apps/jingrow/jingrow/core/hooks/loader.py index 7d0bc24..7df40c5 100644 --- a/apps/jingrow/jingrow/core/hooks/loader.py +++ b/apps/jingrow/jingrow/core/hooks/loader.py @@ -155,13 +155,16 @@ class HookLoader: # 创建钩子名称:如 "page.User.on_update" hook_name = f"page.{pagetype}.{event_name}" signal = registry.register(hook_name) - + + # sender 处理:当 pagetype 为 "*" 时表示所有 sender + sender_value = None if pagetype == "*" else pagetype + # 注册处理器 if isinstance(handlers, list): for handler_path in handlers: - self._register_handler(signal, handler_path, sender=pagetype) + self._register_handler(signal, handler_path, sender=sender_value) elif isinstance(handlers, str): - self._register_handler(signal, handlers, sender=pagetype) + self._register_handler(signal, handlers, sender=sender_value) def _register_scheduler_events(self, scheduler_events: Dict[str, Any]) -> None: """注册定时任务钩子""" diff --git a/apps/jingrow/jingrow/hooks.py b/apps/jingrow/jingrow/hooks.py index e7e9bae..5e3f9a3 100644 --- a/apps/jingrow/jingrow/hooks.py +++ b/apps/jingrow/jingrow/hooks.py @@ -51,18 +51,27 @@ pg_events = { ], "after_insert": [ # 页面插入后 + "jingrow.ai.pagetype.local_ai_agent.run_agent", ], "on_update": [ # 页面更新时 + "jingrow.ai.pagetype.local_ai_agent.run_agent", + ], + "on_submit": [ + # 页面提交时 + "jingrow.ai.pagetype.local_ai_agent.run_agent", ], "on_change": [ # 页面字段变化时 + "jingrow.ai.pagetype.local_ai_agent.run_agent", ], "on_trash": [ # 页面删除时 + "jingrow.ai.pagetype.local_ai_agent.run_agent", ], "on_cancel": [ # 页面取消时 + "jingrow.ai.pagetype.local_ai_agent.run_agent", ], }, # 示例:特定页面类型的钩子 diff --git a/apps/jingrow/jingrow/model/page.py b/apps/jingrow/jingrow/model/page.py index 921df8a..680765d 100644 --- a/apps/jingrow/jingrow/model/page.py +++ b/apps/jingrow/jingrow/model/page.py @@ -20,8 +20,8 @@ class Page(): self.pagetype = d.get("pagetype") if "name" in d: self.name = d["name"] - # flags 与数据 - self.flags = jingrow._dict() + # flags 简化为普通 dict(本地版不依赖 flags 机制) + self.flags = {} self.update(d) @property @@ -107,19 +107,32 @@ class Page(): def _execute_hook(self, event_name: str, **kwargs): """执行钩子(私有方法)""" try: + import logging from jingrow.core.hooks import execute_hook - - # 构建钩子名称:page.{pagetype}.{event_name} - hook_name = f"page.{self.pagetype}.{event_name}" - - # 执行钩子 - execute_hook( - hook_name, + logger = logging.getLogger(__name__) + + # 精确钩子:page.{pagetype}.{event_name} + specific_hook = f"page.{self.pagetype}.{event_name}" + results_specific = execute_hook( + specific_hook, sender=self.pagetype, page=self, + event=event_name, **kwargs ) - + logger.debug(f"Hook executed: {specific_hook}, handlers_return={len(results_specific)}") + + # 通配钩子:page.*.{event_name}(用于全局监听) + wildcard_hook = f"page.*.{event_name}" + results_wildcard = execute_hook( + wildcard_hook, + sender=self.pagetype, + page=self, + event=event_name, + **kwargs + ) + logger.debug(f"Hook executed: {wildcard_hook}, handlers_return={len(results_wildcard)}") + except ImportError: # 如果钩子系统未安装,静默失败(向后兼容) pass diff --git a/apps/jingrow/jingrow/utils/jinja.py b/apps/jingrow/jingrow/utils/jinja.py new file mode 100644 index 0000000..ffc8e8b --- /dev/null +++ b/apps/jingrow/jingrow/utils/jinja.py @@ -0,0 +1,56 @@ +# Copyright (c) 2025, JINGROW +# License: MIT. See LICENSE + +""" +仅实现最小必需的 `render_template`,用于对字符串模板进行 Jinja2 渲染。 + +最佳实践原则: +- 复用单例 Environment,启用 trim_blocks/lstrip_blocks 提升可读性; +- 采用 StrictUndefined,缺失变量时报错,避免静默吞错; +- 不启用 autoescape(后端逻辑条件渲染,不输出到 HTML)。 +""" + +from typing import Any, Dict + + +_env = None + + +def _get_env(): + """延迟初始化 Jinja2 Environment。""" + global _env + if _env is not None: + return _env + try: + from jinja2 import Environment, StrictUndefined + _env = Environment( + autoescape=False, + undefined=StrictUndefined, + trim_blocks=True, + lstrip_blocks=True, + ) + return _env + except Exception as e: + # 将异常原样抛出,调用方可根据自身策略处理 + raise e + + +def render_template(template_string: str, context: Dict[str, Any] | None = None) -> str: + """渲染 Jinja2 字符串模板。 + + Args: + template_string: 模板字符串 + context: 渲染上下文 + + Returns: + 渲染后字符串 + """ + if not template_string: + return "" + + context = context or {} + env = _get_env() + template = env.from_string(str(template_string)) + return template.render(**context) + +