jcloud/jcloud/api/local_app.py

293 lines
6.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
# Copyright (c) 2024, JINGROW
# For license information, please see license.txt
import jingrow
from jcloud.api.client import dashboard_whitelist
from jcloud.utils import get_current_team
def _get_team_user_names(apps):
"""批量获取团队关联用户名称避免N+1查询问题"""
if not apps:
return {}
# 收集所有团队ID
team_ids = [app.team for app in apps if app.team]
if not team_ids:
return {}
# 批量获取团队信息
teams = jingrow.get_all(
"Team",
filters={"name": ["in", team_ids]},
fields=["name", "user"]
)
# 收集所有用户ID
user_ids = [team.user for team in teams if team.user]
if not user_ids:
return {}
# 批量获取用户信息
users = jingrow.get_all(
"User",
filters={"name": ["in", user_ids]},
fields=["name"]
)
# 构建映射关系
team_to_user = {team.name: team.user for team in teams if team.user}
user_names = {user.name: user.name for user in users}
# 返回团队到用户名称的映射
result = {}
for team_id, user_id in team_to_user.items():
if user_id in user_names:
result[team_id] = user_names[user_id]
return result
@jingrow.whitelist(allow_guest=True)
def get_local_apps(filters=None, order_by=None, limit_start=None, limit_page_length=None):
"""获取本地应用列表"""
# 构建查询条件
query_filters = {"enabled": 1} # 只获取启用的应用
if filters:
if isinstance(filters, str):
import json
filters = json.loads(filters)
query_filters.update(filters)
# 默认排序
if not order_by:
order_by = "app_name asc"
# 获取应用列表
apps = jingrow.get_all(
"Local App",
filters=query_filters,
fields=[
"name", "app_name", "title", "subtitle", "category",
"enabled", "public", "team", "status", "repository_url",
"file_url", "app_image", "creation", "modified"
],
order_by=order_by,
limit_start=limit_start,
limit_page_length=limit_page_length
)
# 批量获取团队用户名称
team_user_names = _get_team_user_names(apps)
# 格式化返回数据
result = []
for app in apps:
app_data = {
"name": app.name,
"app_name": app.app_name,
"title": app.title,
"subtitle": app.subtitle,
"category": app.category,
"enabled": app.enabled,
"public": app.public,
"team": team_user_names.get(app.team),
"status": app.status,
"repository_url": app.repository_url,
"file_url": app.file_url,
"app_image": app.app_image,
"creation": app.creation,
"modified": app.modified
}
result.append(app_data)
return result
@jingrow.whitelist(allow_guest=True)
def get_local_app(name):
"""获取本地应用详情"""
if not jingrow.db.exists("Local App", name):
jingrow.throw("Local App not found")
app = jingrow.get_pg("Local App", name)
# 获取团队关联用户的名称
team_user_names = _get_team_user_names([app])
app_data = {
"name": app.name,
"app_name": app.app_name,
"title": app.title,
"subtitle": app.subtitle,
"description": app.description,
"category": app.category,
"enabled": app.enabled,
"public": app.public,
"team": team_user_names.get(app.team),
"status": app.status,
"repository_url": app.repository_url,
"file_url": app.file_url,
"app_image": app.app_image,
"creation": app.creation,
"modified": app.modified
}
return app_data
@dashboard_whitelist()
def create_local_app(app_data):
"""创建本地应用"""
team = get_current_team()
if isinstance(app_data, str):
import json
app_data = json.loads(app_data)
# 检查应用名称是否已存在
existing_app = jingrow.db.exists("Local App", {"app_name": app_data["app_name"]})
if existing_app:
return {"error": "应用名称已存在,请使用其他名称"}
# 创建应用
app = jingrow.get_pg({
"pagetype": "Local App",
"app_name": app_data["app_name"],
"title": app_data["title"],
"subtitle": app_data.get("subtitle", ""),
"description": app_data.get("description", ""),
"category": app_data.get("category"),
"team": team,
"repository_url": app_data.get("repository_url"),
"file_url": app_data.get("file_url"),
"app_image": app_data.get("app_image")
})
app.insert()
return {"success": True, "name": app.name, "message": "应用创建成功"}
@dashboard_whitelist()
def update_local_app(name, app_data):
"""更新本地应用"""
if not jingrow.db.exists("Local App", name):
jingrow.throw("Local App not found")
app = jingrow.get_pg("Local App", name)
if isinstance(app_data, str):
import json
app_data = json.loads(app_data)
# 更新字段
for field, value in app_data.items():
if hasattr(app, field):
setattr(app, field, value)
app.save()
return app.name
@dashboard_whitelist()
def delete_local_app(name):
"""删除本地应用"""
if not jingrow.db.exists("Local App", name):
jingrow.throw("Local App not found")
app = jingrow.get_pg("Local App", name)
app.delete()
return "Local App deleted successfully"
@jingrow.whitelist(allow_guest=True)
def get_local_app_meta():
"""获取 Local App 元数据信息"""
meta = jingrow.get_meta("Local App")
return meta.as_dict()
@jingrow.whitelist(allow_guest=True)
def get_local_app_categories():
"""获取本地应用分类列表"""
categories = jingrow.get_all(
"Marketplace App Category",
fields=["name", "description", "slug"],
order_by="name asc"
)
result = []
for category in categories:
category_data = {
"name": category.name,
"description": category.description,
"slug": category.slug
}
result.append(category_data)
return result
@jingrow.whitelist(allow_guest=True)
def search_local_apps(query, filters=None, limit=20):
"""搜索本地应用"""
# 构建搜索条件
search_filters = {
"enabled": 1,
"app_name": ["like", f"%{query}%"]
}
if filters:
if isinstance(filters, str):
import json
filters = json.loads(filters)
search_filters.update(filters)
# 搜索应用
apps = jingrow.get_all(
"Local App",
filters=search_filters,
fields=[
"name", "app_name", "title", "subtitle", "category",
"enabled", "public", "team", "status", "repository_url",
"file_url", "app_image", "creation", "modified"
],
order_by="app_name asc",
limit=limit
)
# 批量获取团队用户名称
team_user_names = _get_team_user_names(apps)
# 格式化返回数据
result = []
for app in apps:
app_data = {
"name": app.name,
"app_name": app.app_name,
"title": app.title,
"subtitle": app.subtitle,
"category": app.category,
"enabled": app.enabled,
"public": app.public,
"team": team_user_names.get(app.team),
"status": app.status,
"repository_url": app.repository_url,
"file_url": app.file_url,
"app_image": app.app_image,
"creation": app.creation,
"modified": app.modified
}
result.append(app_data)
return result