diff --git a/apps/jingrow/jingrow/services/queue.py b/apps/jingrow/jingrow/services/queue.py index 7a62dfb..c319828 100644 --- a/apps/jingrow/jingrow/services/queue.py +++ b/apps/jingrow/jingrow/services/queue.py @@ -1,8 +1,11 @@ import os import json import logging -from typing import Any, Dict, List +import time +import asyncio import uuid +from datetime import datetime +from typing import Any, Dict, List import dramatiq from dramatiq.brokers.redis import RedisBroker @@ -92,9 +95,8 @@ async def _execute_node_job(payload: Dict[str, Any]) -> Dict[str, Any]: job_id = payload.get("job_id") started_iso = None try: - import time started = time.time() - started_iso = __import__('datetime').datetime.now().isoformat() + started_iso = datetime.now().isoformat() modifier = get_logged_user(session_cookie) or 'system' await _push_status_to_jingrow(job_id, { 'status': 'started', @@ -103,7 +105,7 @@ async def _execute_node_job(payload: Dict[str, Any]) -> Dict[str, Any]: }) result = await executor.execute_node(node_type, flow_id, context, inputs, config, session_cookie) ended = time.time() - ended_iso = __import__('datetime').datetime.now().isoformat() + ended_iso = datetime.now().isoformat() await _push_status_to_jingrow(job_id, { 'status': 'finished' if result.get('success') else 'failed', 'exc_info': None if result.get('success') else result.get('error'), @@ -114,7 +116,7 @@ async def _execute_node_job(payload: Dict[str, Any]) -> Dict[str, Any]: }) return result except Exception as e: - ended_iso = __import__('datetime').datetime.now().isoformat() + ended_iso = datetime.now().isoformat() modifier = get_logged_user(session_cookie) or 'system' await _push_status_to_jingrow(job_id, { 'status': 'failed', @@ -300,10 +302,11 @@ async def _execute_agent_job(payload: Dict[str, Any]) -> Dict[str, Any]: return {"success": False, "error": f"Failed to get agent detail: {str(e)}"} -@dramatiq.actor(max_retries=3, time_limit=60_000) +@dramatiq.actor(max_retries=0, time_limit=60_000) def execute_local_scheduled_job(job_json: str) -> None: """Worker 执行入口:job_json 为 JSON 字符串,包含 target_type 及其参数。 target_type: agent + 超时不重试(max_retries=0) """ try: init_hooks(clear_cache=False) @@ -313,38 +316,79 @@ def execute_local_scheduled_job(job_json: str) -> None: try: payload = json.loads(job_json) except Exception as e: + logger.error(f"解析任务 JSON 失败: {e}") return - import asyncio - if not payload.get("job_id"): payload["job_id"] = str(uuid.uuid4()) target_type = payload.get("target_type") + job_id = payload["job_id"] + sc = payload.get("session_cookie") or (payload.get("context") or {}).get("session_cookie") + modifier = get_logged_user(sc) or 'system' + started = None + started_iso = None - if target_type == "agent": - import time - started = time.time() - started_iso = __import__('datetime').datetime.now().isoformat() - sc = payload.get("session_cookie") or (payload.get("context") or {}).get("session_cookie") - modifier = get_logged_user(sc) or 'system' - asyncio.run(_push_status_to_jingrow(payload["job_id"], { - "status": "started", - "started_at": started_iso, - "session_cookie": sc, - "modified_by": modifier - })) - result = asyncio.run(_execute_agent_job(payload)) - ended = time.time() - ended_iso = __import__('datetime').datetime.now().isoformat() - asyncio.run(_push_status_to_jingrow(payload["job_id"], { - "status": "finished" if result.get("success") else "failed", - "ended_at": ended_iso, - "time_taken": round(max(0.0, ended - started), 3), - "session_cookie": sc, - "modified_by": modifier, - })) - else: - logger.warning(f"Unsupported target_type: {target_type}") + try: + if target_type == "agent": + started = time.time() + started_iso = datetime.now().isoformat() + asyncio.run(_push_status_to_jingrow(job_id, { + "status": "started", + "started_at": started_iso, + "session_cookie": sc, + "modified_by": modifier + })) + result = asyncio.run(_execute_agent_job(payload)) + ended = time.time() + ended_iso = datetime.now().isoformat() + asyncio.run(_push_status_to_jingrow(job_id, { + "status": "finished" if result.get("success") else "failed", + "ended_at": ended_iso, + "time_taken": round(max(0.0, ended - started), 3), + "session_cookie": sc, + "modified_by": modifier, + })) + else: + logger.warning(f"Unsupported target_type: {target_type}") + except BaseException as e: + # 捕获所有异常(包括 TimeLimit 等系统异常),确保任务状态正确更新 + ended_iso = datetime.now().isoformat() + time_taken = round(max(0.0, (time.time() - started) if started else 0.0), 3) + error_msg = str(e) + # 判断是否为超时异常 + is_timeout = "timeout" in error_msg.lower() or "time limit" in error_msg.lower() + if is_timeout: + error_msg = f"任务执行超时: {error_msg}" + # 状态更新失败不应影响异常传播,使用新的事件循环避免与已中断的循环冲突 + status_updated = False + try: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + loop.run_until_complete(_push_status_to_jingrow(job_id, { + "status": "failed", + "ended_at": ended_iso, + "time_taken": time_taken, + "exc_info": error_msg, + "session_cookie": sc, + "modified_by": modifier, + })) + status_updated = True + finally: + loop.close() + except Exception as update_error: + logger.error(f"更新任务 {job_id} 状态失败: {update_error}") + + # 状态更新成功则只记录警告,不抛出异常;更新失败才抛出异常 + if status_updated: + if is_timeout: + logger.warning(f"任务 {job_id} 执行超时,已更新状态为失败") + else: + logger.warning(f"任务 {job_id} 执行失败: {error_msg}") + return + else: + logger.error(f"任务 {job_id} 执行失败且状态更新失败: {e}") + raise