删除冗余的文件

This commit is contained in:
jingrow 2025-10-30 02:42:38 +08:00
parent b85e98ed2a
commit 3059c81cc9
4 changed files with 46 additions and 347 deletions

View File

@ -16,7 +16,7 @@ from jingrow.config import Config
from jingrow.utils.auth import get_jingrow_api_headers
from jingrow.services.queue import init_queue
from jingrow.services.local_job_manager import local_job_manager
from jingrow.services.hook_executor import create_hook_task, execute_hook_sync
from jingrow.core.hooks import execute_hook, execute_hook_async
from jingrow.utils.jingrow_api import get_record_list, get_record, create_record, update_record, delete_record, get_single_pagetype
import uuid
@ -29,12 +29,28 @@ init_queue()
def execute_hooks(pagetype: str, name: str, hook_name: str, data: Dict[str, Any] = None):
"""执行钩子函数"""
return execute_hook_sync(pagetype, name, hook_name, data)
"""执行钩子函数(同步)"""
try:
# 统一命名page.{pagetype}.{hook_name}
hook_full_name = f"page.{pagetype}.{hook_name}"
execute_hook(hook_full_name, sender=pagetype, page=jingrow.get_pg(pagetype, name), **(data or {}))
return True
except Exception as e:
logger.error(f"同步执行钩子失败: {e}")
return False
def execute_hooks_async(pagetype: str, name: str, hook_name: str, data: Dict[str, Any] = None):
"""异步执行钩子函数"""
return create_hook_task(pagetype, name, hook_name, data)
"""异步执行钩子函数fire-and-forget"""
try:
hook_full_name = f"page.{pagetype}.{hook_name}"
pg = jingrow.get_pg(pagetype, name)
# 异步丢到事件循环
import asyncio
asyncio.create_task(execute_hook_async(hook_full_name, sender=pagetype, page=pg, **(data or {})))
return True
except Exception as e:
logger.error(f"异步执行钩子失败: {e}")
return False
@router.get("/api/data/{pagetype}")

View File

@ -1,159 +0,0 @@
# Copyright (c) 2025, JINGROW
# License: MIT. See LICENSE
from __future__ import annotations
import importlib
import weakref
from functools import cached_property
from typing import Any, Iterable, TYPE_CHECKING, TypeVar
import jingrow
from jingrow.modules import load_pagetype_module
if TYPE_CHECKING:
from jingrow.model.page import Page
D = TypeVar("D", bound="Page")
def get_controller(pagetype: str):
"""
返回给定 PageType 的类对象带本地缓存
本地版不依赖数据库元数据直接走模块加载规则
"""
if not pagetype:
raise ImportError("Empty pagetype")
site_controllers = jingrow.controllers.setdefault(jingrow.local.site, {})
if pagetype not in site_controllers:
site_controllers[pagetype] = import_controller(pagetype)
return site_controllers[pagetype]
def import_controller(pagetype: str):
"""
解析并加载 pagetype 对应的 Python 模块与类
规则对齐云端版
- 模块路径jingrow.<package>.pagetype.<snake>/<snake>.py
- 类名去空格/连字符后的驼峰 "Local Ai Agent" "LocalAIAgent"
- 支持 hooksoverride_pagetype_class可选
"""
module_name = "Core"
# hooks 覆盖(兼容云端约定)
get_hooks = getattr(jingrow, "get_hooks", lambda *a, **k: {})
class_overrides = get_hooks("override_pagetype_class")
module_path = None
classname = pagetype.replace(" ", "").replace("-", "")
if class_overrides and class_overrides.get(pagetype):
import_path = class_overrides[pagetype][-1]
module_path, classname = import_path.rsplit(".", 1)
get_module = getattr(jingrow, "get_module", importlib.import_module)
module = get_module(module_path)
else:
module = load_pagetype_module(pagetype, module_name)
class_ = getattr(module, classname, None)
if class_ is None:
raise ImportError(
pagetype
if module_path is None
else f"{pagetype}: {classname} does not exist in module {module_path}"
)
# 为了保持接口一致,这里不强制继承 BasePage但推荐继承。
return class_
class BasePage:
"""本地版最小可用 BasePage保持与云端关键接口一致。
仅实现 pagetype 机制需要的通用数据容器与子表能力复杂校验与数据库操作
交由现有本地框架jingrow.save_pg / get_pg 处理
"""
_reserved_keywords = frozenset(
(
"pagetype",
"meta",
"flags",
"parent_pg",
"_table_fieldnames",
"_reserved_keywords",
"permitted_fieldnames",
)
)
def __init__(self, d: dict[str, Any]):
if d.get("pagetype"):
self.pagetype = d["pagetype"]
self.flags = jingrow._dict()
self._table_fieldnames = set()
self.update(d)
if hasattr(self, "__setup__"):
self.__setup__()
@cached_property
def meta(self):
# 需要元数据时,走本地 jingrow.get_meta
return jingrow.get_meta(self.pagetype)
def update(self, d: dict[str, Any]):
if "name" in d:
self.name = d["name"]
for key, value in d.items():
self.set(key, value)
return self
def get(self, key: str, default: Any = None):
return self.__dict__.get(key, default)
def set(self, key: str, value: Any):
if key in self._reserved_keywords:
return
self.__dict__[key] = value
def append(self, key: str, value: D | dict | None = None) -> D:
if value is None:
value = {}
if (table := self.__dict__.get(key)) is None:
self.__dict__[key] = table = []
d = self._init_child(value, key)
table.append(d)
d.parent_pg = weakref.ref(self)
return d
def extend(self, key: str, value: Iterable[dict | D]):
for v in value:
self.append(key, v)
def _init_child(self, value: dict | D, key: str):
if not isinstance(value, BasePage):
if not (pagetype := self.get_table_field_pagetype(key)):
raise AttributeError(key)
value["pagetype"] = pagetype
child_cls = get_controller(pagetype)
value = child_cls(value)
value.parent = getattr(self, "name", None)
value.parenttype = self.pagetype
value.parentfield = key
if not getattr(value, "idx", None):
table = getattr(self, key, None) or []
value.idx = len(table) + 1
if not getattr(value, "name", None):
value.__dict__["__islocal"] = 1
return value
def get_table_field_pagetype(self, fieldname: str) -> str | None:
try:
return self.meta.get_field(fieldname).options
except Exception:
return None

