308 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import APIRouter, HTTPException, Request
from typing import Dict, Any, List, Optional
import json
import logging
import requests
from jingrow.utils.auth import get_jingrow_api_headers
from jingrow.config import Config
logger = logging.getLogger(__name__)
router = APIRouter()
def call_jingrow_api(method: str, endpoint: str, headers: dict, params: dict = None, json_data: dict = None):
"""调用Jingrow API的通用函数"""
try:
url = f"{Config.jingrow_server_url}{endpoint}"
if method.upper() == 'GET':
response = requests.get(url, headers=headers, params=params, timeout=30)
elif method.upper() == 'POST':
response = requests.post(url, headers=headers, json=json_data, timeout=30)
elif method.upper() == 'PUT':
response = requests.put(url, headers=headers, json=json_data, timeout=30)
elif method.upper() == 'DELETE':
response = requests.delete(url, headers=headers, timeout=30)
else:
raise ValueError(f"Unsupported HTTP method: {method}")
if response.status_code == 200:
result = response.json()
# 检查session是否过期
if result.get('session_expired'):
return {'success': False, 'error': 'Session已过期请重新登录'}
return {'success': True, 'data': result.get('data', result.get('message', result))}
else:
return {'success': False, 'error': f'API请求失败 (HTTP {response.status_code}): {response.text}'}
except Exception as e:
return {'success': False, 'error': f'调用API异常: {str(e)}'}
@router.get("/jingrow/local-jobs")
async def get_local_jobs(
request: Request,
page: int = 1,
page_length: int = 20,
order_by: str = "modified desc",
filters: Optional[str] = None
):
"""
获取Local Job列表
"""
try:
# 统一使用系统级认证
headers = get_jingrow_api_headers()
if not headers:
logger.error("JINGROW_API_KEY 或 JINGROW_API_SECRET 未配置")
raise HTTPException(status_code=500, detail="系统认证配置错误")
headers.update({
"X-Requested-With": "XMLHttpRequest",
})
# 构建请求参数
params = {
'page': page,
'page_length': page_length,
'order_by': order_by,
'start': (page - 1) * page_length
}
if filters:
params['filters'] = filters
result = call_jingrow_api(
'GET',
'/api/data/Local Job',
headers=headers,
params=params
)
if result.get('success'):
data = result.get('data', {})
# 处理不同的数据结构
if isinstance(data, list):
# 如果data直接是数组
items = data
total = len(data)
elif isinstance(data, dict):
# 如果data是对象尝试获取data字段
items = data.get('data', [])
total = data.get('total', len(items))
else:
items = []
total = 0
return {
"success": True,
"items": items,
"total": total,
"page": page,
"page_length": page_length
}
else:
logger.error(f"Jingrow API failed: {result.get('error')}")
raise HTTPException(status_code=500, detail=result.get('error', 'Failed to fetch local jobs'))
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get local jobs: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/jingrow/local-jobs/{job_id}")
async def get_local_job_detail(request: Request, job_id: str):
"""
获取Local Job详情
"""
try:
# 统一使用系统级认证
headers = get_jingrow_api_headers()
if not headers:
logger.error("JINGROW_API_KEY 或 JINGROW_API_SECRET 未配置")
raise HTTPException(status_code=500, detail="系统认证配置错误")
headers.update({
"X-Requested-With": "XMLHttpRequest",
})
result = call_jingrow_api(
'GET',
f'/api/data/Local Job/{job_id}',
headers=headers
)
if result.get('success'):
return {
"success": True,
"data": result.get('data')
}
else:
raise HTTPException(status_code=404, detail=result.get('error', 'Local job not found'))
except Exception as e:
logger.error(f"Failed to get local job detail {job_id}: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/jingrow/local-jobs/{job_id}/stop")
async def stop_local_job(request: Request, job_id: str):
"""
停止Local Job
"""
try:
# 统一使用系统级认证
headers = get_jingrow_api_headers()
if not headers:
logger.error("JINGROW_API_KEY 或 JINGROW_API_SECRET 未配置")
raise HTTPException(status_code=500, detail="系统认证配置错误")
headers.update({
"X-Requested-With": "XMLHttpRequest",
})
result = call_jingrow_api(
'POST',
f'/api/action/jingrow.core.pagetype.local_job.local_job.stop_job',
headers=headers,
json_data={'job_id': job_id}
)
if result.get('success'):
return {
"success": True,
"message": "Job stopped successfully"
}
else:
raise HTTPException(status_code=500, detail=result.get('error', 'Failed to stop job'))
except Exception as e:
logger.error(f"Failed to stop local job {job_id}: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/jingrow/local-jobs/{job_id}")
async def delete_local_job(request: Request, job_id: str):
"""
删除Local Job
"""
try:
# 统一使用系统级认证
headers = get_jingrow_api_headers()
if not headers:
logger.error("JINGROW_API_KEY 或 JINGROW_API_SECRET 未配置")
raise HTTPException(status_code=500, detail="系统认证配置错误")
headers.update({
"X-Requested-With": "XMLHttpRequest",
})
result = call_jingrow_api(
'POST',
'/api/action/jingrow.client.delete',
headers=headers,
json_data={
'pagetype': 'Local Job',
'name': job_id
}
)
if result.get('success'):
return {
"success": True,
"message": "Job deleted successfully"
}
else:
raise HTTPException(status_code=500, detail=result.get('error', 'Failed to delete job'))
except Exception as e:
logger.error(f"Failed to delete local job {job_id}: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/jingrow/local-jobs/batch-delete")
async def batch_delete_local_jobs(request: Request, request_data: Dict[str, Any]):
"""
批量删除Local Jobs
"""
try:
job_ids = request_data.get('job_ids', [])
if not job_ids:
raise HTTPException(status_code=400, detail="job_ids is required")
# 统一使用系统级认证
headers = get_jingrow_api_headers()
if not headers:
logger.error("JINGROW_API_KEY 或 JINGROW_API_SECRET 未配置")
raise HTTPException(status_code=500, detail="系统认证配置错误")
headers.update({
"X-Requested-With": "XMLHttpRequest",
})
success_count = 0
failed_jobs = []
# 逐个删除
for job_id in job_ids:
try:
result = call_jingrow_api(
'POST',
'/api/action/jingrow.client.delete',
headers=headers,
json_data={
'pagetype': 'Local Job',
'name': job_id
}
)
if result.get('success'):
success_count += 1
else:
failed_jobs.append(job_id)
except Exception:
failed_jobs.append(job_id)
return {
"success": True,
"message": f"Deleted {success_count} jobs successfully",
"success_count": success_count,
"failed_jobs": failed_jobs
}
except Exception as e:
logger.error(f"Failed to batch delete local jobs: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/jingrow/local-jobs/stats")
async def get_local_job_stats(request: Request):
"""
获取Local Job统计信息
"""
try:
# 统一使用系统级认证
headers = get_jingrow_api_headers()
if not headers:
logger.error("JINGROW_API_KEY 或 JINGROW_API_SECRET 未配置")
raise HTTPException(status_code=500, detail="系统认证配置错误")
headers.update({
"X-Requested-With": "XMLHttpRequest",
})
result = call_jingrow_api(
'POST',
'/api/action/jingrow.core.pagetype.local_job.local_job.get_stats',
headers=headers
)
if result.get('success'):
return {
"success": True,
"stats": result.get('data', {})
}
else:
raise HTTPException(status_code=500, detail=result.get('error', 'Failed to get stats'))
except Exception as e:
logger.error(f"Failed to get local job stats: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))