任务队列执行超时自动更为状态为失败,默认超时60秒

This commit is contained in:
jingrow 2025-11-10 00:26:26 +08:00
parent 3c2d21981d
commit 14cb5d68db

View File

@ -1,8 +1,11 @@
import os
import json
import logging
from typing import Any, Dict, List
import time
import asyncio
import uuid
from datetime import datetime
from typing import Any, Dict, List
import dramatiq
from dramatiq.brokers.redis import RedisBroker
@ -92,9 +95,8 @@ async def _execute_node_job(payload: Dict[str, Any]) -> Dict[str, Any]:
job_id = payload.get("job_id")
started_iso = None
try:
import time
started = time.time()
started_iso = __import__('datetime').datetime.now().isoformat()
started_iso = datetime.now().isoformat()
modifier = get_logged_user(session_cookie) or 'system'
await _push_status_to_jingrow(job_id, {
'status': 'started',
@ -103,7 +105,7 @@ async def _execute_node_job(payload: Dict[str, Any]) -> Dict[str, Any]:
})
result = await executor.execute_node(node_type, flow_id, context, inputs, config, session_cookie)
ended = time.time()
ended_iso = __import__('datetime').datetime.now().isoformat()
ended_iso = datetime.now().isoformat()
await _push_status_to_jingrow(job_id, {
'status': 'finished' if result.get('success') else 'failed',
'exc_info': None if result.get('success') else result.get('error'),
@ -114,7 +116,7 @@ async def _execute_node_job(payload: Dict[str, Any]) -> Dict[str, Any]:
})
return result
except Exception as e:
ended_iso = __import__('datetime').datetime.now().isoformat()
ended_iso = datetime.now().isoformat()
modifier = get_logged_user(session_cookie) or 'system'
await _push_status_to_jingrow(job_id, {
'status': 'failed',
@ -300,10 +302,11 @@ async def _execute_agent_job(payload: Dict[str, Any]) -> Dict[str, Any]:
return {"success": False, "error": f"Failed to get agent detail: {str(e)}"}
@dramatiq.actor(max_retries=3, time_limit=60_000)
@dramatiq.actor(max_retries=0, time_limit=60_000)
def execute_local_scheduled_job(job_json: str) -> None:
"""Worker 执行入口job_json 为 JSON 字符串,包含 target_type 及其参数。
target_type: agent
超时不重试max_retries=0
"""
try:
init_hooks(clear_cache=False)
@ -313,38 +316,79 @@ def execute_local_scheduled_job(job_json: str) -> None:
try:
payload = json.loads(job_json)
except Exception as e:
logger.error(f"解析任务 JSON 失败: {e}")
return
import asyncio
if not payload.get("job_id"):
payload["job_id"] = str(uuid.uuid4())
target_type = payload.get("target_type")
job_id = payload["job_id"]
sc = payload.get("session_cookie") or (payload.get("context") or {}).get("session_cookie")
modifier = get_logged_user(sc) or 'system'
started = None
started_iso = None
if target_type == "agent":
import time
started = time.time()
started_iso = __import__('datetime').datetime.now().isoformat()
sc = payload.get("session_cookie") or (payload.get("context") or {}).get("session_cookie")
modifier = get_logged_user(sc) or 'system'
asyncio.run(_push_status_to_jingrow(payload["job_id"], {
"status": "started",
"started_at": started_iso,
"session_cookie": sc,
"modified_by": modifier
}))
result = asyncio.run(_execute_agent_job(payload))
ended = time.time()
ended_iso = __import__('datetime').datetime.now().isoformat()
asyncio.run(_push_status_to_jingrow(payload["job_id"], {
"status": "finished" if result.get("success") else "failed",
"ended_at": ended_iso,
"time_taken": round(max(0.0, ended - started), 3),
"session_cookie": sc,
"modified_by": modifier,
}))
else:
logger.warning(f"Unsupported target_type: {target_type}")
try:
if target_type == "agent":
started = time.time()
started_iso = datetime.now().isoformat()
asyncio.run(_push_status_to_jingrow(job_id, {
"status": "started",
"started_at": started_iso,
"session_cookie": sc,
"modified_by": modifier
}))
result = asyncio.run(_execute_agent_job(payload))
ended = time.time()
ended_iso = datetime.now().isoformat()
asyncio.run(_push_status_to_jingrow(job_id, {
"status": "finished" if result.get("success") else "failed",
"ended_at": ended_iso,
"time_taken": round(max(0.0, ended - started), 3),
"session_cookie": sc,
"modified_by": modifier,
}))
else:
logger.warning(f"Unsupported target_type: {target_type}")
except BaseException as e:
# 捕获所有异常(包括 TimeLimit 等系统异常),确保任务状态正确更新
ended_iso = datetime.now().isoformat()
time_taken = round(max(0.0, (time.time() - started) if started else 0.0), 3)
error_msg = str(e)
# 判断是否为超时异常
is_timeout = "timeout" in error_msg.lower() or "time limit" in error_msg.lower()
if is_timeout:
error_msg = f"任务执行超时: {error_msg}"
# 状态更新失败不应影响异常传播,使用新的事件循环避免与已中断的循环冲突
status_updated = False
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(_push_status_to_jingrow(job_id, {
"status": "failed",
"ended_at": ended_iso,
"time_taken": time_taken,
"exc_info": error_msg,
"session_cookie": sc,
"modified_by": modifier,
}))
status_updated = True
finally:
loop.close()
except Exception as update_error:
logger.error(f"更新任务 {job_id} 状态失败: {update_error}")
# 状态更新成功则只记录警告,不抛出异常;更新失败才抛出异常
if status_updated:
if is_timeout:
logger.warning(f"任务 {job_id} 执行超时,已更新状态为失败")
else:
logger.warning(f"任务 {job_id} 执行失败: {error_msg}")
return
else:
logger.error(f"任务 {job_id} 执行失败且状态更新失败: {e}")
raise