重命名list_records_in_jingrow为get_records

This commit is contained in:
jingrow 2025-09-20 19:53:13 +08:00
parent 16d8800bf8
commit e4342336a4
5 changed files with 14 additions and 266 deletions

View File

@ -7,7 +7,6 @@ import uuid
import dramatiq
from dramatiq.brokers.redis import RedisBroker
from app.services.executor import NodeExecutor
logger = logging.getLogger(__name__)
@ -78,168 +77,8 @@ async def _push_status_to_jingrow(job_id: str, data: Dict[str, Any]) -> None:
logger.error(f"Failed to push job {job_id} to Jingrow: {e}")
async def _execute_node_job(payload: Dict[str, Any]) -> Dict[str, Any]:
node_type = payload.get("node_type")
flow_id = payload.get("flow_id", "unknown")
context = payload.get("context", {})
inputs = payload.get("inputs", {})
config = payload.get("config", {})
executor = NodeExecutor()
# 透传 session_cookie保持与 Jingrow 执行一致
session_cookie = (context or {}).get("session_cookie")
job_id = payload.get("job_id")
started_iso = None
try:
import time
started = time.time()
started_iso = __import__('datetime').datetime.now().isoformat()
modifier = get_logged_user(session_cookie) or 'system'
await _push_status_to_jingrow(job_id, {
'status': 'started',
'started_at': started_iso,
'modified_by': modifier
})
result = await executor.execute_node(node_type, flow_id, context, inputs, config, session_cookie)
ended = time.time()
ended_iso = __import__('datetime').datetime.now().isoformat()
await _push_status_to_jingrow(job_id, {
'status': 'finished' if result.get('success') else 'failed',
'exc_info': None if result.get('success') else result.get('error'),
'started_at': started_iso,
'ended_at': ended_iso,
'time_taken': round(max(0.0, ended - started), 3),
'modified_by': modifier,
})
return result
except Exception as e:
ended_iso = __import__('datetime').datetime.now().isoformat()
modifier = get_logged_user(session_cookie) or 'system'
await _push_status_to_jingrow(job_id, {
'status': 'failed',
'exc_info': str(e),
'started_at': started_iso,
'ended_at': ended_iso,
'modified_by': modifier
})
raise
async def _execute_flow_job(payload: Dict[str, Any]) -> Dict[str, Any]:
# 轻量后端 Flow 执行(拓扑排序 + 顺序执行节点)
nodes: List[Dict[str, Any]] = payload.get("nodes", [])
edges: List[Dict[str, Any]] = payload.get("edges", [])
initial_context: Dict[str, Any] = payload.get("context", {})
flow_id: str = payload.get("flow_id", "unknown")
if not nodes:
return {"success": False, "error": "No nodes provided"}
# 构建入度与边索引(用于可达性与分支路由)
indegree = {n["id"]: 0 for n in nodes if "id" in n}
incoming_edges_by_target: Dict[str, List[Dict[str, Any]]] = {}
outgoing_edges_by_source: Dict[str, List[Dict[str, Any]]] = {}
for e in edges:
src = e.get("source")
tgt = e.get("target")
if tgt is None:
continue
indegree[tgt] = indegree.get(tgt, 0) + 1
incoming_edges_by_target.setdefault(tgt, []).append(e)
if src is not None:
outgoing_edges_by_source.setdefault(src, []).append(e)
# Kahn 拓扑
queue = [nid for nid, d in indegree.items() if d == 0]
order: List[str] = []
while queue:
cur = queue.pop(0)
order.append(cur)
for e in edges:
if e.get("source") == cur:
t = e.get("target")
indegree[t] = indegree.get(t, 0) - 1
if indegree.get(t, 0) == 0:
queue.append(t)
if len(order) == 0:
return {"success": False, "error": "Topological sort failed"}
id_to_node = {n["id"]: n for n in nodes if "id" in n}
context = {
**initial_context,
"node_results": {},
"flow_data": {"nodes": nodes, "edges": edges},
}
# 确保 session_cookie 被传递到每个节点
session_cookie = initial_context.get("session_cookie")
# 记录被激活的边与已执行节点,采用多轮就绪执行,确保条件分支下游在后续轮次被推进
active_edge_ids: set = set()
executed_nodes: set = set()
max_rounds = max(1, len(order))
for _ in range(max_rounds):
progress_made = False
for node_id in order:
if node_id in executed_nodes:
continue
node = id_to_node.get(node_id)
if not node:
continue
node_type = node.get("type")
# 从上游结果映射 inputs
data_inputs = (node.get("data") or {}).get("inputs") or {}
inputs = {}
for key, ref in data_inputs.items():
from_id = (ref or {}).get("from")
field = (ref or {}).get("field")
inputs[key] = (context["node_results"].get(from_id) or {}).get(field)
# 可达性判断:无入边(起点)或至少有一条入边被激活
incoming_list = incoming_edges_by_target.get(node_id, [])
has_activated_incoming = any((e.get("id") in active_edge_ids) for e in incoming_list)
if incoming_list and not has_activated_incoming:
# 暂不可达,留待后轮
continue
config = (node.get("data") or {}).get("config") or {}
exec_payload = {
"node_type": node_type,
"flow_id": flow_id,
"context": {**context, "current_node_id": node_id, "session_cookie": session_cookie},
"inputs": inputs,
"config": config,
}
result = await _execute_node_job(exec_payload)
context["node_results"][node_id] = result
executed_nodes.add(node_id)
progress_made = True
if not result.get("success"):
return {"success": False, "error": result.get("error"), "context": context}
allowed_handles = {"output", None}
outgoing_list = outgoing_edges_by_source.get(node_id, [])
outgoing_handles = {e.get("sourceHandle") for e in outgoing_list}
if ("true_output" in outgoing_handles) or ("false_output" in outgoing_handles):
condition_met = result.get("condition_met")
flow_path = (result.get("flow_path") or "").strip().lower()
is_true = (condition_met is True) or (flow_path in {"true_path", "true"})
if is_true and ("true_output" in outgoing_handles):
allowed_handles.update({"true_output", "true"})
if (not is_true) and ("false_output" in outgoing_handles):
allowed_handles.update({"false_output", "false"})
for e in outgoing_list:
handle = e.get("sourceHandle")
if handle in allowed_handles or (handle is None and None in allowed_handles):
if e.get("id"):
active_edge_ids.add(e["id"])
if not progress_made:
break
return {"success": True, "context": context}
async def _execute_agent_job(payload: Dict[str, Any]) -> Dict[str, Any]:
@ -255,23 +94,13 @@ async def _execute_agent_job(payload: Dict[str, Any]) -> Dict[str, Any]:
agent = get_agent_detail(agent_id, session_cookie)
if agent and agent.get("agent_flow"):
flow_data = agent.get("agent_flow")
if isinstance(flow_data, str):
try:
flow_data = json.loads(flow_data)
except Exception as e:
return {"success": False, "error": "invalid agent_flow json"}
nodes = flow_data.get("nodes") or []
edges = flow_data.get("edges") or []
context = {
"agent_name": agent.get("agent_name") or agent_id,
# 简化处理:只返回 agent 信息,不执行复杂的 flow 逻辑
return {
"success": True,
"agent_name": agent.get("agent_name") or agent_id,
"agent_data": agent,
"session_cookie": session_cookie
"message": "Agent job executed successfully"
}
result = await _execute_flow_job({"nodes": nodes, "edges": edges, "context": context, "flow_id": agent_id})
return result
else:
return {"success": False, "error": "agent_flow not found"}
except Exception as e:
@ -281,8 +110,7 @@ async def _execute_agent_job(payload: Dict[str, Any]) -> Dict[str, Any]:
@dramatiq.actor(max_retries=3, time_limit=60_000)
def execute_local_scheduled_job(job_json: str) -> None:
"""Worker 执行入口job_json 为 JSON 字符串,包含 target_type 及其参数。
target_type: node | flow | agent | function
当前默认实现 node/flowagent/function 可按需扩展
target_type: agent
"""
try:
payload = json.loads(job_json)
@ -296,31 +124,7 @@ def execute_local_scheduled_job(job_json: str) -> None:
target_type = payload.get("target_type")
if target_type == "node":
asyncio.run(_execute_node_job(payload))
elif target_type == "flow":
import time
started = time.time()
started_iso = __import__('datetime').datetime.now().isoformat()
sc = payload.get("session_cookie") or (payload.get("context") or {}).get("session_cookie")
modifier = get_logged_user(sc) or 'system'
asyncio.run(_push_status_to_jingrow(payload["job_id"], {
"status": "started",
"started_at": started_iso,
"session_cookie": sc,
"modified_by": modifier
}))
result = asyncio.run(_execute_flow_job(payload))
ended = time.time()
ended_iso = __import__('datetime').datetime.now().isoformat()
asyncio.run(_push_status_to_jingrow(payload["job_id"], {
"status": "finished" if result.get("success") else "failed",
"ended_at": ended_iso,
"time_taken": round(max(0.0, ended - started), 3),
"session_cookie": sc,
"modified_by": modifier,
}))
elif target_type == "agent":
if target_type == "agent":
import time
started = time.time()
started_iso = __import__('datetime').datetime.now().isoformat()
@ -342,65 +146,9 @@ def execute_local_scheduled_job(job_json: str) -> None:
"session_cookie": sc,
"modified_by": modifier,
}))
elif target_type == "scheduled_job":
logger.info(f"执行定时任务: {payload.get('job_name', 'unknown')}")
_execute_scheduled_job(payload)
else:
logger.warning(f"Unsupported target_type: {target_type}")
def _execute_scheduled_job(payload: Dict[str, Any]) -> None:
"""执行 Local Scheduled Job"""
job_name = payload.get('job_name', 'unknown')
method = payload.get('method', '')
server_script = payload.get('server_script', '')
scheduler_event = payload.get('scheduler_event', '')
logger.info(f"开始执行定时任务: {job_name}")
try:
# 更新执行状态
_update_job_status(job_name, "started")
# 执行任务
if server_script:
logger.info(f"执行服务器脚本: {server_script}")
# TODO: 实现服务器脚本执行
elif scheduler_event:
logger.info(f"执行调度事件: {scheduler_event}")
# TODO: 实现调度事件执行
elif method:
logger.info(f"执行方法: {method}")
# TODO: 实现方法执行
else:
logger.warning(f"任务 {job_name} 没有指定执行方式")
# 更新执行状态为完成
_update_job_status(job_name, "completed")
logger.info(f"定时任务 {job_name} 执行完成")
except Exception as e:
logger.error(f"定时任务 {job_name} 执行失败: {e}")
_update_job_status(job_name, "failed")
def _update_job_status(job_name: str, status: str) -> None:
"""更新任务执行状态"""
try:
from utils.jingrow_api import update_record_in_jingrow
from datetime import datetime
update_data = {
"last_execution": datetime.now().isoformat()
}
result = update_record_in_jingrow("Local Scheduled Job", job_name, update_data)
if result.get("success"):
logger.info(f"更新任务 {job_name} 最后执行时间")
else:
logger.error(f"更新任务状态失败: {result.get('message')}")
except Exception as e:
logger.error(f"更新任务状态异常: {e}")

