更新get_my_published_apps函数,修复无法获取已发布应用的问题

This commit is contained in:
jingrow 2025-11-02 22:34:51 +08:00
parent 96adb215e0
commit 3784da1eb7

View File

@ -1046,50 +1046,78 @@ async def get_my_published_apps(
if not user:
raise HTTPException(status_code=401, detail="认证失败")
url = f"{get_jingrow_cloud_url()}/api/action/jcloud.api.local_app.get_local_apps"
# 调用云端API的get_my_local_app_list函数
url = f"{get_jingrow_cloud_url()}/api/action/jcloud.api.local_app.get_my_local_app_list"
# 构建过滤条件:过滤当前用户发布的应用
filters = {"owner": user}
# 构建过滤条件
filters = {}
if search:
filters["title"] = ["like", f"%{search}%"]
# 1. 先获取总数(不分页)
total_params = {
'filters': json.dumps(filters, ensure_ascii=False),
'limit_start': 0,
'limit_page_length': 0 # 不限制数量,获取所有数据来计算总数
}
headers = get_jingrow_cloud_api_headers()
# 添加session cookie以获取用户的应用
headers['Cookie'] = f'sid={session_cookie}'
total_response = requests.get(url, params=total_params, headers=headers, timeout=20)
total_count = 0
if total_response.status_code == 200:
total_data = total_response.json()
total_count = len(total_data.get('message', []))
# 2. 获取分页数据
params = {
'filters': json.dumps(filters, ensure_ascii=False)
}
# 构建请求参数
params = {}
if filters:
params['filters'] = json.dumps(filters, ensure_ascii=False)
# 排序参数
if sort_by:
params['order_by'] = sort_by
else:
params['order_by'] = "app_name asc" # 默认排序
# 分页参数
limit_start = (page - 1) * page_size
params['limit_start'] = limit_start
params['limit_page_length'] = page_size
headers = get_jingrow_cloud_api_headers()
# 添加session cookie以获取用户的团队信息
headers['Cookie'] = f'sid={session_cookie}'
# 1. 先获取总数(不分页)
total_params = params.copy()
total_params['limit_start'] = 0
total_params['limit_page_length'] = 0 # 不限制数量,获取所有数据来计算总数
total_response = requests.get(url, params=total_params, headers=headers, timeout=20)
total_count = 0
if total_response.status_code == 200:
total_data = total_response.json()
# get_my_local_app_list返回的是列表不是message格式
if isinstance(total_data, dict) and 'message' in total_data:
total_count = len(total_data.get('message', []))
elif isinstance(total_data, list):
total_count = len(total_data)
elif isinstance(total_data, dict) and total_data.get('success') is False:
# 如果没有团队信息,返回空列表
return {
"items": [],
"total": 0,
"page": page,
"page_size": page_size
}
# 2. 获取分页数据
response = requests.get(url, params=params, headers=headers, timeout=20)
if response.status_code == 200:
data = response.json()
apps = data.get('message', [])
# 处理不同的响应格式
if isinstance(data, dict) and 'message' in data:
apps = data.get('message', [])
elif isinstance(data, list):
apps = data
elif isinstance(data, dict) and data.get('success') is False:
# 如果没有团队信息,返回空列表
apps = []
else:
apps = []
# 如果没有获取到总数,使用当前返回的数据长度
if total_count == 0 and apps:
total_count = len(apps)
# 返回分页格式的数据
return {
@ -1099,11 +1127,19 @@ async def get_my_published_apps(
"page_size": page_size
}
else:
raise HTTPException(status_code=response.status_code, detail="获取已发布应用数据失败")
error_detail = "获取已发布应用数据失败"
if response.headers.get('content-type', '').startswith('application/json'):
try:
error_data = response.json()
error_detail = error_data.get('detail') or error_data.get('message') or error_detail
except:
pass
raise HTTPException(status_code=response.status_code, detail=error_detail)
except HTTPException:
raise
except Exception as e:
logger.error(f"获取已发布应用数据失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"获取已发布应用数据失败: {str(e)}")