update jlocal.py

This commit is contained in:
jingrow 2026-03-13 00:55:51 +08:00
parent 1c39ef2efe
commit 5191a10fa7
2 changed files with 215 additions and 4 deletions

View File

@ -5,7 +5,7 @@ import { api } from './common'
// 获取节点Schema字段
export const getNodeSchemaFields = async (nodeType: string): Promise<any[]> => {
try {
const res = await api.call('jingrow.ai.pagetype.ai_node.ai_node.get_node_schema_fields', {
const res = await api.call('jingrow.ai.utils.jlocal.get_node_schema_fields', {
node_type: nodeType
})
// Jingrow whitelist 函数返回值被包装在 message 字段中
@ -19,7 +19,7 @@ export const getNodeSchemaFields = async (nodeType: string): Promise<any[]> => {
// 一键导入本地节点(由后端扫描并创建)
export const importLocalNodes = async (): Promise<{ success: boolean; matched: number; imported: number; skipped_existing: number; errors?: string[] }> => {
try {
const res = await api.call('jingrow.ai.pagetype.ai_node.ai_node.import_local_nodes')
const res = await api.call('jingrow.ai.utils.jlocal.import_local_nodes')
const data = res?.message || res
return data || { success: false, matched: 0, imported: 0, skipped_existing: 0 }
} catch (error) {
@ -31,7 +31,7 @@ export const importLocalNodes = async (): Promise<{ success: boolean; matched: n
// 打包节点为zip文件
export const packageNode = async (nodeType: string): Promise<{ blob: Blob; filename: string }> => {
try {
const res = await api.call('jingrow.ai.pagetype.ai_node.ai_node.package_node', {
const res = await api.call('jingrow.ai.utils.jlocal.package_node', {
node_type: nodeType
})

View File

@ -7,9 +7,14 @@ jlocal 相关白名单函数 - 转发到 SaaS 端
import json
import logging
import os
import shutil
import tempfile
from pathlib import Path
from typing import Dict, Any
import requests
from fastapi import HTTPException
from pathlib import Path
import jingrow
from jingrow.config import Config
@ -109,3 +114,209 @@ def get_node_schema(node_type: str = None):
except Exception as e:
return {"success": False, "error": str(e)}
@jingrow.whitelist()
def get_node_schema_fields(node_type: str) -> Dict[str, Any]:
"""获取指定节点类型的Schema字段列表"""
if not node_type:
return {"success": True, "fields": []}
try:
base_path = jingrow.get_app_path("jingrow")
json_file = os.path.join(base_path, "ai", "nodes", node_type, f"{node_type}.json")
if not os.path.exists(json_file):
return {"success": True, "fields": []}
with open(json_file, 'r', encoding='utf-8') as f:
data = json.load(f)
# 移除metadata获取schema
schema = dict(data)
schema.pop('metadata', None)
# 提取字段列表
fields = []
if schema.get('properties'):
for name, config in schema['properties'].items():
fields.append({
'name': name,
'label': config.get('title', name),
'type': config.get('type', 'string'),
'description': config.get('description', '')
})
return {"success": True, "fields": fields}
except FileNotFoundError:
return {"success": True, "fields": []}
except Exception as e:
jingrow.log_error(f"获取节点字段失败: {node_type}", str(e))
return {"success": True, "fields": []}
@jingrow.whitelist()
def import_local_nodes() -> Dict[str, Any]:
"""一键导入本地节点到Ai Node"""
try:
# 扫描本地节点目录
base_path = jingrow.get_app_path("jingrow")
nodes_path = os.path.join(base_path, "ai", "nodes")
if not os.path.exists(nodes_path):
return {
"success": True,
"matched": 0,
"imported": 0,
"skipped_existing": 0,
"errors": []
}
matched = 0
imported = 0
skipped_existing = 0
errors = []
# 遍历节点目录
for item in os.listdir(nodes_path):
item_path = os.path.join(nodes_path, item)
if not os.path.isdir(item_path):
continue
# 检查是否有对应的JSON配置文件
json_file = os.path.join(item_path, f"{item}.json")
if not os.path.exists(json_file):
continue
matched += 1
try:
# 读取节点配置
with open(json_file, 'r', encoding='utf-8') as f:
node_config = json.load(f)
# 提取元数据
metadata = node_config.get('metadata', {})
node_type = item
node_label = metadata.get('label', item)
node_group = metadata.get('group', '')
node_component = metadata.get('component', 'GenericNode')
node_icon = metadata.get('icon', 'fa-cube')
node_color = metadata.get('color', '#6b7280')
node_description = metadata.get('description', '')
# 检查节点是否已存在
existing = jingrow.db.exists('Ai Node', {'node_type': node_type})
if existing:
skipped_existing += 1
continue
# 创建新节点
node_doc = jingrow.get_pg({
'pagetype': 'Ai Node',
'node_type': node_type,
'node_label': node_label,
'node_group': node_group,
'node_component': node_component,
'node_icon': node_icon,
'node_color': node_color,
'node_description': node_description,
'node_schema': node_config,
'status': 'Published'
})
node_doc.insert()
imported += 1
except Exception as e:
error_msg = f"导入节点 {item} 失败: {str(e)}"
jingrow.log_error("导入本地节点失败", error_msg)
errors.append(error_msg)
# 提交事务
jingrow.db.commit()
return {
"success": True,
"matched": matched,
"imported": imported,
"skipped_existing": skipped_existing,
"errors": errors
}
except Exception as e:
jingrow.db.rollback()
jingrow.log_error("导入本地节点失败", str(e))
return {
"success": False,
"error": str(e),
"matched": 0,
"imported": 0,
"skipped_existing": 0
}
@jingrow.whitelist()
def package_node(node_type: str) -> Dict[str, Any]:
"""打包节点为zip文件"""
if not node_type:
jingrow.throw("参数错误", "node_type不能为空")
try:
import base64
from datetime import datetime
base_path = jingrow.get_app_path("jingrow")
node_dir = os.path.join(base_path, "ai", "nodes", node_type)
if not os.path.exists(node_dir):
jingrow.throw("节点不存在", f"节点目录不存在: {node_type}")
# 创建临时目录
temp_dir = tempfile.mkdtemp()
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
temp_package_dir = os.path.join(temp_dir, node_type)
try:
# 复制节点目录内容(排除不必要的文件)
os.makedirs(temp_package_dir, exist_ok=True)
for item in os.listdir(node_dir):
if item in ['__pycache__', '.git', '.DS_Store', '.pytest_cache']:
continue
src = os.path.join(node_dir, item)
dst = os.path.join(temp_package_dir, item)
if os.path.isdir(src):
shutil.copytree(src, dst)
else:
shutil.copy2(src, dst)
# 打包为ZIP
zip_filename = f"{node_type}-{timestamp}"
zip_path = shutil.make_archive(
os.path.join(temp_dir, zip_filename),
'zip',
root_dir=temp_dir,
base_dir=node_type
)
# 读取文件内容
with open(zip_path, 'rb') as f:
zip_content = f.read()
# 返回文件内容base64编码
return {
"success": True,
"filename": f"{zip_filename}.zip",
"content": base64.b64encode(zip_content).decode('utf-8')
}
finally:
# 清理临时目录
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir, ignore_errors=True)
except Exception as e:
jingrow.log_error(f"打包节点失败: {node_type}", str(e))
jingrow.throw("打包失败", str(e))