From 5a758b3a8fbbd6b40d114697c9be3ab3cdb69923 Mon Sep 17 00:00:00 2001 From: jingrow Date: Mon, 3 Nov 2025 03:12:22 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96node=5Fmanagement.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/jingrow/jingrow/api/node_management.py | 590 ++++++++++---------- 1 file changed, 290 insertions(+), 300 deletions(-) diff --git a/apps/jingrow/jingrow/api/node_management.py b/apps/jingrow/jingrow/api/node_management.py index c55d297..839bff8 100644 --- a/apps/jingrow/jingrow/api/node_management.py +++ b/apps/jingrow/jingrow/api/node_management.py @@ -13,6 +13,7 @@ from jingrow.utils.fs import atomic_write_json from jingrow.utils.jingrow_api import get_record_id, create_record, update_record, get_record_list from jingrow.utils.auth import get_jingrow_cloud_url, get_jingrow_cloud_api_headers from jingrow.config import Config +from jingrow.utils.path import get_root_path, get_apps_path logger = logging.getLogger(__name__) @@ -33,8 +34,7 @@ async def export_node_definition(payload: Dict[str, Any]): export_data = {"metadata": metadata, **(schema or {})} - current_file = Path(__file__).resolve() - jingrow_root = current_file.parents[1] # 修正路径层级 + jingrow_root = get_apps_path() / "jingrow" new_root = jingrow_root / "ai" / "nodes" target = new_root / node_type / f"{node_type}.json" atomic_write_json(target, export_data) @@ -49,8 +49,7 @@ async def import_local_node_definitions(): 扫描本地节点定义目录,按 metadata 去重后导入到 Local Ai Node。 """ try: - current_file = Path(__file__).resolve() - jingrow_root = current_file.parents[1] # 修正路径层级 + jingrow_root = get_apps_path() / "jingrow" nodes_root = jingrow_root / "ai" / "nodes" if not nodes_root.exists(): return {"success": True, "matched": 0, "imported": 0, "skipped_existing": 0} @@ -128,8 +127,7 @@ async def get_all_node_metadata(): 获取所有节点的元数据,用于流程编排界面 """ try: - current_file = Path(__file__).resolve() - jingrow_root = current_file.parents[1] # 修正路径层级 + jingrow_root = get_apps_path() / "jingrow" nodes_root = jingrow_root / "ai" / "nodes" if not nodes_root.exists(): @@ -177,8 +175,7 @@ async def get_node_schema(node_type: str): 获取指定节点类型的Schema配置 """ try: - current_file = Path(__file__).resolve() - jingrow_root = current_file.parents[1] + jingrow_root = get_apps_path() / "jingrow" nodes_root = jingrow_root / "ai" / "nodes" json_file = nodes_root / node_type / f"{node_type}.json" @@ -303,151 +300,148 @@ async def check_node_exists(node_type: str): @router.get("/jingrow/installed-node-types") async def get_installed_node_types(): - """ - 获取已安装的节点类型列表 - 通过扫描节点目录获取,这是最高效的方式(只需要列出目录名,不需要读取文件) - """ - try: - # 确定节点目录路径:apps/jingrow/jingrow/ai/nodes - current_file = Path(__file__).resolve() - jingrow_root = current_file.parents[1] # jingrow - nodes_root = jingrow_root / "ai" / "nodes" - - node_types = [] - - # 如果目录不存在,返回空列表 - if not nodes_root.exists(): - return {"success": True, "node_types": []} - - # 扫描目录,只获取目录名(最高效的方式) - for item in nodes_root.iterdir(): - if item.is_dir() and not item.name.startswith('.') and item.name != '__pycache__': - # 验证是否包含节点定义文件(可选,但更可靠) - json_file = item / f"{item.name}.json" - if json_file.exists(): - node_types.append(item.name) - - return {"success": True, "node_types": sorted(node_types)} - except Exception as e: - logger.error(f"获取已安装节点类型失败: {str(e)}") - logger.error(f"Traceback: {traceback.format_exc()}") - return {"success": True, "node_types": []} + """ + 获取已安装的节点类型列表 + 通过扫描节点目录获取,这是最高效的方式(只需要列出目录名,不需要读取文件) + """ + try: + # 确定节点目录路径:apps/jingrow/jingrow/ai/nodes + jingrow_root = get_apps_path() / "jingrow" + nodes_root = jingrow_root / "ai" / "nodes" + + node_types = [] + + # 如果目录不存在,返回空列表 + if not nodes_root.exists(): + return {"success": True, "node_types": []} + + # 扫描目录,只获取目录名(最高效的方式) + for item in nodes_root.iterdir(): + if item.is_dir() and not item.name.startswith('.') and item.name != '__pycache__': + # 验证是否包含节点定义文件(可选,但更可靠) + json_file = item / f"{item.name}.json" + if json_file.exists(): + node_types.append(item.name) + + return {"success": True, "node_types": sorted(node_types)} + except Exception as e: + logger.error(f"获取已安装节点类型失败: {str(e)}") + logger.error(f"Traceback: {traceback.format_exc()}") + return {"success": True, "node_types": []} @router.post("/jingrow/install-node-from-url") async def install_node_from_url(url: str = Form(...)): - """从URL安装节点""" - try: - # 下载文件 - current = Path(__file__).resolve() - root = current.parents[4] - tmp_dir = root / "tmp" - tmp_dir.mkdir(parents=True, exist_ok=True) - - # 创建临时文件 - temp_filename = f"node_download_{uuid.uuid4().hex[:8]}{Path(url).suffix}" - temp_file_path = tmp_dir / temp_filename - - # 下载文件 - response = requests.get(url, stream=True, timeout=300) - response.raise_for_status() - - with open(temp_file_path, 'wb') as f: - for chunk in response.iter_content(chunk_size=8192): - f.write(chunk) - - # 安装节点 - result = _install_node_from_file(str(temp_file_path)) - - # 清理临时文件 - if temp_file_path.exists(): - os.remove(temp_file_path) - - return result - - except Exception as e: - logger.error(f"从URL安装节点失败: {str(e)}") - logger.error(f"Traceback: {traceback.format_exc()}") - raise HTTPException(status_code=500, detail=f"安装节点失败: {str(e)}") + """从URL安装节点""" + try: + # 下载文件 + root = get_root_path() + tmp_dir = root / "tmp" + tmp_dir.mkdir(parents=True, exist_ok=True) + + # 创建临时文件 + temp_filename = f"node_download_{uuid.uuid4().hex[:8]}{Path(url).suffix}" + temp_file_path = tmp_dir / temp_filename + + # 下载文件 + response = requests.get(url, stream=True, timeout=300) + response.raise_for_status() + + with open(temp_file_path, 'wb') as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) + + # 安装节点 + result = _install_node_from_file(str(temp_file_path)) + + # 清理临时文件 + if temp_file_path.exists(): + os.remove(temp_file_path) + + return result + + except Exception as e: + logger.error(f"从URL安装节点失败: {str(e)}") + logger.error(f"Traceback: {traceback.format_exc()}") + raise HTTPException(status_code=500, detail=f"安装节点失败: {str(e)}") @router.post("/jingrow/install-node-from-git") async def install_node_from_git(repo_url: str = Form(...)): - """从git仓库克隆并安装节点""" - try: - current = Path(__file__).resolve() - root = current.parents[4] - tmp_dir = root / "tmp" - tmp_dir.mkdir(parents=True, exist_ok=True) - - # 创建临时目录用于克隆 - clone_dir = tmp_dir / f"node_git_clone_{uuid.uuid4().hex[:8]}" - - try: - # 使用 git clone 克隆仓库 - result = subprocess.run( - ['git', 'clone', repo_url, str(clone_dir)], - capture_output=True, - text=True, - timeout=300 - ) - - if result.returncode != 0: - raise HTTPException(status_code=400, detail=f"Git 克隆失败: {result.stderr}") - - # 查找节点目录(节点包应该包含一个或多个节点目录) - node_dirs = [] - for item in clone_dir.iterdir(): - if item.is_dir() and not item.name.startswith('.') and item.name != '__pycache__': - json_file = item / f"{item.name}.json" - if json_file.exists(): - node_dirs.append(item) - - if not node_dirs: - raise HTTPException(status_code=400, detail="仓库中没有找到节点定义文件") - - # 安装所有找到的节点 - installed_nodes = [] - errors = [] - - for node_dir in node_dirs: - try: - result = _install_single_node_directory(str(node_dir)) - if result.get('success'): - installed_nodes.append(node_dir.name) - else: - errors.append(f"{node_dir.name}: {result.get('error')}") - except Exception as e: - errors.append(f"{node_dir.name}: {str(e)}") - - # 清理临时目录 - shutil.rmtree(clone_dir, ignore_errors=True) - - if errors: - return { - 'success': len(installed_nodes) > 0, - 'installed': installed_nodes, - 'errors': errors, - 'message': f"成功安装 {len(installed_nodes)} 个节点,失败 {len(errors)} 个" - } - else: - return { - 'success': True, - 'installed': installed_nodes, - 'message': f"成功安装 {len(installed_nodes)} 个节点" - } - - finally: - # 确保清理临时目录 - if clone_dir.exists(): - shutil.rmtree(clone_dir, ignore_errors=True) - - except HTTPException: - raise - except Exception as e: - logger.error(f"从Git安装节点失败: {str(e)}") - logger.error(f"Traceback: {traceback.format_exc()}") - raise HTTPException(status_code=500, detail=f"安装节点失败: {str(e)}") + """从git仓库克隆并安装节点""" + try: + root = get_root_path() + tmp_dir = root / "tmp" + tmp_dir.mkdir(parents=True, exist_ok=True) + + # 创建临时目录用于克隆 + clone_dir = tmp_dir / f"node_git_clone_{uuid.uuid4().hex[:8]}" + + try: + # 使用 git clone 克隆仓库 + result = subprocess.run( + ['git', 'clone', repo_url, str(clone_dir)], + capture_output=True, + text=True, + timeout=300 + ) + + if result.returncode != 0: + raise HTTPException(status_code=400, detail=f"Git 克隆失败: {result.stderr}") + + # 查找节点目录(节点包应该包含一个或多个节点目录) + node_dirs = [] + for item in clone_dir.iterdir(): + if item.is_dir() and not item.name.startswith('.') and item.name != '__pycache__': + json_file = item / f"{item.name}.json" + if json_file.exists(): + node_dirs.append(item) + + if not node_dirs: + raise HTTPException(status_code=400, detail="仓库中没有找到节点定义文件") + + # 安装所有找到的节点 + installed_nodes = [] + errors = [] + + for node_dir in node_dirs: + try: + result = _install_single_node_directory(str(node_dir)) + if result.get('success'): + installed_nodes.append(node_dir.name) + else: + errors.append(f"{node_dir.name}: {result.get('error')}") + except Exception as e: + errors.append(f"{node_dir.name}: {str(e)}") + + # 清理临时目录 + shutil.rmtree(clone_dir, ignore_errors=True) + + if errors: + return { + 'success': len(installed_nodes) > 0, + 'installed': installed_nodes, + 'errors': errors, + 'message': f"成功安装 {len(installed_nodes)} 个节点,失败 {len(errors)} 个" + } + else: + return { + 'success': True, + 'installed': installed_nodes, + 'message': f"成功安装 {len(installed_nodes)} 个节点" + } + + finally: + # 确保清理临时目录 + if clone_dir.exists(): + shutil.rmtree(clone_dir, ignore_errors=True) + + except HTTPException: + raise + except Exception as e: + logger.error(f"从Git安装节点失败: {str(e)}") + logger.error(f"Traceback: {traceback.format_exc()}") + raise HTTPException(status_code=500, detail=f"安装节点失败: {str(e)}") def _install_node_from_file(file_path: str) -> Dict[str, Any]: @@ -511,165 +505,161 @@ def _install_node_from_file(file_path: str) -> Dict[str, Any]: def _install_single_node_directory(node_dir: str) -> Dict[str, Any]: - """安装单个节点目录到 ai/nodes 并导入数据库""" - try: - node_dir_path = Path(node_dir) - node_name = node_dir_path.name - - # 读取节点定义文件 - json_file = node_dir_path / f"{node_name}.json" - if not json_file.exists(): - return {'success': False, 'error': f'找不到节点定义文件: {json_file.name}'} - - with open(json_file, 'r', encoding='utf-8') as f: - node_data = json.load(f) - - if not isinstance(node_data, dict): - return {'success': False, 'error': '节点定义文件格式错误'} - - metadata = node_data.get("metadata") or {} - node_type = metadata.get("type") - if not node_type: - return {'success': False, 'error': '节点定义中缺少 metadata.type'} - - # 确定目标目录:apps/jingrow/jingrow/ai/nodes - current_file = Path(__file__).resolve() - # node_management.py 位于 jingrow/api/ - # parents[0] = jingrow/api, parents[1] = jingrow - jingrow_root = current_file.parents[1] # jingrow - nodes_root = jingrow_root / "ai" / "nodes" - nodes_root.mkdir(parents=True, exist_ok=True) - - target_node_dir = nodes_root / node_type - - # 如果目标目录已存在,先删除 - if target_node_dir.exists(): - shutil.rmtree(target_node_dir) - - # 复制整个节点目录 - shutil.copytree(node_dir_path, target_node_dir) - - # 导入到数据库 - # 检查是否已存在 - exists_res = get_record_id( - pagetype="Local Ai Node", - field="node_type", - value=node_type, - ) - - # 生成 schema(移除 metadata) - schema = dict(node_data) - schema.pop("metadata", None) - - payload = { - "node_type": node_type, - "node_label": metadata.get("label") or node_type, - "node_icon": metadata.get("icon") or "fa-cube", - "node_color": metadata.get("color") or "#6b7280", - "node_group": metadata.get("group") or "", - "node_component": metadata.get("component_type") or "GenericNode", - "node_description": metadata.get("description") or "", - "status": "Published", - "node_schema": schema, - } - - if exists_res.get("success"): - # 更新现有记录,使用 node_type 作为 name - res = update_record("Local Ai Node", node_type, payload) - else: - # 创建新记录 - res = create_record("Local Ai Node", payload) - - if res.get("success"): - return {'success': True, 'node_type': node_type, 'message': f'节点 {node_type} 安装成功'} - else: - return {'success': False, 'error': res.get('error', '导入数据库失败')} - - except Exception as e: - logger.error(f"安装节点目录失败: {str(e)}") - return {'success': False, 'error': str(e)} + """安装单个节点目录到 ai/nodes 并导入数据库""" + try: + node_dir_path = Path(node_dir) + node_name = node_dir_path.name + + # 读取节点定义文件 + json_file = node_dir_path / f"{node_name}.json" + if not json_file.exists(): + return {'success': False, 'error': f'找不到节点定义文件: {json_file.name}'} + + with open(json_file, 'r', encoding='utf-8') as f: + node_data = json.load(f) + + if not isinstance(node_data, dict): + return {'success': False, 'error': '节点定义文件格式错误'} + + metadata = node_data.get("metadata") or {} + node_type = metadata.get("type") + if not node_type: + return {'success': False, 'error': '节点定义中缺少 metadata.type'} + + # 确定目标目录:apps/jingrow/jingrow/ai/nodes + jingrow_root = get_apps_path() / "jingrow" + nodes_root = jingrow_root / "ai" / "nodes" + nodes_root.mkdir(parents=True, exist_ok=True) + + target_node_dir = nodes_root / node_type + + # 如果目标目录已存在,先删除 + if target_node_dir.exists(): + shutil.rmtree(target_node_dir) + + # 复制整个节点目录 + shutil.copytree(node_dir_path, target_node_dir) + + # 导入到数据库 + # 检查是否已存在 + exists_res = get_record_id( + pagetype="Local Ai Node", + field="node_type", + value=node_type, + ) + + # 生成 schema(移除 metadata) + schema = dict(node_data) + schema.pop("metadata", None) + + payload = { + "node_type": node_type, + "node_label": metadata.get("label") or node_type, + "node_icon": metadata.get("icon") or "fa-cube", + "node_color": metadata.get("color") or "#6b7280", + "node_group": metadata.get("group") or "", + "node_component": metadata.get("component_type") or "GenericNode", + "node_description": metadata.get("description") or "", + "status": "Published", + "node_schema": schema, + } + + if exists_res.get("success"): + # 更新现有记录,使用 node_type 作为 name + res = update_record("Local Ai Node", node_type, payload) + else: + # 创建新记录 + res = create_record("Local Ai Node", payload) + + if res.get("success"): + return {'success': True, 'node_type': node_type, 'message': f'节点 {node_type} 安装成功'} + else: + return {'success': False, 'error': res.get('error', '导入数据库失败')} + + except Exception as e: + logger.error(f"安装节点目录失败: {str(e)}") + return {'success': False, 'error': str(e)} @router.post("/jingrow/node/package/{node_type}") async def package_node(node_type: str): - """ - 打包节点文件夹为zip文件,返回文件路径用于后续上传 - """ - try: - from datetime import datetime - - current_file = Path(__file__).resolve() - jingrow_root = current_file.parents[1] - nodes_root = jingrow_root / "ai" / "nodes" - node_dir = nodes_root / node_type - - if not node_dir.exists(): - raise HTTPException(status_code=404, detail=f"节点目录不存在: {node_type}") - - # 创建临时打包目录 - root = current_file.parents[4] - tmp_dir = root / "tmp" - tmp_dir.mkdir(parents=True, exist_ok=True) - - # 创建临时目录用于打包,文件夹名称保持为节点类型(不加时间戳) - timestamp = datetime.now().strftime("%Y%m%d%H%M%S") - temp_package_dir = tmp_dir / node_type - if temp_package_dir.exists(): - # 如果临时目录已存在,先删除 - shutil.rmtree(temp_package_dir) - temp_package_dir.mkdir(parents=True, exist_ok=True) - - try: - # 复制节点目录内容(排除不必要的文件) - for item in node_dir.iterdir(): - if item.name in ['__pycache__', '.git', '.DS_Store', '.pytest_cache']: - continue - dst = temp_package_dir / item.name - if item.is_dir(): - shutil.copytree(item, dst, dirs_exist_ok=True) - else: - shutil.copy2(item, dst) - - # 打包为 ZIP,zip文件名格式:{node_type}-{timestamp}.zip,但内部文件夹名称保持为节点类型 - zip_filename = f"{node_type}-{timestamp}.zip" - zip_base_name = tmp_dir / f"{node_type}-{timestamp}" - # base_dir 使用 node_type,这样 zip 内部文件夹名称就是 node_type - shutil.make_archive(str(zip_base_name), 'zip', root_dir=str(tmp_dir), base_dir=node_type) - - zip_path = tmp_dir / f"{zip_filename}" - - if not zip_path.exists(): - raise HTTPException(status_code=500, detail="ZIP文件创建失败") - - # 读取文件内容 - with open(zip_path, 'rb') as f: - zip_content = f.read() - - # 清理临时文件 - if zip_path.exists(): - os.remove(zip_path) - - # 返回文件内容,前端可以直接使用 - return Response( - content=zip_content, - media_type="application/zip", - headers={ - "Content-Disposition": f"attachment; filename={zip_filename}", - "Content-Type": "application/zip" - } - ) - - finally: - # 清理临时目录 - if temp_package_dir.exists(): - shutil.rmtree(temp_package_dir, ignore_errors=True) - - except HTTPException: - raise - except Exception as e: - logger.error(f"打包节点失败: {str(e)}") - logger.error(f"Traceback: {traceback.format_exc()}") - raise HTTPException(status_code=500, detail=f"打包节点失败: {str(e)}") + """ + 打包节点文件夹为zip文件,返回文件路径用于后续上传 + """ + try: + from datetime import datetime + + jingrow_root = get_apps_path() / "jingrow" + nodes_root = jingrow_root / "ai" / "nodes" + node_dir = nodes_root / node_type + + if not node_dir.exists(): + raise HTTPException(status_code=404, detail=f"节点目录不存在: {node_type}") + + # 创建临时打包目录 + root = get_root_path() + tmp_dir = root / "tmp" + tmp_dir.mkdir(parents=True, exist_ok=True) + + # 创建临时目录用于打包,文件夹名称保持为节点类型(不加时间戳) + timestamp = datetime.now().strftime("%Y%m%d%H%M%S") + temp_package_dir = tmp_dir / node_type + if temp_package_dir.exists(): + # 如果临时目录已存在,先删除 + shutil.rmtree(temp_package_dir) + temp_package_dir.mkdir(parents=True, exist_ok=True) + + try: + # 复制节点目录内容(排除不必要的文件) + for item in node_dir.iterdir(): + if item.name in ['__pycache__', '.git', '.DS_Store', '.pytest_cache']: + continue + dst = temp_package_dir / item.name + if item.is_dir(): + shutil.copytree(item, dst, dirs_exist_ok=True) + else: + shutil.copy2(item, dst) + + # 打包为 ZIP,zip文件名格式:{node_type}-{timestamp}.zip,但内部文件夹名称保持为节点类型 + zip_filename = f"{node_type}-{timestamp}.zip" + zip_base_name = tmp_dir / f"{node_type}-{timestamp}" + # base_dir 使用 node_type,这样 zip 内部文件夹名称就是 node_type + shutil.make_archive(str(zip_base_name), 'zip', root_dir=str(tmp_dir), base_dir=node_type) + + zip_path = tmp_dir / f"{zip_filename}" + + if not zip_path.exists(): + raise HTTPException(status_code=500, detail="ZIP文件创建失败") + + # 读取文件内容 + with open(zip_path, 'rb') as f: + zip_content = f.read() + + # 清理临时文件 + if zip_path.exists(): + os.remove(zip_path) + + # 返回文件内容,前端可以直接使用 + return Response( + content=zip_content, + media_type="application/zip", + headers={ + "Content-Disposition": f"attachment; filename={zip_filename}", + "Content-Type": "application/zip" + } + ) + + finally: + # 清理临时目录 + if temp_package_dir.exists(): + shutil.rmtree(temp_package_dir, ignore_errors=True) + + except HTTPException: + raise + except Exception as e: + logger.error(f"打包节点失败: {str(e)}") + logger.error(f"Traceback: {traceback.format_exc()}") + raise HTTPException(status_code=500, detail=f"打包节点失败: {str(e)}") @router.post("/jingrow/node/publish")