定时任务调度器实现响应式调度,测试成功

This commit is contained in:
jingrow 2025-09-21 01:16:05 +08:00
parent 7cb1ff7f72
commit d5cb089470
2 changed files with 115 additions and 16 deletions

View File

@ -1,24 +1,68 @@
from fastapi import APIRouter
from typing import List, Dict, Any
"""
调度器管理 API
支持动态增删改查定时任务
"""
from app.services.scheduler import get_scheduler
from fastapi import APIRouter, HTTPException
from typing import Dict, Any, List, Optional
from app.services.scheduler import (
start_scheduler,
stop_scheduler,
get_scheduler_status,
get_scheduler
)
router = APIRouter()
@router.post("/scheduler/reload")
async def scheduler_reload(jobs: List[Dict[str, Any]]):
"""重新加载所有定时任务。
Local Scheduled Job 读取所有定时任务包括智能体自动创建的定时任务
智能体定时任务会自动在 Local Scheduled Job 中创建对应记录无需单独处理
"""
@router.post("/start")
async def start_scheduler_endpoint():
"""启动定时任务调度器"""
try:
await start_scheduler()
return {"success": True, "message": "定时任务调度器已启动"}
except Exception as e:
raise HTTPException(status_code=500, detail=f"启动失败: {str(e)}")
@router.post("/stop")
async def stop_scheduler_endpoint():
"""停止定时任务调度器"""
try:
await stop_scheduler()
return {"success": True, "message": "定时任务调度器已停止"}
except Exception as e:
raise HTTPException(status_code=500, detail=f"停止失败: {str(e)}")
@router.post("/reload")
async def reload_scheduler():
"""重新加载定时任务"""
try:
scheduler = get_scheduler()
await scheduler.reload()
return {"success": True, "message": "定时任务已重新加载"}
except Exception as e:
raise HTTPException(status_code=500, detail=f"重新加载失败: {str(e)}")
@router.get("/status")
async def get_status():
"""获取调度器状态"""
try:
status = get_scheduler_status()
return {"success": True, "data": status}
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取状态失败: {str(e)}")
# 保持向后兼容
@router.post("/scheduler/reload")
async def scheduler_reload(jobs: List[Dict[str, Any]]):
"""重新加载所有定时任务(向后兼容)"""
try:
scheduler = get_scheduler()
# 重新加载所有任务(从 Local Scheduled Job 统一加载)
await scheduler.reload()
return {"success": True, "message": "所有定时任务已重新加载"}
except Exception as e:
return {"success": False, "error": str(e)}

View File

@ -7,7 +7,7 @@ import asyncio
import logging
import json
from datetime import datetime
from typing import Dict, Any, Optional
from typing import Dict, Any, Optional, List
from dataclasses import dataclass
from contextlib import asynccontextmanager
@ -84,7 +84,8 @@ class Scheduler:
self._scheduler.start()
await self._load_jobs()
self._running = True
logger.info(f"调度器已启动,加载了 {len(self._jobs)} 个任务")
await self.start_database_watcher() # 启动数据库监听器
logger.info(f"调度器已启动,加载了 {len(self._jobs)} 个任务(包含数据库监听器)")
async def stop(self):
"""停止调度器"""
@ -107,9 +108,11 @@ class Scheduler:
async def _load_scheduled_jobs(self):
"""加载定时任务"""
try:
from utils.jingrow_api import list_records_in_jingrow
# 获取所有必要的字段
fields = ['name', 'method', 'cron_format', 'frequency', 'stopped', 'server_script']
result = list_records_with_system_auth('Local Scheduled Job', fields=fields)
result = list_records_in_jingrow('Local Scheduled Job', fields=fields)
if isinstance(result, dict):
if result.get('success'):
jobs = result.get('data', [])
@ -254,7 +257,59 @@ class Scheduler:
async def reload(self):
"""重新加载任务"""
await self._load_jobs()
async def start_database_watcher(self):
"""启动数据库监听器 - 监听 Local Scheduled Job 表的变化"""
try:
# 启动后台任务监听数据库变化
asyncio.create_task(self._watch_database_changes())
logger.info("数据库监听器已启动")
except Exception as e:
logger.error(f"启动数据库监听器失败: {e}")
async def _watch_database_changes(self):
"""监听数据库变化"""
last_check_time = None
while self._running:
try:
# 获取最新的记录修改时间
current_time = await self._get_latest_modification_time()
# 如果有变化,重新加载任务
if last_check_time is None or current_time != last_check_time:
logger.info("检测到数据库变化,重新加载任务")
await self._load_jobs()
last_check_time = current_time
# 每5秒检查一次
await asyncio.sleep(5)
except Exception as e:
logger.error(f"数据库监听异常: {e}")
await asyncio.sleep(10) # 出错时等待更长时间
async def _get_latest_modification_time(self):
"""获取最新的记录修改时间"""
try:
from utils.jingrow_api import list_records_in_jingrow
# 只获取修改时间字段
fields = ['modified']
result = list_records_in_jingrow('Local Scheduled Job', fields=fields)
if isinstance(result, dict) and result.get('success'):
jobs = result.get('data', [])
if jobs:
# 获取最新的修改时间
latest_time = max(job.get('modified', '') for job in jobs)
return latest_time
return None
except Exception as e:
logger.error(f"获取最新修改时间失败: {e}")
return None
# 全局调度器实例
_scheduler: Optional[Scheduler] = None