2025-10-27 06:04:36 +08:00

447 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Jingrow 应用安装器
"""
import os
import json
import shutil
import tempfile
import zipfile
import tarfile
import uuid
from pathlib import Path
from typing import Dict, List, Any
from datetime import datetime
from functools import wraps
import logging
from jingrow.config import Config
from jingrow.utils.jingrow_api import get_jingrow_api_headers
logger = logging.getLogger(__name__)
def handle_errors(func):
"""统一错误处理装饰器"""
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
return {'success': False, 'error': str(e)}
return wrapper
def get_app_directories():
"""获取应用目录路径"""
project_root = Path(__file__).resolve().parents[4]
apps_dir = project_root / "apps"
apps_dir.mkdir(parents=True, exist_ok=True)
return apps_dir, apps_dir
@handle_errors
def extract_package(zip_path: str) -> Dict[str, Any]:
"""解压安装包 - 支持 ZIP 和 TAR.GZ"""
# 获取项目根目录
project_root = Path(__file__).resolve().parents[4]
tmp_dir = project_root / "tmp"
tmp_dir.mkdir(parents=True, exist_ok=True)
# 创建唯一临时目录
temp_dir_name = f"app_install_{uuid.uuid4().hex[:8]}"
temp_dir = tmp_dir / temp_dir_name
temp_dir.mkdir(parents=True, exist_ok=True)
# 判断文件类型
if zip_path.endswith('.tar.gz') or zip_path.endswith('.tgz') or zip_path.endswith('.gz'):
with tarfile.open(zip_path, 'r:gz') as tar_ref:
tar_ref.extractall(temp_dir)
elif zip_path.endswith('.zip'):
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(temp_dir)
else:
return {'success': False, 'error': '不支持的文件格式'}
return {'success': True, 'temp_dir': str(temp_dir)}
@handle_errors
def analyze_package(temp_dir: str) -> Dict[str, Any]:
"""分析安装包结构"""
# 查找根目录
if not os.path.exists(temp_dir):
return {'success': False, 'error': f'目录不存在: {temp_dir}'}
root_items = os.listdir(temp_dir)
if not root_items:
return {'success': False, 'error': '目录为空'}
# 对于 git clone 的目录,跳过 .git进入仓库名目录
# 找到一个不是 .git 的目录作为 root_dir
root_dir = temp_dir
for item in root_items:
item_path = os.path.join(temp_dir, item)
if os.path.isdir(item_path) and item != '.git':
root_dir = item_path
break
package_info = {
'app_name': os.path.basename(root_dir),
'version': '1.0.0',
'description': '',
'has_backend': False,
'has_frontend': False,
'backend_files': [],
'frontend_files': [],
'root_dir': root_dir # 保存根目录路径
}
# 检查配置文件
if os.path.exists(os.path.join(root_dir, 'setup.py')):
package_info['has_backend'] = True
if os.path.exists(os.path.join(root_dir, 'package.json')):
package_info['has_frontend'] = True
try:
with open(os.path.join(root_dir, 'package.json'), 'r') as f:
pkg_data = json.load(f)
package_info.update({
'app_name': pkg_data.get('name', package_info['app_name']),
'version': pkg_data.get('version', package_info['version']),
'description': pkg_data.get('description', package_info['description'])
})
except:
pass
# 检查是否有 hooks.py判断是否为独立应用
hooks_path = os.path.join(root_dir, 'hooks.py')
if not os.path.exists(hooks_path):
# 在子目录中查找
for root, dirs, files in os.walk(root_dir):
if 'hooks.py' in files:
hooks_path = os.path.join(root, 'hooks.py')
break
# 从 hooks.py 提取 app_name
if os.path.exists(hooks_path):
try:
with open(hooks_path, 'r', encoding='utf-8') as f:
content = f.read()
import re
name_match = re.search(r'app_name\s*=\s*["\']([^"\']+)["\']', content)
if name_match:
package_info['app_name'] = name_match.group(1)
except:
pass
package_info['has_hooks'] = os.path.exists(hooks_path)
# 扫描文件
for root, dirs, files in os.walk(root_dir):
for file in files:
rel_path = os.path.relpath(os.path.join(root, file), root_dir)
if file.endswith(('.py', '.txt', '.cfg', '.json', '.md', '.LICENSE', '.yaml', '.yml')):
package_info['backend_files'].append(rel_path)
package_info['has_backend'] = True
elif file.endswith(('.vue', '.ts', '.js', '.css')):
package_info['frontend_files'].append(rel_path)
package_info['has_frontend'] = True
return {'success': True, 'data': package_info}
@handle_errors
def install_files(temp_dir: str, app_name: str, package_info: Dict[str, Any], is_backend: bool) -> Dict[str, Any]:
"""安装文件到指定目录 - 直接复制整个目录结构,更简单高效"""
apps_dir, _ = get_app_directories()
app_dir = apps_dir / app_name
# 创建应用目录结构
if app_dir.exists():
shutil.rmtree(app_dir)
app_dir.mkdir(parents=True, exist_ok=True)
# 获取根目录
root_dir = package_info.get('root_dir', temp_dir)
# 直接复制整个目录结构
if package_info.get('has_backend', False):
backend_dir = app_dir / app_name
backend_dir.mkdir(parents=True, exist_ok=True)
# 检查 root_dir 下是否有 app_name 子目录解压后可能有两层app_name
inner_app_dir = os.path.join(root_dir, app_name)
if os.path.exists(inner_app_dir) and os.path.isdir(inner_app_dir):
for item in os.listdir(inner_app_dir):
src = os.path.join(inner_app_dir, item)
dst = backend_dir / item
if os.path.isdir(src):
shutil.copytree(src, dst, dirs_exist_ok=True)
else:
shutil.copy2(src, dst)
else:
for item in os.listdir(root_dir):
if item in ['__pycache__', '.git']:
continue
src = os.path.join(root_dir, item)
dst = backend_dir / item
if os.path.isdir(src):
shutil.copytree(src, dst, dirs_exist_ok=True)
else:
shutil.copy2(src, dst)
# 复制前端目录
if package_info.get('has_frontend', False):
frontend_dir = app_dir / "frontend"
frontend_dir.mkdir(parents=True, exist_ok=True)
frontend_src = os.path.join(root_dir, "frontend")
if os.path.exists(frontend_src):
for item in os.listdir(frontend_src):
src = os.path.join(frontend_src, item)
dst = frontend_dir / item
if os.path.isdir(src):
shutil.copytree(src, dst, dirs_exist_ok=True)
else:
shutil.copy2(src, dst)
return {
'success': True,
'app_dir': str(app_dir),
'copied_files': ['...'] # 目录复制不记录单个文件
}
@handle_errors
def record_installation(app_name: str, package_info: Dict[str, Any], upload_path: str) -> Dict[str, Any]:
"""记录安装信息到数据库"""
api_url = f"{Config.jingrow_server_url}/api/action/jingrow.core.pagetype.local_installed_apps.local_installed_apps.insert"
from jingrow.utils.auth import get_jingrow_api_headers
headers = get_jingrow_api_headers()
import requests
payload = {
'app_name': app_name,
'version': package_info.get('version', '1.0.0'),
'description': package_info.get('description', ''),
'user_site': Config.jingrow_site or 'local',
'installation_date': datetime.now().isoformat(),
'upload_path': upload_path,
'has_backend': package_info.get('has_backend', False),
'has_frontend': package_info.get('has_frontend', False)
}
response = requests.post(api_url, json=payload, headers=headers, timeout=30)
if response.status_code == 200:
return {'success': True, 'message': '记录成功'}
else:
return {'success': False, 'error': f'记录失败: HTTP {response.status_code}'}
def cleanup_temp_dir(temp_dir: str):
"""清理临时目录"""
try:
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
except Exception:
pass
def is_app_installed(app_name: str) -> bool:
"""检查应用是否已安装"""
apps_dir, _ = get_app_directories()
app_dir = apps_dir / app_name
return app_dir.exists()
def get_installed_apps() -> List[Dict[str, Any]]:
"""获取已安装的应用列表"""
apps = []
apps_dir, _ = get_app_directories()
# 扫描apps目录
if apps_dir.exists():
for app_dir in apps_dir.iterdir():
if app_dir.is_dir():
app_info = {
'name': app_dir.name,
'type': 'unknown',
'path': str(app_dir),
'exists': True
}
# 检查是否有app_name和frontend
backend_dir = app_dir / app_dir.name
frontend_dir = app_dir / "frontend"
if backend_dir.exists() and frontend_dir.exists():
app_info['type'] = 'both'
elif backend_dir.exists():
app_info['type'] = app_dir.name
elif frontend_dir.exists():
app_info['type'] = 'frontend'
apps.append(app_info)
return apps
def install_app(uploaded_file_path: str, app_name: str = None) -> Dict[str, Any]:
"""
安装应用的主函数
Args:
uploaded_file_path: 上传文件的路径
app_name: 应用名称(可选)
Returns:
Dict: 安装结果
"""
try:
# 验证文件
if not os.path.exists(uploaded_file_path):
return {'success': False, 'error': '文件不存在'}
filename_lower = uploaded_file_path.lower()
if not (filename_lower.endswith('.zip') or
filename_lower.endswith('.tar.gz') or
filename_lower.endswith('.tgz') or
filename_lower.endswith('.gz')):
return {'success': False, 'error': '不支持的文件格式支持ZIP、TAR.GZ、TGZ、GZ格式'}
# 解压文件
extract_result = extract_package(uploaded_file_path)
if not extract_result.get('success'):
return extract_result
temp_dir = extract_result['temp_dir']
# 分析包结构
analyze_result = analyze_package(temp_dir)
if not analyze_result.get('success'):
cleanup_temp_dir(temp_dir)
return analyze_result
package_info = analyze_result['data']
# 检查是否有 hooks.py
has_hooks = package_info.get('has_hooks', False)
if not has_hooks:
# 作为扩展包安装到 jingrow 应用内部
result = install_package(temp_dir, package_info)
cleanup_temp_dir(temp_dir)
return result
# 独立应用安装流程
# 确定应用名称
if not app_name:
app_name = package_info.get('app_name')
if not app_name:
cleanup_temp_dir(temp_dir)
return {'success': False, 'error': '无法识别应用名称'}
# 如果应用已安装,先删除旧版本(允许覆盖安装)
if is_app_installed(app_name):
apps_dir, _ = get_app_directories()
app_dir = apps_dir / app_name
try:
if app_dir.exists():
shutil.rmtree(app_dir)
except Exception as e:
cleanup_temp_dir(temp_dir)
return {'success': False, 'error': f'删除旧应用失败: {str(e)}'}
# 安装后端文件
backend_result = None
if package_info.get('has_backend'):
backend_result = install_files(temp_dir, app_name, package_info, is_backend=True)
if not backend_result.get('success'):
cleanup_temp_dir(temp_dir)
return backend_result
# 安装前端文件
frontend_result = None
if package_info.get('has_frontend'):
frontend_result = install_files(temp_dir, app_name, package_info, is_backend=False)
if not frontend_result.get('success'):
cleanup_temp_dir(temp_dir)
return frontend_result
# 记录安装信息
record_installation(app_name, package_info, uploaded_file_path)
# 清理临时文件
cleanup_temp_dir(temp_dir)
return {
'success': True,
'message': f'应用 {app_name} 安装成功',
'app_name': app_name,
'package_info': package_info,
'backend_result': backend_result,
'frontend_result': frontend_result
}
except Exception as e:
return {'success': False, 'error': str(e)}
@handle_errors
def install_package(temp_dir: str, package_info: Dict[str, Any]) -> Dict[str, Any]:
"""将扩展包安装到 jingrow 应用内部并同步到数据库"""
root_dir = package_info.get('root_dir', temp_dir)
app_name = package_info.get('app_name')
# 获取 jingrow 应用目录
apps_dir, _ = get_app_directories()
jingrow_backend_dir = apps_dir / "jingrow" / "jingrow"
if not jingrow_backend_dir.exists():
return {'success': False, 'error': '找不到 jingrow 应用目录'}
# 直接使用 root_dir 作为源目录(扩展包只包含模块目录,不需要额外的目录层级)
source_dir = root_dir
# 先同步到数据库
try:
from jingrow.utils.jingrow_api import get_jingrow_api_headers
from jingrow.config import Config
import requests
api_url = f"{Config.jingrow_server_url}/api/action/jingrow.ai.utils.jlocal.sync_app_files"
response = requests.post(
api_url,
json={'app_name': app_name, 'app_path': source_dir, 'force': True},
headers=get_jingrow_api_headers(),
timeout=60
)
if response.status_code != 200:
return {'success': False, 'error': f'同步到数据库失败: HTTP {response.status_code}'}
except Exception as e:
return {'success': False, 'error': f'同步到数据库失败: {str(e)}'}
# 再复制文件到 jingrow只复制模块目录跳过配置和文档文件
for item in os.listdir(source_dir):
if item in ['__pycache__', '.git', 'frontend'] or item.endswith('.json') or item in ['LICENSE.md', 'README.md', 'README']:
continue
src = os.path.join(source_dir, item)
dst = jingrow_backend_dir / item
if os.path.isdir(src):
if dst.exists():
shutil.rmtree(dst)
shutil.copytree(src, dst)
else:
shutil.copy2(src, dst)
return {'success': True, 'message': '扩展包已安装到 jingrow 应用', 'app_name': app_name}