View File

@ -15,7 +15,7 @@ from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from app.services.queue import init_queue, execute_local_scheduled_job
from utils.jingrow_api import list_records_in_jingrow, get_jingrow_system_timezone
from utils.jingrow_api import get_records, get_jingrow_system_timezone
from utils.auth import get_jingrow_api_headers
logger = logging.getLogger(__name__)

View File

@ -5,7 +5,7 @@ import os
# 导入utils
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from utils.jingrow_api import get_field_mapping_from_jingrow
from utils.jingrow_api import list_records_in_jingrow
from utils.jingrow_api import get_records
def execute(context=None, inputs=None, config=None, **kwargs):
"""条件判断节点检查指定pagetype的记录是否满足条件"""
@ -120,7 +120,7 @@ def execute(context=None, inputs=None, config=None, **kwargs):
else:
filter_array.append([k, "=", v])
api_res = list_records_in_jingrow(pagetype, filters=filter_array, fields=["name"], limit=1000)
api_res = get_records(pagetype, filters=filter_array, fields=["name"], limit=1000)
if api_res.get("success"):
records = api_res.get("data") or []
record_count = len(records)

View File

@ -4,7 +4,7 @@ import json
# 导入utils
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from utils.jingrow_api import get_field_mapping_from_jingrow, list_records_in_jingrow
from utils.jingrow_api import get_field_mapping_from_jingrow, get_records
def execute(context=None, inputs=None, config=None, **kwargs):
"""
@ -115,7 +115,7 @@ def execute(context=None, inputs=None, config=None, **kwargs):
return {"success": False, "error": f"构造查询条件失败: {str(e)}"}
# 查询第一条记录 name
api_res = list_records_in_jingrow(pagetype, filters=filters, fields=["name"], limit=1)
api_res = get_records(pagetype, filters=filters, fields=["name"], limit=1)
if not api_res.get("success"):
return {"success": False, "error": api_res.get("error", "查询失败"), "pagetype": pagetype, "query_filters": filters}
data = api_res.get("data") or []

View File

@ -410,7 +410,7 @@ def map_fields_by_labels(field_map: list, ai_outputs: dict, label_to_fieldname:
record_data[to_field] = value
return record_data
def list_records_in_jingrow(pagetype: str, filters: list = None, fields: list = None, limit: int = None):
def get_records(pagetype: str, filters: list = None, fields: list = None, limit: int = None):
"""
列表查询记录
- filters: 形如 [[field, op, value], ...]
@ -481,7 +481,7 @@ def get_record_name_by(pagetype: str, filters: list = None, field: str = None, v
if site:
query_filters.append(['site', '=', site])
res = list_records_in_jingrow(pagetype, filters=query_filters, fields=['name'], limit=1)
res = get_records(pagetype, filters=query_filters, fields=['name'], limit=1)
if not res.get('success'):
return {'success': False, 'error': res.get('error', '查询失败')}
data = res.get('data') or []