View File

@ -6,45 +6,38 @@ from __future__ import annotations
from typing import Any, Iterable
import jingrow
from jingrow.model.base_page import BasePage, get_controller
def get_pg(*args, **kwargs):
"""本地版 get_pg兼容云端签名。
class Page():
"""最小可用 Page本地数据容器 + 钩子触发"""
支持
- get_pg("PageType", name)
- get_pg({"pagetype": "PageType", ...})
- get_pg(pagetype="PageType", field=...)
"""
if args:
if isinstance(args[0], BasePage):
return args[0]
elif isinstance(args[0], str):
pagetype = args[0]
elif isinstance(args[0], dict):
kwargs = args[0]
else:
raise ValueError("First non keyword argument must be a string or dict")
def __init__(self, d: dict[str, Any] | None = None):
if d is None:
d = {}
if not isinstance(d, dict):
raise ValueError("Page expects a dict payload")
# 基础字段
self.pagetype = d.get("pagetype")
if "name" in d:
self.name = d["name"]
# flags 与数据
self.flags = jingrow._dict()
self.update(d)
if len(args) < 2 and kwargs:
if "pagetype" in kwargs:
pagetype = kwargs["pagetype"]
else:
raise ValueError('"pagetype" is a required key')
@property
def meta(self):
return jingrow.get_meta(self.pagetype)
controller = get_controller(pagetype)
if controller:
return controller(*args, **kwargs)
def update(self, d: dict[str, Any]):
for key, value in d.items():
self.set(key, value)
return self
raise ImportError(pagetype)
def get(self, key: str, default: Any = None):
return self.__dict__.get(key, default)
class Page(BasePage):
"""最小可用 Page复用 jingrow 本地数据库 APIsave_pg等
仅保留 pagetype 生命周期接口insert/save/delete/run_method
集成现代化的 hooks 系统
"""
def set(self, key: str, value: Any):
self.__dict__[key] = value
def insert(self, **kwargs) -> "Page":
"""插入页面,触发相关钩子"""

View File

@ -1,151 +0,0 @@
# Copyright (c) 2025, JINGROW and contributors
# For license information, please see license.txt
"""
钩子执行器 - 基于现有消息队列的钩子执行系统
"""
import json
import logging
import uuid
import dramatiq
from typing import Dict, Any
from jingrow.services.local_job_manager import local_job_manager
from jingrow.utils.jingrow_api import get_logged_user
logger = logging.getLogger(__name__)
@dramatiq.actor(max_retries=3, time_limit=30_000)
def execute_hook_task(hook_data: str) -> None:
"""执行钩子任务"""
try:
data = json.loads(hook_data)
pagetype = data.get('pagetype')
name = data.get('name')
hook_name = data.get('hook_name')
hook_args = data.get('data', {})
if not all([pagetype, name, hook_name]):
logger.error(f"钩子任务参数不完整: {data}")
return
logger.info(f"开始执行钩子任务: {pagetype}.{name}.{hook_name}")
# 更新任务状态为执行中
job_id = data.get('job_id', str(uuid.uuid4()))
local_job_manager.update_job(job_id, {
'status': 'started',
'started_at': __import__('datetime').datetime.now().isoformat()
})
# 获取Page对象并执行钩子
import jingrow
pg = jingrow.get_pg(pagetype, name)
if pg and hasattr(pg, hook_name):
hook_func = getattr(pg, hook_name)
if callable(hook_func):
# 执行钩子函数
result = hook_func(**hook_args)
# 更新任务状态为完成
local_job_manager.update_job(job_id, {
'status': 'finished',
'ended_at': __import__('datetime').datetime.now().isoformat(),
'result': str(result) if result is not None else 'None'
})
logger.info(f"钩子执行成功: {pagetype}.{name}.{hook_name}")
else:
logger.warning(f"钩子函数不可调用: {pagetype}.{name}.{hook_name}")
local_job_manager.update_job(job_id, {
'status': 'failed',
'ended_at': __import__('datetime').datetime.now().isoformat(),
'exc_info': f"钩子函数不可调用: {hook_name}"
})
else:
logger.warning(f"钩子函数不存在: {pagetype}.{name}.{hook_name}")
local_job_manager.update_job(job_id, {
'status': 'failed',
'ended_at': __import__('datetime').datetime.now().isoformat(),
'exc_info': f"钩子函数不存在: {hook_name}"
})
except Exception as e:
logger.error(f"执行钩子任务失败: {e}")
job_id = data.get('job_id', 'unknown')
local_job_manager.update_job(job_id, {
'status': 'failed',
'ended_at': __import__('datetime').datetime.now().isoformat(),
'exc_info': str(e)
})
def create_hook_task(pagetype: str, name: str, hook_name: str, data: Dict[str, Any] = None, session_cookie: str = None) -> str:
"""创建钩子任务"""
try:
# 生成任务ID
job_id = str(uuid.uuid4())
# 构建任务数据
task_data = {
'job_id': job_id,
'pagetype': pagetype,
'name': name,
'hook_name': hook_name,
'data': data or {},
'session_cookie': session_cookie
}
# 获取当前用户
creator = get_logged_user(session_cookie) or 'system'
# 创建本地任务记录
local_job_manager.create_job({
'job_id': job_id,
'target_type': 'hook',
'pagetype': pagetype,
'name': name,
'hook_name': hook_name,
'status': 'queued',
'queue': 'default',
'job_name': f"Hook: {pagetype}.{name}.{hook_name}",
'arguments': json.dumps(task_data, indent=2, ensure_ascii=False),
'owner': creator,
'modified_by': creator
})
# 发送到消息队列
execute_hook_task.send(json.dumps(task_data))
logger.info(f"钩子任务已创建: {job_id} - {pagetype}.{name}.{hook_name}")
return job_id
except Exception as e:
logger.error(f"创建钩子任务失败: {e}")
return None
def execute_hook_sync(pagetype: str, name: str, hook_name: str, data: Dict[str, Any] = None) -> bool:
"""同步执行钩子函数"""
try:
import jingrow
# 对于新记录创建,记录可能还不存在,需要特殊处理
if hook_name == "before_insert":
logger.warning(f"跳过 before_insert 钩子,因为记录 {pagetype}.{name} 还不存在")
return False
pg = jingrow.get_pg(pagetype, name)
if pg and hasattr(pg, hook_name):
hook_func = getattr(pg, hook_name)
if callable(hook_func):
logger.info(f"同步执行钩子: {pagetype}.{name}.{hook_name}")
hook_func(**(data or {}))
return True
logger.warning(f"钩子函数不存在或不可调用: {pagetype}.{name}.{hook_name}")
return False
except Exception as e:
logger.error(f"同步执行钩子失败: {e}")
return False