diff --git a/apps/jingrow/jingrow/core/hooks/__init__.py b/apps/jingrow/jingrow/core/hooks/__init__.py new file mode 100644 index 0000000..be99131 --- /dev/null +++ b/apps/jingrow/jingrow/core/hooks/__init__.py @@ -0,0 +1,63 @@ +# Copyright (c) 2025, JINGROW and contributors +# For license information, please see license.txt + +""" +Jingrow Hooks System - 现代化的钩子系统 + +特性: +- 配置式注册(hooks.py) +- 装饰器注册 +- 同步/异步执行 +- 优先级支持 +- 条件过滤 +- 类型安全 +- 易于测试 + +快速开始: + +1. 配置式注册 (推荐用于应用开发): + 在 hooks.py 中: + pg_events = { + "User": { + "after_insert": "myapp.hooks.send_welcome_email", + } + } + +2. 装饰器注册 (推荐用于代码中): + from jingrow.core.hooks import hook, Priority + + @hook('page.User.on_update', sender='User', priority=Priority.HIGH) + def handle_user_update(page, **kwargs): + print(f"User {page.name} updated") + +3. 发送钩子: + from jingrow.core.hooks import execute_hook + + execute_hook('page.User.on_update', sender='User', page=page_obj) +""" + +from .signal import Signal, signal, Priority +from .registry import HookRegistry, registry +from .loader import HookLoader, load_app_hooks, register_loaded_hooks +from .executor import HookExecutor, execute_hook, execute_hook_async +from .decorators import hook, register_hook + +__all__ = [ + # Core classes + "Signal", + "signal", + "Priority", + "HookRegistry", + "registry", + "HookLoader", + "load_app_hooks", + "register_loaded_hooks", + "HookExecutor", + "execute_hook", + "execute_hook_async", + # Decorators + "hook", + "register_hook", +] + +__version__ = "1.0.0" diff --git a/apps/jingrow/jingrow/core/hooks/decorators.py b/apps/jingrow/jingrow/core/hooks/decorators.py new file mode 100644 index 0000000..6f7631e --- /dev/null +++ b/apps/jingrow/jingrow/core/hooks/decorators.py @@ -0,0 +1,92 @@ +# Copyright (c) 2025, JINGROW and contributors +# For license information, please see license.txt + +""" +钩子装饰器 - 提供便捷的装饰器语法 +""" + +import functools +from typing import Callable, Optional, Any +from .registry import registry +from .signal import Priority + +logger = None # 延迟导入避免循环 + + +def hook( + hook_name: str, + *, + sender: Optional[str] = None, + priority: Priority = Priority.NORMAL, + condition: Optional[Callable] = None, + once: bool = False +): + """ + 钩子装饰器 - 注册函数为钩子处理器 + + 用法: + @hook('page.User.on_update', sender='User', priority=Priority.HIGH) + def handle_user_update(page, **kwargs): + print(f"User {page.name} updated") + + @hook('scheduler.daily', condition=lambda: datetime.now().hour == 0) + def daily_task(**kwargs): + # 只在午夜执行 + pass + """ + def decorator(func: Callable) -> Callable: + registry.connect( + hook_name, + func, + sender=sender, + priority=priority, + condition=condition, + once=once + ) + return func + + return decorator + + +def register_hook( + hook_name: str, + handler: Optional[Callable] = None, + *, + sender: Optional[str] = None, + priority: Priority = Priority.NORMAL, + condition: Optional[Callable] = None, + once: bool = False +): + """ + 注册钩子(可用作装饰器或函数调用) + + 用法: + # 装饰器方式 + @register_hook('page.User.on_update', sender='User') + def handler(page, **kwargs): + pass + + # 函数调用方式 + register_hook('page.User.on_update', my_handler, sender='User') + """ + if handler is None: + # 用作装饰器 + return hook( + hook_name, + sender=sender, + priority=priority, + condition=condition, + once=once + ) + + # 直接注册 + registry.connect( + hook_name, + handler, + sender=sender, + priority=priority, + condition=condition, + once=once + ) + return handler + diff --git a/apps/jingrow/jingrow/core/hooks/executor.py b/apps/jingrow/jingrow/core/hooks/executor.py new file mode 100644 index 0000000..75aa308 --- /dev/null +++ b/apps/jingrow/jingrow/core/hooks/executor.py @@ -0,0 +1,201 @@ +# Copyright (c) 2025, JINGROW and contributors +# For license information, please see license.txt + +""" +钩子执行器 - 执行钩子的便捷函数和类 +支持同步/异步执行、错误处理、性能监控 +""" + +import logging +import time +import asyncio +from typing import Dict, List, Any, Optional, Callable +from .registry import registry + +logger = logging.getLogger(__name__) + + +class HookExecutor: + """ + 钩子执行器 - 提供统一的钩子执行接口 + + 特性: + - 同步/异步执行 + - 错误处理 + - 性能监控 + - 批量执行 + """ + + @staticmethod + def execute( + hook_name: str, + sender: Optional[str] = None, + raise_on_error: bool = False, + **kwargs + ) -> List[Any]: + """ + 同步执行钩子 + + :param hook_name: 钩子名称 + :param sender: 发送者类型 + :param raise_on_error: 是否在错误时抛出异常 + :param kwargs: 钩子参数 + :return: 所有处理器返回值的列表 + """ + start_time = time.time() + + try: + results = registry.send(hook_name, sender=sender, **kwargs) + + elapsed = time.time() - start_time + if elapsed > 0.1: # 超过100ms记录警告 + logger.warning(f"钩子执行较慢: {hook_name} ({elapsed:.3f}s)") + + return results + + except Exception as e: + logger.error(f"执行钩子失败: {hook_name} - {e}", exc_info=True) + if raise_on_error: + raise + return [] + + @staticmethod + async def execute_async( + hook_name: str, + sender: Optional[str] = None, + raise_on_error: bool = False, + **kwargs + ) -> List[Any]: + """ + 异步执行钩子 + + :param hook_name: 钩子名称 + :param sender: 发送者类型 + :param raise_on_error: 是否在错误时抛出异常 + :param kwargs: 钩子参数 + :return: 所有处理器返回值的列表 + """ + start_time = time.time() + + try: + results = await registry.send_async(hook_name, sender=sender, **kwargs) + + elapsed = time.time() - start_time + if elapsed > 0.1: # 超过100ms记录警告 + logger.warning(f"异步钩子执行较慢: {hook_name} ({elapsed:.3f}s)") + + return results + + except Exception as e: + logger.error(f"异步执行钩子失败: {hook_name} - {e}", exc_info=True) + if raise_on_error: + raise + return [] + + @staticmethod + def execute_batch(hooks: List[Dict[str, Any]], raise_on_error: bool = False) -> Dict[str, List[Any]]: + """ + 批量执行钩子 + + :param hooks: 钩子列表,每个元素为 {"name": "...", "sender": "...", **kwargs} + :param raise_on_error: 是否在错误时抛出异常 + :return: 执行结果字典 {hook_name: results} + """ + results = {} + + for hook_config in hooks: + hook_name = hook_config.pop('name') + sender = hook_config.pop('sender', None) + + try: + results[hook_name] = HookExecutor.execute( + hook_name, + sender=sender, + raise_on_error=raise_on_error, + **hook_config + ) + except Exception as e: + logger.error(f"批量执行钩子失败: {hook_name} - {e}") + results[hook_name] = [] + if raise_on_error: + raise + + return results + + @staticmethod + async def execute_batch_async( + hooks: List[Dict[str, Any]], + raise_on_error: bool = False + ) -> Dict[str, List[Any]]: + """ + 异步批量执行钩子 + + :param hooks: 钩子列表 + :param raise_on_error: 是否在错误时抛出异常 + :return: 执行结果字典 + """ + results = {} + + # 并行执行所有钩子 + tasks = [] + for hook_config in hooks: + hook_name = hook_config['name'] + sender = hook_config.get('sender') + kwargs = {k: v for k, v in hook_config.items() if k not in ('name', 'sender')} + + task = HookExecutor.execute_async( + hook_name, + sender=sender, + raise_on_error=raise_on_error, + **kwargs + ) + tasks.append((hook_name, task)) + + # 等待所有任务完成 + for hook_name, task in tasks: + try: + results[hook_name] = await task + except Exception as e: + logger.error(f"异步批量执行钩子失败: {hook_name} - {e}") + results[hook_name] = [] + if raise_on_error: + raise + + return results + + +# 便捷函数 +def execute_hook( + hook_name: str, + sender: Optional[str] = None, + raise_on_error: bool = False, + **kwargs +) -> List[Any]: + """ + 同步执行钩子(便捷函数) + + 示例: + results = execute_hook('page.User.on_update', sender='User', page=page_obj) + """ + return HookExecutor.execute(hook_name, sender=sender, raise_on_error=raise_on_error, **kwargs) + + +async def execute_hook_async( + hook_name: str, + sender: Optional[str] = None, + raise_on_error: bool = False, + **kwargs +) -> List[Any]: + """ + 异步执行钩子(便捷函数) + + 示例: + results = await execute_hook_async('page.User.on_update', sender='User', page=page_obj) + """ + return await HookExecutor.execute_async( + hook_name, + sender=sender, + raise_on_error=raise_on_error, + **kwargs + ) + diff --git a/apps/jingrow/jingrow/core/hooks/init_hooks.py b/apps/jingrow/jingrow/core/hooks/init_hooks.py new file mode 100644 index 0000000..82e05f2 --- /dev/null +++ b/apps/jingrow/jingrow/core/hooks/init_hooks.py @@ -0,0 +1,37 @@ +# Copyright (c) 2025, JINGROW and contributors +# For license information, please see license.txt + +""" +钩子系统初始化 - 在框架启动时自动加载和应用钩子 +""" + +import logging +from typing import Optional +from .loader import register_loaded_hooks + +logger = logging.getLogger(__name__) + + +def init_hooks(app_name: Optional[str] = None, clear_cache: bool = True) -> None: + """ + 初始化钩子系统 - 加载并注册所有应用的钩子 + + :param app_name: 应用名称,如果为 None 则加载所有应用 + :param clear_cache: 是否清除缓存 + """ + if clear_cache: + from .loader import _loader_instance + _loader_instance.clear_cache() + + try: + register_loaded_hooks(app_name) + logger.info(f"钩子系统初始化完成" + (f" (应用: {app_name})" if app_name else "")) + except Exception as e: + logger.error(f"钩子系统初始化失败: {e}", exc_info=True) + raise + + +def reload_hooks(app_name: Optional[str] = None) -> None: + """重新加载钩子(开发模式)""" + init_hooks(app_name, clear_cache=True) + diff --git a/apps/jingrow/jingrow/core/hooks/loader.py b/apps/jingrow/jingrow/core/hooks/loader.py new file mode 100644 index 0000000..7d0bc24 --- /dev/null +++ b/apps/jingrow/jingrow/core/hooks/loader.py @@ -0,0 +1,279 @@ +# Copyright (c) 2025, JINGROW and contributors +# For license information, please see license.txt + +""" +钩子加载器 - 从 hooks.py 配置文件加载钩子 +支持应用级钩子配置 +""" + +import os +import importlib +import inspect +import types +import logging +from typing import Dict, List, Any, Optional, Union +from .registry import registry + +# 受支持的 hooks.py 键(仅这些键会被加载与注册) +SUPPORTED_HOOK_KEYS = { + 'pg_events', + 'scheduler_events', + 'before_request', + 'after_request', + 'before_job', + 'after_job', +} + +logger = logging.getLogger(__name__) + + +class HookLoader: + """ + 钩子加载器 - 加载 hooks.py 配置文件中的钩子 + + 支持的钩子类型: + - pg_events: 页面生命周期钩子 + - scheduler_events: 定时任务钩子 + - before_request / after_request: 请求钩子 + - before_job / after_job: 后台任务钩子 + - 其他自定义钩子 + """ + + def __init__(self): + self._loaded_apps: set = set() + self._cache: Dict[str, Any] = {} + + def load_app_hooks(self, app_name: Optional[str] = None) -> Dict[str, Any]: + """ + 加载应用钩子配置 + + :param app_name: 应用名称,如果为 None 则加载所有已安装应用 + :return: 钩子配置字典 + """ + if app_name: + return self._load_single_app_hooks(app_name) + + # 加载所有应用的钩子 + hooks = {} + + # 获取已安装应用列表 + try: + from jingrow.utils.app_manager import get_installed_apps + installed_apps = get_installed_apps() + except Exception: + # 如果获取失败,至少加载 jingrow + installed_apps = ['jingrow'] + + for app in installed_apps: + try: + app_hooks = self._load_single_app_hooks(app) + # 合并钩子(后加载的应用钩子会追加到列表中) + for key, value in app_hooks.items(): + if key in hooks: + # 如果已存在,追加到列表 + if isinstance(hooks[key], list) and isinstance(value, list): + hooks[key].extend(value) + elif isinstance(hooks[key], list): + hooks[key].append(value) + elif isinstance(value, list): + hooks[key] = [hooks[key]] + value + else: + hooks[key] = [hooks[key], value] + else: + hooks[key] = value + except Exception as e: + logger.warning(f"加载应用 {app} 的钩子失败: {e}") + + return hooks + + def _load_single_app_hooks(self, app_name: str) -> Dict[str, Any]: + """加载单个应用的钩子""" + cache_key = f"app_hooks_{app_name}" + + # 检查缓存 + if cache_key in self._cache: + return self._cache[cache_key] + + try: + # 导入 hooks 模块 + hooks_module = importlib.import_module(f"{app_name}.hooks") + + # 仅提取受支持的钩子键,忽略应用元信息(如 app_name、app_title 等) + hooks: Dict[str, Any] = {} + + for key in SUPPORTED_HOOK_KEYS: + if hasattr(hooks_module, key): + hooks[key] = getattr(hooks_module, key) + + logger.debug(f"加载应用 {app_name} 的钩子: {list(hooks.keys())}") + + # 缓存结果 + self._cache[cache_key] = hooks + + return hooks + + except ImportError as e: + logger.debug(f"应用 {app_name} 没有 hooks.py 模块: {e}") + return {} + except Exception as e: + logger.error(f"加载应用 {app_name} 的钩子失败: {e}", exc_info=True) + return {} + + def register_loaded_hooks(self, app_name: Optional[str] = None) -> None: + """ + 注册已加载的钩子到注册表 + + :param app_name: 应用名称,如果为 None 则注册所有应用 + """ + hooks_config = self.load_app_hooks(app_name) + + # 处理 pg_events(页面生命周期钩子) + if 'pg_events' in hooks_config: + self._register_pg_events(hooks_config['pg_events']) + + # 处理 scheduler_events(定时任务钩子) + if 'scheduler_events' in hooks_config: + self._register_scheduler_events(hooks_config['scheduler_events']) + + # 处理其他受支持的通用钩子(目前仅支持 request/job 级别) + for hook_name in ('before_request', 'after_request', 'before_job', 'after_job'): + hook_value = hooks_config.get(hook_name) + if not hook_value: + continue + # 为这些钩子创建统一命名并注册 + signal = registry.register(f"{hook_name}") + if isinstance(hook_value, list): + self._register_handlers(signal, hook_value) + elif isinstance(hook_value, str): + self._register_handler(signal, hook_value) + + def _register_pg_events(self, pg_events: Dict[str, Any]) -> None: + """注册页面生命周期钩子""" + for pagetype, events in pg_events.items(): + if isinstance(events, dict): + for event_name, handlers in events.items(): + # 创建钩子名称:如 "page.User.on_update" + hook_name = f"page.{pagetype}.{event_name}" + signal = registry.register(hook_name) + + # 注册处理器 + if isinstance(handlers, list): + for handler_path in handlers: + self._register_handler(signal, handler_path, sender=pagetype) + elif isinstance(handlers, str): + self._register_handler(signal, handlers, sender=pagetype) + + def _register_scheduler_events(self, scheduler_events: Dict[str, Any]) -> None: + """注册定时任务钩子""" + for event_type, handlers in scheduler_events.items(): + if event_type == 'cron': + # Cron 表达式钩子 + for cron_expr, handler_list in handlers.items(): + hook_name = f"scheduler.cron.{cron_expr}" + signal = registry.register(hook_name) + self._register_handlers(signal, handler_list) + else: + # 其他定时任务类型(hourly, daily, weekly 等) + hook_name = f"scheduler.{event_type}" + signal = registry.register(hook_name) + self._register_handlers(signal, handlers) + + def _register_hook(self, hook_name: str, hook_value: Any) -> None: + """注册通用钩子""" + signal = registry.register(hook_name) + + if isinstance(hook_value, list): + self._register_handlers(signal, hook_value) + elif isinstance(hook_value, str): + self._register_handler(signal, hook_value) + else: + # 其他类型的值,可能需要特殊处理 + logger.warning(f"不支持的钩子值类型: {hook_name}={type(hook_value)}") + + def _register_handlers(self, signal: Any, handlers: List[str]) -> None: + """注册多个处理器""" + for handler_path in handlers: + self._register_handler(signal, handler_path) + + def _register_handler(self, signal: Any, handler_path: str, sender: Optional[str] = None) -> None: + """ + 注册单个处理器 + + :param signal: Signal 对象 + :param handler_path: 处理器路径,如 "app.module.function" + :param sender: 发送者类型 + """ + try: + # 动态导入处理器函数 + handler = self._import_handler(handler_path) + if handler: + signal.connect(handler=handler, sender=sender) + logger.debug(f"注册处理器: {handler_path} -> {signal}") + except Exception as e: + logger.error(f"注册处理器失败 {handler_path}: {e}", exc_info=True) + + def _import_handler(self, handler_path: str) -> Optional[Any]: + """导入处理器函数""" + try: + # 分割模块路径和函数名 + parts = handler_path.rsplit('.', 1) + if len(parts) != 2: + logger.error(f"无效的处理器路径格式: {handler_path}") + return None + + module_path, function_name = parts + module = importlib.import_module(module_path) + handler = getattr(module, function_name, None) + + if handler is None: + logger.error(f"处理器函数不存在: {handler_path}") + return None + + if not callable(handler): + logger.error(f"处理器不是可调用对象: {handler_path}") + return None + + return handler + + except ImportError as e: + logger.error(f"导入处理器模块失败: {handler_path} - {e}") + return None + except Exception as e: + logger.error(f"导入处理器失败: {handler_path} - {e}") + return None + + def clear_cache(self, app_name: Optional[str] = None) -> None: + """清除缓存""" + if app_name: + cache_key = f"app_hooks_{app_name}" + self._cache.pop(cache_key, None) + else: + self._cache.clear() + + +# 全局加载器实例 +_loader_instance = HookLoader() + + +def load_app_hooks(app_name: Optional[str] = None) -> Dict[str, Any]: + """ + 便捷函数:加载应用钩子配置 + + :param app_name: 应用名称 + :return: 钩子配置字典 + """ + return _loader_instance.load_app_hooks(app_name) + + +def register_loaded_hooks(app_name: Optional[str] = None) -> None: + """ + 便捷函数:注册已加载的钩子 + + :param app_name: 应用名称 + """ + _loader_instance.register_loaded_hooks(app_name) + + +# 保留 HookLoader 类定义以便导入 +# HookLoader 已在上面定义 + diff --git a/apps/jingrow/jingrow/core/hooks/registry.py b/apps/jingrow/jingrow/core/hooks/registry.py new file mode 100644 index 0000000..e7d604a --- /dev/null +++ b/apps/jingrow/jingrow/core/hooks/registry.py @@ -0,0 +1,166 @@ +# Copyright (c) 2025, JINGROW and contributors +# For license information, please see license.txt + +""" +钩子注册表 - 管理所有钩子信号 +支持按名称注册和获取钩子 +""" + +import logging +from typing import Dict, Optional, Any +from .signal import Signal, Priority + +logger = logging.getLogger(__name__) + + +class HookRegistry: + """ + 钩子注册表 - 全局单例 + + 用法: + # 注册钩子 + registry.register('page.saved', Signal()) + + # 获取钩子 + page_saved = registry.get('page.saved') + + # 连接处理器 + @page_saved.connect(sender='User') + def handle(page, **kwargs): + pass + + # 发送信号 + registry.send('page.saved', sender='User', page=page) + """ + + _instance = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._signals: Dict[str, Signal] = {} + cls._instance._initialized = False + return cls._instance + + def register(self, name: str, signal: Optional[Signal] = None) -> Signal: + """ + 注册钩子(如果不存在则创建) + + :param name: 钩子名称,如 "page.saved", "page.before_insert" + :param signal: 信号对象,如果为 None 则创建新信号 + :return: Signal 对象 + """ + if name not in self._signals: + if signal is None: + signal = Signal() + self._signals[name] = signal + logger.debug(f"注册钩子: {name}") + + return self._signals[name] + + def get(self, name: str, default: Optional[Signal] = None) -> Optional[Signal]: + """获取钩子""" + return self._signals.get(name, default) + + def send(self, name: str, sender: Optional[str] = None, **kwargs) -> list: + """ + 同步发送钩子 + + :param name: 钩子名称 + :param sender: 发送者类型 + :param kwargs: 信号参数 + :return: 所有处理器返回值的列表 + """ + signal = self.get(name) + if signal is None: + logger.warning(f"钩子不存在: {name}") + return [] + + return signal.send(sender=sender, **kwargs) + + async def send_async(self, name: str, sender: Optional[str] = None, **kwargs) -> list: + """ + 异步发送钩子 + + :param name: 钩子名称 + :param sender: 发送者类型 + :param kwargs: 信号参数 + :return: 所有处理器返回值的列表 + """ + signal = self.get(name) + if signal is None: + logger.warning(f"钩子不存在: {name}") + return [] + + return await signal.send_async(sender=sender, **kwargs) + + def connect( + self, + name: str, + handler: Any = None, + *, + sender: Optional[str] = None, + priority: Priority = Priority.NORMAL, + condition: Optional[Any] = None, + once: bool = False + ) -> Any: + """ + 连接处理器到钩子(可用作装饰器) + + :param name: 钩子名称 + :param handler: 处理器函数(用作装饰器时可省略) + :param sender: 发送者类型 + :param priority: 优先级 + :param condition: 条件函数 + :param once: 是否只执行一次 + """ + signal = self.register(name) + + if handler is None: + # 用作装饰器 + return signal.connect( + sender=sender, + priority=priority, + condition=condition, + once=once + ) + + # 直接连接 + return signal.connect( + handler, + sender=sender, + priority=priority, + condition=condition, + once=once + ) + + def disconnect(self, name: str, handler: Any, sender: Optional[str] = None) -> None: + """断开处理器连接""" + signal = self.get(name) + if signal: + signal.disconnect(handler, sender=sender) + + def has_listeners(self, name: str, sender: Optional[str] = None) -> bool: + """检查钩子是否有监听器""" + signal = self.get(name) + if signal is None: + return False + return signal.has_listeners(sender) + + def clear(self, name: Optional[str] = None) -> None: + """清空钩子(如果 name 为 None 则清空所有)""" + if name is None: + self._signals.clear() + logger.debug("清空所有钩子") + elif name in self._signals: + self._signals[name].clear() + logger.debug(f"清空钩子: {name}") + + def list_all(self) -> list: + """列出所有已注册的钩子名称""" + return list(self._signals.keys()) + + +# 全局注册表实例 +registry = HookRegistry() + diff --git a/apps/jingrow/jingrow/core/hooks/signal.py b/apps/jingrow/jingrow/core/hooks/signal.py new file mode 100644 index 0000000..8fe1b02 --- /dev/null +++ b/apps/jingrow/jingrow/core/hooks/signal.py @@ -0,0 +1,263 @@ +# Copyright (c) 2025, JINGROW and contributors +# For license information, please see license.txt + +""" +Signal 系统 - 支持优先级、条件过滤、类型提示 +""" + +import logging +import asyncio +from typing import Callable, Dict, List, Any, Optional, Union +from dataclasses import dataclass, field +from functools import wraps +from enum import IntEnum + +logger = logging.getLogger(__name__) + + +class Priority(IntEnum): + """钩子执行优先级""" + LOWEST = 100 + LOW = 75 + NORMAL = 50 + HIGH = 25 + HIGHEST = 1 + + +@dataclass +class HookHandler: + """钩子处理器""" + handler: Callable + sender: Optional[str] = None # 发送者类型,如 "User", None 表示所有 + priority: Priority = Priority.NORMAL + condition: Optional[Callable] = None # 条件函数,返回 True 才执行 + once: bool = False # 是否只执行一次 + weak: bool = False # 弱引用(暂不支持) + _executed: bool = False # 是否已执行(用于 once 模式) + + def __call__(self, *args, **kwargs) -> Any: + """执行处理器""" + if self.once and self._executed: + return None + + # 检查条件 + if self.condition: + try: + if not self.condition(*args, **kwargs): + return None + except Exception as e: + logger.warning(f"钩子条件检查失败: {e}") + return None + + try: + result = self.handler(*args, **kwargs) + if self.once: + self._executed = True + return result + except Exception as e: + logger.error(f"钩子处理器执行失败: {e}", exc_info=True) + raise + + async def __acall__(self, *args, **kwargs) -> Any: + """异步执行处理器""" + if self.once and self._executed: + return None + + # 检查条件 + if self.condition: + try: + if not self.condition(*args, **kwargs): + return None + except Exception as e: + logger.warning(f"钩子条件检查失败: {e}") + return None + + try: + if asyncio.iscoroutinefunction(self.handler): + result = await self.handler(*args, **kwargs) + else: + result = self.handler(*args, **kwargs) + + if self.once: + self._executed = True + return result + except Exception as e: + logger.error(f"异步钩子处理器执行失败: {e}", exc_info=True) + raise + + +class Signal: + """ + 信号类 - 类似 Django Signal + + 用法: + # 定义信号 + page_saved = Signal() + + # 连接处理器 + @page_saved.connect(sender='User', priority=Priority.HIGH) + def handle_page_saved(page, created, **kwargs): + print(f"Page saved: {page.name}") + + # 发送信号 + page_saved.send(sender='User', page=page, created=True) + """ + + def __init__(self, providing_args: Optional[List[str]] = None): + """ + 初始化信号 + + :param providing_args: 提供的参数列表(用于文档和类型检查) + """ + self.providing_args = providing_args or [] + self._handlers: List[HookHandler] = [] + self._handlers_by_sender: Dict[str, List[HookHandler]] = {} + + def connect( + self, + handler: Optional[Callable] = None, + *, + sender: Optional[str] = None, + priority: Priority = Priority.NORMAL, + condition: Optional[Callable] = None, + once: bool = False, + weak: bool = False + ) -> Union[Callable, Callable]: + """ + 连接处理器(可用作装饰器或直接调用) + + :param handler: 处理器函数 + :param sender: 发送者类型 + :param priority: 优先级 + :param condition: 条件函数 + :param once: 是否只执行一次 + :param weak: 弱引用(暂不支持) + """ + if handler is None: + # 用作装饰器 + def decorator(func: Callable) -> Callable: + self.connect( + func, + sender=sender, + priority=priority, + condition=condition, + once=once, + weak=weak + ) + return func + return decorator + + # 注册处理器 + hook_handler = HookHandler( + handler=handler, + sender=sender, + priority=priority, + condition=condition, + once=once, + weak=weak + ) + + if sender: + if sender not in self._handlers_by_sender: + self._handlers_by_sender[sender] = [] + self._handlers_by_sender[sender].append(hook_handler) + else: + self._handlers.append(hook_handler) + + # 按优先级排序(优先级越小越先执行) + if sender: + self._handlers_by_sender[sender].sort(key=lambda h: h.priority) + self._handlers.sort(key=lambda h: h.priority) + + logger.debug(f"注册钩子处理器: {handler.__name__} (sender={sender}, priority={priority})") + return handler + + def disconnect(self, handler: Callable, sender: Optional[str] = None) -> None: + """断开处理器连接""" + if sender: + handlers = self._handlers_by_sender.get(sender, []) + self._handlers_by_sender[sender] = [h for h in handlers if h.handler != handler] + else: + self._handlers = [h for h in self._handlers if h.handler != handler] + + def send(self, sender: Optional[str] = None, **kwargs) -> List[Any]: + """ + 同步发送信号 + + :param sender: 发送者类型 + :param kwargs: 信号参数 + :return: 所有处理器返回值的列表 + """ + results = [] + + # 获取所有相关处理器 + handlers = self._get_handlers(sender) + + for handler in handlers: + try: + result = handler(**kwargs) + if result is not None: + results.append(result) + except Exception as e: + logger.error(f"钩子处理器执行失败: {e}", exc_info=True) + # 继续执行其他处理器,除非是严重错误 + + return results + + async def send_async(self, sender: Optional[str] = None, **kwargs) -> List[Any]: + """ + 异步发送信号 + + :param sender: 发送者类型 + :param kwargs: 信号参数 + :return: 所有处理器返回值的列表 + """ + results = [] + + # 获取所有相关处理器 + handlers = self._get_handlers(sender) + + for handler in handlers: + try: + result = await handler.__acall__(**kwargs) + if result is not None: + results.append(result) + except Exception as e: + logger.error(f"异步钩子处理器执行失败: {e}", exc_info=True) + # 继续执行其他处理器 + + return results + + def _get_handlers(self, sender: Optional[str] = None) -> List[HookHandler]: + """获取相关处理器列表(已按优先级排序)""" + handlers = [] + + # 添加通用处理器(sender=None) + handlers.extend(self._handlers) + + # 添加特定 sender 的处理器 + if sender: + handlers.extend(self._handlers_by_sender.get(sender, [])) + + # 重新按优先级排序(因为合并了通用和特定的) + handlers.sort(key=lambda h: h.priority) + + return handlers + + def has_listeners(self, sender: Optional[str] = None) -> bool: + """检查是否有监听器""" + if sender: + return len(self._handlers) > 0 or len(self._handlers_by_sender.get(sender, [])) > 0 + return len(self._handlers) > 0 + + def clear(self) -> None: + """清空所有处理器""" + self._handlers.clear() + self._handlers_by_sender.clear() + + +# 便捷函数:创建信号 +def signal(providing_args: Optional[List[str]] = None) -> Signal: + """创建新的信号""" + return Signal(providing_args=providing_args) + diff --git a/apps/jingrow/jingrow/hooks.py b/apps/jingrow/jingrow/hooks.py index b8712a0..e7e9bae 100644 --- a/apps/jingrow/jingrow/hooks.py +++ b/apps/jingrow/jingrow/hooks.py @@ -1,8 +1,117 @@ -# Copyright (c) 2025, JINGROW +# Copyright (c) 2025, JINGROW and contributors +# For license information, please see license.txt + +""" +Jingrow 框架核心钩子配置 + +此文件定义了框架的所有钩子扩展点,应用可以通过 hooks.py 注册自己的处理器。 +""" app_name = "jingrow" app_title = "Jingrow" app_publisher = "Jingrow" -app_description = "Jingrow" +app_description = "Jingrow Framework" app_email = "support@jingrow.com" app_license = "mit" + +# ========== 应用生命周期钩子 ========== +before_install = "jingrow.utils.install.before_install" +after_install = "jingrow.utils.install.after_install" +before_migrate = "jingrow.utils.migrate.before_migrate" +after_migrate = "jingrow.utils.migrate.after_migrate" + +# ========== 请求钩子 ========== +before_request = [ + # 请求前处理,如认证、限流等 +] + +after_request = [ + # 请求后处理,如日志、性能统计等 +] + +# ========== 后台任务钩子 ========== +before_job = [ + # 后台任务执行前 +] + +after_job = [ + # 后台任务执行后 +] + +# ========== 页面生命周期钩子 (pg_events) ========== +# 格式: {pagetype: {event_name: [handler_path, ...]}} +# 或: {pagetype: {event_name: handler_path}} +# 或: {"*": {event_name: [handler_path, ...]}} # 所有页面类型 + +pg_events = { + "*": { + # 所有页面类型的通用钩子 + "before_insert": [ + # 页面插入前 + ], + "after_insert": [ + # 页面插入后 + ], + "on_update": [ + # 页面更新时 + ], + "on_change": [ + # 页面字段变化时 + ], + "on_trash": [ + # 页面删除时 + ], + "on_cancel": [ + # 页面取消时 + ], + }, + # 示例:特定页面类型的钩子 + # "User": { + # "after_insert": "myapp.hooks.send_welcome_email", + # "on_update": [ + # "myapp.hooks.log_user_changes", + # "myapp.hooks.sync_to_external_system", + # ], + # }, +} + +# ========== 定时任务钩子 (scheduler_events) ========== +scheduler_events = { + "cron": { + # Cron 表达式钩子 + # "0/15 * * * *": [ # 每15分钟 + # "jingrow.utils.cleanup.cleanup_temp_files", + # ], + }, + "hourly": [ + # 每小时执行 + ], + "daily": [ + # 每天执行 + ], + "weekly": [ + # 每周执行 + ], + "monthly": [ + # 每月执行 + ], +} + +# ========== 其他钩子 ========== +# 扩展启动信息 +extend_bootinfo = [] + +# 权限查询条件 +permission_query_conditions = {} + +# 权限检查 +has_permission = {} + +# 标准查询 +standard_queries = {} + +# Jinja 过滤器和方法 +jinja = { + "methods": [], + "filters": [], +} diff --git a/apps/jingrow/jingrow/main.py b/apps/jingrow/jingrow/main.py index 35063de..6ab6b92 100644 --- a/apps/jingrow/jingrow/main.py +++ b/apps/jingrow/jingrow/main.py @@ -11,18 +11,23 @@ from contextlib import asynccontextmanager from jingrow.utils.router_auto_register import include_routers_from_package from jingrow.services.scheduler import start_scheduler, stop_scheduler from jingrow.services.whitelist import router +from jingrow.services.scheduler import get_scheduler_status +from jingrow.core.hooks.init_hooks import init_hooks @asynccontextmanager async def lifespan(app: FastAPI): """应用生命周期管理""" # 启动事件 try: + # 初始化钩子系统 + init_hooks() + print("钩子系统已初始化") + # 启动调度器 await start_scheduler() print("调度器已启动") - # 验证调度器状态 - from jingrow.services.scheduler import get_scheduler_status + # 验证调度器状态 status = get_scheduler_status() print(f"调度器状态: 运行={status['running']}, 任务数={status['total_jobs']}") diff --git a/apps/jingrow/jingrow/model/page.py b/apps/jingrow/jingrow/model/page.py index 1401560..814b6d9 100644 --- a/apps/jingrow/jingrow/model/page.py +++ b/apps/jingrow/jingrow/model/page.py @@ -43,30 +43,98 @@ def get_pg(*args, **kwargs): class Page(BasePage): """最小可用 Page,复用 jingrow 本地数据库 API(save_pg等) 仅保留 pagetype 生命周期接口:insert/save/delete/run_method。 + 集成现代化的 hooks 系统。 """ def insert(self, **kwargs) -> "Page": + """插入页面,触发相关钩子""" self.set("__islocal", True) + + # 触发 before_insert 钩子 + self._execute_hook("before_insert") + + # 本地方法(向后兼容) self.run_method("before_insert") + + # 保存到数据库 jingrow.save_pg(self) + + # 触发 after_insert 钩子 + self._execute_hook("after_insert") + + # 本地方法(向后兼容) self.run_method("after_insert") + return self def save(self, **kwargs) -> "Page": + """保存页面,触发相关钩子""" + # 触发 before_save 钩子(如果存在) + if not self.get("name"): + # 新记录,触发 before_insert + self._execute_hook("before_insert") + else: + # 更新记录,触发 on_update + self._execute_hook("on_update", **kwargs) + + # 本地方法(向后兼容) self.run_method("before_save") + + # 保存到数据库 jingrow.save_pg(self) + + # 如果之前触发了 before_insert,现在触发 after_insert + if not self.get("name") or self.get("__islocal"): + self._execute_hook("after_insert") + else: + self._execute_hook("on_update", **kwargs) + + # 本地方法(向后兼容) self.run_method("on_update") + return self def delete(self, **kwargs): + """删除页面,触发相关钩子""" + # 触发 on_trash 钩子 + self._execute_hook("on_trash", **kwargs) + + # 本地方法(向后兼容) self.run_method("on_trash") + return jingrow.delete_pg(self.pagetype, self.name) def run_method(self, method: str, *args, **kwargs): + """运行本地方法(向后兼容)""" fn = getattr(self, method, None) if callable(fn): return fn(*args, **kwargs) return None + + def _execute_hook(self, event_name: str, **kwargs): + """执行钩子(私有方法)""" + try: + from jingrow.core.hooks import execute_hook + + # 构建钩子名称:page.{pagetype}.{event_name} + hook_name = f"page.{self.pagetype}.{event_name}" + + # 执行钩子 + execute_hook( + hook_name, + sender=self.pagetype, + page=self, + **kwargs + ) + + except ImportError: + # 如果钩子系统未安装,静默失败(向后兼容) + pass + except Exception as e: + # 记录错误但继续执行(防止钩子错误影响主流程) + import logging + logger = logging.getLogger(__name__) + logger.warning(f"执行钩子失败 {event_name}: {e}", exc_info=True) # 兼容云端常用辅助 def as_dict(self) -> dict[str, Any]: