jingrow-framework/apps/jingrow/jingrow/api/local_app_installer.py

907 lines
36 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Jingrow Local 应用安装 API
提供本地应用安装、管理等功能
"""
from fastapi import APIRouter, HTTPException, Request, UploadFile, File, Form
from typing import Dict, Any, List, Optional
import logging
import tempfile
import os
from pathlib import Path
import requests
import json
import shutil
import re
from jingrow.utils.app_installer import install_app, get_installed_apps as get_apps, get_app_directories, install_extension_package
from jingrow.utils.jingrow_api import log_info, log_error
from jingrow.utils.auth import get_jingrow_cloud_url, get_jingrow_cloud_api_headers, get_jingrow_cloud_api_url, get_jingrow_api_headers
from jingrow.config import Config
logger = logging.getLogger(__name__)
router = APIRouter()
# 常量定义
JINGROW_APP_NAME = 'jingrow'
DEFAULT_VERSION = '1.0.0'
DEFAULT_BRANCH = 'main'
SYSTEM_APP_TYPE = 'system'
INSTALLED_APP_TYPE = 'installed'
# 全局缓存,避免重复检查
_jingrow_registered_cache = None
def _import_app_package_and_pagetypes(app_name: str, install_result: Dict[str, Any]) -> None:
"""直接导入应用的 Package 和 PageTypes 到数据库"""
try:
# 从安装结果获取路径
backend_result = install_result.get('backend_result', {})
app_dir = backend_result.get('app_dir')
if not app_dir:
# 计算应用路径
current = Path(__file__).resolve()
root = current.parents[4]
apps_dir = root / "apps"
backend_dir = apps_dir / app_name / app_name
if not backend_dir.exists():
backend_dir = apps_dir / app_name
app_dir = str(backend_dir)
else:
# 构建后端代码目录路径
backend_dir = Path(app_dir) / app_name
if not backend_dir.exists():
backend_dir = Path(app_dir)
app_dir = str(backend_dir)
if not os.path.exists(app_dir):
log_info(f"应用目录不存在: {app_dir},跳过导入")
return
# 调用 jingrow whitelist API 导入
try:
api_url = f"{Config.jingrow_server_url}/api/action/jingrow.ai.utils.jlocal.sync_app_files"
headers = get_jingrow_api_headers()
response = requests.post(
api_url,
json={'app_name': app_name, 'app_path': app_dir, 'force': True},
headers=headers,
timeout=60
)
if response.status_code == 200:
result_data = response.json()
log_info(f"导入结果: {result_data}")
else:
log_error(f"导入失败: HTTP {response.status_code}")
except Exception as api_error:
log_error(f"调用导入 API 失败: {str(api_error)}")
except Exception as e:
log_error(f"导入应用数据失败: {str(e)}")
@router.post("/jingrow/install/upload")
async def install_app_from_upload(
request: Request,
file: UploadFile = File(...),
app_name: Optional[str] = Form(None)
):
try:
log_info(f"开始处理上传的应用包: {file.filename}")
# 验证文件类型
if not file.filename.lower().endswith('.zip'):
raise HTTPException(status_code=400, detail="只支持ZIP格式的安装包")
# 保存上传文件到临时目录
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.zip')
try:
content = await file.read()
temp_file.write(content)
temp_file.close()
# 安装应用
result = install_app(temp_file.name, app_name)
if result.get('success'):
app_name_result = result.get('app_name')
log_info(f"应用安装成功: {app_name_result}")
# 导入 Package 和 PageTypes 到数据库
try:
_import_app_package_and_pagetypes(app_name_result, result)
log_info(f"应用 {app_name_result} 的 Package 和 PageTypes 已导入")
except Exception as import_error:
log_error(f"导入 Package 和 PageTypes 失败: {str(import_error)}")
# 不阻止安装成功,只是警告
return result
else:
raise HTTPException(status_code=400, detail=result.get('error'))
finally:
# 清理临时文件
try:
os.unlink(temp_file.name)
except:
pass
except HTTPException:
raise
except Exception as e:
log_error(f"安装应用失败: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/jingrow/local-apps")
async def get_local_apps(request: Request):
"""扫描本地未安装的App"""
try:
current = Path(__file__).resolve()
root = current.parents[4]
apps_dir = root / "apps"
# 获取已安装的App列表 - 通过get_single API获取
from jingrow.utils.jingrow_api import get_single_pagetype
result = get_single_pagetype("Local Installed Apps")
installed_names = set()
if result.get('success'):
config = result.get('config', {})
local_installed_apps = config.get('local_installed_apps', [])
installed_names = {app.get('app_name', '') for app in local_installed_apps}
# 系统默认App列表需要排除
system_apps = {
'jingrow', 'local_app_installer', 'local_jobs',
'local_apps', 'local_app', 'local_app_installer'
}
# 扫描apps目录
local_apps = []
if apps_dir.exists():
for app_dir in apps_dir.iterdir():
if (app_dir.is_dir() and
app_dir.name not in installed_names and
app_dir.name not in system_apps):
# 检查是否有hooks.py文件
hooks_file = app_dir / app_dir.name / 'hooks.py'
if hooks_file.exists():
try:
# 读取hooks.py获取App信息
with open(hooks_file, 'r', encoding='utf-8') as f:
content = f.read()
app_info = {
'name': app_dir.name,
'path': str(app_dir),
'type': 'both' if (app_dir / "frontend").exists() else app_dir.name,
'title': app_dir.name.replace('_', ' ').title(),
'description': '',
'publisher': '',
'version': '1.0.0'
}
# 提取App信息
import re
title_match = re.search(r'app_title\s*=\s*["\']([^"\']+)["\']', content)
if title_match:
app_info['title'] = title_match.group(1)
desc_match = re.search(r'app_description\s*=\s*["\']([^"\']+)["\']', content)
if desc_match:
app_info['description'] = desc_match.group(1)
pub_match = re.search(r'app_publisher\s*=\s*["\']([^"\']+)["\']', content)
if pub_match:
app_info['publisher'] = pub_match.group(1)
local_apps.append(app_info)
except Exception as e:
# 如果读取失败,仍然添加基本信息
local_apps.append({
'name': app_dir.name,
'path': str(app_dir),
'type': app_dir.name,
'title': app_dir.name.replace('_', ' ').title(),
'description': '',
'publisher': '',
'version': '1.0.0'
})
return {
'success': True,
'data': {
'apps': local_apps,
'total': len(local_apps)
}
}
except Exception as e:
log_error(f"扫描本地App失败: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/jingrow/install-local/{app_name}")
async def install_local_app(request: Request, app_name: str):
"""安装本地App"""
try:
# 获取应用目录路径
current = Path(__file__).resolve()
root = current.parents[4]
apps_dir = root / "apps"
app_dir = apps_dir / app_name
backend_dir = app_dir / app_name # 后端代码目录
if not app_dir.exists():
raise HTTPException(status_code=404, detail=f"App '{app_name}' not found")
if not backend_dir.exists():
log_info(f"后端目录不存在: {backend_dir},跳过导入")
backend_dir = app_dir # 使用应用根目录
# 检查是否已经安装 - 通过get_single API检查
from jingrow.utils.jingrow_api import get_single_pagetype
result = get_single_pagetype("Local Installed Apps")
if result.get('success'):
config = result.get('config', {})
local_installed_apps = config.get('local_installed_apps', [])
for app in local_installed_apps:
if app.get('app_name', '') == app_name:
raise HTTPException(status_code=400, detail=f"App '{app_name}' is already installed")
# 将App信息添加到Local Installed Apps PageType
try:
from jingrow.utils.jingrow_api import get_single_pagetype
# 获取当前应用列表
result = get_single_pagetype("Local Installed Apps")
if result.get('success'):
config = result.get('config', {})
apps_list = config.get('local_installed_apps', [])
else:
apps_list = []
# 检查是否已存在
for app in apps_list:
if app.get('app_name', '') == app_name:
log_info(f"应用 {app_name} 已存在于数据库中")
break
else:
# 添加新应用
new_app = {
'app_name': app_name,
'app_version': "1.0.0",
'git_branch': "main"
}
apps_list.append(new_app)
if await _update_local_installed_apps(apps_list):
log_info(f"应用 {app_name} 已注册到数据库")
# 同步应用到数据库(导入 Package 和 PageTypes
try:
api_url = f"{Config.jingrow_server_url}/api/action/jingrow.ai.utils.jlocal.sync_app_files"
response = requests.post(
api_url,
json={'app_name': app_name, 'app_path': str(backend_dir), 'force': True},
headers=get_jingrow_api_headers(),
timeout=60
)
if response.status_code == 200:
log_info(f"应用 {app_name} 的 Package 和 PageTypes 已成功同步到数据库")
else:
log_error(f"同步应用 {app_name} 失败: HTTP {response.status_code}")
except Exception as sync_error:
log_error(f"同步应用 {app_name} 时出错: {str(sync_error)}")
# 不阻止安装成功,只是记录错误
else:
log_error("更新数据库失败")
except Exception as e:
log_error(f"注册应用到数据库失败: {str(e)}")
# 数据库操作失败不应该阻止安装,但需要记录错误
return {
'success': True,
'message': f"App '{app_name}' installed successfully",
'app_name': app_name
}
except HTTPException:
raise
except Exception as e:
log_error(f"安装本地App失败: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
async def _update_local_installed_apps(apps_list):
"""更新Local Installed Apps数据库记录"""
import requests
from jingrow.utils.auth import get_jingrow_api_headers
from jingrow.config import Config
api_url = f"{Config.jingrow_server_url}/api/data/Local Installed Apps/Local Installed Apps"
payload = {"local_installed_apps": apps_list}
response = requests.put(api_url, json=payload, headers=get_jingrow_api_headers(), timeout=10)
return response.status_code == 200
async def ensure_jingrow_registered():
"""确保jingrow应用已注册到数据库中带缓存"""
global _jingrow_registered_cache
# 如果已经检查过且成功,直接返回
if _jingrow_registered_cache is True:
return
try:
from jingrow.utils.jingrow_api import get_single_pagetype
result = get_single_pagetype("Local Installed Apps")
if not result.get('success'):
return
apps = result.get('config', {}).get('local_installed_apps', [])
# 检查jingrow是否已存在
if not any(app.get('app_name') == JINGROW_APP_NAME for app in apps):
apps.append({'app_name': JINGROW_APP_NAME, 'app_version': DEFAULT_VERSION, 'git_branch': DEFAULT_BRANCH})
if await _update_local_installed_apps(apps):
log_info("jingrow应用已自动注册到数据库")
_jingrow_registered_cache = True
else:
log_error("注册jingrow应用失败")
_jingrow_registered_cache = False
else:
log_info("jingrow应用已在数据库中")
_jingrow_registered_cache = True
except Exception as e:
log_error(f"检查jingrow应用注册状态失败: {str(e)}")
_jingrow_registered_cache = False
@router.get("/jingrow/installed-apps")
async def get_installed_apps(request: Request):
"""获取已安装的应用列表 - 通过get_single API获取"""
try:
# 确保jingrow应用已注册
await ensure_jingrow_registered()
# 通过get_single API获取Local Installed Apps数据
from jingrow.utils.jingrow_api import get_single_pagetype
result = get_single_pagetype("Local Installed Apps")
if not result.get('success'):
return {
'success': True,
'data': {
'apps': [],
'total': 0
}
}
config = result.get('config', {})
local_installed_apps = config.get('local_installed_apps', [])
apps = [
{
'name': app.get('app_name', ''),
'version': app.get('app_version', '1.0.0'),
'git_branch': app.get('git_branch', 'main'),
'type': SYSTEM_APP_TYPE if app.get('app_name') == JINGROW_APP_NAME else INSTALLED_APP_TYPE,
'uninstallable': app.get('app_name') != JINGROW_APP_NAME
}
for app in local_installed_apps
]
return {
'success': True,
'data': {
'apps': apps,
'total': len(apps)
}
}
except Exception as e:
log_error(f"获取已安装应用列表失败: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/jingrow/uninstall/{app_name}")
async def uninstall_app(request: Request, app_name: str):
"""卸载应用 - 直接删除整个app目录"""
try:
log_info(f"开始卸载应用: {app_name}")
# 禁止卸载系统应用
if app_name == JINGROW_APP_NAME:
raise HTTPException(status_code=403, detail=f"系统应用 '{JINGROW_APP_NAME}' 不允许卸载")
# 获取应用目录
apps_dir, _ = get_app_directories()
app_dir = apps_dir / app_name
# 检查应用是否存在
if not app_dir.exists():
raise HTTPException(status_code=404, detail=f"应用 {app_name} 不存在")
# 删除整个app目录
shutil.rmtree(app_dir)
log_info(f"应用目录删除成功: {app_dir}")
# 从数据库中删除记录
db_result = await _remove_from_database(app_name)
return {
'success': True,
'message': f'应用 {app_name} 卸载成功'
}
except HTTPException:
raise
except Exception as e:
log_error(f"卸载应用 {app_name} 失败: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/jingrow/app-info/{app_name}")
async def get_app_info(request: Request, app_name: str):
try:
backend_dir, frontend_dir = get_app_directories()
backend_app_dir = backend_dir / app_name / app_name
frontend_app_dir = frontend_dir / app_name / "frontend"
app_info = {
'name': app_name,
'has_backend': backend_app_dir.exists(),
'has_frontend': frontend_app_dir.exists(),
'backend_path': str(backend_app_dir) if backend_app_dir.exists() else None,
'frontend_path': str(frontend_app_dir) if frontend_app_dir.exists() else None
}
# 尝试读取应用信息
if backend_app_dir.exists():
setup_py = backend_app_dir / 'setup.py'
if setup_py.exists():
try:
with open(setup_py, 'r', encoding='utf-8') as f:
content = f.read()
# 简单提取版本信息
version_match = re.search(r'version\s*=\s*["\']([^"\']+)["\']', content)
if version_match:
app_info['version'] = version_match.group(1)
except:
pass
if frontend_app_dir.exists():
package_json = frontend_app_dir / 'package.json'
if package_json.exists():
try:
with open(package_json, 'r', encoding='utf-8') as f:
pkg_data = json.load(f)
app_info['version'] = pkg_data.get('version', app_info.get('version'))
app_info['description'] = pkg_data.get('description')
except:
pass
return {
'success': True,
'data': app_info
}
except Exception as e:
log_error(f"获取应用信息失败: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
async def _install_extension_package_in_api(package_path: str, original_filename: str) -> Dict[str, Any]:
"""保存扩展包并立即安装"""
import shutil
import requests
try:
log_info(f"保存扩展包文件: {original_filename}")
# 复制文件到 public/files 目录
# 从 jingrow-framework/apps/jingrow/jingrow/api/local_app_installer.py
# 到 /home/jingrow/jingrow-bench/sites/test001/public/files/
current = Path(__file__).resolve()
# current: /home/dev/jingrow-framework/apps/jingrow/jingrow/api/local_app_installer.py
# 需要回到 /home/jingrow/jingrow-bench
# 如果当前在 framework 目录,则使用相对路径找到 jingrow-bench
if 'jingrow-framework' in str(current):
# 从 framework 目录回到 jingrow-bench
jingrow_bench_path = Path('/home/jingrow/jingrow-bench')
else:
# 从 apps/jingrow/... 到 jingrow-bench
project_root = current.parents[6] if current.parts.count('apps') > 1 else current.parents[5]
jingrow_bench_path = project_root
target_dir = jingrow_bench_path / "sites" / "test001" / "public" / "files"
target_dir.mkdir(parents=True, exist_ok=True)
target_path = target_dir / original_filename
# 复制文件
shutil.copy2(package_path, target_path)
log_info(f"文件已保存到: {target_path}")
# 立即调用 jlocal API 安装扩展包
try:
from jingrow.utils.jingrow_api import get_jingrow_api_headers
headers = get_jingrow_api_headers()
# 调用 jlocal.install_package
api_url = f"{Config.jingrow_server_url}/api/action/jingrow.ai.utils.jlocal.install_package"
file_url = f"/files/{original_filename}"
response = requests.post(
api_url,
json={'package_file_url': file_url},
headers=headers,
timeout=60
)
if response.status_code == 200:
result = response.json()
if result.get('message', {}).get('success'):
log_info(f"扩展包安装成功: {result.get('message', {}).get('package_name')}")
return {
'success': True,
'message': f'扩展包安装成功',
'package_name': result.get('message', {}).get('package_name'),
'file_url': file_url,
'imported_files': result.get('message', {}).get('imported_files', []),
'file_count': result.get('message', {}).get('file_count', 0)
}
else:
error_msg = result.get('message', {}).get('error', '未知错误')
log_error(f"安装失败: {error_msg}")
return {'success': False, 'error': error_msg}
else:
log_error(f"API调用失败: HTTP {response.status_code}")
return {'success': False, 'error': f'API调用失败: HTTP {response.status_code}'}
except Exception as api_error:
log_error(f"调用安装API失败: {str(api_error)}")
return {
'success': True,
'message': f'扩展包已保存到 public/files 目录',
'file_url': f'/files/{original_filename}',
'file_path': str(target_path),
'note': '文件已上传,请手动在 jingrow 应用中使用 Package Import 功能导入'
}
except Exception as e:
log_error(f"保存扩展包失败: {str(e)}")
return {'success': False, 'error': f'保存文件失败: {str(e)}'}
async def _remove_from_database(app_name: str) -> Dict[str, Any]:
"""从数据库中删除应用记录"""
try:
# 使用API方式操作数据库
from jingrow.utils.jingrow_api import get_single_pagetype
# 获取当前数据
result = get_single_pagetype("Local Installed Apps")
if not result.get('success'):
return {'success': True, 'message': '未找到Local Installed Apps记录'}
config = result.get('config', {})
local_installed_apps = config.get('local_installed_apps', [])
log_info(f"当前应用列表: {local_installed_apps}")
# 查找要删除的应用
original_count = len(local_installed_apps)
local_installed_apps = [app for app in local_installed_apps if app.get('app_name', '') != app_name]
if len(local_installed_apps) < original_count:
if await _update_local_installed_apps(local_installed_apps):
log_info(f"应用 {app_name} 已从Local Installed Apps删除")
return {'success': True, 'message': '数据库记录删除成功'}
else:
return {'success': False, 'error': '更新数据库失败'}
else:
return {'success': True, 'message': '未找到要删除的应用记录'}
except Exception as e:
log_error(f"从数据库删除应用记录失败: {str(e)}")
return {'success': False, 'error': f'数据库操作异常: {str(e)}'}
@router.get("/jingrow/app-marketplace")
async def get_app_marketplace(
search: Optional[str] = None,
page: int = 1,
page_size: int = 20,
sort_by: Optional[str] = None
):
"""获取应用市场数据,支持搜索、分页和排序"""
try:
url = f"{get_jingrow_cloud_url()}/api/action/jcloud.api.local_app.get_local_apps"
# 构建过滤条件
filters = {"public": 1}
if search:
filters["title"] = ["like", f"%{search}%"]
# 1. 先获取总数(不分页)
total_params = {
'filters': json.dumps(filters, ensure_ascii=False),
'limit_start': 0,
'limit_page_length': 0 # 不限制数量,获取所有数据来计算总数
}
headers = get_jingrow_cloud_api_headers()
total_response = requests.get(url, params=total_params, headers=headers, timeout=20)
total_count = 0
if total_response.status_code == 200:
total_data = total_response.json()
total_count = len(total_data.get('message', []))
# 2. 获取分页数据
params = {
'filters': json.dumps(filters, ensure_ascii=False)
}
# 排序参数
if sort_by:
params['order_by'] = sort_by
# 分页参数
limit_start = (page - 1) * page_size
params['limit_start'] = limit_start
params['limit_page_length'] = page_size
response = requests.get(url, params=params, headers=headers, timeout=20)
if response.status_code == 200:
data = response.json()
apps = data.get('message', [])
# 返回分页格式的数据
return {
"items": apps,
"total": total_count,
"page": page,
"page_size": page_size
}
else:
raise HTTPException(status_code=response.status_code, detail="获取应用市场数据失败")
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取应用市场数据失败: {str(e)}")
@router.get("/jingrow/app-marketplace/{app_name}")
async def get_app_detail(app_name: str):
"""获取应用详情"""
try:
url = f"{get_jingrow_cloud_url()}/api/action/jcloud.api.local_app.get_local_app"
params = {"name": app_name}
headers = get_jingrow_cloud_api_headers()
response = requests.get(url, params=params, headers=headers, timeout=20)
if response.status_code == 200:
data = response.json()
return data.get('message')
else:
raise HTTPException(status_code=404, detail="应用不存在")
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取应用详情失败: {str(e)}")
@router.get("/jingrow/app-marketplace-meta")
async def get_app_meta():
"""获取Local App元数据"""
try:
url = f"{get_jingrow_cloud_url()}/api/action/jcloud.api.local_app.get_local_app_meta"
headers = get_jingrow_cloud_api_headers()
response = requests.get(url, headers=headers, timeout=20)
if response.status_code == 200:
data = response.json()
return data.get('message')
else:
raise HTTPException(status_code=500, detail="获取元数据失败")
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取元数据失败: {str(e)}")
@router.post("/jingrow/install-extension")
async def install_extension_package_api(request: Request, file: UploadFile = File(...)):
"""安装扩展包到数据库"""
try:
log_info(f"开始处理上传的扩展包: {file.filename}")
if not file.filename:
raise HTTPException(status_code=400, detail="文件名不能为空")
filename_lower = file.filename.lower()
if not filename_lower.endswith(('.tar.gz', '.tgz', '.gz')):
raise HTTPException(status_code=400, detail=f"只支持TAR.GZ格式的扩展包当前文件: {filename_lower}")
content = await file.read()
if not content:
raise HTTPException(status_code=400, detail="文件内容为空")
log_info(f"读取文件完成,大小: {len(content)} 字节")
# 使用标准文件上传接口
from jingrow.utils.jingrow_api import upload_file_to_jingrow, get_jingrow_api_headers
import requests
upload_result = upload_file_to_jingrow(content, file.filename)
if not upload_result.get('success'):
raise HTTPException(status_code=400, detail=upload_result.get('error', '文件上传失败'))
file_url = upload_result['file_url']
log_info(f"文件上传成功: {file_url}")
# 调用 install_package API 安装
api_url = f"{Config.jingrow_server_url}/api/action/jingrow.ai.utils.jlocal.install_package"
headers = get_jingrow_api_headers()
response = requests.post(
api_url,
json={'package_file_url': file_url},
headers=headers,
timeout=60
)
if response.status_code == 200:
result_data = response.json()
if result_data.get('message', {}).get('success'):
log_info(f"扩展包安装成功: {result_data['message'].get('package_name')}")
return {
'success': True,
'package_name': result_data['message']['package_name'],
'file_url': file_url,
'file_count': result_data['message'].get('file_count', 0)
}
else:
error_msg = result_data.get('message', {}).get('error', '未知错误')
log_error(f"安装失败: {error_msg}")
raise HTTPException(status_code=400, detail=error_msg)
else:
log_error(f"API调用失败: HTTP {response.status_code}")
raise HTTPException(status_code=500, detail=f'安装API调用失败: HTTP {response.status_code}')
except HTTPException:
raise
except Exception as e:
log_error(f"安装扩展包失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"安装扩展包失败: {str(e)}")
@router.post("/jingrow/upload-image")
async def upload_image(file: UploadFile = File(...)):
"""上传应用图片"""
try:
# 验证文件类型
if not file.content_type or not file.content_type.startswith('image/'):
raise HTTPException(status_code=400, detail="只支持图片格式")
# 验证文件大小 (5MB限制)
if file.size and file.size > 5 * 1024 * 1024:
raise HTTPException(status_code=400, detail="图片大小不能超过5MB")
# 读取文件内容
file_content = await file.read()
url = f"{get_jingrow_cloud_url()}/api/action/upload_file"
headers = get_jingrow_cloud_api_headers()
headers.pop('Content-Type', None)
files = {'file': (file.filename, file_content, file.content_type)}
data = {'file_name': file.filename, 'is_private': 0, 'folder': 'Home/Attachments'}
response = requests.post(url, files=files, data=data, headers=headers, timeout=30)
if response.status_code != 200:
error_detail = response.json().get('detail', f"HTTP {response.status_code}") if response.headers.get('content-type', '').startswith('application/json') else f"HTTP {response.status_code}"
raise HTTPException(status_code=response.status_code, detail=error_detail)
result = response.json()
# 检查错误
if isinstance(result, dict) and result.get('error'):
raise HTTPException(status_code=400, detail=result['error'])
# 处理成功响应
message = result.get('message', {})
if isinstance(message, dict) and message.get('file_url'):
file_url = message.get('file_url')
# 确保URL格式正确
if not file_url.startswith('/files/'):
file_url = f"/files/{file_url.lstrip('/')}"
return {
"success": True,
"url": file_url,
"file_name": message.get('file_name'),
"message": "图片上传成功"
}
raise HTTPException(status_code=400, detail="API响应格式错误")
except HTTPException:
raise
except Exception as e:
log_error(f"图片上传失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"图片上传失败: {str(e)}")
@router.post("/jingrow/create-app")
async def create_app(
title: str = Form(...),
app_name: str = Form(...),
subtitle: str = Form(None),
category: str = Form(None),
description: str = Form(None),
repository_url: str = Form(None),
file_url: str = Form(None),
app_image: str = Form(None)
):
try:
url = f"{get_jingrow_cloud_url()}/api/action/jcloud.api.local_app.create_local_app"
headers = get_jingrow_cloud_api_headers()
headers['Content-Type'] = 'application/json'
response = requests.post(url, json={
"app_data": {
"app_name": app_name,
"title": title,
"subtitle": subtitle,
"description": description,
"category": category,
"repository_url": repository_url,
"file_url": file_url,
"app_image": app_image
}
}, headers=headers, timeout=20)
if response.status_code != 200:
error_detail = response.json().get('detail', f"HTTP {response.status_code}") if response.headers.get('content-type', '').startswith('application/json') else f"HTTP {response.status_code}"
raise HTTPException(status_code=response.status_code, detail=error_detail)
result = response.json()
# 检查错误
if isinstance(result, dict) and result.get('error'):
raise HTTPException(status_code=400, detail=result['error'])
message = result.get('message', {})
if isinstance(message, dict) and message.get('error'):
raise HTTPException(status_code=400, detail=message['error'])
# 成功响应
app_name = message.get('name', 'unknown') if isinstance(message, dict) else result.get('message', 'unknown')
return {"success": True, "message": f"应用发布成功,应用名称: {app_name}"}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"发布失败: {str(e)}")