add async parallel node dependency installation with after_install hook

This commit is contained in:
jingrow 2025-11-14 02:24:54 +08:00
parent 0933f37ab3
commit deb2b3295c
2 changed files with 237 additions and 61 deletions

View File

@ -1,8 +1,10 @@
"""
节点依赖管理工具
支持节点独立管理自己的依赖使用 uv pip install 直接安装
支持异步并行安装提升多节点场景下的性能
"""
import os
import asyncio
import hashlib
import subprocess
import logging
@ -49,6 +51,103 @@ def discover_nodes_with_dependencies() -> List[str]:
return sorted(nodes_with_deps)
def _parse_node_dependencies(pyproject_path: Path) -> tuple[List[str], str]:
"""
解析节点的依赖列表和 pyproject.toml 内容
Args:
pyproject_path: pyproject.toml 文件路径
Returns:
(dependencies列表, pyproject.toml内容)
"""
dependencies = []
content = ""
try:
with open(pyproject_path, 'r', encoding='utf-8') as f:
content = f.read()
in_dependencies = False
bracket_count = 0
for line in content.split('\n'):
stripped = line.strip()
if stripped.startswith('dependencies') and '=' in stripped and '[' in stripped:
in_dependencies = True
bracket_count = stripped.count('[') - stripped.count(']')
if ']' in stripped:
in_dependencies = False
continue
elif in_dependencies:
bracket_count += stripped.count('[') - stripped.count(']')
if bracket_count <= 0 and ']' in stripped:
break
dep = stripped.strip('",\' ,')
if dep and not dep.startswith('#'):
dependencies.append(dep)
except Exception as e:
logger.warning(f"解析 pyproject.toml 失败: {str(e)}")
return dependencies, content
def _build_install_command(dependencies: List[str], node_dir: Path) -> List[str]:
"""
构建安装命令
Args:
dependencies: 依赖列表
node_dir: 节点目录
Returns:
安装命令列表
"""
if dependencies:
return ["uv", "pip", "install"] + dependencies
else:
return ["uv", "pip", "install", "-e", str(node_dir)]
def _format_install_result(node_type: str, success: bool, stdout: str, stderr: str,
after_install_result: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
格式化安装结果
Args:
node_type: 节点类型
success: 是否成功
stdout: 标准输出
stderr: 标准错误
after_install_result: 后安装脚本结果
Returns:
格式化的结果字典
"""
if success:
message = f"节点 {node_type} 依赖安装成功"
if after_install_result:
if after_install_result.get("success"):
message += f" ({after_install_result.get('message', '后安装脚本执行成功')})"
else:
logger.warning(f"节点 {node_type} 后安装脚本执行失败: {after_install_result.get('error', '未知错误')}")
message += f" (后安装脚本警告: {after_install_result.get('error', '未知错误')})"
return {
"success": True,
"message": message,
"output": stdout,
"post_install": after_install_result
}
else:
error_msg = stderr or stdout
logger.error(f"安装节点 {node_type} 依赖失败: {error_msg}")
return {
"success": False,
"error": f"依赖安装失败: {error_msg}",
"stderr": stderr,
"stdout": stdout
}
def install_node_dependencies(node_type: str, sync: bool = True) -> Dict[str, Any]:
"""
安装节点的依赖
@ -72,44 +171,15 @@ def install_node_dependencies(node_type: str, sync: bool = True) -> Dict[str, An
# 节点没有独立依赖,返回成功(使用根依赖)
return {"success": True, "message": "节点使用根依赖,无需安装"}
# 获取项目根目录apps/jingrow
project_root = jingrow_root.parent
# 读取节点的 pyproject.toml 获取依赖列表,然后直接安装依赖
dependencies = []
try:
# 使用简单文本解析提取依赖(兼容 Python 3.10
with open(pyproject_path, 'r', encoding='utf-8') as f:
content = f.read()
# 提取 dependencies 列表
in_dependencies = False
bracket_count = 0
for line in content.split('\n'):
stripped = line.strip()
if stripped.startswith('dependencies') and '=' in stripped and '[' in stripped:
in_dependencies = True
bracket_count = stripped.count('[') - stripped.count(']')
if ']' in stripped:
in_dependencies = False
continue
elif in_dependencies:
bracket_count += stripped.count('[') - stripped.count(']')
if bracket_count <= 0 and ']' in stripped:
break
dep = stripped.strip('",\' ,')
if dep and not dep.startswith('#'):
dependencies.append(dep)
except Exception as e:
logger.warning(f"解析节点 {node_type} 的 pyproject.toml 失败: {str(e)}")
# 解析依赖
dependencies, content = _parse_node_dependencies(pyproject_path)
if dependencies:
# 直接安装依赖包
cmd = ["uv", "pip", "install"] + dependencies
else:
# 如果没有依赖或解析失败,尝试安装节点包本身(可编辑模式)
cmd = ["uv", "pip", "install", "-e", str(node_dir)]
# 构建安装命令
cmd = _build_install_command(dependencies, node_dir)
# 执行安装
result = subprocess.run(
cmd,
cwd=project_root,
@ -118,33 +188,19 @@ def install_node_dependencies(node_type: str, sync: bool = True) -> Dict[str, An
timeout=300
)
# 执行后安装脚本
after_install_result = None
if result.returncode == 0:
# 检查是否有后安装脚本需要执行
after_install_result = _run_node_after_install(node_type, node_dir, project_root, content)
message = f"节点 {node_type} 依赖安装成功"
if after_install_result:
if after_install_result.get("success"):
message += f" ({after_install_result.get('message', '后安装脚本执行成功')})"
else:
logger.warning(f"节点 {node_type} 后安装脚本执行失败: {after_install_result.get('error', '未知错误')}")
message += f" (后安装脚本警告: {after_install_result.get('error', '未知错误')})"
return {
"success": True,
"message": message,
"output": result.stdout,
"post_install": after_install_result
}
else:
error_msg = result.stderr or result.stdout
logger.error(f"安装节点 {node_type} 依赖失败: {error_msg}")
return {
"success": False,
"error": f"依赖安装失败: {error_msg}",
"stderr": result.stderr,
"stdout": result.stdout
}
# 格式化结果
return _format_install_result(
node_type,
result.returncode == 0,
result.stdout,
result.stderr,
after_install_result
)
except subprocess.TimeoutExpired:
return {"success": False, "error": "依赖安装超时"}
@ -428,3 +484,123 @@ def ensure_node_dependencies(node_type: str) -> Dict[str, Any]:
# 如果检查失败或不确定,尝试安装
return install_node_dependencies(node_type)
async def install_node_dependencies_async(node_type: str) -> Dict[str, Any]:
"""
异步安装节点的依赖内部使用保持与同步版本相同的逻辑
Args:
node_type: 节点类型
Returns:
安装结果字典
"""
try:
jingrow_root = get_jingrow_root()
node_dir = jingrow_root / "ai" / "nodes" / node_type
if not node_dir.exists():
return {"success": False, "error": f"节点目录不存在: {node_type}"}
pyproject_path = node_dir / "pyproject.toml"
if not pyproject_path.exists():
return {"success": True, "message": "节点使用根依赖,无需安装"}
project_root = jingrow_root.parent
# 解析依赖
dependencies, content = _parse_node_dependencies(pyproject_path)
# 构建安装命令
cmd = _build_install_command(dependencies, node_dir)
# 使用异步 subprocess
process = await asyncio.create_subprocess_exec(
*cmd,
cwd=project_root,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=300)
stdout_text = stdout.decode('utf-8') if stdout else ''
stderr_text = stderr.decode('utf-8') if stderr else ''
# 执行后安装脚本(同步执行,因为可能涉及系统级操作)
after_install_result = None
if process.returncode == 0:
after_install_result = _run_node_after_install(node_type, node_dir, project_root, content)
# 格式化结果
return _format_install_result(
node_type,
process.returncode == 0,
stdout_text,
stderr_text,
after_install_result
)
except asyncio.TimeoutError:
return {"success": False, "error": "依赖安装超时"}
except Exception as e:
logger.error(f"安装节点 {node_type} 依赖时发生异常: {str(e)}")
return {"success": False, "error": str(e)}
def install_all_nodes_dependencies(max_concurrent: int = 5) -> List[Dict[str, Any]]:
"""
并行安装所有节点的依赖同步接口内部使用异步
Args:
max_concurrent: 最大并发数默认5避免资源过载
Returns:
所有节点的安装结果列表
"""
nodes = discover_nodes_with_dependencies()
if not nodes:
logger.info("未发现需要安装依赖的节点")
return []
logger.info(f"发现 {len(nodes)} 个节点需要安装依赖,使用 {max_concurrent} 个并发")
async def install_all():
semaphore = asyncio.Semaphore(max_concurrent)
async def install_with_limit(node_type: str):
async with semaphore:
return await install_node_dependencies_async(node_type)
tasks = [install_with_limit(node) for node in nodes]
return await asyncio.gather(*tasks, return_exceptions=True)
# 运行异步函数
try:
results = asyncio.run(install_all())
# 处理异常结果
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
node_type = nodes[i]
logger.error(f"安装节点 {node_type} 依赖时发生异常: {str(result)}")
processed_results.append({
"success": False,
"error": f"异常: {str(result)}",
"node_type": node_type
})
else:
processed_results.append(result)
# 统计结果
success_count = sum(1 for r in processed_results if r.get("success"))
logger.info(f"节点依赖安装完成: {success_count}/{len(nodes)} 成功")
return processed_results
except Exception as e:
logger.error(f"并行安装节点依赖时发生异常: {str(e)}")
# 如果异步安装失败,回退到同步安装
logger.warning("回退到同步安装模式")
return [install_node_dependencies(node) for node in nodes]

4
dev.sh
View File

@ -141,9 +141,9 @@ check_deps() {
error "uv 同步失败"
exit 1
}
# 安装所有节点的依赖(直接使用 uv pip install不修改配置文件
# 安装所有节点的依赖(使用异步并行安装,提升性能
info "安装节点依赖..."
(cd apps/jingrow && uv run python -c "from jingrow.utils.node_dependencies import discover_nodes_with_dependencies, install_node_dependencies; nodes = discover_nodes_with_dependencies(); [install_node_dependencies(node) for node in nodes]" 2>&1) || {
(cd apps/jingrow && uv run python -c "from jingrow.utils.node_dependencies import install_all_nodes_dependencies; install_all_nodes_dependencies(max_concurrent=5)" 2>&1) || {
warn "安装节点依赖失败,但继续执行"
}
elif [ -f "apps/jingrow/requirements.txt" ]; then