585 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import APIRouter, HTTPException, Form
from typing import Dict, Any, List, Optional
from pathlib import Path
import json
import os
import uuid
import shutil
import subprocess
import logging
import traceback
import requests
from jingrow.utils.fs import atomic_write_json
from jingrow.utils.jingrow_api import get_record_id, create_record, update_record, get_record_list
from jingrow.utils.auth import get_jingrow_cloud_url, get_jingrow_cloud_api_headers
logger = logging.getLogger(__name__)
router = APIRouter()
@router.post("/jingrow/node/export")
async def export_node_definition(payload: Dict[str, Any]):
"""
导出节点定义metadata + schema为 JSON 文件
"""
try:
metadata = payload.get("metadata") or {}
schema = payload.get("schema") or {}
node_type = metadata.get("type")
if not node_type:
raise ValueError("metadata.type is required")
export_data = {"metadata": metadata, **(schema or {})}
current_file = Path(__file__).resolve()
jingrow_root = current_file.parents[1] # 修正路径层级
new_root = jingrow_root / "ai" / "nodes"
target = new_root / node_type / f"{node_type}.json"
atomic_write_json(target, export_data)
return {"success": True, "path": str(target)}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@router.post("/jingrow/node/import-local")
async def import_local_node_definitions():
"""
扫描本地节点定义目录,按 metadata 去重后导入到 Local Ai Node。
"""
try:
current_file = Path(__file__).resolve()
jingrow_root = current_file.parents[1] # 修正路径层级
nodes_root = jingrow_root / "ai" / "nodes"
if not nodes_root.exists():
return {"success": True, "matched": 0, "imported": 0, "skipped_existing": 0}
matched: int = 0
imported: int = 0
skipped: int = 0
errors: List[str] = []
for node_dir in nodes_root.iterdir():
if not node_dir.is_dir():
continue
json_file = node_dir / f"{node_dir.name}.json"
if not json_file.exists():
continue
matched += 1
try:
with open(json_file, "r", encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, dict):
continue
metadata = data.get("metadata") or {}
node_type = metadata.get("type")
if not node_type:
continue
# 去重:按 node_type 查询是否已存在
exists_res = get_record_id(
pagetype="Local Ai Node",
field="node_type",
value=node_type,
)
if exists_res.get("success"):
skipped += 1
continue
# 生成 schema移除 metadata 的剩余部分)
schema = dict(data)
schema.pop("metadata", None)
payload = {
"node_type": node_type,
"node_label": metadata.get("label") or node_type,
"node_icon": metadata.get("icon") or "fa-cube",
"node_color": metadata.get("color") or "#6b7280",
"node_group": metadata.get("group") or "",
"node_component": metadata.get("component_type") or "GenericNode",
"node_description": metadata.get("description") or "",
"status": "Published",
"node_schema": schema,
}
res = create_record("Local Ai Node", payload)
if res.get("success"):
imported += 1
else:
errors.append(f"{node_type}: {res.get('error')}")
except Exception as e:
errors.append(f"{json_file.name}: {str(e)}")
return {
"success": True,
"matched": matched,
"imported": imported,
"skipped_existing": skipped,
"errors": errors,
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/jingrow/node/metadata")
async def get_all_node_metadata():
"""
获取所有节点的元数据,用于流程编排界面
"""
try:
current_file = Path(__file__).resolve()
jingrow_root = current_file.parents[1] # 修正路径层级
nodes_root = jingrow_root / "ai" / "nodes"
if not nodes_root.exists():
return {"success": True, "data": {}}
metadata_map = {}
for node_dir in nodes_root.iterdir():
if not node_dir.is_dir():
continue
json_file = node_dir / f"{node_dir.name}.json"
if not json_file.exists():
continue
try:
with open(json_file, "r", encoding="utf-8") as f:
data = json.load(f)
metadata = data.get("metadata") or {}
node_type = metadata.get("type")
if not node_type:
continue
metadata_map[node_type] = {
"type": node_type,
"label": metadata.get("label") or node_type,
"icon": metadata.get("icon") or "fa-cube",
"color": metadata.get("color") or "#6b7280",
"description": metadata.get("description") or "",
"group": metadata.get("group") or "其他",
"component": metadata.get("component_type") or "GenericNode"
}
except Exception:
continue
return {"success": True, "data": metadata_map}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/jingrow/node/schema/{node_type}")
async def get_node_schema(node_type: str):
"""
获取指定节点类型的Schema配置
"""
try:
current_file = Path(__file__).resolve()
jingrow_root = current_file.parents[1]
nodes_root = jingrow_root / "ai" / "nodes"
json_file = nodes_root / node_type / f"{node_type}.json"
if not json_file.exists():
raise HTTPException(status_code=404, detail=f"节点类型 {node_type} 不存在")
with open(json_file, "r", encoding="utf-8") as f:
data = json.load(f)
schema = dict(data)
schema.pop("metadata", None)
return {"success": True, "data": schema}
except FileNotFoundError:
raise HTTPException(status_code=404, detail=f"节点类型 {node_type} 不存在")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ==================== 节点市场 API ====================
@router.get("/jingrow/node-marketplace")
async def get_node_marketplace(
search: Optional[str] = None,
page: int = 1,
page_size: int = 20,
sort_by: Optional[str] = None
):
"""获取节点市场数据,支持搜索、分页和排序"""
try:
url = f"{get_jingrow_cloud_url()}/api/action/jcloud.api.local_node.get_local_nodes"
# 构建过滤条件
filters = {"public": 1}
if search:
filters["title"] = ["like", f"%{search}%"]
filters["node_type"] = ["like", f"%{search}%"]
# 1. 先获取总数
total_params = {
'filters': json.dumps(filters, ensure_ascii=False),
'limit_start': 0,
'limit_page_length': 0
}
headers = get_jingrow_cloud_api_headers()
try:
total_response = requests.get(url, params=total_params, headers=headers, timeout=20)
total_count = 0
if total_response.status_code == 200:
total_data = total_response.json()
total_count = len(total_data.get('message', []))
# 2. 获取分页数据
params = {
'filters': json.dumps(filters, ensure_ascii=False)
}
# 排序参数
if sort_by:
params['order_by'] = sort_by
# 分页参数
limit_start = (page - 1) * page_size
params['limit_start'] = limit_start
params['limit_page_length'] = page_size
response = requests.get(url, params=params, headers=headers, timeout=20)
if response.status_code == 200:
data = response.json()
nodes = data.get('message', [])
return {
"items": nodes,
"total": total_count,
"page": page,
"page_size": page_size
}
else:
# 如果API不存在或失败返回空列表
logger.warning(f"获取节点市场数据失败: HTTP {response.status_code}, 返回空列表")
return {
"items": [],
"total": 0,
"page": page,
"page_size": page_size
}
except requests.exceptions.RequestException as e:
# 网络错误或API不存在时返回空列表
logger.warning(f"节点市场API请求失败: {str(e)}, 返回空列表")
return {
"items": [],
"total": 0,
"page": page,
"page_size": page_size
}
except Exception as e:
logger.error(f"获取节点市场数据异常: {str(e)}")
logger.error(f"Traceback: {traceback.format_exc()}")
# 即使出错也返回空列表而不是抛出500错误
return {
"items": [],
"total": 0,
"page": page,
"page_size": page_size
}
@router.get("/jingrow/check-node/{node_type}")
async def check_node_exists(node_type: str):
"""检查节点是否已安装"""
try:
result = get_record_id(
pagetype="Local Ai Node",
field="node_type",
value=node_type,
)
return {"exists": result.get("success", False)}
except Exception as e:
raise HTTPException(status_code=500, detail=f"检查节点失败: {str(e)}")
@router.get("/jingrow/installed-node-types")
async def get_installed_node_types():
"""获取已安装的节点类型列表"""
try:
result = get_record_list(
pagetype="Local Ai Node",
fields=["node_type"],
filters=[],
limit_page_length=1000
)
if result.get("success"):
records = result.get("data", {}).get("data", [])
node_types = [record.get("node_type") for record in records if record.get("node_type")]
return {"success": True, "node_types": node_types}
else:
return {"success": True, "node_types": []}
except Exception as e:
logger.error(f"获取已安装节点类型失败: {str(e)}")
return {"success": True, "node_types": []}
@router.post("/jingrow/install-node-from-url")
async def install_node_from_url(url: str = Form(...)):
"""从URL安装节点"""
try:
# 下载文件
current = Path(__file__).resolve()
root = current.parents[4]
tmp_dir = root / "tmp"
tmp_dir.mkdir(parents=True, exist_ok=True)
# 创建临时文件
temp_filename = f"node_download_{uuid.uuid4().hex[:8]}{Path(url).suffix}"
temp_file_path = tmp_dir / temp_filename
# 下载文件
response = requests.get(url, stream=True, timeout=300)
response.raise_for_status()
with open(temp_file_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
# 安装节点
result = _install_node_from_file(str(temp_file_path))
# 清理临时文件
if temp_file_path.exists():
os.remove(temp_file_path)
return result
except Exception as e:
logger.error(f"从URL安装节点失败: {str(e)}")
logger.error(f"Traceback: {traceback.format_exc()}")
raise HTTPException(status_code=500, detail=f"安装节点失败: {str(e)}")
@router.post("/jingrow/install-node-from-git")
async def install_node_from_git(repo_url: str = Form(...)):
"""从git仓库克隆并安装节点"""
try:
current = Path(__file__).resolve()
root = current.parents[4]
tmp_dir = root / "tmp"
tmp_dir.mkdir(parents=True, exist_ok=True)
# 创建临时目录用于克隆
clone_dir = tmp_dir / f"node_git_clone_{uuid.uuid4().hex[:8]}"
try:
# 使用 git clone 克隆仓库
result = subprocess.run(
['git', 'clone', repo_url, str(clone_dir)],
capture_output=True,
text=True,
timeout=300
)
if result.returncode != 0:
raise HTTPException(status_code=400, detail=f"Git 克隆失败: {result.stderr}")
# 查找节点目录(节点包应该包含一个或多个节点目录)
node_dirs = []
for item in clone_dir.iterdir():
if item.is_dir() and not item.name.startswith('.') and item.name != '__pycache__':
json_file = item / f"{item.name}.json"
if json_file.exists():
node_dirs.append(item)
if not node_dirs:
raise HTTPException(status_code=400, detail="仓库中没有找到节点定义文件")
# 安装所有找到的节点
installed_nodes = []
errors = []
for node_dir in node_dirs:
try:
result = _install_single_node_directory(str(node_dir))
if result.get('success'):
installed_nodes.append(node_dir.name)
else:
errors.append(f"{node_dir.name}: {result.get('error')}")
except Exception as e:
errors.append(f"{node_dir.name}: {str(e)}")
# 清理临时目录
shutil.rmtree(clone_dir, ignore_errors=True)
if errors:
return {
'success': len(installed_nodes) > 0,
'installed': installed_nodes,
'errors': errors,
'message': f"成功安装 {len(installed_nodes)} 个节点,失败 {len(errors)}"
}
else:
return {
'success': True,
'installed': installed_nodes,
'message': f"成功安装 {len(installed_nodes)} 个节点"
}
finally:
# 确保清理临时目录
if clone_dir.exists():
shutil.rmtree(clone_dir, ignore_errors=True)
except HTTPException:
raise
except Exception as e:
logger.error(f"从Git安装节点失败: {str(e)}")
logger.error(f"Traceback: {traceback.format_exc()}")
raise HTTPException(status_code=500, detail=f"安装节点失败: {str(e)}")
def _install_node_from_file(file_path: str) -> Dict[str, Any]:
"""从文件安装节点支持ZIP和TAR.GZ"""
try:
from jingrow.utils.app_installer import extract_package, cleanup_temp_dir
# 解压文件
extract_result = extract_package(file_path)
if not extract_result.get('success'):
return extract_result
temp_dir = extract_result['temp_dir']
try:
# 查找节点目录
node_dirs = []
for item in Path(temp_dir).iterdir():
if item.is_dir() and not item.name.startswith('.') and item.name != '__pycache__':
json_file = item / f"{item.name}.json"
if json_file.exists():
node_dirs.append(item)
if not node_dirs:
return {'success': False, 'error': '压缩包中没有找到节点定义文件'}
# 安装所有找到的节点
installed_nodes = []
errors = []
for node_dir in node_dirs:
try:
result = _install_single_node_directory(str(node_dir))
if result.get('success'):
installed_nodes.append(node_dir.name)
else:
errors.append(f"{node_dir.name}: {result.get('error')}")
except Exception as e:
errors.append(f"{node_dir.name}: {str(e)}")
if errors:
return {
'success': len(installed_nodes) > 0,
'installed': installed_nodes,
'errors': errors,
'message': f"成功安装 {len(installed_nodes)} 个节点,失败 {len(errors)}"
}
else:
return {
'success': True,
'installed': installed_nodes,
'message': f"成功安装 {len(installed_nodes)} 个节点"
}
finally:
cleanup_temp_dir(temp_dir)
except Exception as e:
logger.error(f"从文件安装节点失败: {str(e)}")
return {'success': False, 'error': str(e)}
def _install_single_node_directory(node_dir: str) -> Dict[str, Any]:
"""安装单个节点目录到 ai/nodes 并导入数据库"""
try:
node_dir_path = Path(node_dir)
node_name = node_dir_path.name
# 读取节点定义文件
json_file = node_dir_path / f"{node_name}.json"
if not json_file.exists():
return {'success': False, 'error': f'找不到节点定义文件: {json_file.name}'}
with open(json_file, 'r', encoding='utf-8') as f:
node_data = json.load(f)
if not isinstance(node_data, dict):
return {'success': False, 'error': '节点定义文件格式错误'}
metadata = node_data.get("metadata") or {}
node_type = metadata.get("type")
if not node_type:
return {'success': False, 'error': '节点定义中缺少 metadata.type'}
# 确定目标目录apps/jingrow/jingrow/ai/nodes
current_file = Path(__file__).resolve()
# node_management.py 位于 jingrow/api/
# parents[0] = jingrow/api, parents[1] = jingrow
jingrow_root = current_file.parents[1] # jingrow
nodes_root = jingrow_root / "ai" / "nodes"
nodes_root.mkdir(parents=True, exist_ok=True)
target_node_dir = nodes_root / node_type
# 如果目标目录已存在,先删除
if target_node_dir.exists():
shutil.rmtree(target_node_dir)
# 复制整个节点目录
shutil.copytree(node_dir_path, target_node_dir)
# 导入到数据库
# 检查是否已存在
exists_res = get_record_id(
pagetype="Local Ai Node",
field="node_type",
value=node_type,
)
# 生成 schema移除 metadata
schema = dict(node_data)
schema.pop("metadata", None)
payload = {
"node_type": node_type,
"node_label": metadata.get("label") or node_type,
"node_icon": metadata.get("icon") or "fa-cube",
"node_color": metadata.get("color") or "#6b7280",
"node_group": metadata.get("group") or "",
"node_component": metadata.get("component_type") or "GenericNode",
"node_description": metadata.get("description") or "",
"status": "Published",
"node_schema": schema,
}
if exists_res.get("success"):
# 更新现有记录,使用 node_type 作为 name
res = update_record("Local Ai Node", node_type, payload)
else:
# 创建新记录
res = create_record("Local Ai Node", payload)
if res.get("success"):
return {'success': True, 'node_type': node_type, 'message': f'节点 {node_type} 安装成功'}
else:
return {'success': False, 'error': res.get('error', '导入数据库失败')}
except Exception as e:
logger.error(f"安装节点目录失败: {str(e)}")
return {'success': False, 'error': str(e)}