修复应用无法完整安装的问题

This commit is contained in:
jingrow 2025-10-29 04:41:27 +08:00
parent 77cb098f80
commit 51cfebd621
2 changed files with 42 additions and 99 deletions

View File

@ -43,33 +43,28 @@ def _import_app_package_and_pagetypes(app_name: str, install_result: Dict[str, A
"""直接导入应用的 Package 和 PageTypes 到数据库"""
try:
# 从安装结果获取路径
backend_result = install_result.get('backend_result', {})
app_dir = backend_result.get('app_dir')
app_dir = install_result.get('install_result', {}).get('app_dir') or install_result.get('app_dir')
if not app_dir:
# 计算应用路径
current = Path(__file__).resolve()
root = current.parents[4]
apps_dir = root / "apps"
backend_dir = apps_dir / app_name / app_name
if not backend_dir.exists():
backend_dir = apps_dir / app_name
app_dir = str(backend_dir)
else:
# 构建后端代码目录路径
backend_dir = Path(app_dir) / app_name
if not backend_dir.exists():
backend_dir = Path(app_dir)
app_dir = str(backend_dir)
app_dir = apps_dir / app_name
if not os.path.exists(app_dir):
# 构建后端代码目录路径(可能是 app_name/app_name 或直接 app_name
backend_dir = Path(app_dir) / app_name
if not backend_dir.exists():
backend_dir = Path(app_dir)
if not os.path.exists(backend_dir):
return
try:
api_url = f"{Config.jingrow_server_url}/api/action/jingrow.ai.utils.jlocal.sync_app_files"
response = requests.post(
api_url,
json={'app_name': app_name, 'app_path': app_dir, 'force': True},
json={'app_name': app_name, 'app_path': str(backend_dir), 'force': True},
headers=get_jingrow_api_headers(),
timeout=60
)
@ -117,8 +112,7 @@ async def install_app_from_upload(
if result.get('success'):
app_name_result = result.get('app_name')
backend_result = result.get('backend_result', {})
app_dir = backend_result.get('app_dir')
app_dir = result.get('app_dir')
# 扩展包不添加到 Local Installed Apps返回时没有 app_dir
if not app_dir:
@ -570,25 +564,16 @@ async def install_from_git(repo_url: str = Form(...)):
if app_dir.exists():
shutil.rmtree(app_dir)
# 安装后端文件
backend_result = None
if package_info['data'].get('has_backend'):
backend_result = install_files(str(clone_dir), app_name, package_info['data'], is_backend=True)
if not backend_result.get('success'):
raise HTTPException(status_code=400, detail=backend_result.get('error'))
# 安装前端文件
frontend_result = None
if package_info['data'].get('has_frontend'):
frontend_result = install_files(str(clone_dir), app_name, package_info['data'], is_backend=False)
if not frontend_result.get('success'):
raise HTTPException(status_code=400, detail=frontend_result.get('error'))
# 安装文件(完整复制整个包结构)
install_result = install_files(str(clone_dir), app_name, package_info['data'])
if not install_result.get('success'):
raise HTTPException(status_code=400, detail=install_result.get('error'))
# 清理临时文件
shutil.rmtree(clone_dir, ignore_errors=True)
# 只有独立应用才需要注册到 Local Installed Apps
app_dir = backend_result.get('app_dir') if backend_result else None
app_dir = install_result.get('app_dir')
if app_dir:
try:
@ -609,7 +594,7 @@ async def install_from_git(repo_url: str = Form(...)):
await _update_local_installed_apps(apps_list)
# 同步文件到数据库
_import_app_package_and_pagetypes(app_name, {'app_name': app_name, 'backend_result': backend_result, 'frontend_result': frontend_result})
_import_app_package_and_pagetypes(app_name, {'install_result': install_result, 'app_dir': install_result.get('app_dir')})
# 确保创建 Package 与 Module Defcustom记录
ensure_package_and_module(app_name)
@ -621,8 +606,7 @@ async def install_from_git(repo_url: str = Form(...)):
'message': f'应用 {app_name} 安装成功',
'app_name': app_name,
'package_info': package_info['data'],
'backend_result': backend_result,
'frontend_result': frontend_result
'app_dir': install_result.get('app_dir')
}
except subprocess.TimeoutExpired:
@ -666,8 +650,7 @@ async def install_from_url(url: str = Form(...)):
if result.get('success'):
app_name_result = result.get('app_name')
backend_result = result.get('backend_result', {})
app_dir = backend_result.get('app_dir')
app_dir = result.get('app_dir')
# 扩展包不添加到 Local Installed Apps返回时没有 app_dir
if not app_dir:

View File

@ -156,64 +156,35 @@ def analyze_package(temp_dir: str) -> Dict[str, Any]:
@handle_errors
def install_files(temp_dir: str, app_name: str, package_info: Dict[str, Any], is_backend: bool) -> Dict[str, Any]:
"""安装文件到指定目录 - 直接复制整个目录结构,更简单高效"""
def install_files(temp_dir: str, app_name: str, package_info: Dict[str, Any]) -> Dict[str, Any]:
"""安装文件到指定目录 - 完整复制整个包结构(业内标准做法)"""
apps_dir, _ = get_app_directories()
app_dir = apps_dir / app_name
# 创建应用目录结构
if app_dir.exists():
shutil.rmtree(app_dir)
# 创建应用目录
app_dir.mkdir(parents=True, exist_ok=True)
# 获取根目录
# 获取根目录analyze_package 已经找到正确的根目录)
root_dir = package_info.get('root_dir', temp_dir)
# 直接复制整个目录结构
if package_info.get('has_backend', False):
backend_dir = app_dir / app_name
backend_dir.mkdir(parents=True, exist_ok=True)
# 直接复制整个包结构(不检查内层目录,因为导出时就包含了完整结构
for item in os.listdir(root_dir):
if item in ['__pycache__', '.git', '.DS_Store']:
continue
# 检查 root_dir 下是否有 app_name 子目录解压后可能有两层app_name
inner_app_dir = os.path.join(root_dir, app_name)
if os.path.exists(inner_app_dir) and os.path.isdir(inner_app_dir):
for item in os.listdir(inner_app_dir):
src = os.path.join(inner_app_dir, item)
dst = backend_dir / item
if os.path.isdir(src):
shutil.copytree(src, dst, dirs_exist_ok=True)
else:
shutil.copy2(src, dst)
src = os.path.join(root_dir, item)
dst = app_dir / item
if os.path.isdir(src):
if dst.exists():
shutil.rmtree(dst)
shutil.copytree(src, dst)
else:
for item in os.listdir(root_dir):
if item in ['__pycache__', '.git']:
continue
src = os.path.join(root_dir, item)
dst = backend_dir / item
if os.path.isdir(src):
shutil.copytree(src, dst, dirs_exist_ok=True)
else:
shutil.copy2(src, dst)
# 复制前端目录
if package_info.get('has_frontend', False):
frontend_dir = app_dir / "frontend"
frontend_dir.mkdir(parents=True, exist_ok=True)
frontend_src = os.path.join(root_dir, "frontend")
if os.path.exists(frontend_src):
for item in os.listdir(frontend_src):
src = os.path.join(frontend_src, item)
dst = frontend_dir / item
if os.path.isdir(src):
shutil.copytree(src, dst, dirs_exist_ok=True)
else:
shutil.copy2(src, dst)
shutil.copy2(src, dst)
return {
'success': True,
'app_dir': str(app_dir),
'copied_files': ['...'] # 目录复制不记录单个文件
'app_dir': str(app_dir)
}
@ -360,28 +331,18 @@ def install_app(uploaded_file_path: str, app_name: str = None) -> Dict[str, Any]
cleanup_temp_dir(temp_dir)
return {'success': False, 'error': f'删除旧应用失败: {str(e)}'}
# 安装后端文件
backend_result = None
if package_info.get('has_backend'):
backend_result = install_files(temp_dir, app_name, package_info, is_backend=True)
if not backend_result.get('success'):
cleanup_temp_dir(temp_dir)
return backend_result
# 安装前端文件
frontend_result = None
if package_info.get('has_frontend'):
frontend_result = install_files(temp_dir, app_name, package_info, is_backend=False)
if not frontend_result.get('success'):
cleanup_temp_dir(temp_dir)
return frontend_result
# 安装文件(完整复制整个包结构)
install_result = install_files(temp_dir, app_name, package_info)
if not install_result.get('success'):
cleanup_temp_dir(temp_dir)
return install_result
# 记录安装信息
record_installation(app_name, package_info, uploaded_file_path)
# 确保创建 Package 与 Module Defcustom记录
try:
create_package_and_module(app_name, package_info)
ensure_package_and_module(app_name)
except Exception:
# 忽略该步骤错误,不影响安装完成
pass
@ -393,8 +354,7 @@ def install_app(uploaded_file_path: str, app_name: str = None) -> Dict[str, Any]
'message': f'应用 {app_name} 安装成功',
'app_name': app_name,
'package_info': package_info,
'backend_result': backend_result,
'frontend_result': frontend_result
'app_dir': install_result.get('app_dir')
}
except Exception as e: