jingrow-framework/apps/jingrow/jingrow/api/local_app_installer.py

774 lines
29 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Jingrow Local 应用安装 API
提供本地应用安装、管理等功能
"""
from fastapi import APIRouter, HTTPException, UploadFile, File, Form, Request
from typing import Dict, Any, List, Optional
import logging
import tempfile
import os
import uuid
from pathlib import Path
import requests
import json
import shutil
import re
from jingrow.utils.app_installer import install_app, get_installed_apps as get_apps, get_app_directories
from jingrow.utils.jingrow_api import get_jingrow_api_headers
from jingrow.utils.auth import get_jingrow_cloud_url, get_jingrow_cloud_api_headers, get_jingrow_cloud_api_url, get_jingrow_api_headers
from jingrow.config import Config
logger = logging.getLogger(__name__)
router = APIRouter()
# 常量定义
JINGROW_APP_NAME = 'jingrow'
DEFAULT_VERSION = '1.0.0'
DEFAULT_BRANCH = 'main'
SYSTEM_APP_TYPE = 'system'
INSTALLED_APP_TYPE = 'installed'
# 全局缓存,避免重复检查
_jingrow_registered_cache = None
def _import_app_package_and_pagetypes(app_name: str, install_result: Dict[str, Any]) -> None:
"""直接导入应用的 Package 和 PageTypes 到数据库"""
try:
# 从安装结果获取路径
backend_result = install_result.get('backend_result', {})
app_dir = backend_result.get('app_dir')
if not app_dir:
# 计算应用路径
current = Path(__file__).resolve()
root = current.parents[4]
apps_dir = root / "apps"
backend_dir = apps_dir / app_name / app_name
if not backend_dir.exists():
backend_dir = apps_dir / app_name
app_dir = str(backend_dir)
else:
# 构建后端代码目录路径
backend_dir = Path(app_dir) / app_name
if not backend_dir.exists():
backend_dir = Path(app_dir)
app_dir = str(backend_dir)
if not os.path.exists(app_dir):
return
try:
api_url = f"{Config.jingrow_server_url}/api/action/jingrow.ai.utils.jlocal.sync_app_files"
response = requests.post(
api_url,
json={'app_name': app_name, 'app_path': app_dir, 'force': True},
headers=get_jingrow_api_headers(),
timeout=60
)
except Exception:
pass
except Exception:
pass
@router.post("/jingrow/install/upload")
async def install_app_from_upload(
file: UploadFile = File(...),
app_name: Optional[str] = Form(None)
):
try:
filename_lower = file.filename.lower()
# 支持zip、tar.gz、tgz、gz格式
if not (filename_lower.endswith('.zip') or
filename_lower.endswith('.tar.gz') or
filename_lower.endswith('.tgz') or
filename_lower.endswith('.gz')):
raise HTTPException(status_code=400, detail="只支持ZIP、TAR.GZ、TGZ、GZ格式的安装包")
# 获取项目根目录
current = Path(__file__).resolve()
root = current.parents[4] # apps/jingrow/jingrow/api/ -> apps/ -> root
tmp_dir = root / "tmp"
tmp_dir.mkdir(parents=True, exist_ok=True)
# 创建唯一的文件名
temp_filename = f"upload_{uuid.uuid4().hex[:8]}{Path(filename_lower).suffix}"
temp_file_path = tmp_dir / temp_filename
# 保存上传的文件到项目tmp目录
content = await file.read()
with open(temp_file_path, 'wb') as f:
f.write(content)
# 统一使用 install_app 函数处理所有格式
try:
result = install_app(str(temp_file_path), app_name)
if result.get('success'):
app_name_result = result.get('app_name')
backend_result = result.get('backend_result', {})
app_dir = backend_result.get('app_dir')
# 判断是否为扩展包安装(没有 app_dir 说明是扩展包)
if not app_dir:
return result
# 对齐扫描安装的执行链 - 只针对独立应用安装
try:
# 1. 添加到 Local Installed Apps PageType
from jingrow.utils.jingrow_api import get_single_pagetype
pagetype_result = get_single_pagetype("Local Installed Apps")
if pagetype_result.get('success'):
config = pagetype_result.get('config', {})
apps_list = config.get('local_installed_apps', [])
else:
apps_list = []
# 检查是否已存在,如果存在则更新,否则添加
app_exists = False
for app in apps_list:
if app.get('app_name', '') == app_name_result:
app['app_version'] = '1.0.0'
app['git_branch'] = 'main'
app_exists = True
break
if not app_exists:
apps_list.append({
'app_name': app_name_result,
'app_version': '1.0.0',
'git_branch': 'main'
})
# 更新数据库
await _update_local_installed_apps(apps_list)
# 2. 调用 sync_app_files 同步文件到数据库
if app_dir:
current = Path(__file__).resolve()
root = current.parents[4]
apps_dir = root / "apps"
backend_dir = apps_dir / app_name_result / app_name_result
if not backend_dir.exists():
backend_dir = apps_dir / app_name_result
if backend_dir.exists():
try:
api_url = f"{Config.jingrow_server_url}/api/action/jingrow.ai.utils.jlocal.sync_app_files"
requests.post(
api_url,
json={'app_name': app_name_result, 'app_path': str(backend_dir), 'force': True},
headers=get_jingrow_api_headers(),
timeout=60
)
except Exception:
pass
except Exception:
pass
return result
else:
raise HTTPException(status_code=400, detail=result.get('error', '安装失败'))
finally:
# 清理上传的文件
try:
if temp_file_path.exists():
os.unlink(temp_file_path)
except Exception:
pass
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/jingrow/local-apps")
async def get_local_apps(request: Request):
"""扫描本地未安装的App"""
try:
current = Path(__file__).resolve()
root = current.parents[4]
apps_dir = root / "apps"
# 获取已安装的App列表 - 通过get_single API获取
from jingrow.utils.jingrow_api import get_single_pagetype
result = get_single_pagetype("Local Installed Apps")
installed_names = set()
if result.get('success'):
config = result.get('config', {})
local_installed_apps = config.get('local_installed_apps', [])
installed_names = {app.get('app_name', '') for app in local_installed_apps}
# 系统默认App列表需要排除
system_apps = {
'jingrow', 'local_app_installer', 'local_jobs',
'local_apps', 'local_app', 'local_app_installer'
}
# 扫描apps目录
local_apps = []
if apps_dir.exists():
for app_dir in apps_dir.iterdir():
if (app_dir.is_dir() and
app_dir.name not in installed_names and
app_dir.name not in system_apps):
# 检查是否有hooks.py文件
hooks_file = app_dir / app_dir.name / 'hooks.py'
if hooks_file.exists():
try:
# 读取hooks.py获取App信息
with open(hooks_file, 'r', encoding='utf-8') as f:
content = f.read()
app_info = {
'name': app_dir.name,
'path': str(app_dir),
'type': 'both' if (app_dir / "frontend").exists() else app_dir.name,
'title': app_dir.name.replace('_', ' ').title(),
'description': '',
'publisher': '',
'version': '1.0.0'
}
# 提取App信息
import re
title_match = re.search(r'app_title\s*=\s*["\']([^"\']+)["\']', content)
if title_match:
app_info['title'] = title_match.group(1)
desc_match = re.search(r'app_description\s*=\s*["\']([^"\']+)["\']', content)
if desc_match:
app_info['description'] = desc_match.group(1)
pub_match = re.search(r'app_publisher\s*=\s*["\']([^"\']+)["\']', content)
if pub_match:
app_info['publisher'] = pub_match.group(1)
local_apps.append(app_info)
except Exception as e:
# 如果读取失败,仍然添加基本信息
local_apps.append({
'name': app_dir.name,
'path': str(app_dir),
'type': app_dir.name,
'title': app_dir.name.replace('_', ' ').title(),
'description': '',
'publisher': '',
'version': '1.0.0'
})
return {
'success': True,
'data': {
'apps': local_apps,
'total': len(local_apps)
}
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/jingrow/install-local/{app_name}")
async def install_local_app(request: Request, app_name: str):
"""安装本地App"""
try:
# 获取应用目录路径
current = Path(__file__).resolve()
root = current.parents[4]
apps_dir = root / "apps"
app_dir = apps_dir / app_name
backend_dir = app_dir / app_name # 后端代码目录
if not app_dir.exists():
raise HTTPException(status_code=404, detail=f"App '{app_name}' not found")
if not backend_dir.exists():
backend_dir = app_dir
# 检查是否已经安装 - 通过get_single API检查
from jingrow.utils.jingrow_api import get_single_pagetype
result = get_single_pagetype("Local Installed Apps")
if result.get('success'):
config = result.get('config', {})
local_installed_apps = config.get('local_installed_apps', [])
for app in local_installed_apps:
if app.get('app_name', '') == app_name:
raise HTTPException(status_code=400, detail=f"App '{app_name}' is already installed")
# 将App信息添加到Local Installed Apps PageType
try:
from jingrow.utils.jingrow_api import get_single_pagetype
# 获取当前应用列表
result = get_single_pagetype("Local Installed Apps")
if result.get('success'):
config = result.get('config', {})
apps_list = config.get('local_installed_apps', [])
else:
apps_list = []
for app in apps_list:
if app.get('app_name', '') == app_name:
break
else:
new_app = {
'app_name': app_name,
'app_version': "1.0.0",
'git_branch': "main"
}
apps_list.append(new_app)
if await _update_local_installed_apps(apps_list):
try:
api_url = f"{Config.jingrow_server_url}/api/action/jingrow.ai.utils.jlocal.sync_app_files"
requests.post(
api_url,
json={'app_name': app_name, 'app_path': str(backend_dir), 'force': True},
headers=get_jingrow_api_headers(),
timeout=60
)
except Exception:
pass
except Exception:
pass
return {
'success': True,
'message': f"App '{app_name}' installed successfully",
'app_name': app_name
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
async def _update_local_installed_apps(apps_list):
"""更新Local Installed Apps数据库记录"""
import requests
from jingrow.utils.auth import get_jingrow_api_headers
from jingrow.config import Config
api_url = f"{Config.jingrow_server_url}/api/data/Local Installed Apps/Local Installed Apps"
payload = {"local_installed_apps": apps_list}
response = requests.put(api_url, json=payload, headers=get_jingrow_api_headers(), timeout=10)
return response.status_code == 200
async def ensure_jingrow_registered():
"""确保jingrow应用已注册到数据库中带缓存"""
global _jingrow_registered_cache
# 如果已经检查过且成功,直接返回
if _jingrow_registered_cache is True:
return
try:
from jingrow.utils.jingrow_api import get_single_pagetype
result = get_single_pagetype("Local Installed Apps")
if not result.get('success'):
return
apps = result.get('config', {}).get('local_installed_apps', [])
if not any(app.get('app_name') == JINGROW_APP_NAME for app in apps):
apps.append({'app_name': JINGROW_APP_NAME, 'app_version': DEFAULT_VERSION, 'git_branch': DEFAULT_BRANCH})
await _update_local_installed_apps(apps)
_jingrow_registered_cache = True
else:
_jingrow_registered_cache = True
except Exception:
_jingrow_registered_cache = False
@router.get("/jingrow/installed-apps")
async def get_installed_apps(request: Request):
"""获取已安装的应用列表 - 通过get_single API获取"""
try:
# 确保jingrow应用已注册
await ensure_jingrow_registered()
# 通过get_single API获取Local Installed Apps数据
from jingrow.utils.jingrow_api import get_single_pagetype
result = get_single_pagetype("Local Installed Apps")
if not result.get('success'):
return {
'success': True,
'data': {
'apps': [],
'total': 0
}
}
config = result.get('config', {})
local_installed_apps = config.get('local_installed_apps', [])
apps = [
{
'name': app.get('app_name', ''),
'version': app.get('app_version', '1.0.0'),
'git_branch': app.get('git_branch', 'main'),
'type': SYSTEM_APP_TYPE if app.get('app_name') == JINGROW_APP_NAME else INSTALLED_APP_TYPE,
'uninstallable': app.get('app_name') != JINGROW_APP_NAME
}
for app in local_installed_apps
]
return {
'success': True,
'data': {
'apps': apps,
'total': len(apps)
}
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/jingrow/uninstall/{app_name}")
async def uninstall_app(request: Request, app_name: str):
"""卸载应用 - 直接删除整个app目录"""
try:
if app_name == JINGROW_APP_NAME:
raise HTTPException(status_code=403, detail=f"系统应用 '{JINGROW_APP_NAME}' 不允许卸载")
apps_dir, _ = get_app_directories()
app_dir = apps_dir / app_name
# 检查应用是否存在
if not app_dir.exists():
raise HTTPException(status_code=404, detail=f"应用 {app_name} 不存在")
try:
api_url = f"{Config.jingrow_server_url}/api/action/jingrow.core.pagetype.package.package.uninstall_package"
response = requests.post(
api_url,
json={'package_name': app_name},
headers=get_jingrow_api_headers(),
timeout=60
)
except Exception:
pass
shutil.rmtree(app_dir)
await _remove_from_database(app_name)
return {
'success': True,
'message': f'应用 {app_name} 卸载成功'
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/jingrow/app-info/{app_name}")
async def get_app_info(request: Request, app_name: str):
try:
backend_dir, frontend_dir = get_app_directories()
backend_app_dir = backend_dir / app_name / app_name
frontend_app_dir = frontend_dir / app_name / "frontend"
app_info = {
'name': app_name,
'has_backend': backend_app_dir.exists(),
'has_frontend': frontend_app_dir.exists(),
'backend_path': str(backend_app_dir) if backend_app_dir.exists() else None,
'frontend_path': str(frontend_app_dir) if frontend_app_dir.exists() else None
}
# 尝试读取应用信息
if backend_app_dir.exists():
setup_py = backend_app_dir / 'setup.py'
if setup_py.exists():
try:
with open(setup_py, 'r', encoding='utf-8') as f:
content = f.read()
# 简单提取版本信息
version_match = re.search(r'version\s*=\s*["\']([^"\']+)["\']', content)
if version_match:
app_info['version'] = version_match.group(1)
except:
pass
if frontend_app_dir.exists():
package_json = frontend_app_dir / 'package.json'
if package_json.exists():
try:
with open(package_json, 'r', encoding='utf-8') as f:
pkg_data = json.load(f)
app_info['version'] = pkg_data.get('version', app_info.get('version'))
app_info['description'] = pkg_data.get('description')
except:
pass
return {
'success': True,
'data': app_info
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
async def _remove_from_database(app_name: str) -> Dict[str, Any]:
"""从数据库中删除应用记录"""
try:
# 使用API方式操作数据库
from jingrow.utils.jingrow_api import get_single_pagetype
# 获取当前数据
result = get_single_pagetype("Local Installed Apps")
if not result.get('success'):
return {'success': True, 'message': '未找到Local Installed Apps记录'}
config = result.get('config', {})
local_installed_apps = config.get('local_installed_apps', [])
original_count = len(local_installed_apps)
local_installed_apps = [app for app in local_installed_apps if app.get('app_name', '') != app_name]
if len(local_installed_apps) < original_count:
if await _update_local_installed_apps(local_installed_apps):
return {'success': True, 'message': '数据库记录删除成功'}
else:
return {'success': False, 'error': '更新数据库失败'}
else:
return {'success': True, 'message': '未找到要删除的应用记录'}
except Exception as e:
return {'success': False, 'error': str(e)}
@router.get("/jingrow/app-marketplace")
async def get_app_marketplace(
search: Optional[str] = None,
page: int = 1,
page_size: int = 20,
sort_by: Optional[str] = None
):
"""获取应用市场数据,支持搜索、分页和排序"""
try:
url = f"{get_jingrow_cloud_url()}/api/action/jcloud.api.local_app.get_local_apps"
# 构建过滤条件
filters = {"public": 1}
if search:
filters["title"] = ["like", f"%{search}%"]
# 1. 先获取总数(不分页)
total_params = {
'filters': json.dumps(filters, ensure_ascii=False),
'limit_start': 0,
'limit_page_length': 0 # 不限制数量,获取所有数据来计算总数
}
headers = get_jingrow_cloud_api_headers()
total_response = requests.get(url, params=total_params, headers=headers, timeout=20)
total_count = 0
if total_response.status_code == 200:
total_data = total_response.json()
total_count = len(total_data.get('message', []))
# 2. 获取分页数据
params = {
'filters': json.dumps(filters, ensure_ascii=False)
}
# 排序参数
if sort_by:
params['order_by'] = sort_by
# 分页参数
limit_start = (page - 1) * page_size
params['limit_start'] = limit_start
params['limit_page_length'] = page_size
response = requests.get(url, params=params, headers=headers, timeout=20)
if response.status_code == 200:
data = response.json()
apps = data.get('message', [])
# 返回分页格式的数据
return {
"items": apps,
"total": total_count,
"page": page,
"page_size": page_size
}
else:
raise HTTPException(status_code=response.status_code, detail="获取应用市场数据失败")
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取应用市场数据失败: {str(e)}")
@router.get("/jingrow/app-marketplace/{app_name}")
async def get_app_detail(app_name: str):
"""获取应用详情"""
try:
url = f"{get_jingrow_cloud_url()}/api/action/jcloud.api.local_app.get_local_app"
params = {"name": app_name}
headers = get_jingrow_cloud_api_headers()
response = requests.get(url, params=params, headers=headers, timeout=20)
if response.status_code == 200:
data = response.json()
return data.get('message')
else:
raise HTTPException(status_code=404, detail="应用不存在")
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取应用详情失败: {str(e)}")
@router.get("/jingrow/app-marketplace-meta")
async def get_app_meta():
"""获取Local App元数据"""
try:
url = f"{get_jingrow_cloud_url()}/api/action/jcloud.api.local_app.get_local_app_meta"
headers = get_jingrow_cloud_api_headers()
response = requests.get(url, headers=headers, timeout=20)
if response.status_code == 200:
data = response.json()
return data.get('message')
else:
raise HTTPException(status_code=500, detail="获取元数据失败")
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取元数据失败: {str(e)}")
@router.post("/jingrow/upload-image")
async def upload_image(file: UploadFile = File(...)):
"""上传应用图片"""
try:
# 验证文件类型
if not file.content_type or not file.content_type.startswith('image/'):
raise HTTPException(status_code=400, detail="只支持图片格式")
# 验证文件大小 (5MB限制)
if file.size and file.size > 5 * 1024 * 1024:
raise HTTPException(status_code=400, detail="图片大小不能超过5MB")
# 读取文件内容
file_content = await file.read()
url = f"{get_jingrow_cloud_url()}/api/action/upload_file"
headers = get_jingrow_cloud_api_headers()
headers.pop('Content-Type', None)
files = {'file': (file.filename, file_content, file.content_type)}
data = {'file_name': file.filename, 'is_private': 0, 'folder': 'Home/Attachments'}
response = requests.post(url, files=files, data=data, headers=headers, timeout=30)
if response.status_code != 200:
error_detail = response.json().get('detail', f"HTTP {response.status_code}") if response.headers.get('content-type', '').startswith('application/json') else f"HTTP {response.status_code}"
raise HTTPException(status_code=response.status_code, detail=error_detail)
result = response.json()
# 检查错误
if isinstance(result, dict) and result.get('error'):
raise HTTPException(status_code=400, detail=result['error'])
# 处理成功响应
message = result.get('message', {})
if isinstance(message, dict) and message.get('file_url'):
file_url = message.get('file_url')
# 确保URL格式正确
if not file_url.startswith('/files/'):
file_url = f"/files/{file_url.lstrip('/')}"
return {
"success": True,
"url": file_url,
"file_name": message.get('file_name'),
"message": "图片上传成功"
}
raise HTTPException(status_code=400, detail="API响应格式错误")
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/jingrow/create-app")
async def create_app(
title: str = Form(...),
app_name: str = Form(...),
subtitle: str = Form(None),
category: str = Form(None),
description: str = Form(None),
repository_url: str = Form(None),
file_url: str = Form(None),
app_image: str = Form(None)
):
try:
url = f"{get_jingrow_cloud_url()}/api/action/jcloud.api.local_app.create_local_app"
headers = get_jingrow_cloud_api_headers()
headers['Content-Type'] = 'application/json'
response = requests.post(url, json={
"app_data": {
"app_name": app_name,
"title": title,
"subtitle": subtitle,
"description": description,
"category": category,
"repository_url": repository_url,
"file_url": file_url,
"app_image": app_image
}
}, headers=headers, timeout=20)
if response.status_code != 200:
error_detail = response.json().get('detail', f"HTTP {response.status_code}") if response.headers.get('content-type', '').startswith('application/json') else f"HTTP {response.status_code}"
raise HTTPException(status_code=response.status_code, detail=error_detail)
result = response.json()
# 检查错误
if isinstance(result, dict) and result.get('error'):
raise HTTPException(status_code=400, detail=result['error'])
message = result.get('message', {})
if isinstance(message, dict) and message.get('error'):
raise HTTPException(status_code=400, detail=message['error'])
# 成功响应
app_name = message.get('name', 'unknown') if isinstance(message, dict) else result.get('message', 'unknown')
return {"success": True, "message": f"应用发布成功,应用名称: {app_name}"}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"发布失败: {str(e)}")