jingrow-framework/apps/jingrow/jingrow/api/local_app_installer.py

1390 lines
53 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Jingrow Local 应用安装 API
提供本地应用安装、管理等功能
"""
from fastapi import APIRouter, HTTPException, UploadFile, File, Form, Request
from typing import Dict, Any, List, Optional
import logging
import tempfile
import os
import uuid
import re
import json
import shutil
import subprocess
import unicodedata
import traceback
from pathlib import Path
from datetime import datetime
import requests
from jingrow.utils.app_installer import install_app, get_installed_apps as get_apps, get_app_directories, ensure_package_and_module
from jingrow.utils.jingrow_api import get_jingrow_api_headers, get_logged_user
from jingrow.utils.auth import get_jingrow_cloud_url, get_jingrow_cloud_api_headers, get_jingrow_cloud_api_url, get_jingrow_api_headers
from jingrow.utils.export_app_package import export_app_package_from_local
from jingrow.config import Config
from jingrow.utils.app_manager import update_apps_txt
logger = logging.getLogger(__name__)
router = APIRouter()
# 常量定义
JINGROW_APP_NAME = 'jingrow'
DEFAULT_VERSION = '1.0.0'
DEFAULT_BRANCH = 'main'
SYSTEM_APP_TYPE = 'system'
INSTALLED_APP_TYPE = 'installed'
# 全局缓存,避免重复检查
_jingrow_registered_cache = None
def _import_app_package_and_pagetypes(app_name: str, install_result: Dict[str, Any]) -> None:
"""直接导入应用的 Package 和 PageTypes 到数据库"""
try:
# 从安装结果获取路径
app_dir = install_result.get('install_result', {}).get('app_dir') or install_result.get('app_dir')
if not app_dir:
# 计算应用路径
current = Path(__file__).resolve()
root = current.parents[4]
apps_dir = root / "apps"
app_dir = apps_dir / app_name
# 构建后端代码目录路径(可能是 app_name/app_name 或直接 app_name
backend_dir = Path(app_dir) / app_name
if not backend_dir.exists():
backend_dir = Path(app_dir)
if not os.path.exists(backend_dir):
return
try:
api_url = f"{Config.jingrow_server_url}/api/action/jingrow.ai.utils.jlocal.sync_app_files"
response = requests.post(
api_url,
json={'app_name': app_name, 'app_path': str(backend_dir), 'force': True},
headers=get_jingrow_api_headers(),
timeout=60
)
except Exception:
pass
except Exception:
pass
@router.post("/jingrow/install/upload")
async def install_app_from_upload(
file: UploadFile = File(...),
app_name: Optional[str] = Form(None)
):
try:
filename_lower = file.filename.lower()
# 支持zip、tar.gz、tgz、gz格式
if not (filename_lower.endswith('.zip') or
filename_lower.endswith('.tar.gz') or
filename_lower.endswith('.tgz') or
filename_lower.endswith('.gz')):
raise HTTPException(status_code=400, detail="只支持ZIP、TAR.GZ、TGZ、GZ格式的安装包")
# 获取项目根目录
current = Path(__file__).resolve()
root = current.parents[4] # apps/jingrow/jingrow/api/ -> apps/ -> root
tmp_dir = root / "tmp"
tmp_dir.mkdir(parents=True, exist_ok=True)
# 创建唯一的文件名
temp_filename = f"upload_{uuid.uuid4().hex[:8]}{Path(filename_lower).suffix}"
temp_file_path = tmp_dir / temp_filename
# 保存上传的文件到项目tmp目录
content = await file.read()
with open(temp_file_path, 'wb') as f:
f.write(content)
# 统一使用 install_app 函数处理所有格式
try:
result = install_app(str(temp_file_path), app_name)
if result.get('success'):
app_name_result = result.get('app_name')
app_dir = result.get('app_dir')
# 扩展包不添加到 Local Installed Apps返回时没有 app_dir
if not app_dir:
return result
# 对齐扫描安装的执行链
try:
# 1. 添加到 Local Installed Apps PageType
from jingrow.utils.jingrow_api import get_single_pagetype
pagetype_result = get_single_pagetype("Local Installed Apps")
if pagetype_result.get('success'):
config = pagetype_result.get('config', {})
apps_list = config.get('local_installed_apps', [])
else:
apps_list = []
# 检查是否已存在,如果存在则更新,否则添加
app_exists = False
for app in apps_list:
if app.get('app_name', '') == app_name_result:
app['app_version'] = '1.0.0'
app['git_branch'] = 'main'
app_exists = True
break
if not app_exists:
apps_list.append({
'app_name': app_name_result,
'app_version': '1.0.0',
'git_branch': 'main'
})
# 更新数据库
await _update_local_installed_apps(apps_list)
# 额外:确保创建 Package 与 Module Defcustom记录上传安装
ensure_package_and_module(app_name_result)
# 更新 apps.txt确保路由生效
update_apps_txt(app_name_result, add=True)
# 2. 调用 sync_app_files 同步文件到数据库
if app_dir:
current = Path(__file__).resolve()
root = current.parents[4]
apps_dir = root / "apps"
backend_dir = apps_dir / app_name_result / app_name_result
if not backend_dir.exists():
backend_dir = apps_dir / app_name_result
if backend_dir.exists():
try:
api_url = f"{Config.jingrow_server_url}/api/action/jingrow.ai.utils.jlocal.sync_app_files"
requests.post(
api_url,
json={'app_name': app_name_result, 'app_path': str(backend_dir), 'force': True},
headers=get_jingrow_api_headers(),
timeout=60
)
except Exception:
pass
except Exception:
pass
return result
else:
raise HTTPException(status_code=400, detail=result.get('error', '安装失败'))
finally:
# 清理上传的文件
try:
if temp_file_path.exists():
os.unlink(temp_file_path)
except Exception:
pass
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/jingrow/check-app/{app_name}")
async def check_app_exists(app_name: str):
"""检查应用或扩展包是否已安装"""
try:
from jingrow.utils.jingrow_api import get_single_pagetype, get_record_list
# 1. 检查应用是否已安装
result = get_single_pagetype("Local Installed Apps")
if result.get('success'):
config = result.get('config', {})
local_installed_apps = config.get('local_installed_apps', [])
for app in local_installed_apps:
if app.get('app_name', '') == app_name:
return {'exists': True, 'installed': True, 'type': 'app'}
# 2. 检查扩展包是否已安装通过查询Package PageType
package_result = get_record_list("Package", filters=[["name", "=", app_name]], limit=1)
if package_result.get('success') and package_result.get('data'):
# 如果找到了记录
if len(package_result.get('data', [])) > 0:
return {'exists': True, 'installed': False, 'type': 'package'}
return {'exists': False, 'installed': False}
except Exception as e:
return {'exists': False, 'installed': False, 'error': str(e)}
@router.get("/jingrow/installed-app-names")
async def get_installed_app_names():
"""获取所有已安装的应用名称列表"""
try:
from jingrow.utils.jingrow_api import get_single_pagetype, get_record_list
installed = set()
# 从 Local Installed Apps 获取应用列表
result = get_single_pagetype("Local Installed Apps")
if result.get('success'):
config = result.get('config', {})
local_installed_apps = config.get('local_installed_apps', [])
for app in local_installed_apps:
app_name = app.get('app_name', '')
if app_name:
installed.add(app_name.lower())
# 从 Package PageType 获取扩展包列表
package_result = get_record_list("Package", fields=["name"], limit=1000)
if package_result.get('success') and package_result.get('data'):
for pkg in package_result.get('data', []):
pkg_name = pkg.get('name', '')
if pkg_name:
installed.add(pkg_name.lower())
return {'success': True, 'apps': list(installed)}
except Exception as e:
return {'success': False, 'error': str(e), 'apps': []}
@router.get("/jingrow/local-apps")
async def get_local_installed_apps(request: Request):
"""扫描本地未安装的App"""
try:
current = Path(__file__).resolve()
root = current.parents[4]
apps_dir = root / "apps"
# 获取已安装的App列表 - 通过get_single API获取
from jingrow.utils.jingrow_api import get_single_pagetype
result = get_single_pagetype("Local Installed Apps")
installed_names = set()
if result.get('success'):
config = result.get('config', {})
local_installed_apps = config.get('local_installed_apps', [])
installed_names = {app.get('app_name', '') for app in local_installed_apps}
# 系统默认App列表需要排除
system_apps = {
'jingrow', 'local_app_installer', 'local_jobs',
'local_apps', 'local_app', 'local_app_installer'
}
# 扫描apps目录
local_apps = []
if apps_dir.exists():
for app_dir in apps_dir.iterdir():
if (app_dir.is_dir() and
app_dir.name not in installed_names and
app_dir.name not in system_apps):
# 检查是否有hooks.py文件
hooks_file = app_dir / app_dir.name / 'hooks.py'
if hooks_file.exists():
try:
# 读取hooks.py获取App信息
with open(hooks_file, 'r', encoding='utf-8') as f:
content = f.read()
app_info = {
'name': app_dir.name,
'path': str(app_dir),
'type': 'both' if (app_dir / "frontend").exists() else app_dir.name,
'title': app_dir.name.replace('_', ' ').title(),
'description': '',
'publisher': '',
'version': '1.0.0'
}
# 提取App信息
import re
title_match = re.search(r'app_title\s*=\s*["\']([^"\']+)["\']', content)
if title_match:
app_info['title'] = title_match.group(1)
desc_match = re.search(r'app_description\s*=\s*["\']([^"\']+)["\']', content)
if desc_match:
app_info['description'] = desc_match.group(1)
pub_match = re.search(r'app_publisher\s*=\s*["\']([^"\']+)["\']', content)
if pub_match:
app_info['publisher'] = pub_match.group(1)
local_apps.append(app_info)
except Exception as e:
# 如果读取失败,仍然添加基本信息
local_apps.append({
'name': app_dir.name,
'path': str(app_dir),
'type': app_dir.name,
'title': app_dir.name.replace('_', ' ').title(),
'description': '',
'publisher': '',
'version': '1.0.0'
})
return {
'success': True,
'data': {
'apps': local_apps,
'total': len(local_apps)
}
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/jingrow/install-local/{app_name}")
async def install_local_app(request: Request, app_name: str):
"""安装本地App"""
try:
# 获取应用目录路径
current = Path(__file__).resolve()
root = current.parents[4]
apps_dir = root / "apps"
app_dir = apps_dir / app_name
backend_dir = app_dir / app_name # 后端代码目录
if not app_dir.exists():
raise HTTPException(status_code=404, detail=f"App '{app_name}' not found")
if not backend_dir.exists():
backend_dir = app_dir
# 检查是否已经安装 - 通过get_single API检查
from jingrow.utils.jingrow_api import get_single_pagetype
result = get_single_pagetype("Local Installed Apps")
if result.get('success'):
config = result.get('config', {})
local_installed_apps = config.get('local_installed_apps', [])
for app in local_installed_apps:
if app.get('app_name', '') == app_name:
raise HTTPException(status_code=400, detail=f"App '{app_name}' is already installed")
# 将App信息添加到Local Installed Apps PageType
try:
from jingrow.utils.jingrow_api import get_single_pagetype
# 获取当前应用列表
result = get_single_pagetype("Local Installed Apps")
if result.get('success'):
config = result.get('config', {})
apps_list = config.get('local_installed_apps', [])
else:
apps_list = []
for app in apps_list:
if app.get('app_name', '') == app_name:
break
else:
new_app = {
'app_name': app_name,
'app_version': "1.0.0",
'git_branch': "main"
}
apps_list.append(new_app)
if await _update_local_installed_apps(apps_list):
# 确保创建 Package 与 Module Defcustom记录本地扫描安装
ensure_package_and_module(app_name)
# 更新 apps.txt确保路由生效
update_apps_txt(app_name, add=True)
try:
api_url = f"{Config.jingrow_server_url}/api/action/jingrow.ai.utils.jlocal.sync_app_files"
requests.post(
api_url,
json={'app_name': app_name, 'app_path': str(backend_dir), 'force': True},
headers=get_jingrow_api_headers(),
timeout=60
)
except Exception:
pass
except Exception:
pass
return {
'success': True,
'message': f"App '{app_name}' installed successfully",
'app_name': app_name
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
async def _update_local_installed_apps(apps_list):
"""更新Local Installed Apps数据库记录"""
import requests
from jingrow.utils.auth import get_jingrow_api_headers
from jingrow.config import Config
api_url = f"{Config.jingrow_server_url}/api/data/Local Installed Apps/Local Installed Apps"
payload = {"local_installed_apps": apps_list}
response = requests.put(api_url, json=payload, headers=get_jingrow_api_headers(), timeout=10)
return response.status_code == 200
async def ensure_jingrow_registered():
"""确保jingrow应用已注册到数据库中带缓存"""
global _jingrow_registered_cache
# 如果已经检查过且成功,直接返回
if _jingrow_registered_cache is True:
return
try:
from jingrow.utils.jingrow_api import get_single_pagetype
result = get_single_pagetype("Local Installed Apps")
if not result.get('success'):
return
apps = result.get('config', {}).get('local_installed_apps', [])
if not any(app.get('app_name') == JINGROW_APP_NAME for app in apps):
apps.append({'app_name': JINGROW_APP_NAME, 'app_version': DEFAULT_VERSION, 'git_branch': DEFAULT_BRANCH})
await _update_local_installed_apps(apps)
_jingrow_registered_cache = True
else:
_jingrow_registered_cache = True
except Exception:
_jingrow_registered_cache = False
@router.get("/jingrow/installed-apps")
async def get_installed_apps(request: Request):
"""获取已安装的应用列表 - 通过get_single API获取"""
try:
# 确保jingrow应用已注册
await ensure_jingrow_registered()
# 通过get_single API获取Local Installed Apps数据
from jingrow.utils.jingrow_api import get_single_pagetype
result = get_single_pagetype("Local Installed Apps")
if not result.get('success'):
return {
'success': True,
'data': {
'apps': [],
'total': 0
}
}
config = result.get('config', {})
local_installed_apps = config.get('local_installed_apps', [])
apps = [
{
'name': app.get('app_name', ''),
'version': app.get('app_version', '1.0.0'),
'git_branch': app.get('git_branch', 'main'),
'type': SYSTEM_APP_TYPE if app.get('app_name') == JINGROW_APP_NAME else INSTALLED_APP_TYPE,
'uninstallable': app.get('app_name') != JINGROW_APP_NAME
}
for app in local_installed_apps
]
return {
'success': True,
'data': {
'apps': apps,
'total': len(apps)
}
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/jingrow/install-from-git")
async def install_from_git(repo_url: str = Form(...)):
"""从 git 仓库克隆并安装应用或扩展包"""
import subprocess
current = Path(__file__).resolve()
root = current.parents[4]
tmp_dir = root / "tmp"
tmp_dir.mkdir(parents=True, exist_ok=True)
# 创建临时目录用于克隆
clone_dir = tmp_dir / f"git_clone_{uuid.uuid4().hex[:8]}"
try:
# 使用 git clone 克隆仓库(使用仓库默认分支)
result = subprocess.run(
['git', 'clone', repo_url, str(clone_dir)],
capture_output=True,
text=True,
timeout=300
)
if result.returncode != 0:
logger.error(f"Git clone failed: {result.stderr}")
raise HTTPException(status_code=400, detail=f"Git 克隆失败: {result.stderr}")
# 直接使用克隆的目录进行分析和安装
from jingrow.utils.app_installer import analyze_package, install_files, is_app_installed, install_package
# 分析包结构
package_info = analyze_package(str(clone_dir))
if not package_info.get('success'):
raise HTTPException(status_code=400, detail=package_info.get('error', '无法识别应用'))
has_hooks = package_info['data'].get('has_hooks', False)
app_name = package_info['data'].get('app_name')
if not app_name:
raise HTTPException(status_code=400, detail='无法识别应用名称')
# 判断是扩展包还是独立应用
if not has_hooks:
# 作为扩展包安装到 jingrow 应用内部
# 注意package_info['data']['root_dir'] 已经是 git clone 后进入仓库目录的路径
# 但我们传给 install_package 的 temp_dir 应该是仓库内容的实际路径
result = install_package(package_info['data']['root_dir'], package_info['data'])
shutil.rmtree(clone_dir, ignore_errors=True)
return result
else:
# 独立应用安装
# 如果应用已安装,先删除旧版本
if is_app_installed(app_name):
apps_dir, _ = get_app_directories()
app_dir = apps_dir / app_name
if app_dir.exists():
shutil.rmtree(app_dir)
# 安装文件(完整复制整个包结构)
install_result = install_files(str(clone_dir), app_name, package_info['data'])
if not install_result.get('success'):
shutil.rmtree(clone_dir, ignore_errors=True)
raise HTTPException(status_code=400, detail=install_result.get('error'))
# 清理临时文件
shutil.rmtree(clone_dir, ignore_errors=True)
# 只有独立应用才需要注册到 Local Installed Apps
app_dir = install_result.get('app_dir')
if app_dir:
try:
from jingrow.utils.jingrow_api import get_single_pagetype
pagetype_result = get_single_pagetype("Local Installed Apps")
apps_list = pagetype_result.get('config', {}).get('local_installed_apps', []) if pagetype_result.get('success') else []
app_exists = False
for app in apps_list:
if app.get('app_name', '') == app_name:
app.update({'app_version': '1.0.0', 'git_repo': repo_url})
app_exists = True
break
if not app_exists:
apps_list.append({'app_name': app_name, 'app_version': '1.0.0', 'git_repo': repo_url})
await _update_local_installed_apps(apps_list)
# 同步文件到数据库
_import_app_package_and_pagetypes(app_name, {'install_result': install_result, 'app_dir': install_result.get('app_dir')})
# 确保创建 Package 与 Module Defcustom记录
ensure_package_and_module(app_name)
# 更新 apps.txt确保路由生效
update_apps_txt(app_name, add=True)
except Exception as e:
logger.error(f"Failed to register app: {e}")
return {
'success': True,
'message': f'应用 {app_name} 安装成功',
'app_name': app_name,
'package_info': package_info['data'],
'app_dir': install_result.get('app_dir')
}
except subprocess.TimeoutExpired:
if clone_dir.exists():
shutil.rmtree(clone_dir, ignore_errors=True)
raise HTTPException(status_code=400, detail="Git 克隆超时")
except Exception as e:
if clone_dir.exists():
shutil.rmtree(clone_dir, ignore_errors=True)
raise HTTPException(status_code=500, detail=f"安装失败: {str(e)}")
@router.post("/jingrow/install-from-url")
async def install_from_url(url: str = Form(...)):
"""从URL安装应用或扩展包"""
try:
# 下载文件
import tempfile
import requests
current = Path(__file__).resolve()
root = current.parents[4]
tmp_dir = root / "tmp"
tmp_dir.mkdir(parents=True, exist_ok=True)
# 创建临时文件
temp_filename = f"download_{uuid.uuid4().hex[:8]}{Path(url).suffix}"
temp_file_path = tmp_dir / temp_filename
# 下载文件
response = requests.get(url, stream=True, timeout=300)
response.raise_for_status()
with open(temp_file_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
# 使用现有的 install_app 函数安装
try:
result = install_app(str(temp_file_path), None)
if result.get('success'):
app_name_result = result.get('app_name')
app_dir = result.get('app_dir')
# 扩展包不添加到 Local Installed Apps返回时没有 app_dir
if not app_dir:
return result
# 对齐扫描安装的执行链
try:
# 1. 添加到 Local Installed Apps PageType
from jingrow.utils.jingrow_api import get_single_pagetype
pagetype_result = get_single_pagetype("Local Installed Apps")
if pagetype_result.get('success'):
config = pagetype_result.get('config', {})
apps_list = config.get('local_installed_apps', [])
else:
apps_list = []
# 检查是否已存在,如果存在则更新,否则添加
app_exists = False
for app in apps_list:
if app.get('app_name', '') == app_name_result:
app['app_version'] = '1.0.0'
app['git_branch'] = 'main'
app_exists = True
break
if not app_exists:
apps_list.append({
'app_name': app_name_result,
'app_version': '1.0.0',
'git_branch': 'main'
})
# 更新数据库
await _update_local_installed_apps(apps_list)
# 额外:确保创建 Package 与 Module Defcustom记录URL安装
ensure_package_and_module(app_name_result)
# 更新 apps.txt确保路由生效
update_apps_txt(app_name_result, add=True)
# 2. 调用 sync_app_files 同步文件到数据库
if app_dir:
current = Path(__file__).resolve()
root = current.parents[4]
apps_dir = root / "apps"
backend_dir = apps_dir / app_name_result / app_name_result
if not backend_dir.exists():
backend_dir = apps_dir / app_name_result
if backend_dir.exists():
try:
api_url = f"{Config.jingrow_server_url}/api/action/jingrow.ai.utils.jlocal.sync_app_files"
requests.post(
api_url,
json={'app_name': app_name_result, 'app_path': str(backend_dir), 'force': True},
headers=get_jingrow_api_headers(),
timeout=60
)
except Exception:
pass
except Exception:
pass
return result
else:
raise HTTPException(status_code=400, detail=result.get('error', '安装失败'))
finally:
# 清理下载的文件
try:
if temp_file_path.exists():
os.unlink(temp_file_path)
except:
pass
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/jingrow/uninstall-extension/{package_name}")
async def uninstall_extension(request: Request, package_name: str):
"""卸载扩展包 - 先获取模块列表,卸载数据库,再删除本地目录"""
try:
# 获取 jingrow 应用目录
apps_dir, _ = get_app_directories()
jingrow_backend_dir = apps_dir / "jingrow" / "jingrow"
if not jingrow_backend_dir.exists():
return {'success': False, 'error': '找不到 jingrow 应用目录'}
# 1. 先从数据库获取模块列表
modules = []
try:
deps_url = f"{Config.jingrow_server_url}/api/action/jingrow.core.pagetype.package.package.get_package_dependencies"
response = requests.post(
deps_url,
json={'package_name': package_name},
headers=get_jingrow_api_headers(),
timeout=60
)
if response.status_code == 200:
result = response.json()
if result.get('message', {}).get('status') == 'success':
modules = result['message'].get('modules', [])
except Exception as e:
pass
# 2. 调用云端API卸载数据库记录
try:
uninstall_url = f"{Config.jingrow_server_url}/api/action/jingrow.core.pagetype.package.package.uninstall_package"
requests.post(
uninstall_url,
json={'package_name': package_name},
headers=get_jingrow_api_headers(),
timeout=60
)
except Exception:
pass
# 3. 删除本地模块目录
removed_count = 0
for module_name in modules:
module_dir_name = module_name.lower().replace(" ", "_").replace("-", "_")
module_dir = jingrow_backend_dir / module_dir_name
if module_dir.exists():
shutil.rmtree(module_dir)
removed_count += 1
return {
'success': True,
'message': f'扩展包卸载成功,删除了 {removed_count} 个本地模块目录',
'removed_count': removed_count
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/jingrow/uninstall/{app_name}")
async def uninstall_app(request: Request, app_name: str):
"""卸载应用 - 直接删除整个app目录"""
try:
if app_name == JINGROW_APP_NAME:
raise HTTPException(status_code=403, detail=f"系统应用 '{JINGROW_APP_NAME}' 不允许卸载")
apps_dir, _ = get_app_directories()
app_dir = apps_dir / app_name
# 检查应用是否存在
if not app_dir.exists():
raise HTTPException(status_code=404, detail=f"应用 {app_name} 不存在")
try:
api_url = f"{Config.jingrow_server_url}/api/action/jingrow.core.pagetype.package.package.uninstall_package"
response = requests.post(
api_url,
json={'package_name': app_name},
headers=get_jingrow_api_headers(),
timeout=60
)
except Exception:
pass
shutil.rmtree(app_dir)
await _remove_from_database(app_name)
# 更新 apps.txt确保路由生效
update_apps_txt(app_name, add=False)
return {
'success': True,
'message': f'应用 {app_name} 卸载成功'
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/jingrow/app-info/{app_name}")
async def get_app_info(request: Request, app_name: str):
try:
backend_dir, frontend_dir = get_app_directories()
backend_app_dir = backend_dir / app_name / app_name
frontend_app_dir = frontend_dir / app_name / "frontend"
app_info = {
'name': app_name,
'has_backend': backend_app_dir.exists(),
'has_frontend': frontend_app_dir.exists(),
'backend_path': str(backend_app_dir) if backend_app_dir.exists() else None,
'frontend_path': str(frontend_app_dir) if frontend_app_dir.exists() else None
}
# 尝试读取应用信息
if backend_app_dir.exists():
setup_py = backend_app_dir / 'setup.py'
if setup_py.exists():
try:
with open(setup_py, 'r', encoding='utf-8') as f:
content = f.read()
# 简单提取版本信息
version_match = re.search(r'version\s*=\s*["\']([^"\']+)["\']', content)
if version_match:
app_info['version'] = version_match.group(1)
except:
pass
if frontend_app_dir.exists():
package_json = frontend_app_dir / 'package.json'
if package_json.exists():
try:
with open(package_json, 'r', encoding='utf-8') as f:
pkg_data = json.load(f)
app_info['version'] = pkg_data.get('version', app_info.get('version'))
app_info['description'] = pkg_data.get('description')
except:
pass
return {
'success': True,
'data': app_info
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
async def _remove_from_database(app_name: str) -> Dict[str, Any]:
"""从数据库中删除应用记录"""
try:
# 使用API方式操作数据库
from jingrow.utils.jingrow_api import get_single_pagetype
# 获取当前数据
result = get_single_pagetype("Local Installed Apps")
if not result.get('success'):
return {'success': True, 'message': '未找到Local Installed Apps记录'}
config = result.get('config', {})
local_installed_apps = config.get('local_installed_apps', [])
original_count = len(local_installed_apps)
local_installed_apps = [app for app in local_installed_apps if app.get('app_name', '') != app_name]
if len(local_installed_apps) < original_count:
if await _update_local_installed_apps(local_installed_apps):
return {'success': True, 'message': '数据库记录删除成功'}
else:
return {'success': False, 'error': '更新数据库失败'}
else:
return {'success': True, 'message': '未找到要删除的应用记录'}
except Exception as e:
return {'success': False, 'error': str(e)}
@router.get("/jingrow/app-marketplace")
async def get_app_marketplace(
search: Optional[str] = None,
page: int = 1,
page_size: int = 20,
sort_by: Optional[str] = None
):
"""获取应用市场数据,支持搜索、分页和排序"""
try:
url = f"{get_jingrow_cloud_url()}/api/action/jcloud.api.jlocal.get_local_app_list"
# 构建过滤条件
filters = {"public": 1}
if search:
filters["title"] = ["like", f"%{search}%"]
# 1. 先获取总数(不分页)
total_params = {
'filters': json.dumps(filters, ensure_ascii=False),
'limit_start': 0,
'limit_page_length': 0 # 不限制数量,获取所有数据来计算总数
}
headers = get_jingrow_cloud_api_headers()
total_response = requests.get(url, params=total_params, headers=headers, timeout=20)
total_count = 0
if total_response.status_code == 200:
total_data = total_response.json()
total_count = len(total_data.get('message', []))
# 2. 获取分页数据
params = {
'filters': json.dumps(filters, ensure_ascii=False)
}
# 排序参数
if sort_by:
params['order_by'] = sort_by
# 分页参数
limit_start = (page - 1) * page_size
params['limit_start'] = limit_start
params['limit_page_length'] = page_size
response = requests.get(url, params=params, headers=headers, timeout=20)
if response.status_code == 200:
data = response.json()
apps = data.get('message', [])
# 返回分页格式的数据
return {
"items": apps,
"total": total_count,
"page": page,
"page_size": page_size
}
else:
raise HTTPException(status_code=response.status_code, detail="获取应用市场数据失败")
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取应用市场数据失败: {str(e)}")
@router.get("/jingrow/app-marketplace/{app_name}")
async def get_app_detail(app_name: str):
"""获取应用详情"""
try:
url = f"{get_jingrow_cloud_url()}/api/action/jcloud.api.jlocal.get_local_app"
params = {"name": app_name}
headers = get_jingrow_cloud_api_headers()
response = requests.get(url, params=params, headers=headers, timeout=20)
if response.status_code == 200:
data = response.json()
return data.get('message')
else:
raise HTTPException(status_code=404, detail="应用不存在")
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取应用详情失败: {str(e)}")
@router.get("/jingrow/my-published-apps")
async def get_my_published_apps(
request: Request,
search: Optional[str] = None,
page: int = 1,
page_size: int = 20,
sort_by: Optional[str] = None
):
"""获取当前用户已发布的应用列表,支持搜索、分页和排序"""
session_cookie = request.cookies.get('sid')
if not session_cookie:
raise HTTPException(status_code=401, detail="未提供认证信息")
url = f"{get_jingrow_cloud_url()}/api/action/jcloud.api.jlocal.get_my_local_app_list"
# 构建参数
params = {
'order_by': sort_by or "app_name asc",
'limit_start': (page - 1) * page_size,
'limit_page_length': page_size
}
if search:
params['filters'] = json.dumps({"title": ["like", f"%{search}%"]}, ensure_ascii=False)
# 获取总数
total_params = params.copy()
total_params['limit_start'] = 0
total_params['limit_page_length'] = 0
headers = get_jingrow_cloud_api_headers()
headers['Cookie'] = f'sid={session_cookie}'
total_response = requests.get(url, params=total_params, headers=headers, timeout=20)
total_response.raise_for_status()
total_count = len(total_response.json().get('message', []))
# 获取分页数据
response = requests.get(url, params=params, headers=headers, timeout=20)
response.raise_for_status()
apps = response.json().get('message', [])
return {
"items": apps,
"total": total_count,
"page": page,
"page_size": page_size
}
@router.post("/jingrow/delete-published-app")
async def delete_published_app(request: Request, payload: Dict[str, Any]):
"""删除已发布的应用根据记录的name字段删除"""
session_cookie = request.cookies.get('sid')
if not session_cookie:
raise HTTPException(status_code=401, detail="未提供认证信息")
# 使用记录的name字段不是app_name字段
record_name = payload.get('name')
if not record_name:
raise HTTPException(status_code=400, detail="记录名称不能为空")
url = f"{get_jingrow_cloud_url()}/api/action/jcloud.api.jlocal.delete_local_app"
headers = get_jingrow_cloud_api_headers()
headers['Cookie'] = f'sid={session_cookie}'
# 传递记录的name字段到云端API
response = requests.post(url, json={'name': record_name}, headers=headers, timeout=20)
response.raise_for_status()
data = response.json()
result = data.get('message', data)
if result.get('success'):
return {"success": True, "message": result.get('message', '应用删除成功')}
else:
raise HTTPException(status_code=400, detail=result.get('message', '删除失败'))
@router.get("/jingrow/my-published-nodes")
async def get_my_published_nodes(
request: Request,
search: Optional[str] = None,
page: int = 1,
page_size: int = 20,
sort_by: Optional[str] = None
):
"""获取当前用户已发布的节点列表,支持搜索、分页和排序"""
session_cookie = request.cookies.get('sid')
if not session_cookie:
raise HTTPException(status_code=401, detail="未提供认证信息")
url = f"{get_jingrow_cloud_url()}/api/action/jcloud.api.jlocal.get_my_local_node_list"
# 构建参数
params = {
'order_by': sort_by or "name asc",
'limit_start': (page - 1) * page_size,
'limit_page_length': page_size
}
if search:
params['filters'] = json.dumps({"title": ["like", f"%{search}%"]}, ensure_ascii=False)
# 获取总数
total_params = params.copy()
total_params['limit_start'] = 0
total_params['limit_page_length'] = 0
headers = get_jingrow_cloud_api_headers()
headers['Cookie'] = f'sid={session_cookie}'
total_response = requests.get(url, params=total_params, headers=headers, timeout=20)
total_response.raise_for_status()
total_count = len(total_response.json().get('message', []))
# 获取分页数据
response = requests.get(url, params=params, headers=headers, timeout=20)
response.raise_for_status()
nodes = response.json().get('message', [])
return {
"items": nodes,
"total": total_count,
"page": page,
"page_size": page_size
}
@router.get("/jingrow/my-published-agents")
async def get_my_published_agents(
request: Request,
search: Optional[str] = None,
page: int = 1,
page_size: int = 20,
sort_by: Optional[str] = None
):
"""获取当前用户已发布的智能体列表,支持搜索、分页和排序"""
session_cookie = request.cookies.get('sid')
if not session_cookie:
raise HTTPException(status_code=401, detail="未提供认证信息")
url = f"{get_jingrow_cloud_url()}/api/action/jcloud.api.jlocal.get_my_local_agent_list"
# 构建参数
params = {
'order_by': sort_by or "name asc",
'limit_start': (page - 1) * page_size,
'limit_page_length': page_size
}
if search:
params['filters'] = json.dumps({"title": ["like", f"%{search}%"]}, ensure_ascii=False)
# 获取总数
total_params = params.copy()
total_params['limit_start'] = 0
total_params['limit_page_length'] = 0
headers = get_jingrow_cloud_api_headers()
headers['Cookie'] = f'sid={session_cookie}'
total_response = requests.get(url, params=total_params, headers=headers, timeout=20)
total_response.raise_for_status()
total_count = len(total_response.json().get('message', []))
# 获取分页数据
response = requests.get(url, params=params, headers=headers, timeout=20)
response.raise_for_status()
agents = response.json().get('message', [])
return {
"items": agents,
"total": total_count,
"page": page,
"page_size": page_size
}
@router.post("/jingrow/delete-published-node")
async def delete_published_node(request: Request, payload: Dict[str, Any]):
"""删除已发布的节点根据记录的name字段删除"""
session_cookie = request.cookies.get('sid')
if not session_cookie:
raise HTTPException(status_code=401, detail="未提供认证信息")
# 使用记录的name字段
record_name = payload.get('name')
if not record_name:
raise HTTPException(status_code=400, detail="记录名称不能为空")
url = f"{get_jingrow_cloud_url()}/api/action/jcloud.api.jlocal.delete_local_node"
headers = get_jingrow_cloud_api_headers()
headers['Cookie'] = f'sid={session_cookie}'
# 传递记录的name字段到云端API
response = requests.post(url, json={'name': record_name}, headers=headers, timeout=20)
response.raise_for_status()
data = response.json()
result = data.get('message', data)
if result.get('success'):
return {"success": True, "message": result.get('message', '节点删除成功')}
else:
raise HTTPException(status_code=400, detail=result.get('message', '删除失败'))
@router.post("/jingrow/delete-published-agent")
async def delete_published_agent(request: Request, payload: Dict[str, Any]):
"""删除已发布的智能体根据记录的name字段删除"""
session_cookie = request.cookies.get('sid')
if not session_cookie:
raise HTTPException(status_code=401, detail="未提供认证信息")
# 使用记录的name字段
record_name = payload.get('name')
if not record_name:
raise HTTPException(status_code=400, detail="记录名称不能为空")
url = f"{get_jingrow_cloud_url()}/api/action/jcloud.api.jlocal.delete_local_agent"
headers = get_jingrow_cloud_api_headers()
headers['Cookie'] = f'sid={session_cookie}'
# 传递记录的name字段到云端API
response = requests.post(url, json={'name': record_name}, headers=headers, timeout=20)
response.raise_for_status()
data = response.json()
result = data.get('message', data)
if result.get('success'):
return {"success": True, "message": result.get('message', '智能体删除成功')}
else:
raise HTTPException(status_code=400, detail=result.get('message', '删除失败'))
@router.post("/jingrow/upload-image")
async def upload_image(file: UploadFile = File(...)):
"""上传应用图片"""
try:
# 验证文件类型
if not file.content_type or not file.content_type.startswith('image/'):
raise HTTPException(status_code=400, detail="只支持图片格式")
# 验证文件大小 (5MB限制)
if file.size and file.size > 5 * 1024 * 1024:
raise HTTPException(status_code=400, detail="图片大小不能超过5MB")
# 读取文件内容
file_content = await file.read()
url = f"{get_jingrow_cloud_url()}/api/action/upload_file"
headers = get_jingrow_cloud_api_headers()
headers.pop('Content-Type', None)
files = {'file': (file.filename, file_content, file.content_type)}
data = {'file_name': file.filename, 'is_private': 0, 'folder': 'Home/Attachments'}
response = requests.post(url, files=files, data=data, headers=headers, timeout=30)
if response.status_code != 200:
error_detail = response.json().get('detail', f"HTTP {response.status_code}") if response.headers.get('content-type', '').startswith('application/json') else f"HTTP {response.status_code}"
raise HTTPException(status_code=response.status_code, detail=error_detail)
result = response.json()
# 检查错误
if isinstance(result, dict) and result.get('error'):
raise HTTPException(status_code=400, detail=result['error'])
# 处理成功响应
message = result.get('message', {})
if isinstance(message, dict) and message.get('file_url'):
file_url = message.get('file_url')
# 确保URL格式正确
if not file_url.startswith('/files/'):
file_url = f"/files/{file_url.lstrip('/')}"
return {
"success": True,
"url": file_url,
"file_name": message.get('file_name'),
"message": "图片上传成功"
}
raise HTTPException(status_code=400, detail="API响应格式错误")
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/jingrow/create-app")
async def create_app(
title: str = Form(...),
app_name: str = Form(...),
subtitle: str = Form(None),
category: str = Form(None),
description: str = Form(None),
repository_url: str = Form(None),
file_url: str = Form(None),
app_image: str = Form(None)
):
try:
url = f"{get_jingrow_cloud_url()}/api/action/jcloud.api.jlocal.create_local_app"
headers = get_jingrow_cloud_api_headers()
headers['Content-Type'] = 'application/json'
response = requests.post(url, json={
"app_data": {
"app_name": app_name,
"title": title,
"subtitle": subtitle,
"description": description,
"category": category,
"repository_url": repository_url,
"file_url": file_url,
"app_image": app_image
}
}, headers=headers, timeout=20)
if response.status_code != 200:
error_detail = response.json().get('detail', f"HTTP {response.status_code}") if response.headers.get('content-type', '').startswith('application/json') else f"HTTP {response.status_code}"
raise HTTPException(status_code=response.status_code, detail=error_detail)
result = response.json()
# 检查错误
if isinstance(result, dict) and result.get('error'):
raise HTTPException(status_code=400, detail=result['error'])
message = result.get('message', {})
if isinstance(message, dict) and message.get('error'):
raise HTTPException(status_code=400, detail=message['error'])
# 成功响应
app_name = message.get('name', 'unknown') if isinstance(message, dict) else result.get('message', 'unknown')
return {"success": True, "message": f"应用发布成功,应用名称: {app_name}"}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"发布失败: {str(e)}")
@router.post("/jingrow/export-app-package/{app_name}")
async def export_app_package(app_name: str):
"""导出应用安装包 - 直接打包本地应用源码"""
try:
logger.info(f"开始导出应用安装包: {app_name}")
# 获取应用目录
current = Path(__file__).resolve()
root = current.parents[4]
apps_dir = root / "apps"
# 调用辅助函数
result = export_app_package_from_local(app_name, apps_dir)
if not result.get("success"):
raise HTTPException(status_code=400, detail=result.get("error", "导出失败"))
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"导出应用安装包失败: {str(e)}")
logger.error(f"Traceback: {traceback.format_exc()}")
raise HTTPException(status_code=500, detail=f"导出失败: {str(e)}")