diff --git a/apps/jingrow/jingrow/services/queue.py b/apps/jingrow/jingrow/services/queue.py index c319828..3a4df88 100644 --- a/apps/jingrow/jingrow/services/queue.py +++ b/apps/jingrow/jingrow/services/queue.py @@ -302,7 +302,7 @@ async def _execute_agent_job(payload: Dict[str, Any]) -> Dict[str, Any]: return {"success": False, "error": f"Failed to get agent detail: {str(e)}"} -@dramatiq.actor(max_retries=0, time_limit=60_000) +@dramatiq.actor(max_retries=0, time_limit=600_000) def execute_local_scheduled_job(job_json: str) -> None: """Worker 执行入口:job_json 为 JSON 字符串,包含 target_type 及其参数。 target_type: agent @@ -352,15 +352,21 @@ def execute_local_scheduled_job(job_json: str) -> None: else: logger.warning(f"Unsupported target_type: {target_type}") except BaseException as e: - # 捕获所有异常(包括 TimeLimit 等系统异常),确保任务状态正确更新 ended_iso = datetime.now().isoformat() time_taken = round(max(0.0, (time.time() - started) if started else 0.0), 3) - error_msg = str(e) - # 判断是否为超时异常 - is_timeout = "timeout" in error_msg.lower() or "time limit" in error_msg.lower() + + exception_type = type(e).__name__ + error_msg = str(e) if str(e) else exception_type + is_timeout = ( + "TimeLimitExceeded" in exception_type or + "timeout" in error_msg.lower() or + "time limit" in error_msg.lower() + ) + if is_timeout: - error_msg = f"任务执行超时: {error_msg}" - # 状态更新失败不应影响异常传播,使用新的事件循环避免与已中断的循环冲突 + error_msg = f"任务执行超时\n异常类型: {exception_type}\n执行时长: {time_taken} 秒" + if str(e) and str(e) != exception_type: + error_msg += f"\n详细信息: {str(e)}" status_updated = False try: loop = asyncio.new_event_loop() @@ -380,7 +386,6 @@ def execute_local_scheduled_job(job_json: str) -> None: except Exception as update_error: logger.error(f"更新任务 {job_id} 状态失败: {update_error}") - # 状态更新成功则只记录警告,不抛出异常;更新失败才抛出异常 if status_updated: if is_timeout: logger.warning(f"任务 {job_id} 执行超时,已更新状态为失败")