From 6a0f04dc4b873df4c4813ff4864a4402599f2858 Mon Sep 17 00:00:00 2001 From: jingrow Date: Fri, 31 Oct 2025 00:17:53 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E4=BA=8B=E4=BB=B6=E8=A7=A6?= =?UTF-8?q?=E5=8F=91=E6=99=BA=E8=83=BD=E4=BD=93=E6=97=B6=E6=97=A0=E6=B3=95?= =?UTF-8?q?=E4=BC=A0=E5=85=A5=E8=A7=A6=E5=8F=91=E4=BA=8B=E4=BB=B6=E7=9A=84?= =?UTF-8?q?=E8=AE=B0=E5=BD=95=E7=9A=84pagetype=E5=92=8Cname?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ai/pagetype/local_ai_agent/__init__.py | 3 +++ .../pagetype/local_ai_agent/local_ai_agent.py | 7 +++++- .../nodes/input_record/input_record.py | 4 +-- apps/jingrow/jingrow/services/queue.py | 25 +++++++++++++++++-- apps/jingrow/jingrow/utils/path.py | 2 +- 5 files changed, 35 insertions(+), 6 deletions(-) diff --git a/apps/jingrow/jingrow/ai/pagetype/local_ai_agent/__init__.py b/apps/jingrow/jingrow/ai/pagetype/local_ai_agent/__init__.py index d1960f2..b7a8166 100644 --- a/apps/jingrow/jingrow/ai/pagetype/local_ai_agent/__init__.py +++ b/apps/jingrow/jingrow/ai/pagetype/local_ai_agent/__init__.py @@ -137,9 +137,12 @@ def enqueue_local_ai_agent(pg, agent_name): from .local_ai_agent import create_agent_job # 将触发上下文透传为执行参数的一部分(由下游路由读取) + event_inputs = {"pagetype": getattr(pg, 'pagetype', None), "name": getattr(pg, 'name', None)} if (hasattr(pg, 'pagetype') and hasattr(pg, 'name')) else {} + _ = create_agent_job( agent_id=str(agent_id), agent_name=agent_name_value, + inputs=event_inputs, # 使用统一的执行路由;会在本地队列处理器里转发 route="jingrow/agents/execute", ) diff --git a/apps/jingrow/jingrow/ai/pagetype/local_ai_agent/local_ai_agent.py b/apps/jingrow/jingrow/ai/pagetype/local_ai_agent/local_ai_agent.py index 40e3ccf..fb4e51f 100644 --- a/apps/jingrow/jingrow/ai/pagetype/local_ai_agent/local_ai_agent.py +++ b/apps/jingrow/jingrow/ai/pagetype/local_ai_agent/local_ai_agent.py @@ -28,7 +28,8 @@ def create_agent_job( agent_name: Optional[str] = None, session_cookie: Optional[str] = None, route: str = "jingrow/agents/execute", - method: Optional[str] = None + method: Optional[str] = None, + inputs: Optional[Dict[str, Any]] = None ) -> str: """ 创建智能体执行任务的通用函数 @@ -58,6 +59,10 @@ def create_agent_job( 'job_id': job_id, 'route': route, } + + # 透传触发时的上下文输入(例如 pagetype/name) + if inputs and isinstance(inputs, dict): + payload['inputs'] = inputs if method: payload['method'] = method diff --git a/apps/jingrow/jingrow/ai/pagetype/local_ai_agent/nodes/input_record/input_record.py b/apps/jingrow/jingrow/ai/pagetype/local_ai_agent/nodes/input_record/input_record.py index 11a6f0e..04921ab 100644 --- a/apps/jingrow/jingrow/ai/pagetype/local_ai_agent/nodes/input_record/input_record.py +++ b/apps/jingrow/jingrow/ai/pagetype/local_ai_agent/nodes/input_record/input_record.py @@ -1,7 +1,7 @@ import json import sys import os - +import jingrow # 导入utils from jingrow.utils.jingrow_api import get_record @@ -13,7 +13,7 @@ def execute(context=None, inputs=None, config=None, **kwargs): if config is None: config = kwargs.get("config", {}) - # jingrow.log_error("input_record 节点 inputs汇总", f"inputs={inputs}, config={config}, context={context}") + jingrow.log_error(f"inputs={inputs}, config={config}, context={context}") pagetype = inputs.get("pagetype") if isinstance(inputs, dict) else None name = inputs.get("name") if isinstance(inputs, dict) else None diff --git a/apps/jingrow/jingrow/services/queue.py b/apps/jingrow/jingrow/services/queue.py index baa1d0c..d4e39b8 100644 --- a/apps/jingrow/jingrow/services/queue.py +++ b/apps/jingrow/jingrow/services/queue.py @@ -196,8 +196,15 @@ async def _execute_flow_job(payload: Dict[str, Any]) -> Dict[str, Any]: from_id = (ref or {}).get("from") field = (ref or {}).get("field") inputs[key] = (context["node_results"].get(from_id) or {}).get(field) - # 可达性判断:无入边(起点)或至少有一条入边被激活 + # 入口节点合并初始触发入参(入口节点直接使用 context["inputs"]) incoming_list = incoming_edges_by_target.get(node_id, []) + if not incoming_list: + initial_inputs = (context or {}).get("inputs") or {} + if isinstance(initial_inputs, dict) and initial_inputs: + # 后者优先,保留显式连线映射的结果 + merged = {**initial_inputs, **inputs} + inputs = merged + # 可达性判断:无入边(起点)或至少有一条入边被激活 has_activated_incoming = any((e.get("id") in active_edge_ids) for e in incoming_list) if incoming_list and not has_activated_incoming: # 暂不可达,留待后轮 @@ -265,10 +272,24 @@ async def _execute_agent_job(payload: Dict[str, Any]) -> Dict[str, Any]: nodes = flow_data.get("nodes") or [] edges = flow_data.get("edges") or [] + # 收集初始触发入参,入口节点可直接取用 context["inputs"] + initial_inputs: Dict[str, Any] = {} + incoming_inputs = payload.get("inputs") or {} + if isinstance(incoming_inputs, dict): + initial_inputs.update(incoming_inputs) + ctx_inputs = (payload.get("context") or {}).get("inputs") + if isinstance(ctx_inputs, dict): + initial_inputs.update(ctx_inputs) + # 兼容直传 pagetype/name 的场景 + if payload.get("pagetype") and payload.get("name"): + initial_inputs.setdefault("pagetype", payload.get("pagetype")) + initial_inputs.setdefault("name", payload.get("name")) + context = { "agent_name": agent.get("agent_name") or agent_id, "agent_data": agent, - "session_cookie": session_cookie + "session_cookie": session_cookie, + "inputs": initial_inputs, } result = await _execute_flow_job({"nodes": nodes, "edges": edges, "context": context, "flow_id": agent_id}) return result diff --git a/apps/jingrow/jingrow/utils/path.py b/apps/jingrow/jingrow/utils/path.py index f597f4c..17258df 100644 --- a/apps/jingrow/jingrow/utils/path.py +++ b/apps/jingrow/jingrow/utils/path.py @@ -7,7 +7,7 @@ from pathlib import Path @lru_cache(maxsize=1) def get_root_path() -> Path: """返回项目根目录:从 utils/path.py 回退到仓库根 (../../..)。""" - return Path(__file__).resolve().parents[3] + return Path(__file__).resolve().parents[4] @lru_cache(maxsize=1)