修复事件触发智能体时无法传入触发事件的记录的pagetype和name

This commit is contained in:
jingrow 2025-10-31 00:17:53 +08:00
parent db4cb24048
commit 6a0f04dc4b
5 changed files with 35 additions and 6 deletions

View File

@ -137,9 +137,12 @@ def enqueue_local_ai_agent(pg, agent_name):
from .local_ai_agent import create_agent_job
# 将触发上下文透传为执行参数的一部分(由下游路由读取)
event_inputs = {"pagetype": getattr(pg, 'pagetype', None), "name": getattr(pg, 'name', None)} if (hasattr(pg, 'pagetype') and hasattr(pg, 'name')) else {}
_ = create_agent_job(
agent_id=str(agent_id),
agent_name=agent_name_value,
inputs=event_inputs,
# 使用统一的执行路由;会在本地队列处理器里转发
route="jingrow/agents/execute",
)

View File

@ -28,7 +28,8 @@ def create_agent_job(
agent_name: Optional[str] = None,
session_cookie: Optional[str] = None,
route: str = "jingrow/agents/execute",
method: Optional[str] = None
method: Optional[str] = None,
inputs: Optional[Dict[str, Any]] = None
) -> str:
"""
创建智能体执行任务的通用函数
@ -58,6 +59,10 @@ def create_agent_job(
'job_id': job_id,
'route': route,
}
# 透传触发时的上下文输入(例如 pagetype/name
if inputs and isinstance(inputs, dict):
payload['inputs'] = inputs
if method:
payload['method'] = method

View File

@ -1,7 +1,7 @@
import json
import sys
import os
import jingrow
# 导入utils
from jingrow.utils.jingrow_api import get_record
@ -13,7 +13,7 @@ def execute(context=None, inputs=None, config=None, **kwargs):
if config is None:
config = kwargs.get("config", {})
# jingrow.log_error("input_record 节点 inputs汇总", f"inputs={inputs}, config={config}, context={context}")
jingrow.log_error(f"inputs={inputs}, config={config}, context={context}")
pagetype = inputs.get("pagetype") if isinstance(inputs, dict) else None
name = inputs.get("name") if isinstance(inputs, dict) else None

View File

@ -196,8 +196,15 @@ async def _execute_flow_job(payload: Dict[str, Any]) -> Dict[str, Any]:
from_id = (ref or {}).get("from")
field = (ref or {}).get("field")
inputs[key] = (context["node_results"].get(from_id) or {}).get(field)
# 可达性判断:无入边(起点)或至少有一条入边被激活
# 入口节点合并初始触发入参(入口节点直接使用 context["inputs"]
incoming_list = incoming_edges_by_target.get(node_id, [])
if not incoming_list:
initial_inputs = (context or {}).get("inputs") or {}
if isinstance(initial_inputs, dict) and initial_inputs:
# 后者优先,保留显式连线映射的结果
merged = {**initial_inputs, **inputs}
inputs = merged
# 可达性判断:无入边(起点)或至少有一条入边被激活
has_activated_incoming = any((e.get("id") in active_edge_ids) for e in incoming_list)
if incoming_list and not has_activated_incoming:
# 暂不可达,留待后轮
@ -265,10 +272,24 @@ async def _execute_agent_job(payload: Dict[str, Any]) -> Dict[str, Any]:
nodes = flow_data.get("nodes") or []
edges = flow_data.get("edges") or []
# 收集初始触发入参,入口节点可直接取用 context["inputs"]
initial_inputs: Dict[str, Any] = {}
incoming_inputs = payload.get("inputs") or {}
if isinstance(incoming_inputs, dict):
initial_inputs.update(incoming_inputs)
ctx_inputs = (payload.get("context") or {}).get("inputs")
if isinstance(ctx_inputs, dict):
initial_inputs.update(ctx_inputs)
# 兼容直传 pagetype/name 的场景
if payload.get("pagetype") and payload.get("name"):
initial_inputs.setdefault("pagetype", payload.get("pagetype"))
initial_inputs.setdefault("name", payload.get("name"))
context = {
"agent_name": agent.get("agent_name") or agent_id,
"agent_data": agent,
"session_cookie": session_cookie
"session_cookie": session_cookie,
"inputs": initial_inputs,
}
result = await _execute_flow_job({"nodes": nodes, "edges": edges, "context": context, "flow_id": agent_id})
return result

View File

@ -7,7 +7,7 @@ from pathlib import Path
@lru_cache(maxsize=1)
def get_root_path() -> Path:
"""返回项目根目录:从 utils/path.py 回退到仓库根 (../../..)。"""
return Path(__file__).resolve().parents[3]
return Path(__file__).resolve().parents[4]
@lru_cache(maxsize=1)