重构create_record和update_record节点
This commit is contained in:
parent
aaac5da9a9
commit
be5b0bf55a
@ -1,11 +1,7 @@
|
|||||||
import json
|
import json
|
||||||
from typing import Dict, Any, Optional
|
from typing import Dict, Any, Optional
|
||||||
|
|
||||||
from jingrow.utils.jingrow_api import (
|
import jingrow
|
||||||
get_field_mapping_from_jingrow,
|
|
||||||
create_record,
|
|
||||||
map_fields_by_labels,
|
|
||||||
)
|
|
||||||
|
|
||||||
def execute(context=None, inputs=None, config=None):
|
def execute(context=None, inputs=None, config=None):
|
||||||
"""
|
"""
|
||||||
@ -38,7 +34,7 @@ def execute(context=None, inputs=None, config=None):
|
|||||||
field_map = config.get("field_map", [])
|
field_map = config.get("field_map", [])
|
||||||
|
|
||||||
# 从 Jingrow 获取字段 label->fieldname 映射
|
# 从 Jingrow 获取字段 label->fieldname 映射
|
||||||
label2field = get_field_mapping_from_jingrow(record_type) or {}
|
label2field = jingrow.get_field_mapping_from_jingrow(record_type) or {}
|
||||||
|
|
||||||
# 收集AI输出:只平铺同一分支的所有上游节点数据
|
# 收集AI输出:只平铺同一分支的所有上游节点数据
|
||||||
ai_outputs = {}
|
ai_outputs = {}
|
||||||
@ -82,7 +78,7 @@ def execute(context=None, inputs=None, config=None):
|
|||||||
|
|
||||||
# 1. 优先用field_map(from→to)
|
# 1. 优先用field_map(from→to)
|
||||||
if field_map:
|
if field_map:
|
||||||
record_data.update(map_fields_by_labels(field_map, ai_outputs, label2field))
|
record_data.update(jingrow.map_fields_by_labels(field_map, ai_outputs, label2field))
|
||||||
mapped_fields.update([m.get("from") for m in field_map if m.get("from")])
|
mapped_fields.update([m.get("from") for m in field_map if m.get("from")])
|
||||||
# 同时标记目标字段为已映射,防止被自动映射覆盖
|
# 同时标记目标字段为已映射,防止被自动映射覆盖
|
||||||
for mapping in field_map:
|
for mapping in field_map:
|
||||||
@ -101,11 +97,9 @@ def execute(context=None, inputs=None, config=None):
|
|||||||
mapped_fields.add(label)
|
mapped_fields.add(label)
|
||||||
|
|
||||||
# 调用 Jingrow 创建记录
|
# 调用 Jingrow 创建记录
|
||||||
api_res = create_record(record_type, record_data)
|
created = jingrow.create_pg(record_type, record_data)
|
||||||
if not api_res.get("success"):
|
if created is None:
|
||||||
return {"success": False, "error": api_res.get("error", "创建记录失败")}
|
return {"success": False, "error": "创建记录失败"}
|
||||||
|
|
||||||
created = api_res.get("data", {})
|
|
||||||
created_name = created.get("name") or record_data.get("name")
|
created_name = created.get("name") or record_data.get("name")
|
||||||
created_id = created.get("name") or created.get("id") # 兼容不同返回
|
created_id = created.get("name") or created.get("id") # 兼容不同返回
|
||||||
|
|
||||||
|
|||||||
@ -1,11 +1,7 @@
|
|||||||
import json
|
import json
|
||||||
from typing import Dict, Any, Optional
|
from typing import Dict, Any, Optional
|
||||||
|
|
||||||
from jingrow.utils.jingrow_api import (
|
import jingrow
|
||||||
get_field_mapping_from_jingrow,
|
|
||||||
get_field_value_from_jingrow,
|
|
||||||
update_record,
|
|
||||||
)
|
|
||||||
|
|
||||||
def execute(context=None, inputs=None, config=None):
|
def execute(context=None, inputs=None, config=None):
|
||||||
"""
|
"""
|
||||||
@ -80,7 +76,7 @@ def execute(context=None, inputs=None, config=None):
|
|||||||
}
|
}
|
||||||
|
|
||||||
# 获取 label->fieldname 映射
|
# 获取 label->fieldname 映射
|
||||||
label_to_fieldname = get_field_mapping_from_jingrow(record_type) or {}
|
label_to_fieldname = jingrow.get_field_mapping_from_jingrow(record_type) or {}
|
||||||
fieldname_set = set(label_to_fieldname.values())
|
fieldname_set = set(label_to_fieldname.values())
|
||||||
|
|
||||||
updated = False
|
updated = False
|
||||||
@ -139,7 +135,7 @@ def execute(context=None, inputs=None, config=None):
|
|||||||
# 在追加模式下,先检查是否重复
|
# 在追加模式下,先检查是否重复
|
||||||
if write_mode == "append":
|
if write_mode == "append":
|
||||||
# 从服务端查询当前值,避免并发覆盖
|
# 从服务端查询当前值,避免并发覆盖
|
||||||
current_value = get_field_value_from_jingrow(record_type, record_name, to_field) or ""
|
current_value = jingrow.get_field_value_from_jingrow(record_type, record_name, to_field) or ""
|
||||||
current_str = str(current_value).strip()
|
current_str = str(current_value).strip()
|
||||||
new_value_str = str(value).strip()
|
new_value_str = str(value).strip()
|
||||||
|
|
||||||
@ -164,10 +160,11 @@ def execute(context=None, inputs=None, config=None):
|
|||||||
|
|
||||||
# 调用 Jingrow 更新记录(仅当有更新)
|
# 调用 Jingrow 更新记录(仅当有更新)
|
||||||
if updated:
|
if updated:
|
||||||
api_res = update_record(record_type, record_name, mock_record)
|
server_record = jingrow.update_pg(record_type, record_name, mock_record)
|
||||||
if not api_res.get("success"):
|
if server_record is False:
|
||||||
return {"success": False, "error": api_res.get("error", "更新记录失败")}
|
return {"success": False, "error": "更新记录失败"}
|
||||||
server_record = api_res.get("data", {})
|
if server_record is True:
|
||||||
|
server_record = {}
|
||||||
else:
|
else:
|
||||||
server_record = {}
|
server_record = {}
|
||||||
|
|
||||||
|
|||||||
@ -60,7 +60,6 @@ async def get_local_jobs(
|
|||||||
"X-Requested-With": "XMLHttpRequest",
|
"X-Requested-With": "XMLHttpRequest",
|
||||||
})
|
})
|
||||||
|
|
||||||
logger.info(f"Getting local jobs with system auth")
|
|
||||||
|
|
||||||
# 构建请求参数
|
# 构建请求参数
|
||||||
params = {
|
params = {
|
||||||
@ -73,7 +72,6 @@ async def get_local_jobs(
|
|||||||
if filters:
|
if filters:
|
||||||
params['filters'] = filters
|
params['filters'] = filters
|
||||||
|
|
||||||
logger.info(f"Calling Jingrow API: /api/data/Local Job with params: {params}")
|
|
||||||
result = call_jingrow_api(
|
result = call_jingrow_api(
|
||||||
'GET',
|
'GET',
|
||||||
'/api/data/Local Job',
|
'/api/data/Local Job',
|
||||||
@ -81,7 +79,6 @@ async def get_local_jobs(
|
|||||||
params=params
|
params=params
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.info(f"Jingrow API result: {result}")
|
|
||||||
|
|
||||||
if result.get('success'):
|
if result.get('success'):
|
||||||
data = result.get('data', {})
|
data = result.get('data', {})
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user