任务队列执行超时改为600秒
This commit is contained in:
parent
14cb5d68db
commit
8b3f7bcaba
@ -302,7 +302,7 @@ async def _execute_agent_job(payload: Dict[str, Any]) -> Dict[str, Any]:
|
|||||||
return {"success": False, "error": f"Failed to get agent detail: {str(e)}"}
|
return {"success": False, "error": f"Failed to get agent detail: {str(e)}"}
|
||||||
|
|
||||||
|
|
||||||
@dramatiq.actor(max_retries=0, time_limit=60_000)
|
@dramatiq.actor(max_retries=0, time_limit=600_000)
|
||||||
def execute_local_scheduled_job(job_json: str) -> None:
|
def execute_local_scheduled_job(job_json: str) -> None:
|
||||||
"""Worker 执行入口:job_json 为 JSON 字符串,包含 target_type 及其参数。
|
"""Worker 执行入口:job_json 为 JSON 字符串,包含 target_type 及其参数。
|
||||||
target_type: agent
|
target_type: agent
|
||||||
@ -352,15 +352,21 @@ def execute_local_scheduled_job(job_json: str) -> None:
|
|||||||
else:
|
else:
|
||||||
logger.warning(f"Unsupported target_type: {target_type}")
|
logger.warning(f"Unsupported target_type: {target_type}")
|
||||||
except BaseException as e:
|
except BaseException as e:
|
||||||
# 捕获所有异常(包括 TimeLimit 等系统异常),确保任务状态正确更新
|
|
||||||
ended_iso = datetime.now().isoformat()
|
ended_iso = datetime.now().isoformat()
|
||||||
time_taken = round(max(0.0, (time.time() - started) if started else 0.0), 3)
|
time_taken = round(max(0.0, (time.time() - started) if started else 0.0), 3)
|
||||||
error_msg = str(e)
|
|
||||||
# 判断是否为超时异常
|
exception_type = type(e).__name__
|
||||||
is_timeout = "timeout" in error_msg.lower() or "time limit" in error_msg.lower()
|
error_msg = str(e) if str(e) else exception_type
|
||||||
|
is_timeout = (
|
||||||
|
"TimeLimitExceeded" in exception_type or
|
||||||
|
"timeout" in error_msg.lower() or
|
||||||
|
"time limit" in error_msg.lower()
|
||||||
|
)
|
||||||
|
|
||||||
if is_timeout:
|
if is_timeout:
|
||||||
error_msg = f"任务执行超时: {error_msg}"
|
error_msg = f"任务执行超时\n异常类型: {exception_type}\n执行时长: {time_taken} 秒"
|
||||||
# 状态更新失败不应影响异常传播,使用新的事件循环避免与已中断的循环冲突
|
if str(e) and str(e) != exception_type:
|
||||||
|
error_msg += f"\n详细信息: {str(e)}"
|
||||||
status_updated = False
|
status_updated = False
|
||||||
try:
|
try:
|
||||||
loop = asyncio.new_event_loop()
|
loop = asyncio.new_event_loop()
|
||||||
@ -380,7 +386,6 @@ def execute_local_scheduled_job(job_json: str) -> None:
|
|||||||
except Exception as update_error:
|
except Exception as update_error:
|
||||||
logger.error(f"更新任务 {job_id} 状态失败: {update_error}")
|
logger.error(f"更新任务 {job_id} 状态失败: {update_error}")
|
||||||
|
|
||||||
# 状态更新成功则只记录警告,不抛出异常;更新失败才抛出异常
|
|
||||||
if status_updated:
|
if status_updated:
|
||||||
if is_timeout:
|
if is_timeout:
|
||||||
logger.warning(f"任务 {job_id} 执行超时,已更新状态为失败")
|
logger.warning(f"任务 {job_id} 执行超时,已更新状态为失败")
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user