diff --git a/apps/jingrow/jingrow/utils/node_dependencies.py b/apps/jingrow/jingrow/utils/node_dependencies.py index bd010ed..328cce1 100644 --- a/apps/jingrow/jingrow/utils/node_dependencies.py +++ b/apps/jingrow/jingrow/utils/node_dependencies.py @@ -1,8 +1,10 @@ """ 节点依赖管理工具 支持节点独立管理自己的依赖,使用 uv pip install 直接安装 +支持异步并行安装,提升多节点场景下的性能 """ import os +import asyncio import hashlib import subprocess import logging @@ -49,6 +51,103 @@ def discover_nodes_with_dependencies() -> List[str]: return sorted(nodes_with_deps) +def _parse_node_dependencies(pyproject_path: Path) -> tuple[List[str], str]: + """ + 解析节点的依赖列表和 pyproject.toml 内容 + + Args: + pyproject_path: pyproject.toml 文件路径 + + Returns: + (dependencies列表, pyproject.toml内容) + """ + dependencies = [] + content = "" + try: + with open(pyproject_path, 'r', encoding='utf-8') as f: + content = f.read() + + in_dependencies = False + bracket_count = 0 + for line in content.split('\n'): + stripped = line.strip() + if stripped.startswith('dependencies') and '=' in stripped and '[' in stripped: + in_dependencies = True + bracket_count = stripped.count('[') - stripped.count(']') + if ']' in stripped: + in_dependencies = False + continue + elif in_dependencies: + bracket_count += stripped.count('[') - stripped.count(']') + if bracket_count <= 0 and ']' in stripped: + break + dep = stripped.strip('",\' ,') + if dep and not dep.startswith('#'): + dependencies.append(dep) + except Exception as e: + logger.warning(f"解析 pyproject.toml 失败: {str(e)}") + + return dependencies, content + + +def _build_install_command(dependencies: List[str], node_dir: Path) -> List[str]: + """ + 构建安装命令 + + Args: + dependencies: 依赖列表 + node_dir: 节点目录 + + Returns: + 安装命令列表 + """ + if dependencies: + return ["uv", "pip", "install"] + dependencies + else: + return ["uv", "pip", "install", "-e", str(node_dir)] + + +def _format_install_result(node_type: str, success: bool, stdout: str, stderr: str, + after_install_result: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: + """ + 格式化安装结果 + + Args: + node_type: 节点类型 + success: 是否成功 + stdout: 标准输出 + stderr: 标准错误 + after_install_result: 后安装脚本结果 + + Returns: + 格式化的结果字典 + """ + if success: + message = f"节点 {node_type} 依赖安装成功" + if after_install_result: + if after_install_result.get("success"): + message += f" ({after_install_result.get('message', '后安装脚本执行成功')})" + else: + logger.warning(f"节点 {node_type} 后安装脚本执行失败: {after_install_result.get('error', '未知错误')}") + message += f" (后安装脚本警告: {after_install_result.get('error', '未知错误')})" + + return { + "success": True, + "message": message, + "output": stdout, + "post_install": after_install_result + } + else: + error_msg = stderr or stdout + logger.error(f"安装节点 {node_type} 依赖失败: {error_msg}") + return { + "success": False, + "error": f"依赖安装失败: {error_msg}", + "stderr": stderr, + "stdout": stdout + } + + def install_node_dependencies(node_type: str, sync: bool = True) -> Dict[str, Any]: """ 安装节点的依赖 @@ -72,44 +171,15 @@ def install_node_dependencies(node_type: str, sync: bool = True) -> Dict[str, An # 节点没有独立依赖,返回成功(使用根依赖) return {"success": True, "message": "节点使用根依赖,无需安装"} - # 获取项目根目录(apps/jingrow) project_root = jingrow_root.parent - # 读取节点的 pyproject.toml 获取依赖列表,然后直接安装依赖 - dependencies = [] - try: - # 使用简单文本解析提取依赖(兼容 Python 3.10) - with open(pyproject_path, 'r', encoding='utf-8') as f: - content = f.read() - - # 提取 dependencies 列表 - in_dependencies = False - bracket_count = 0 - for line in content.split('\n'): - stripped = line.strip() - if stripped.startswith('dependencies') and '=' in stripped and '[' in stripped: - in_dependencies = True - bracket_count = stripped.count('[') - stripped.count(']') - if ']' in stripped: - in_dependencies = False - continue - elif in_dependencies: - bracket_count += stripped.count('[') - stripped.count(']') - if bracket_count <= 0 and ']' in stripped: - break - dep = stripped.strip('",\' ,') - if dep and not dep.startswith('#'): - dependencies.append(dep) - except Exception as e: - logger.warning(f"解析节点 {node_type} 的 pyproject.toml 失败: {str(e)}") + # 解析依赖 + dependencies, content = _parse_node_dependencies(pyproject_path) - if dependencies: - # 直接安装依赖包 - cmd = ["uv", "pip", "install"] + dependencies - else: - # 如果没有依赖或解析失败,尝试安装节点包本身(可编辑模式) - cmd = ["uv", "pip", "install", "-e", str(node_dir)] + # 构建安装命令 + cmd = _build_install_command(dependencies, node_dir) + # 执行安装 result = subprocess.run( cmd, cwd=project_root, @@ -118,33 +188,19 @@ def install_node_dependencies(node_type: str, sync: bool = True) -> Dict[str, An timeout=300 ) + # 执行后安装脚本 + after_install_result = None if result.returncode == 0: - # 检查是否有后安装脚本需要执行 after_install_result = _run_node_after_install(node_type, node_dir, project_root, content) - - message = f"节点 {node_type} 依赖安装成功" - if after_install_result: - if after_install_result.get("success"): - message += f" ({after_install_result.get('message', '后安装脚本执行成功')})" - else: - logger.warning(f"节点 {node_type} 后安装脚本执行失败: {after_install_result.get('error', '未知错误')}") - message += f" (后安装脚本警告: {after_install_result.get('error', '未知错误')})" - - return { - "success": True, - "message": message, - "output": result.stdout, - "post_install": after_install_result - } - else: - error_msg = result.stderr or result.stdout - logger.error(f"安装节点 {node_type} 依赖失败: {error_msg}") - return { - "success": False, - "error": f"依赖安装失败: {error_msg}", - "stderr": result.stderr, - "stdout": result.stdout - } + + # 格式化结果 + return _format_install_result( + node_type, + result.returncode == 0, + result.stdout, + result.stderr, + after_install_result + ) except subprocess.TimeoutExpired: return {"success": False, "error": "依赖安装超时"} @@ -428,3 +484,123 @@ def ensure_node_dependencies(node_type: str) -> Dict[str, Any]: # 如果检查失败或不确定,尝试安装 return install_node_dependencies(node_type) + +async def install_node_dependencies_async(node_type: str) -> Dict[str, Any]: + """ + 异步安装节点的依赖(内部使用,保持与同步版本相同的逻辑) + + Args: + node_type: 节点类型 + + Returns: + 安装结果字典 + """ + try: + jingrow_root = get_jingrow_root() + node_dir = jingrow_root / "ai" / "nodes" / node_type + + if not node_dir.exists(): + return {"success": False, "error": f"节点目录不存在: {node_type}"} + + pyproject_path = node_dir / "pyproject.toml" + if not pyproject_path.exists(): + return {"success": True, "message": "节点使用根依赖,无需安装"} + + project_root = jingrow_root.parent + + # 解析依赖 + dependencies, content = _parse_node_dependencies(pyproject_path) + + # 构建安装命令 + cmd = _build_install_command(dependencies, node_dir) + + # 使用异步 subprocess + process = await asyncio.create_subprocess_exec( + *cmd, + cwd=project_root, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + + stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=300) + stdout_text = stdout.decode('utf-8') if stdout else '' + stderr_text = stderr.decode('utf-8') if stderr else '' + + # 执行后安装脚本(同步执行,因为可能涉及系统级操作) + after_install_result = None + if process.returncode == 0: + after_install_result = _run_node_after_install(node_type, node_dir, project_root, content) + + # 格式化结果 + return _format_install_result( + node_type, + process.returncode == 0, + stdout_text, + stderr_text, + after_install_result + ) + + except asyncio.TimeoutError: + return {"success": False, "error": "依赖安装超时"} + except Exception as e: + logger.error(f"安装节点 {node_type} 依赖时发生异常: {str(e)}") + return {"success": False, "error": str(e)} + + +def install_all_nodes_dependencies(max_concurrent: int = 5) -> List[Dict[str, Any]]: + """ + 并行安装所有节点的依赖(同步接口,内部使用异步) + + Args: + max_concurrent: 最大并发数,默认5,避免资源过载 + + Returns: + 所有节点的安装结果列表 + """ + nodes = discover_nodes_with_dependencies() + + if not nodes: + logger.info("未发现需要安装依赖的节点") + return [] + + logger.info(f"发现 {len(nodes)} 个节点需要安装依赖,使用 {max_concurrent} 个并发") + + async def install_all(): + semaphore = asyncio.Semaphore(max_concurrent) + + async def install_with_limit(node_type: str): + async with semaphore: + return await install_node_dependencies_async(node_type) + + tasks = [install_with_limit(node) for node in nodes] + return await asyncio.gather(*tasks, return_exceptions=True) + + # 运行异步函数 + try: + results = asyncio.run(install_all()) + + # 处理异常结果 + processed_results = [] + for i, result in enumerate(results): + if isinstance(result, Exception): + node_type = nodes[i] + logger.error(f"安装节点 {node_type} 依赖时发生异常: {str(result)}") + processed_results.append({ + "success": False, + "error": f"异常: {str(result)}", + "node_type": node_type + }) + else: + processed_results.append(result) + + # 统计结果 + success_count = sum(1 for r in processed_results if r.get("success")) + logger.info(f"节点依赖安装完成: {success_count}/{len(nodes)} 成功") + + return processed_results + except Exception as e: + logger.error(f"并行安装节点依赖时发生异常: {str(e)}") + # 如果异步安装失败,回退到同步安装 + logger.warning("回退到同步安装模式") + return [install_node_dependencies(node) for node in nodes] + diff --git a/dev.sh b/dev.sh index ca87943..51c5847 100755 --- a/dev.sh +++ b/dev.sh @@ -141,9 +141,9 @@ check_deps() { error "uv 同步失败" exit 1 } - # 安装所有节点的依赖(直接使用 uv pip install,不修改配置文件) + # 安装所有节点的依赖(使用异步并行安装,提升性能) info "安装节点依赖..." - (cd apps/jingrow && uv run python -c "from jingrow.utils.node_dependencies import discover_nodes_with_dependencies, install_node_dependencies; nodes = discover_nodes_with_dependencies(); [install_node_dependencies(node) for node in nodes]" 2>&1) || { + (cd apps/jingrow && uv run python -c "from jingrow.utils.node_dependencies import install_all_nodes_dependencies; install_all_nodes_dependencies(max_concurrent=5)" 2>&1) || { warn "安装节点依赖失败,但继续执行" } elif [ -f "apps/jingrow/requirements.txt" ]; then