diff --git a/apps/jingrow/jingrow/utils/app_installer.py b/apps/jingrow/jingrow/utils/app_installer.py index 3131105..81e9483 100644 --- a/apps/jingrow/jingrow/utils/app_installer.py +++ b/apps/jingrow/jingrow/utils/app_installer.py @@ -1,5 +1,5 @@ """ -Jingrow Local 应用安装器 +Jingrow 应用安装器 """ import os @@ -366,119 +366,6 @@ def install_app(uploaded_file_path: str, app_name: str = None) -> Dict[str, Any] return {'success': False, 'error': str(e)} -def install_extension_package(package_path: str) -> Dict[str, Any]: - """ - 直接安装扩展包到数据库(不走文件系统复制) - - Args: - package_path: 扩展包文件路径(tar.gz) - - Returns: - Dict: 安装结果 - """ - try: - - # 验证文件 - if not os.path.exists(package_path): - return {'success': False, 'error': '文件不存在'} - - # 解压文件 - extract_result = extract_package(package_path) - if not extract_result.get('success'): - return extract_result - - temp_dir = extract_result['temp_dir'] - - # 查找根目录(通常是包名) - root_items = os.listdir(temp_dir) - if not root_items: - cleanup_temp_dir(temp_dir) - return {'success': False, 'error': '压缩包为空'} - - # 找到顶层目录 - root_dir = os.path.join(temp_dir, root_items[0]) if len(root_items) == 1 and os.path.isdir(os.path.join(temp_dir, root_items[0])) else temp_dir - package_name = os.path.basename(root_dir) - - # 查找 package.json 文件 - package_json_path = os.path.join(root_dir, f"{package_name}.json") - if not os.path.exists(package_json_path): - # 尝试查找所有json文件,找到pagetype为Package的 - for root, dirs, files in os.walk(root_dir): - for file in files: - if file.endswith('.json'): - try: - with open(os.path.join(root, file), 'r') as f: - data = json.load(f) - if data.get('pagetype') == 'Package': - package_json_path = os.path.join(root, file) - break - except: - pass - if os.path.exists(package_json_path): - break - - if not os.path.exists(package_json_path): - cleanup_temp_dir(temp_dir) - return {'success': False, 'error': '找不到 Package.json 文件'} - - # 导入 Package - try: - import jingrow - from jingrow.modules.import_file import import_pg, import_file_by_path - from jingrow.model.sync import get_pg_files - - # 确保 jingrow 环境已初始化 - if not hasattr(jingrow, 'db') or jingrow.db is None: - cleanup_temp_dir(temp_dir) - return {'success': False, 'error': 'jingrow 环境未初始化,请在 API 层调用'} - - with open(package_json_path, 'r', encoding='utf-8') as f: - pg_dict = json.load(f) - - # 验证 Package 数据 - if not pg_dict.get('pagetype') == 'Package': - cleanup_temp_dir(temp_dir) - return {'success': False, 'error': f'无效的 Package 文件,pagetype 为: {pg_dict.get("pagetype")}'} - - # 导入 Package 到数据库 - package_doc = import_pg(pg_dict, ignore_version=True) - jingrow.flags.package = package_doc - - # 收集所有 pagetype 文件 - files = [] - for module in os.listdir(root_dir): - module_path = os.path.join(root_dir, module) - if os.path.isdir(module_path): - files = get_pg_files(files, module_path) - - - # 导入所有文件 - imported_files = [] - for file in files: - try: - import_file_by_path(file, force=True, ignore_version=True) - imported_files.append(file) - except Exception: - pass - - # 清理临时文件 - cleanup_temp_dir(temp_dir) - return { - 'success': True, - 'message': f'扩展包 {package_name} 安装成功', - 'package_name': package_name, - 'imported_files': imported_files, - 'file_count': len(imported_files) - } - - except Exception as e: - cleanup_temp_dir(temp_dir) - return {'success': False, 'error': str(e)} - - except Exception as e: - return {'success': False, 'error': str(e)} - - @handle_errors def install_package(temp_dir: str, package_info: Dict[str, Any]) -> Dict[str, Any]: """将扩展包安装到 jingrow 应用内部并同步到数据库"""