From 8b3f7bcaba493af530d26547c097aaafaf3df673 Mon Sep 17 00:00:00 2001 From: jingrow Date: Mon, 10 Nov 2025 00:36:58 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E9=98=9F=E5=88=97=E6=89=A7?= =?UTF-8?q?=E8=A1=8C=E8=B6=85=E6=97=B6=E6=94=B9=E4=B8=BA600=E7=A7=92?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/jingrow/jingrow/services/queue.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/apps/jingrow/jingrow/services/queue.py b/apps/jingrow/jingrow/services/queue.py index c319828..3a4df88 100644 --- a/apps/jingrow/jingrow/services/queue.py +++ b/apps/jingrow/jingrow/services/queue.py @@ -302,7 +302,7 @@ async def _execute_agent_job(payload: Dict[str, Any]) -> Dict[str, Any]: return {"success": False, "error": f"Failed to get agent detail: {str(e)}"} -@dramatiq.actor(max_retries=0, time_limit=60_000) +@dramatiq.actor(max_retries=0, time_limit=600_000) def execute_local_scheduled_job(job_json: str) -> None: """Worker 执行入口:job_json 为 JSON 字符串,包含 target_type 及其参数。 target_type: agent @@ -352,15 +352,21 @@ def execute_local_scheduled_job(job_json: str) -> None: else: logger.warning(f"Unsupported target_type: {target_type}") except BaseException as e: - # 捕获所有异常(包括 TimeLimit 等系统异常),确保任务状态正确更新 ended_iso = datetime.now().isoformat() time_taken = round(max(0.0, (time.time() - started) if started else 0.0), 3) - error_msg = str(e) - # 判断是否为超时异常 - is_timeout = "timeout" in error_msg.lower() or "time limit" in error_msg.lower() + + exception_type = type(e).__name__ + error_msg = str(e) if str(e) else exception_type + is_timeout = ( + "TimeLimitExceeded" in exception_type or + "timeout" in error_msg.lower() or + "time limit" in error_msg.lower() + ) + if is_timeout: - error_msg = f"任务执行超时: {error_msg}" - # 状态更新失败不应影响异常传播,使用新的事件循环避免与已中断的循环冲突 + error_msg = f"任务执行超时\n异常类型: {exception_type}\n执行时长: {time_taken} 秒" + if str(e) and str(e) != exception_type: + error_msg += f"\n详细信息: {str(e)}" status_updated = False try: loop = asyncio.new_event_loop() @@ -380,7 +386,6 @@ def execute_local_scheduled_job(job_json: str) -> None: except Exception as update_error: logger.error(f"更新任务 {job_id} 状态失败: {update_error}") - # 状态更新成功则只记录警告,不抛出异常;更新失败才抛出异常 if status_updated: if is_timeout: logger.warning(f"任务 {job_id} 执行超时,已更新状态为失败")