69 lines
2.0 KiB
Python

"""
调度器管理 API
支持动态增删改查定时任务
"""
from fastapi import APIRouter, HTTPException
from typing import Dict, Any, List, Optional
from jingrow.services.scheduler import (
start_scheduler,
stop_scheduler,
get_scheduler_status,
get_scheduler
)
router = APIRouter()
@router.post("/start")
async def start_scheduler_endpoint():
"""启动定时任务调度器"""
try:
await start_scheduler()
return {"success": True, "message": "定时任务调度器已启动"}
except Exception as e:
raise HTTPException(status_code=500, detail=f"启动失败: {str(e)}")
@router.post("/stop")
async def stop_scheduler_endpoint():
"""停止定时任务调度器"""
try:
await stop_scheduler()
return {"success": True, "message": "定时任务调度器已停止"}
except Exception as e:
raise HTTPException(status_code=500, detail=f"停止失败: {str(e)}")
@router.post("/reload")
async def reload_scheduler():
"""重新加载定时任务"""
try:
scheduler = get_scheduler()
await scheduler.reload()
return {"success": True, "message": "定时任务已重新加载"}
except Exception as e:
raise HTTPException(status_code=500, detail=f"重新加载失败: {str(e)}")
@router.get("/status")
async def get_status():
"""获取调度器状态"""
try:
status = get_scheduler_status()
return {"success": True, "data": status}
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取状态失败: {str(e)}")
# 保持向后兼容
@router.post("/scheduler/reload")
async def scheduler_reload(jobs: List[Dict[str, Any]]):
"""重新加载所有定时任务(向后兼容)"""
try:
scheduler = get_scheduler()
await scheduler.reload()
return {"success": True, "message": "所有定时任务已重新加载"}
except Exception as e:
return {"success": False, "error": str(e)}