2025-10-24 23:10:22 +08:00

133 lines
4.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import redis
import json
import uuid
import asyncio
from datetime import datetime
from typing import Dict, List, Optional, Any
import logging
logger = logging.getLogger(__name__)
class LocalJobManager:
"""本地Local Job管理器 - 仅用于数据存储"""
def __init__(self):
self.redis = redis.Redis()
self.key = "jingrow:local_jobs"
def create_job(self, job_data: Dict[str, Any]) -> str:
"""创建Local Job仅本地存储"""
job_id = job_data.get('job_id') or str(uuid.uuid4())
# 构建完整的job数据
raw_arguments = job_data.get('arguments', {})
# 统一用多行漂亮 JSON便于 Jingrow code 字段展示
arguments_str = (
raw_arguments if isinstance(raw_arguments, str)
else json.dumps(raw_arguments, ensure_ascii=False, indent=2)
)
full_job_data = {
'job_id': job_id,
'name': job_id,
'queue': job_data.get('queue', 'default'),
'job_name': job_data.get('job_name', job_data.get('target_type', 'unknown')),
'status': job_data.get('status', 'queued'),
'started_at': job_data.get('started_at', ''),
'ended_at': job_data.get('ended_at', ''),
'time_taken': job_data.get('time_taken', ''),
'exc_info': job_data.get('exc_info', ''),
'arguments': arguments_str,
'timeout': job_data.get('timeout', ''),
'creation': datetime.now().isoformat(),
'modified': datetime.now().isoformat(),
'_comment_count': 0,
'owner': job_data.get('owner', 'system'),
'modified_by': job_data.get('modified_by', 'system'),
# 额外字段
'target_type': job_data.get('target_type'),
'flow_id': job_data.get('flow_id'),
'node_type': job_data.get('node_type'),
'session_cookie': job_data.get('session_cookie'),
'result': job_data.get('result'),
'error': job_data.get('error')
}
# 存储到本地Redis
self.redis.hset(self.key, job_id, json.dumps(full_job_data))
logger.info(f"Created local job: {job_id}")
return job_id
def update_job(self, job_id: str, updates: Dict[str, Any]):
"""更新Local Job状态仅本地存储"""
existing = self.get_job(job_id)
if existing:
# 更新字段
# 处理 arguments避免重复序列化
if 'arguments' in updates:
raw = updates.get('arguments')
updates['arguments'] = (
raw if isinstance(raw, str)
else json.dumps((raw or {}), ensure_ascii=False, indent=2)
)
existing.update(updates)
existing['modified'] = datetime.now().isoformat()
# 存储到本地Redis
self.redis.hset(self.key, job_id, json.dumps(existing))
logger.info(f"Updated local job: {job_id} with status: {updates.get('status')}")
else:
logger.warning(f"Job not found for update: {job_id}")
def get_job(self, job_id: str) -> Optional[Dict[str, Any]]:
"""获取单个Job"""
data = self.redis.hget(self.key, job_id)
if data:
return json.loads(data)
return None
def list_jobs(self, page: int = 1, page_size: int = 20, order_by: str = "modified desc") -> Dict[str, Any]:
"""获取Job列表"""
all_jobs = self.redis.hgetall(self.key)
jobs = []
for job_id, job_data in all_jobs.items():
try:
job = json.loads(job_data)
jobs.append(job)
except json.JSONDecodeError:
logger.warning(f"Invalid JSON data for job: {job_id}")
continue
# 按修改时间排序
reverse = "desc" in order_by.lower()
jobs.sort(key=lambda x: x.get('modified', x.get('creation', '')), reverse=reverse)
# 分页
start = (page - 1) * page_size
end = start + page_size
return {
'success': True,
'items': jobs[start:end],
'total': len(jobs),
'page': page,
'page_length': page_size
}
def delete_job(self, job_id: str) -> bool:
"""删除Job"""
result = self.redis.hdel(self.key, job_id)
if result:
logger.info(f"Deleted local job: {job_id}")
return True
return False
def get_count(self) -> int:
"""获取Job总数"""
return self.redis.hlen(self.key)
# 全局实例
local_job_manager = LocalJobManager()