优化node_management.py

This commit is contained in:
jingrow 2025-11-03 03:12:22 +08:00
parent d348a656c0
commit 5a758b3a8f

View File

@ -13,6 +13,7 @@ from jingrow.utils.fs import atomic_write_json
from jingrow.utils.jingrow_api import get_record_id, create_record, update_record, get_record_list
from jingrow.utils.auth import get_jingrow_cloud_url, get_jingrow_cloud_api_headers
from jingrow.config import Config
from jingrow.utils.path import get_root_path, get_apps_path
logger = logging.getLogger(__name__)
@ -33,8 +34,7 @@ async def export_node_definition(payload: Dict[str, Any]):
export_data = {"metadata": metadata, **(schema or {})}
current_file = Path(__file__).resolve()
jingrow_root = current_file.parents[1] # 修正路径层级
jingrow_root = get_apps_path() / "jingrow"
new_root = jingrow_root / "ai" / "nodes"
target = new_root / node_type / f"{node_type}.json"
atomic_write_json(target, export_data)
@ -49,8 +49,7 @@ async def import_local_node_definitions():
扫描本地节点定义目录 metadata 去重后导入到 Local Ai Node
"""
try:
current_file = Path(__file__).resolve()
jingrow_root = current_file.parents[1] # 修正路径层级
jingrow_root = get_apps_path() / "jingrow"
nodes_root = jingrow_root / "ai" / "nodes"
if not nodes_root.exists():
return {"success": True, "matched": 0, "imported": 0, "skipped_existing": 0}
@ -128,8 +127,7 @@ async def get_all_node_metadata():
获取所有节点的元数据用于流程编排界面
"""
try:
current_file = Path(__file__).resolve()
jingrow_root = current_file.parents[1] # 修正路径层级
jingrow_root = get_apps_path() / "jingrow"
nodes_root = jingrow_root / "ai" / "nodes"
if not nodes_root.exists():
@ -177,8 +175,7 @@ async def get_node_schema(node_type: str):
获取指定节点类型的Schema配置
"""
try:
current_file = Path(__file__).resolve()
jingrow_root = current_file.parents[1]
jingrow_root = get_apps_path() / "jingrow"
nodes_root = jingrow_root / "ai" / "nodes"
json_file = nodes_root / node_type / f"{node_type}.json"
@ -303,151 +300,148 @@ async def check_node_exists(node_type: str):
@router.get("/jingrow/installed-node-types")
async def get_installed_node_types():
"""
获取已安装的节点类型列表
通过扫描节点目录获取这是最高效的方式只需要列出目录名不需要读取文件
"""
try:
# 确定节点目录路径apps/jingrow/jingrow/ai/nodes
current_file = Path(__file__).resolve()
jingrow_root = current_file.parents[1] # jingrow
nodes_root = jingrow_root / "ai" / "nodes"
node_types = []
# 如果目录不存在,返回空列表
if not nodes_root.exists():
return {"success": True, "node_types": []}
# 扫描目录,只获取目录名(最高效的方式)
for item in nodes_root.iterdir():
if item.is_dir() and not item.name.startswith('.') and item.name != '__pycache__':
# 验证是否包含节点定义文件(可选,但更可靠)
json_file = item / f"{item.name}.json"
if json_file.exists():
node_types.append(item.name)
return {"success": True, "node_types": sorted(node_types)}
except Exception as e:
logger.error(f"获取已安装节点类型失败: {str(e)}")
logger.error(f"Traceback: {traceback.format_exc()}")
return {"success": True, "node_types": []}
"""
获取已安装的节点类型列表
通过扫描节点目录获取这是最高效的方式只需要列出目录名不需要读取文件
"""
try:
# 确定节点目录路径apps/jingrow/jingrow/ai/nodes
jingrow_root = get_apps_path() / "jingrow"
nodes_root = jingrow_root / "ai" / "nodes"
node_types = []
# 如果目录不存在,返回空列表
if not nodes_root.exists():
return {"success": True, "node_types": []}
# 扫描目录,只获取目录名(最高效的方式)
for item in nodes_root.iterdir():
if item.is_dir() and not item.name.startswith('.') and item.name != '__pycache__':
# 验证是否包含节点定义文件(可选,但更可靠)
json_file = item / f"{item.name}.json"
if json_file.exists():
node_types.append(item.name)
return {"success": True, "node_types": sorted(node_types)}
except Exception as e:
logger.error(f"获取已安装节点类型失败: {str(e)}")
logger.error(f"Traceback: {traceback.format_exc()}")
return {"success": True, "node_types": []}
@router.post("/jingrow/install-node-from-url")
async def install_node_from_url(url: str = Form(...)):
"""从URL安装节点"""
try:
# 下载文件
current = Path(__file__).resolve()
root = current.parents[4]
tmp_dir = root / "tmp"
tmp_dir.mkdir(parents=True, exist_ok=True)
# 创建临时文件
temp_filename = f"node_download_{uuid.uuid4().hex[:8]}{Path(url).suffix}"
temp_file_path = tmp_dir / temp_filename
# 下载文件
response = requests.get(url, stream=True, timeout=300)
response.raise_for_status()
with open(temp_file_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
# 安装节点
result = _install_node_from_file(str(temp_file_path))
# 清理临时文件
if temp_file_path.exists():
os.remove(temp_file_path)
return result
except Exception as e:
logger.error(f"从URL安装节点失败: {str(e)}")
logger.error(f"Traceback: {traceback.format_exc()}")
raise HTTPException(status_code=500, detail=f"安装节点失败: {str(e)}")
"""从URL安装节点"""
try:
# 下载文件
root = get_root_path()
tmp_dir = root / "tmp"
tmp_dir.mkdir(parents=True, exist_ok=True)
# 创建临时文件
temp_filename = f"node_download_{uuid.uuid4().hex[:8]}{Path(url).suffix}"
temp_file_path = tmp_dir / temp_filename
# 下载文件
response = requests.get(url, stream=True, timeout=300)
response.raise_for_status()
with open(temp_file_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
# 安装节点
result = _install_node_from_file(str(temp_file_path))
# 清理临时文件
if temp_file_path.exists():
os.remove(temp_file_path)
return result
except Exception as e:
logger.error(f"从URL安装节点失败: {str(e)}")
logger.error(f"Traceback: {traceback.format_exc()}")
raise HTTPException(status_code=500, detail=f"安装节点失败: {str(e)}")
@router.post("/jingrow/install-node-from-git")
async def install_node_from_git(repo_url: str = Form(...)):
"""从git仓库克隆并安装节点"""
try:
current = Path(__file__).resolve()
root = current.parents[4]
tmp_dir = root / "tmp"
tmp_dir.mkdir(parents=True, exist_ok=True)
# 创建临时目录用于克隆
clone_dir = tmp_dir / f"node_git_clone_{uuid.uuid4().hex[:8]}"
try:
# 使用 git clone 克隆仓库
result = subprocess.run(
['git', 'clone', repo_url, str(clone_dir)],
capture_output=True,
text=True,
timeout=300
)
if result.returncode != 0:
raise HTTPException(status_code=400, detail=f"Git 克隆失败: {result.stderr}")
# 查找节点目录(节点包应该包含一个或多个节点目录)
node_dirs = []
for item in clone_dir.iterdir():
if item.is_dir() and not item.name.startswith('.') and item.name != '__pycache__':
json_file = item / f"{item.name}.json"
if json_file.exists():
node_dirs.append(item)
if not node_dirs:
raise HTTPException(status_code=400, detail="仓库中没有找到节点定义文件")
# 安装所有找到的节点
installed_nodes = []
errors = []
for node_dir in node_dirs:
try:
result = _install_single_node_directory(str(node_dir))
if result.get('success'):
installed_nodes.append(node_dir.name)
else:
errors.append(f"{node_dir.name}: {result.get('error')}")
except Exception as e:
errors.append(f"{node_dir.name}: {str(e)}")
# 清理临时目录
shutil.rmtree(clone_dir, ignore_errors=True)
if errors:
return {
'success': len(installed_nodes) > 0,
'installed': installed_nodes,
'errors': errors,
'message': f"成功安装 {len(installed_nodes)} 个节点,失败 {len(errors)}"
}
else:
return {
'success': True,
'installed': installed_nodes,
'message': f"成功安装 {len(installed_nodes)} 个节点"
}
finally:
# 确保清理临时目录
if clone_dir.exists():
shutil.rmtree(clone_dir, ignore_errors=True)
except HTTPException:
raise
except Exception as e:
logger.error(f"从Git安装节点失败: {str(e)}")
logger.error(f"Traceback: {traceback.format_exc()}")
raise HTTPException(status_code=500, detail=f"安装节点失败: {str(e)}")
"""从git仓库克隆并安装节点"""
try:
root = get_root_path()
tmp_dir = root / "tmp"
tmp_dir.mkdir(parents=True, exist_ok=True)
# 创建临时目录用于克隆
clone_dir = tmp_dir / f"node_git_clone_{uuid.uuid4().hex[:8]}"
try:
# 使用 git clone 克隆仓库
result = subprocess.run(
['git', 'clone', repo_url, str(clone_dir)],
capture_output=True,
text=True,
timeout=300
)
if result.returncode != 0:
raise HTTPException(status_code=400, detail=f"Git 克隆失败: {result.stderr}")
# 查找节点目录(节点包应该包含一个或多个节点目录)
node_dirs = []
for item in clone_dir.iterdir():
if item.is_dir() and not item.name.startswith('.') and item.name != '__pycache__':
json_file = item / f"{item.name}.json"
if json_file.exists():
node_dirs.append(item)
if not node_dirs:
raise HTTPException(status_code=400, detail="仓库中没有找到节点定义文件")
# 安装所有找到的节点
installed_nodes = []
errors = []
for node_dir in node_dirs:
try:
result = _install_single_node_directory(str(node_dir))
if result.get('success'):
installed_nodes.append(node_dir.name)
else:
errors.append(f"{node_dir.name}: {result.get('error')}")
except Exception as e:
errors.append(f"{node_dir.name}: {str(e)}")
# 清理临时目录
shutil.rmtree(clone_dir, ignore_errors=True)
if errors:
return {
'success': len(installed_nodes) > 0,
'installed': installed_nodes,
'errors': errors,
'message': f"成功安装 {len(installed_nodes)} 个节点,失败 {len(errors)}"
}
else:
return {
'success': True,
'installed': installed_nodes,
'message': f"成功安装 {len(installed_nodes)} 个节点"
}
finally:
# 确保清理临时目录
if clone_dir.exists():
shutil.rmtree(clone_dir, ignore_errors=True)
except HTTPException:
raise
except Exception as e:
logger.error(f"从Git安装节点失败: {str(e)}")
logger.error(f"Traceback: {traceback.format_exc()}")
raise HTTPException(status_code=500, detail=f"安装节点失败: {str(e)}")
def _install_node_from_file(file_path: str) -> Dict[str, Any]:
@ -511,165 +505,161 @@ def _install_node_from_file(file_path: str) -> Dict[str, Any]:
def _install_single_node_directory(node_dir: str) -> Dict[str, Any]:
"""安装单个节点目录到 ai/nodes 并导入数据库"""
try:
node_dir_path = Path(node_dir)
node_name = node_dir_path.name
# 读取节点定义文件
json_file = node_dir_path / f"{node_name}.json"
if not json_file.exists():
return {'success': False, 'error': f'找不到节点定义文件: {json_file.name}'}
with open(json_file, 'r', encoding='utf-8') as f:
node_data = json.load(f)
if not isinstance(node_data, dict):
return {'success': False, 'error': '节点定义文件格式错误'}
metadata = node_data.get("metadata") or {}
node_type = metadata.get("type")
if not node_type:
return {'success': False, 'error': '节点定义中缺少 metadata.type'}
# 确定目标目录apps/jingrow/jingrow/ai/nodes
current_file = Path(__file__).resolve()
# node_management.py 位于 jingrow/api/
# parents[0] = jingrow/api, parents[1] = jingrow
jingrow_root = current_file.parents[1] # jingrow
nodes_root = jingrow_root / "ai" / "nodes"
nodes_root.mkdir(parents=True, exist_ok=True)
target_node_dir = nodes_root / node_type
# 如果目标目录已存在,先删除
if target_node_dir.exists():
shutil.rmtree(target_node_dir)
# 复制整个节点目录
shutil.copytree(node_dir_path, target_node_dir)
# 导入到数据库
# 检查是否已存在
exists_res = get_record_id(
pagetype="Local Ai Node",
field="node_type",
value=node_type,
)
# 生成 schema移除 metadata
schema = dict(node_data)
schema.pop("metadata", None)
payload = {
"node_type": node_type,
"node_label": metadata.get("label") or node_type,
"node_icon": metadata.get("icon") or "fa-cube",
"node_color": metadata.get("color") or "#6b7280",
"node_group": metadata.get("group") or "",
"node_component": metadata.get("component_type") or "GenericNode",
"node_description": metadata.get("description") or "",
"status": "Published",
"node_schema": schema,
}
if exists_res.get("success"):
# 更新现有记录,使用 node_type 作为 name
res = update_record("Local Ai Node", node_type, payload)
else:
# 创建新记录
res = create_record("Local Ai Node", payload)
if res.get("success"):
return {'success': True, 'node_type': node_type, 'message': f'节点 {node_type} 安装成功'}
else:
return {'success': False, 'error': res.get('error', '导入数据库失败')}
except Exception as e:
logger.error(f"安装节点目录失败: {str(e)}")
return {'success': False, 'error': str(e)}
"""安装单个节点目录到 ai/nodes 并导入数据库"""
try:
node_dir_path = Path(node_dir)
node_name = node_dir_path.name
# 读取节点定义文件
json_file = node_dir_path / f"{node_name}.json"
if not json_file.exists():
return {'success': False, 'error': f'找不到节点定义文件: {json_file.name}'}
with open(json_file, 'r', encoding='utf-8') as f:
node_data = json.load(f)
if not isinstance(node_data, dict):
return {'success': False, 'error': '节点定义文件格式错误'}
metadata = node_data.get("metadata") or {}
node_type = metadata.get("type")
if not node_type:
return {'success': False, 'error': '节点定义中缺少 metadata.type'}
# 确定目标目录apps/jingrow/jingrow/ai/nodes
jingrow_root = get_apps_path() / "jingrow"
nodes_root = jingrow_root / "ai" / "nodes"
nodes_root.mkdir(parents=True, exist_ok=True)
target_node_dir = nodes_root / node_type
# 如果目标目录已存在,先删除
if target_node_dir.exists():
shutil.rmtree(target_node_dir)
# 复制整个节点目录
shutil.copytree(node_dir_path, target_node_dir)
# 导入到数据库
# 检查是否已存在
exists_res = get_record_id(
pagetype="Local Ai Node",
field="node_type",
value=node_type,
)
# 生成 schema移除 metadata
schema = dict(node_data)
schema.pop("metadata", None)
payload = {
"node_type": node_type,
"node_label": metadata.get("label") or node_type,
"node_icon": metadata.get("icon") or "fa-cube",
"node_color": metadata.get("color") or "#6b7280",
"node_group": metadata.get("group") or "",
"node_component": metadata.get("component_type") or "GenericNode",
"node_description": metadata.get("description") or "",
"status": "Published",
"node_schema": schema,
}
if exists_res.get("success"):
# 更新现有记录,使用 node_type 作为 name
res = update_record("Local Ai Node", node_type, payload)
else:
# 创建新记录
res = create_record("Local Ai Node", payload)
if res.get("success"):
return {'success': True, 'node_type': node_type, 'message': f'节点 {node_type} 安装成功'}
else:
return {'success': False, 'error': res.get('error', '导入数据库失败')}
except Exception as e:
logger.error(f"安装节点目录失败: {str(e)}")
return {'success': False, 'error': str(e)}
@router.post("/jingrow/node/package/{node_type}")
async def package_node(node_type: str):
"""
打包节点文件夹为zip文件返回文件路径用于后续上传
"""
try:
from datetime import datetime
current_file = Path(__file__).resolve()
jingrow_root = current_file.parents[1]
nodes_root = jingrow_root / "ai" / "nodes"
node_dir = nodes_root / node_type
if not node_dir.exists():
raise HTTPException(status_code=404, detail=f"节点目录不存在: {node_type}")
# 创建临时打包目录
root = current_file.parents[4]
tmp_dir = root / "tmp"
tmp_dir.mkdir(parents=True, exist_ok=True)
# 创建临时目录用于打包,文件夹名称保持为节点类型(不加时间戳)
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
temp_package_dir = tmp_dir / node_type
if temp_package_dir.exists():
# 如果临时目录已存在,先删除
shutil.rmtree(temp_package_dir)
temp_package_dir.mkdir(parents=True, exist_ok=True)
try:
# 复制节点目录内容(排除不必要的文件)
for item in node_dir.iterdir():
if item.name in ['__pycache__', '.git', '.DS_Store', '.pytest_cache']:
continue
dst = temp_package_dir / item.name
if item.is_dir():
shutil.copytree(item, dst, dirs_exist_ok=True)
else:
shutil.copy2(item, dst)
# 打包为 ZIPzip文件名格式{node_type}-{timestamp}.zip但内部文件夹名称保持为节点类型
zip_filename = f"{node_type}-{timestamp}.zip"
zip_base_name = tmp_dir / f"{node_type}-{timestamp}"
# base_dir 使用 node_type这样 zip 内部文件夹名称就是 node_type
shutil.make_archive(str(zip_base_name), 'zip', root_dir=str(tmp_dir), base_dir=node_type)
zip_path = tmp_dir / f"{zip_filename}"
if not zip_path.exists():
raise HTTPException(status_code=500, detail="ZIP文件创建失败")
# 读取文件内容
with open(zip_path, 'rb') as f:
zip_content = f.read()
# 清理临时文件
if zip_path.exists():
os.remove(zip_path)
# 返回文件内容,前端可以直接使用
return Response(
content=zip_content,
media_type="application/zip",
headers={
"Content-Disposition": f"attachment; filename={zip_filename}",
"Content-Type": "application/zip"
}
)
finally:
# 清理临时目录
if temp_package_dir.exists():
shutil.rmtree(temp_package_dir, ignore_errors=True)
except HTTPException:
raise
except Exception as e:
logger.error(f"打包节点失败: {str(e)}")
logger.error(f"Traceback: {traceback.format_exc()}")
raise HTTPException(status_code=500, detail=f"打包节点失败: {str(e)}")
"""
打包节点文件夹为zip文件返回文件路径用于后续上传
"""
try:
from datetime import datetime
jingrow_root = get_apps_path() / "jingrow"
nodes_root = jingrow_root / "ai" / "nodes"
node_dir = nodes_root / node_type
if not node_dir.exists():
raise HTTPException(status_code=404, detail=f"节点目录不存在: {node_type}")
# 创建临时打包目录
root = get_root_path()
tmp_dir = root / "tmp"
tmp_dir.mkdir(parents=True, exist_ok=True)
# 创建临时目录用于打包,文件夹名称保持为节点类型(不加时间戳)
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
temp_package_dir = tmp_dir / node_type
if temp_package_dir.exists():
# 如果临时目录已存在,先删除
shutil.rmtree(temp_package_dir)
temp_package_dir.mkdir(parents=True, exist_ok=True)
try:
# 复制节点目录内容(排除不必要的文件)
for item in node_dir.iterdir():
if item.name in ['__pycache__', '.git', '.DS_Store', '.pytest_cache']:
continue
dst = temp_package_dir / item.name
if item.is_dir():
shutil.copytree(item, dst, dirs_exist_ok=True)
else:
shutil.copy2(item, dst)
# 打包为 ZIPzip文件名格式{node_type}-{timestamp}.zip但内部文件夹名称保持为节点类型
zip_filename = f"{node_type}-{timestamp}.zip"
zip_base_name = tmp_dir / f"{node_type}-{timestamp}"
# base_dir 使用 node_type这样 zip 内部文件夹名称就是 node_type
shutil.make_archive(str(zip_base_name), 'zip', root_dir=str(tmp_dir), base_dir=node_type)
zip_path = tmp_dir / f"{zip_filename}"
if not zip_path.exists():
raise HTTPException(status_code=500, detail="ZIP文件创建失败")
# 读取文件内容
with open(zip_path, 'rb') as f:
zip_content = f.read()
# 清理临时文件
if zip_path.exists():
os.remove(zip_path)
# 返回文件内容,前端可以直接使用
return Response(
content=zip_content,
media_type="application/zip",
headers={
"Content-Disposition": f"attachment; filename={zip_filename}",
"Content-Type": "application/zip"
}
)
finally:
# 清理临时目录
if temp_package_dir.exists():
shutil.rmtree(temp_package_dir, ignore_errors=True)
except HTTPException:
raise
except Exception as e:
logger.error(f"打包节点失败: {str(e)}")
logger.error(f"Traceback: {traceback.format_exc()}")
raise HTTPException(status_code=500, detail=f"打包节点失败: {str(e)}")
@router.post("/jingrow/node/publish")