优化queue.py,删除模拟数据的相关逻辑

This commit is contained in:
jingrow 2025-09-19 00:00:02 +08:00
parent 22f2d5f3cf
commit 674b66f208
2 changed files with 53 additions and 81 deletions

View File

@ -4,14 +4,8 @@ import logging
from typing import Any, Dict, List
import uuid
# 尝试导入 dramatiq如果失败则使用模拟实现
try:
import dramatiq
from dramatiq.brokers.redis import RedisBroker
DRAMATIQ_AVAILABLE = True
except ImportError:
DRAMATIQ_AVAILABLE = False
print("Warning: dramatiq not available, using mock implementation")
import dramatiq
from dramatiq.brokers.redis import RedisBroker
from app.services.executor import NodeExecutor
@ -29,11 +23,6 @@ def init_queue() -> None:
global _broker_initialized
if _broker_initialized:
return
if not DRAMATIQ_AVAILABLE:
logger.warning("Dramatiq not available, skipping queue initialization")
_broker_initialized = True
return
try:
broker = RedisBroker(url=_get_redis_url())
@ -218,7 +207,6 @@ async def _execute_agent_job(payload: Dict[str, Any]) -> Dict[str, Any]:
if not agent_id:
return {"success": False, "error": "agent_id required"}
# 尝试从 Jingrow 获取智能体详情(优先使用 session cookie降级到 API key
try:
from utils.jingrow_api import get_agent_detail
agent = get_agent_detail(agent_id, session_cookie)
@ -247,75 +235,59 @@ async def _execute_agent_job(payload: Dict[str, Any]) -> Dict[str, Any]:
return {"success": False, "error": f"Failed to get agent detail: {str(e)}"}
# 模拟执行函数(当 dramatiq 不可用时)
def _mock_execute_job(job_json: str) -> None:
"""模拟执行任务"""
@dramatiq.actor(max_retries=3, time_limit=60_000)
def execute_local_scheduled_job(job_json: str) -> None:
"""Worker 执行入口job_json 为 JSON 字符串,包含 target_type 及其参数。
target_type: node | flow | agent | function
当前默认实现 node/flowagent/function 可按需扩展
"""
try:
payload = json.loads(job_json)
logger.info(f"Mock executing job: {payload}")
# 这里可以添加实际的执行逻辑
except Exception as e:
logger.error(f"Mock execution failed: {e}")
if DRAMATIQ_AVAILABLE:
@dramatiq.actor(max_retries=3, time_limit=60_000)
def execute_local_scheduled_job(job_json: str) -> None:
"""Worker 执行入口job_json 为 JSON 字符串,包含 target_type 及其参数。
target_type: node | flow | agent | function
当前默认实现 node/flowagent/function 可按需扩展
"""
try:
payload = json.loads(job_json)
except Exception as e:
return
import asyncio
if not payload.get("job_id"):
payload["job_id"] = str(uuid.uuid4())
target_type = payload.get("target_type")
if target_type == "node":
asyncio.run(_execute_node_job(payload))
elif target_type == "flow":
import time
started = time.time()
started_iso = __import__('datetime').datetime.utcnow().isoformat()
sc = payload.get("session_cookie") or (payload.get("context") or {}).get("session_cookie")
update_local_job({ "job_id": payload["job_id"], "status": "started", "started_at": started_iso, "session_cookie": sc })
result = asyncio.run(_execute_flow_job(payload))
ended = time.time()
ended_iso = __import__('datetime').datetime.utcnow().isoformat()
update_local_job({
"job_id": payload["job_id"],
"status": "finished" if result.get("success") else "failed",
"ended_at": ended_iso,
"time_taken": round(max(0.0, ended - started), 3),
"session_cookie": sc,
})
elif target_type == "agent":
import time
started = time.time()
started_iso = __import__('datetime').datetime.utcnow().isoformat()
sc = payload.get("session_cookie") or (payload.get("context") or {}).get("session_cookie")
update_local_job({ "job_id": payload["job_id"], "status": "started", "started_at": started_iso, "session_cookie": sc })
result = asyncio.run(_execute_agent_job(payload))
ended = time.time()
ended_iso = __import__('datetime').datetime.utcnow().isoformat()
update_local_job({
"job_id": payload["job_id"],
"status": "finished" if result.get("success") else "failed",
"ended_at": ended_iso,
"time_taken": round(max(0.0, ended - started), 3),
"session_cookie": sc,
})
else:
logger.warning(f"Unsupported target_type: {target_type}")
else:
# 当 dramatiq 不可用时,使用模拟实现
def execute_local_scheduled_job(job_json: str) -> None:
_mock_execute_job(job_json)
return
import asyncio
if not payload.get("job_id"):
payload["job_id"] = str(uuid.uuid4())
target_type = payload.get("target_type")
if target_type == "node":
asyncio.run(_execute_node_job(payload))
elif target_type == "flow":
import time
started = time.time()
started_iso = __import__('datetime').datetime.utcnow().isoformat()
sc = payload.get("session_cookie") or (payload.get("context") or {}).get("session_cookie")
update_local_job({ "job_id": payload["job_id"], "status": "started", "started_at": started_iso, "session_cookie": sc })
result = asyncio.run(_execute_flow_job(payload))
ended = time.time()
ended_iso = __import__('datetime').datetime.utcnow().isoformat()
update_local_job({
"job_id": payload["job_id"],
"status": "finished" if result.get("success") else "failed",
"ended_at": ended_iso,
"time_taken": round(max(0.0, ended - started), 3),
"session_cookie": sc,
})
elif target_type == "agent":
import time
started = time.time()
started_iso = __import__('datetime').datetime.utcnow().isoformat()
sc = payload.get("session_cookie") or (payload.get("context") or {}).get("session_cookie")
update_local_job({ "job_id": payload["job_id"], "status": "started", "started_at": started_iso, "session_cookie": sc })
result = asyncio.run(_execute_agent_job(payload))
ended = time.time()
ended_iso = __import__('datetime').datetime.utcnow().isoformat()
update_local_job({
"job_id": payload["job_id"],
"status": "finished" if result.get("success") else "failed",
"ended_at": ended_iso,
"time_taken": round(max(0.0, ended - started), 3),
"session_cookie": sc,
})
else:
logger.warning(f"Unsupported target_type: {target_type}")

BIN
dump.rdb

Binary file not shown.