初步实现Local Ai Agent事件触发,测试成功

This commit is contained in:
jingrow 2025-10-30 14:41:30 +08:00
parent b0b570b637
commit 89277d1f49
8 changed files with 297 additions and 18 deletions

View File

@ -29,6 +29,10 @@ def get_pg(pagetype: str, name: str = None, **kwargs):
def get_list(pagetype: str, *args, **kwargs):
return _adapter.get_list(pagetype, *args, **kwargs)
def get_all(pagetype: str, *args, **kwargs):
"""与 SaaS 版兼容的别名,返回列表。"""
return _adapter.get_list(pagetype, *args, **kwargs)
def new_pg(pagetype: str, **kwargs):
return _adapter.new_pg(pagetype, **kwargs)

View File

@ -6,6 +6,7 @@ Jingrow API 适配器, 通过 API 调用 Jingrow SaaS 版的服务
"""
from typing import Dict, List, Any, Optional, Union
import json
import logging
import requests
from jingrow.model.page import Page
@ -152,9 +153,11 @@ class ApiAdapter:
# 构建查询参数
params = {}
if fields is not None:
params['fields'] = fields
# API 期望 JSON 字符串
params['fields'] = json.dumps(fields) if not isinstance(fields, str) else fields
if filters is not None:
params['filters'] = filters
# API 期望 JSON 字符串
params['filters'] = json.dumps(filters) if not isinstance(filters, str) else filters
if group_by is not None:
params['group_by'] = group_by
if order_by is not None:

View File

@ -0,0 +1,157 @@
# Copyright (c) 2025, JINGROW
# License: MIT. See LICENSE
import jingrow
import logging
from jingrow.utils.jinja import render_template
from jingrow.utils.jingrow_api import get_record_list, get_page_meta
def _get_all_local_ai_agents():
"""获取所有启用的事件触发 Local Ai Agent支持按PageType和Module分组缓存返回。
通过本地 API 封装获取非适配器
"""
res = get_record_list(
"Local Ai Agent",
filters=[["enabled", "=", 1], ["trigger_mode", "=", "Event Trigger"]],
fields=[
"name",
"agent_name",
"trigger_mode",
"event_type",
"target_pagetype",
"target_module",
"condition",
],
)
data = res.get("data", []) if res.get("success") else []
agents: dict[str, list[dict]] = {}
for agent in data:
tp = (agent or {}).get("target_pagetype")
tm = (agent or {}).get("target_module")
if tp:
agents.setdefault(tp, []).append(agent)
elif tm:
key = f"module:{str(tm).strip().lower()}"
agents.setdefault(key, []).append(agent)
return agents
def run_agent(pg=None, method=None, event=None, page=None, **kwargs):
"""本地版 Local Ai Agent 事件驱动机制。
- 支持按 pagetype module 分组合并去重
- condition 支持 Jinja2 表达式
- 使用本地任务队列避免链式重复触发
"""
logger = logging.getLogger(__name__)
# 兼容多种调用方式:
pg = pg or page
method = event or method or kwargs.get("event")
logger.debug(f"LocalAIAgent.run_agent called: pagetype={getattr(pg,'pagetype',None)}, name={getattr(pg,'name',None)}, method={method}")
# 简化实现:不使用 flags始终尝试匹配
ai_agents = _get_all_local_ai_agents()
# pagetype 分组
agents_by_pagetype = ai_agents.get(pg.pagetype, []) or []
# module 分组
# 获取 pagetype 所属 module通过 API 获取 meta
module_name = None
try:
meta_res = get_page_meta(pg.pagetype)
if meta_res.get("success"):
module_name = (meta_res.get("data") or {}).get("module")
except Exception:
module_name = None
agents_by_module = []
if module_name:
key = f"module:{str(module_name).strip().lower()}"
agents_by_module = ai_agents.get(key, []) or []
# 合并去重
agents_for_pg = {a.get("name"): a for a in agents_by_pagetype + agents_by_module}.values()
logger.debug(
f"LocalAIAgent groups: pagetype_count={len(agents_by_pagetype)}, module_count={len(agents_by_module)}, merged={len(list(agents_for_pg))}"
)
if not agents_for_pg:
logger.debug("No Local Ai Agents matched for this page")
return
# 支持的事件(不依赖 flags
event_list = [
"on_update",
"after_insert",
"on_submit",
"on_cancel",
"on_trash",
"on_change",
"before_update_after_submit",
]
for agent in agents_for_pg:
event = method if method in event_list else None
if event and (agent.get("event_type") == event):
# 简化:不做最近去重缓存,直接执行
trigger = False
if not agent.get("condition"):
trigger = True
else:
try:
result = render_template(agent.get("condition"), {"pg": pg})
if str(result).strip().lower() in ("true", "1", "yes"):
trigger = True
except Exception:
logger.warning(f"Condition render error, skip agent: agent={agent.get('name')}")
trigger = False
if trigger:
logger.info(f"Run Local Ai Agent: agent={agent.get('name')}, event={event}, page={pg.pagetype}:{pg.name}")
enqueue_local_ai_agent(pg, agent.get('name'))
else:
logger.debug(f"Condition not met, skip agent: agent={agent.get('name')}, event={event}")
def _add_agent_to_queue(agent, pg):
"""向后兼容:直接调用入队函数(无本地事务队列)。"""
enqueue_local_ai_agent(pg, agent.get('name'))
def enqueue_local_ai_agent(pg, agent_name):
"""后台实际入队执行 Local Ai Agent。
此函数会读取本地智能体配置调用本地任务创建函数以执行远端/本地智能体
"""
try:
# 通过 API 获取 Local Ai Agent 详情
res = get_record_list("Local Ai Agent", filters=[["name", "=", agent_name]], fields=["name", "agent_name"], limit=1)
agent = (res.get("data") or [None])[0] if res.get("success") else None
if not agent:
return
# 无 agent_id 字段,统一使用 name 作为标识
agent_id = agent.get("name")
agent_name_value = agent.get("agent_name")
from .local_ai_agent import create_agent_job
# 将触发上下文透传为执行参数的一部分(由下游路由读取)
_ = create_agent_job(
agent_id=str(agent_id),
agent_name=agent_name_value,
# 使用统一的执行路由;会在本地队列处理器里转发
route="jingrow/agents/execute",
)
except Exception as e:
jingrow.log_error("Local Ai Agent执行失败", f"agent={agent_name}, error={str(e)}")
def clear_local_ai_agent_cache():
"""清空本地智能体缓存(简化实现:无缓存,函数保留占位)。"""
return None

View File

@ -33,7 +33,25 @@ def execute_hooks(pagetype: str, name: str, hook_name: str, data: Dict[str, Any]
try:
# 统一命名page.{pagetype}.{hook_name}
hook_full_name = f"page.{pagetype}.{hook_name}"
execute_hook(hook_full_name, sender=pagetype, page=jingrow.get_pg(pagetype, name), **(data or {}))
# 重要:不要展开 data避免与 HookRegistry.send(name=...) 参数名冲突
pg_obj = jingrow.get_pg(pagetype, name)
# 1) 精确钩子
execute_hook(
hook_full_name,
sender=pagetype,
page=pg_obj,
event=hook_name,
data=data or {},
)
# 2) 通配钩子(与 hooks.py 的 "*" 对齐)
wildcard_hook = f"page.*.{hook_name}"
execute_hook(
wildcard_hook,
sender=pagetype,
page=pg_obj,
event=hook_name,
data=data or {},
)
return True
except Exception as e:
logger.error(f"同步执行钩子失败: {e}")
@ -44,9 +62,25 @@ def execute_hooks_async(pagetype: str, name: str, hook_name: str, data: Dict[str
try:
hook_full_name = f"page.{pagetype}.{hook_name}"
pg = jingrow.get_pg(pagetype, name)
# 异步丢到事件循环
# 异步丢到事件循环(同样不要展开 data。同时发送精确与通配钩子
import asyncio
asyncio.create_task(execute_hook_async(hook_full_name, sender=pagetype, page=pg, **(data or {})))
async def _fire():
await execute_hook_async(
hook_full_name,
sender=pagetype,
page=pg,
event=hook_name,
data=data or {},
)
wildcard_hook = f"page.*.{hook_name}"
await execute_hook_async(
wildcard_hook,
sender=pagetype,
page=pg,
event=hook_name,
data=data or {},
)
asyncio.create_task(_fire())
return True
except Exception as e:
logger.error(f"异步执行钩子失败: {e}")

View File

@ -155,13 +155,16 @@ class HookLoader:
# 创建钩子名称:如 "page.User.on_update"
hook_name = f"page.{pagetype}.{event_name}"
signal = registry.register(hook_name)
# sender 处理:当 pagetype 为 "*" 时表示所有 sender
sender_value = None if pagetype == "*" else pagetype
# 注册处理器
if isinstance(handlers, list):
for handler_path in handlers:
self._register_handler(signal, handler_path, sender=pagetype)
self._register_handler(signal, handler_path, sender=sender_value)
elif isinstance(handlers, str):
self._register_handler(signal, handlers, sender=pagetype)
self._register_handler(signal, handlers, sender=sender_value)
def _register_scheduler_events(self, scheduler_events: Dict[str, Any]) -> None:
"""注册定时任务钩子"""

View File

@ -51,18 +51,27 @@ pg_events = {
],
"after_insert": [
# 页面插入后
"jingrow.ai.pagetype.local_ai_agent.run_agent",
],
"on_update": [
# 页面更新时
"jingrow.ai.pagetype.local_ai_agent.run_agent",
],
"on_submit": [
# 页面提交时
"jingrow.ai.pagetype.local_ai_agent.run_agent",
],
"on_change": [
# 页面字段变化时
"jingrow.ai.pagetype.local_ai_agent.run_agent",
],
"on_trash": [
# 页面删除时
"jingrow.ai.pagetype.local_ai_agent.run_agent",
],
"on_cancel": [
# 页面取消时
"jingrow.ai.pagetype.local_ai_agent.run_agent",
],
},
# 示例:特定页面类型的钩子

View File

@ -20,8 +20,8 @@ class Page():
self.pagetype = d.get("pagetype")
if "name" in d:
self.name = d["name"]
# flags 与数据
self.flags = jingrow._dict()
# flags 简化为普通 dict本地版不依赖 flags 机制)
self.flags = {}
self.update(d)
@property
@ -107,19 +107,32 @@ class Page():
def _execute_hook(self, event_name: str, **kwargs):
"""执行钩子(私有方法)"""
try:
import logging
from jingrow.core.hooks import execute_hook
# 构建钩子名称page.{pagetype}.{event_name}
hook_name = f"page.{self.pagetype}.{event_name}"
# 执行钩子
execute_hook(
hook_name,
logger = logging.getLogger(__name__)
# 精确钩子page.{pagetype}.{event_name}
specific_hook = f"page.{self.pagetype}.{event_name}"
results_specific = execute_hook(
specific_hook,
sender=self.pagetype,
page=self,
event=event_name,
**kwargs
)
logger.debug(f"Hook executed: {specific_hook}, handlers_return={len(results_specific)}")
# 通配钩子page.*.{event_name}(用于全局监听)
wildcard_hook = f"page.*.{event_name}"
results_wildcard = execute_hook(
wildcard_hook,
sender=self.pagetype,
page=self,
event=event_name,
**kwargs
)
logger.debug(f"Hook executed: {wildcard_hook}, handlers_return={len(results_wildcard)}")
except ImportError:
# 如果钩子系统未安装,静默失败(向后兼容)
pass

View File

@ -0,0 +1,56 @@
# Copyright (c) 2025, JINGROW
# License: MIT. See LICENSE
"""
仅实现最小必需的 `render_template`用于对字符串模板进行 Jinja2 渲染
最佳实践原则
- 复用单例 Environment启用 trim_blocks/lstrip_blocks 提升可读性
- 采用 StrictUndefined缺失变量时报错避免静默吞错
- 不启用 autoescape后端逻辑条件渲染不输出到 HTML
"""
from typing import Any, Dict
_env = None
def _get_env():
"""延迟初始化 Jinja2 Environment。"""
global _env
if _env is not None:
return _env
try:
from jinja2 import Environment, StrictUndefined
_env = Environment(
autoescape=False,
undefined=StrictUndefined,
trim_blocks=True,
lstrip_blocks=True,
)
return _env
except Exception as e:
# 将异常原样抛出,调用方可根据自身策略处理
raise e
def render_template(template_string: str, context: Dict[str, Any] | None = None) -> str:
"""渲染 Jinja2 字符串模板。
Args:
template_string: 模板字符串
context: 渲染上下文
Returns:
渲染后字符串
"""
if not template_string:
return ""
context = context or {}
env = _get_env()
template = env.from_string(str(template_string))
return template.render(**context)