实现hooks钩子系统

This commit is contained in:
jingrow 2025-10-30 01:59:45 +08:00
parent b16570fb1c
commit b85e98ed2a
10 changed files with 1287 additions and 4 deletions

View File

@ -0,0 +1,63 @@
# Copyright (c) 2025, JINGROW and contributors
# For license information, please see license.txt
"""
Jingrow Hooks System - 现代化的钩子系统
特性
- 配置式注册hooks.py
- 装饰器注册
- 同步/异步执行
- 优先级支持
- 条件过滤
- 类型安全
- 易于测试
快速开始:
1. 配置式注册 (推荐用于应用开发):
hooks.py :
pg_events = {
"User": {
"after_insert": "myapp.hooks.send_welcome_email",
}
}
2. 装饰器注册 (推荐用于代码中):
from jingrow.core.hooks import hook, Priority
@hook('page.User.on_update', sender='User', priority=Priority.HIGH)
def handle_user_update(page, **kwargs):
print(f"User {page.name} updated")
3. 发送钩子:
from jingrow.core.hooks import execute_hook
execute_hook('page.User.on_update', sender='User', page=page_obj)
"""
from .signal import Signal, signal, Priority
from .registry import HookRegistry, registry
from .loader import HookLoader, load_app_hooks, register_loaded_hooks
from .executor import HookExecutor, execute_hook, execute_hook_async
from .decorators import hook, register_hook
__all__ = [
# Core classes
"Signal",
"signal",
"Priority",
"HookRegistry",
"registry",
"HookLoader",
"load_app_hooks",
"register_loaded_hooks",
"HookExecutor",
"execute_hook",
"execute_hook_async",
# Decorators
"hook",
"register_hook",
]
__version__ = "1.0.0"

View File

@ -0,0 +1,92 @@
# Copyright (c) 2025, JINGROW and contributors
# For license information, please see license.txt
"""
钩子装饰器 - 提供便捷的装饰器语法
"""
import functools
from typing import Callable, Optional, Any
from .registry import registry
from .signal import Priority
logger = None # 延迟导入避免循环
def hook(
hook_name: str,
*,
sender: Optional[str] = None,
priority: Priority = Priority.NORMAL,
condition: Optional[Callable] = None,
once: bool = False
):
"""
钩子装饰器 - 注册函数为钩子处理器
用法:
@hook('page.User.on_update', sender='User', priority=Priority.HIGH)
def handle_user_update(page, **kwargs):
print(f"User {page.name} updated")
@hook('scheduler.daily', condition=lambda: datetime.now().hour == 0)
def daily_task(**kwargs):
# 只在午夜执行
pass
"""
def decorator(func: Callable) -> Callable:
registry.connect(
hook_name,
func,
sender=sender,
priority=priority,
condition=condition,
once=once
)
return func
return decorator
def register_hook(
hook_name: str,
handler: Optional[Callable] = None,
*,
sender: Optional[str] = None,
priority: Priority = Priority.NORMAL,
condition: Optional[Callable] = None,
once: bool = False
):
"""
注册钩子可用作装饰器或函数调用
用法:
# 装饰器方式
@register_hook('page.User.on_update', sender='User')
def handler(page, **kwargs):
pass
# 函数调用方式
register_hook('page.User.on_update', my_handler, sender='User')
"""
if handler is None:
# 用作装饰器
return hook(
hook_name,
sender=sender,
priority=priority,
condition=condition,
once=once
)
# 直接注册
registry.connect(
hook_name,
handler,
sender=sender,
priority=priority,
condition=condition,
once=once
)
return handler

View File

@ -0,0 +1,201 @@
# Copyright (c) 2025, JINGROW and contributors
# For license information, please see license.txt
"""
钩子执行器 - 执行钩子的便捷函数和类
支持同步/异步执行错误处理性能监控
"""
import logging
import time
import asyncio
from typing import Dict, List, Any, Optional, Callable
from .registry import registry
logger = logging.getLogger(__name__)
class HookExecutor:
"""
钩子执行器 - 提供统一的钩子执行接口
特性
- 同步/异步执行
- 错误处理
- 性能监控
- 批量执行
"""
@staticmethod
def execute(
hook_name: str,
sender: Optional[str] = None,
raise_on_error: bool = False,
**kwargs
) -> List[Any]:
"""
同步执行钩子
:param hook_name: 钩子名称
:param sender: 发送者类型
:param raise_on_error: 是否在错误时抛出异常
:param kwargs: 钩子参数
:return: 所有处理器返回值的列表
"""
start_time = time.time()
try:
results = registry.send(hook_name, sender=sender, **kwargs)
elapsed = time.time() - start_time
if elapsed > 0.1: # 超过100ms记录警告
logger.warning(f"钩子执行较慢: {hook_name} ({elapsed:.3f}s)")
return results
except Exception as e:
logger.error(f"执行钩子失败: {hook_name} - {e}", exc_info=True)
if raise_on_error:
raise
return []
@staticmethod
async def execute_async(
hook_name: str,
sender: Optional[str] = None,
raise_on_error: bool = False,
**kwargs
) -> List[Any]:
"""
异步执行钩子
:param hook_name: 钩子名称
:param sender: 发送者类型
:param raise_on_error: 是否在错误时抛出异常
:param kwargs: 钩子参数
:return: 所有处理器返回值的列表
"""
start_time = time.time()
try:
results = await registry.send_async(hook_name, sender=sender, **kwargs)
elapsed = time.time() - start_time
if elapsed > 0.1: # 超过100ms记录警告
logger.warning(f"异步钩子执行较慢: {hook_name} ({elapsed:.3f}s)")
return results
except Exception as e:
logger.error(f"异步执行钩子失败: {hook_name} - {e}", exc_info=True)
if raise_on_error:
raise
return []
@staticmethod
def execute_batch(hooks: List[Dict[str, Any]], raise_on_error: bool = False) -> Dict[str, List[Any]]:
"""
批量执行钩子
:param hooks: 钩子列表每个元素为 {"name": "...", "sender": "...", **kwargs}
:param raise_on_error: 是否在错误时抛出异常
:return: 执行结果字典 {hook_name: results}
"""
results = {}
for hook_config in hooks:
hook_name = hook_config.pop('name')
sender = hook_config.pop('sender', None)
try:
results[hook_name] = HookExecutor.execute(
hook_name,
sender=sender,
raise_on_error=raise_on_error,
**hook_config
)
except Exception as e:
logger.error(f"批量执行钩子失败: {hook_name} - {e}")
results[hook_name] = []
if raise_on_error:
raise
return results
@staticmethod
async def execute_batch_async(
hooks: List[Dict[str, Any]],
raise_on_error: bool = False
) -> Dict[str, List[Any]]:
"""
异步批量执行钩子
:param hooks: 钩子列表
:param raise_on_error: 是否在错误时抛出异常
:return: 执行结果字典
"""
results = {}
# 并行执行所有钩子
tasks = []
for hook_config in hooks:
hook_name = hook_config['name']
sender = hook_config.get('sender')
kwargs = {k: v for k, v in hook_config.items() if k not in ('name', 'sender')}
task = HookExecutor.execute_async(
hook_name,
sender=sender,
raise_on_error=raise_on_error,
**kwargs
)
tasks.append((hook_name, task))
# 等待所有任务完成
for hook_name, task in tasks:
try:
results[hook_name] = await task
except Exception as e:
logger.error(f"异步批量执行钩子失败: {hook_name} - {e}")
results[hook_name] = []
if raise_on_error:
raise
return results
# 便捷函数
def execute_hook(
hook_name: str,
sender: Optional[str] = None,
raise_on_error: bool = False,
**kwargs
) -> List[Any]:
"""
同步执行钩子便捷函数
示例:
results = execute_hook('page.User.on_update', sender='User', page=page_obj)
"""
return HookExecutor.execute(hook_name, sender=sender, raise_on_error=raise_on_error, **kwargs)
async def execute_hook_async(
hook_name: str,
sender: Optional[str] = None,
raise_on_error: bool = False,
**kwargs
) -> List[Any]:
"""
异步执行钩子便捷函数
示例:
results = await execute_hook_async('page.User.on_update', sender='User', page=page_obj)
"""
return await HookExecutor.execute_async(
hook_name,
sender=sender,
raise_on_error=raise_on_error,
**kwargs
)

View File

@ -0,0 +1,37 @@
# Copyright (c) 2025, JINGROW and contributors
# For license information, please see license.txt
"""
钩子系统初始化 - 在框架启动时自动加载和应用钩子
"""
import logging
from typing import Optional
from .loader import register_loaded_hooks
logger = logging.getLogger(__name__)
def init_hooks(app_name: Optional[str] = None, clear_cache: bool = True) -> None:
"""
初始化钩子系统 - 加载并注册所有应用的钩子
:param app_name: 应用名称如果为 None 则加载所有应用
:param clear_cache: 是否清除缓存
"""
if clear_cache:
from .loader import _loader_instance
_loader_instance.clear_cache()
try:
register_loaded_hooks(app_name)
logger.info(f"钩子系统初始化完成" + (f" (应用: {app_name})" if app_name else ""))
except Exception as e:
logger.error(f"钩子系统初始化失败: {e}", exc_info=True)
raise
def reload_hooks(app_name: Optional[str] = None) -> None:
"""重新加载钩子(开发模式)"""
init_hooks(app_name, clear_cache=True)

View File

@ -0,0 +1,279 @@
# Copyright (c) 2025, JINGROW and contributors
# For license information, please see license.txt
"""
钩子加载器 - hooks.py 配置文件加载钩子
支持应用级钩子配置
"""
import os
import importlib
import inspect
import types
import logging
from typing import Dict, List, Any, Optional, Union
from .registry import registry
# 受支持的 hooks.py 键(仅这些键会被加载与注册)
SUPPORTED_HOOK_KEYS = {
'pg_events',
'scheduler_events',
'before_request',
'after_request',
'before_job',
'after_job',
}
logger = logging.getLogger(__name__)
class HookLoader:
"""
钩子加载器 - 加载 hooks.py 配置文件中的钩子
支持的钩子类型
- pg_events: 页面生命周期钩子
- scheduler_events: 定时任务钩子
- before_request / after_request: 请求钩子
- before_job / after_job: 后台任务钩子
- 其他自定义钩子
"""
def __init__(self):
self._loaded_apps: set = set()
self._cache: Dict[str, Any] = {}
def load_app_hooks(self, app_name: Optional[str] = None) -> Dict[str, Any]:
"""
加载应用钩子配置
:param app_name: 应用名称如果为 None 则加载所有已安装应用
:return: 钩子配置字典
"""
if app_name:
return self._load_single_app_hooks(app_name)
# 加载所有应用的钩子
hooks = {}
# 获取已安装应用列表
try:
from jingrow.utils.app_manager import get_installed_apps
installed_apps = get_installed_apps()
except Exception:
# 如果获取失败,至少加载 jingrow
installed_apps = ['jingrow']
for app in installed_apps:
try:
app_hooks = self._load_single_app_hooks(app)
# 合并钩子(后加载的应用钩子会追加到列表中)
for key, value in app_hooks.items():
if key in hooks:
# 如果已存在,追加到列表
if isinstance(hooks[key], list) and isinstance(value, list):
hooks[key].extend(value)
elif isinstance(hooks[key], list):
hooks[key].append(value)
elif isinstance(value, list):
hooks[key] = [hooks[key]] + value
else:
hooks[key] = [hooks[key], value]
else:
hooks[key] = value
except Exception as e:
logger.warning(f"加载应用 {app} 的钩子失败: {e}")
return hooks
def _load_single_app_hooks(self, app_name: str) -> Dict[str, Any]:
"""加载单个应用的钩子"""
cache_key = f"app_hooks_{app_name}"
# 检查缓存
if cache_key in self._cache:
return self._cache[cache_key]
try:
# 导入 hooks 模块
hooks_module = importlib.import_module(f"{app_name}.hooks")
# 仅提取受支持的钩子键,忽略应用元信息(如 app_name、app_title 等)
hooks: Dict[str, Any] = {}
for key in SUPPORTED_HOOK_KEYS:
if hasattr(hooks_module, key):
hooks[key] = getattr(hooks_module, key)
logger.debug(f"加载应用 {app_name} 的钩子: {list(hooks.keys())}")
# 缓存结果
self._cache[cache_key] = hooks
return hooks
except ImportError as e:
logger.debug(f"应用 {app_name} 没有 hooks.py 模块: {e}")
return {}
except Exception as e:
logger.error(f"加载应用 {app_name} 的钩子失败: {e}", exc_info=True)
return {}
def register_loaded_hooks(self, app_name: Optional[str] = None) -> None:
"""
注册已加载的钩子到注册表
:param app_name: 应用名称如果为 None 则注册所有应用
"""
hooks_config = self.load_app_hooks(app_name)
# 处理 pg_events页面生命周期钩子
if 'pg_events' in hooks_config:
self._register_pg_events(hooks_config['pg_events'])
# 处理 scheduler_events定时任务钩子
if 'scheduler_events' in hooks_config:
self._register_scheduler_events(hooks_config['scheduler_events'])
# 处理其他受支持的通用钩子(目前仅支持 request/job 级别)
for hook_name in ('before_request', 'after_request', 'before_job', 'after_job'):
hook_value = hooks_config.get(hook_name)
if not hook_value:
continue
# 为这些钩子创建统一命名并注册
signal = registry.register(f"{hook_name}")
if isinstance(hook_value, list):
self._register_handlers(signal, hook_value)
elif isinstance(hook_value, str):
self._register_handler(signal, hook_value)
def _register_pg_events(self, pg_events: Dict[str, Any]) -> None:
"""注册页面生命周期钩子"""
for pagetype, events in pg_events.items():
if isinstance(events, dict):
for event_name, handlers in events.items():
# 创建钩子名称:如 "page.User.on_update"
hook_name = f"page.{pagetype}.{event_name}"
signal = registry.register(hook_name)
# 注册处理器
if isinstance(handlers, list):
for handler_path in handlers:
self._register_handler(signal, handler_path, sender=pagetype)
elif isinstance(handlers, str):
self._register_handler(signal, handlers, sender=pagetype)
def _register_scheduler_events(self, scheduler_events: Dict[str, Any]) -> None:
"""注册定时任务钩子"""
for event_type, handlers in scheduler_events.items():
if event_type == 'cron':
# Cron 表达式钩子
for cron_expr, handler_list in handlers.items():
hook_name = f"scheduler.cron.{cron_expr}"
signal = registry.register(hook_name)
self._register_handlers(signal, handler_list)
else:
# 其他定时任务类型hourly, daily, weekly 等)
hook_name = f"scheduler.{event_type}"
signal = registry.register(hook_name)
self._register_handlers(signal, handlers)
def _register_hook(self, hook_name: str, hook_value: Any) -> None:
"""注册通用钩子"""
signal = registry.register(hook_name)
if isinstance(hook_value, list):
self._register_handlers(signal, hook_value)
elif isinstance(hook_value, str):
self._register_handler(signal, hook_value)
else:
# 其他类型的值,可能需要特殊处理
logger.warning(f"不支持的钩子值类型: {hook_name}={type(hook_value)}")
def _register_handlers(self, signal: Any, handlers: List[str]) -> None:
"""注册多个处理器"""
for handler_path in handlers:
self._register_handler(signal, handler_path)
def _register_handler(self, signal: Any, handler_path: str, sender: Optional[str] = None) -> None:
"""
注册单个处理器
:param signal: Signal 对象
:param handler_path: 处理器路径 "app.module.function"
:param sender: 发送者类型
"""
try:
# 动态导入处理器函数
handler = self._import_handler(handler_path)
if handler:
signal.connect(handler=handler, sender=sender)
logger.debug(f"注册处理器: {handler_path} -> {signal}")
except Exception as e:
logger.error(f"注册处理器失败 {handler_path}: {e}", exc_info=True)
def _import_handler(self, handler_path: str) -> Optional[Any]:
"""导入处理器函数"""
try:
# 分割模块路径和函数名
parts = handler_path.rsplit('.', 1)
if len(parts) != 2:
logger.error(f"无效的处理器路径格式: {handler_path}")
return None
module_path, function_name = parts
module = importlib.import_module(module_path)
handler = getattr(module, function_name, None)
if handler is None:
logger.error(f"处理器函数不存在: {handler_path}")
return None
if not callable(handler):
logger.error(f"处理器不是可调用对象: {handler_path}")
return None
return handler
except ImportError as e:
logger.error(f"导入处理器模块失败: {handler_path} - {e}")
return None
except Exception as e:
logger.error(f"导入处理器失败: {handler_path} - {e}")
return None
def clear_cache(self, app_name: Optional[str] = None) -> None:
"""清除缓存"""
if app_name:
cache_key = f"app_hooks_{app_name}"
self._cache.pop(cache_key, None)
else:
self._cache.clear()
# 全局加载器实例
_loader_instance = HookLoader()
def load_app_hooks(app_name: Optional[str] = None) -> Dict[str, Any]:
"""
便捷函数加载应用钩子配置
:param app_name: 应用名称
:return: 钩子配置字典
"""
return _loader_instance.load_app_hooks(app_name)
def register_loaded_hooks(app_name: Optional[str] = None) -> None:
"""
便捷函数注册已加载的钩子
:param app_name: 应用名称
"""
_loader_instance.register_loaded_hooks(app_name)
# 保留 HookLoader 类定义以便导入
# HookLoader 已在上面定义

View File

@ -0,0 +1,166 @@
# Copyright (c) 2025, JINGROW and contributors
# For license information, please see license.txt
"""
钩子注册表 - 管理所有钩子信号
支持按名称注册和获取钩子
"""
import logging
from typing import Dict, Optional, Any
from .signal import Signal, Priority
logger = logging.getLogger(__name__)
class HookRegistry:
"""
钩子注册表 - 全局单例
用法
# 注册钩子
registry.register('page.saved', Signal())
# 获取钩子
page_saved = registry.get('page.saved')
# 连接处理器
@page_saved.connect(sender='User')
def handle(page, **kwargs):
pass
# 发送信号
registry.send('page.saved', sender='User', page=page)
"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._signals: Dict[str, Signal] = {}
cls._instance._initialized = False
return cls._instance
def register(self, name: str, signal: Optional[Signal] = None) -> Signal:
"""
注册钩子如果不存在则创建
:param name: 钩子名称 "page.saved", "page.before_insert"
:param signal: 信号对象如果为 None 则创建新信号
:return: Signal 对象
"""
if name not in self._signals:
if signal is None:
signal = Signal()
self._signals[name] = signal
logger.debug(f"注册钩子: {name}")
return self._signals[name]
def get(self, name: str, default: Optional[Signal] = None) -> Optional[Signal]:
"""获取钩子"""
return self._signals.get(name, default)
def send(self, name: str, sender: Optional[str] = None, **kwargs) -> list:
"""
同步发送钩子
:param name: 钩子名称
:param sender: 发送者类型
:param kwargs: 信号参数
:return: 所有处理器返回值的列表
"""
signal = self.get(name)
if signal is None:
logger.warning(f"钩子不存在: {name}")
return []
return signal.send(sender=sender, **kwargs)
async def send_async(self, name: str, sender: Optional[str] = None, **kwargs) -> list:
"""
异步发送钩子
:param name: 钩子名称
:param sender: 发送者类型
:param kwargs: 信号参数
:return: 所有处理器返回值的列表
"""
signal = self.get(name)
if signal is None:
logger.warning(f"钩子不存在: {name}")
return []
return await signal.send_async(sender=sender, **kwargs)
def connect(
self,
name: str,
handler: Any = None,
*,
sender: Optional[str] = None,
priority: Priority = Priority.NORMAL,
condition: Optional[Any] = None,
once: bool = False
) -> Any:
"""
连接处理器到钩子可用作装饰器
:param name: 钩子名称
:param handler: 处理器函数用作装饰器时可省略
:param sender: 发送者类型
:param priority: 优先级
:param condition: 条件函数
:param once: 是否只执行一次
"""
signal = self.register(name)
if handler is None:
# 用作装饰器
return signal.connect(
sender=sender,
priority=priority,
condition=condition,
once=once
)
# 直接连接
return signal.connect(
handler,
sender=sender,
priority=priority,
condition=condition,
once=once
)
def disconnect(self, name: str, handler: Any, sender: Optional[str] = None) -> None:
"""断开处理器连接"""
signal = self.get(name)
if signal:
signal.disconnect(handler, sender=sender)
def has_listeners(self, name: str, sender: Optional[str] = None) -> bool:
"""检查钩子是否有监听器"""
signal = self.get(name)
if signal is None:
return False
return signal.has_listeners(sender)
def clear(self, name: Optional[str] = None) -> None:
"""清空钩子(如果 name 为 None 则清空所有)"""
if name is None:
self._signals.clear()
logger.debug("清空所有钩子")
elif name in self._signals:
self._signals[name].clear()
logger.debug(f"清空钩子: {name}")
def list_all(self) -> list:
"""列出所有已注册的钩子名称"""
return list(self._signals.keys())
# 全局注册表实例
registry = HookRegistry()

View File

@ -0,0 +1,263 @@
# Copyright (c) 2025, JINGROW and contributors
# For license information, please see license.txt
"""
Signal 系统 - 支持优先级条件过滤类型提示
"""
import logging
import asyncio
from typing import Callable, Dict, List, Any, Optional, Union
from dataclasses import dataclass, field
from functools import wraps
from enum import IntEnum
logger = logging.getLogger(__name__)
class Priority(IntEnum):
"""钩子执行优先级"""
LOWEST = 100
LOW = 75
NORMAL = 50
HIGH = 25
HIGHEST = 1
@dataclass
class HookHandler:
"""钩子处理器"""
handler: Callable
sender: Optional[str] = None # 发送者类型,如 "User", None 表示所有
priority: Priority = Priority.NORMAL
condition: Optional[Callable] = None # 条件函数,返回 True 才执行
once: bool = False # 是否只执行一次
weak: bool = False # 弱引用(暂不支持)
_executed: bool = False # 是否已执行(用于 once 模式)
def __call__(self, *args, **kwargs) -> Any:
"""执行处理器"""
if self.once and self._executed:
return None
# 检查条件
if self.condition:
try:
if not self.condition(*args, **kwargs):
return None
except Exception as e:
logger.warning(f"钩子条件检查失败: {e}")
return None
try:
result = self.handler(*args, **kwargs)
if self.once:
self._executed = True
return result
except Exception as e:
logger.error(f"钩子处理器执行失败: {e}", exc_info=True)
raise
async def __acall__(self, *args, **kwargs) -> Any:
"""异步执行处理器"""
if self.once and self._executed:
return None
# 检查条件
if self.condition:
try:
if not self.condition(*args, **kwargs):
return None
except Exception as e:
logger.warning(f"钩子条件检查失败: {e}")
return None
try:
if asyncio.iscoroutinefunction(self.handler):
result = await self.handler(*args, **kwargs)
else:
result = self.handler(*args, **kwargs)
if self.once:
self._executed = True
return result
except Exception as e:
logger.error(f"异步钩子处理器执行失败: {e}", exc_info=True)
raise
class Signal:
"""
信号类 - 类似 Django Signal
用法
# 定义信号
page_saved = Signal()
# 连接处理器
@page_saved.connect(sender='User', priority=Priority.HIGH)
def handle_page_saved(page, created, **kwargs):
print(f"Page saved: {page.name}")
# 发送信号
page_saved.send(sender='User', page=page, created=True)
"""
def __init__(self, providing_args: Optional[List[str]] = None):
"""
初始化信号
:param providing_args: 提供的参数列表用于文档和类型检查
"""
self.providing_args = providing_args or []
self._handlers: List[HookHandler] = []
self._handlers_by_sender: Dict[str, List[HookHandler]] = {}
def connect(
self,
handler: Optional[Callable] = None,
*,
sender: Optional[str] = None,
priority: Priority = Priority.NORMAL,
condition: Optional[Callable] = None,
once: bool = False,
weak: bool = False
) -> Union[Callable, Callable]:
"""
连接处理器可用作装饰器或直接调用
:param handler: 处理器函数
:param sender: 发送者类型
:param priority: 优先级
:param condition: 条件函数
:param once: 是否只执行一次
:param weak: 弱引用暂不支持
"""
if handler is None:
# 用作装饰器
def decorator(func: Callable) -> Callable:
self.connect(
func,
sender=sender,
priority=priority,
condition=condition,
once=once,
weak=weak
)
return func
return decorator
# 注册处理器
hook_handler = HookHandler(
handler=handler,
sender=sender,
priority=priority,
condition=condition,
once=once,
weak=weak
)
if sender:
if sender not in self._handlers_by_sender:
self._handlers_by_sender[sender] = []
self._handlers_by_sender[sender].append(hook_handler)
else:
self._handlers.append(hook_handler)
# 按优先级排序(优先级越小越先执行)
if sender:
self._handlers_by_sender[sender].sort(key=lambda h: h.priority)
self._handlers.sort(key=lambda h: h.priority)
logger.debug(f"注册钩子处理器: {handler.__name__} (sender={sender}, priority={priority})")
return handler
def disconnect(self, handler: Callable, sender: Optional[str] = None) -> None:
"""断开处理器连接"""
if sender:
handlers = self._handlers_by_sender.get(sender, [])
self._handlers_by_sender[sender] = [h for h in handlers if h.handler != handler]
else:
self._handlers = [h for h in self._handlers if h.handler != handler]
def send(self, sender: Optional[str] = None, **kwargs) -> List[Any]:
"""
同步发送信号
:param sender: 发送者类型
:param kwargs: 信号参数
:return: 所有处理器返回值的列表
"""
results = []
# 获取所有相关处理器
handlers = self._get_handlers(sender)
for handler in handlers:
try:
result = handler(**kwargs)
if result is not None:
results.append(result)
except Exception as e:
logger.error(f"钩子处理器执行失败: {e}", exc_info=True)
# 继续执行其他处理器,除非是严重错误
return results
async def send_async(self, sender: Optional[str] = None, **kwargs) -> List[Any]:
"""
异步发送信号
:param sender: 发送者类型
:param kwargs: 信号参数
:return: 所有处理器返回值的列表
"""
results = []
# 获取所有相关处理器
handlers = self._get_handlers(sender)
for handler in handlers:
try:
result = await handler.__acall__(**kwargs)
if result is not None:
results.append(result)
except Exception as e:
logger.error(f"异步钩子处理器执行失败: {e}", exc_info=True)
# 继续执行其他处理器
return results
def _get_handlers(self, sender: Optional[str] = None) -> List[HookHandler]:
"""获取相关处理器列表(已按优先级排序)"""
handlers = []
# 添加通用处理器sender=None
handlers.extend(self._handlers)
# 添加特定 sender 的处理器
if sender:
handlers.extend(self._handlers_by_sender.get(sender, []))
# 重新按优先级排序(因为合并了通用和特定的)
handlers.sort(key=lambda h: h.priority)
return handlers
def has_listeners(self, sender: Optional[str] = None) -> bool:
"""检查是否有监听器"""
if sender:
return len(self._handlers) > 0 or len(self._handlers_by_sender.get(sender, [])) > 0
return len(self._handlers) > 0
def clear(self) -> None:
"""清空所有处理器"""
self._handlers.clear()
self._handlers_by_sender.clear()
# 便捷函数:创建信号
def signal(providing_args: Optional[List[str]] = None) -> Signal:
"""创建新的信号"""
return Signal(providing_args=providing_args)

View File

@ -1,8 +1,117 @@
# Copyright (c) 2025, JINGROW # Copyright (c) 2025, JINGROW and contributors
# For license information, please see license.txt
"""
Jingrow 框架核心钩子配置
此文件定义了框架的所有钩子扩展点应用可以通过 hooks.py 注册自己的处理器
"""
app_name = "jingrow" app_name = "jingrow"
app_title = "Jingrow" app_title = "Jingrow"
app_publisher = "Jingrow" app_publisher = "Jingrow"
app_description = "Jingrow" app_description = "Jingrow Framework"
app_email = "support@jingrow.com" app_email = "support@jingrow.com"
app_license = "mit" app_license = "mit"
# ========== 应用生命周期钩子 ==========
before_install = "jingrow.utils.install.before_install"
after_install = "jingrow.utils.install.after_install"
before_migrate = "jingrow.utils.migrate.before_migrate"
after_migrate = "jingrow.utils.migrate.after_migrate"
# ========== 请求钩子 ==========
before_request = [
# 请求前处理,如认证、限流等
]
after_request = [
# 请求后处理,如日志、性能统计等
]
# ========== 后台任务钩子 ==========
before_job = [
# 后台任务执行前
]
after_job = [
# 后台任务执行后
]
# ========== 页面生命周期钩子 (pg_events) ==========
# 格式: {pagetype: {event_name: [handler_path, ...]}}
# 或: {pagetype: {event_name: handler_path}}
# 或: {"*": {event_name: [handler_path, ...]}} # 所有页面类型
pg_events = {
"*": {
# 所有页面类型的通用钩子
"before_insert": [
# 页面插入前
],
"after_insert": [
# 页面插入后
],
"on_update": [
# 页面更新时
],
"on_change": [
# 页面字段变化时
],
"on_trash": [
# 页面删除时
],
"on_cancel": [
# 页面取消时
],
},
# 示例:特定页面类型的钩子
# "User": {
# "after_insert": "myapp.hooks.send_welcome_email",
# "on_update": [
# "myapp.hooks.log_user_changes",
# "myapp.hooks.sync_to_external_system",
# ],
# },
}
# ========== 定时任务钩子 (scheduler_events) ==========
scheduler_events = {
"cron": {
# Cron 表达式钩子
# "0/15 * * * *": [ # 每15分钟
# "jingrow.utils.cleanup.cleanup_temp_files",
# ],
},
"hourly": [
# 每小时执行
],
"daily": [
# 每天执行
],
"weekly": [
# 每周执行
],
"monthly": [
# 每月执行
],
}
# ========== 其他钩子 ==========
# 扩展启动信息
extend_bootinfo = []
# 权限查询条件
permission_query_conditions = {}
# 权限检查
has_permission = {}
# 标准查询
standard_queries = {}
# Jinja 过滤器和方法
jinja = {
"methods": [],
"filters": [],
}

View File

@ -11,18 +11,23 @@ from contextlib import asynccontextmanager
from jingrow.utils.router_auto_register import include_routers_from_package from jingrow.utils.router_auto_register import include_routers_from_package
from jingrow.services.scheduler import start_scheduler, stop_scheduler from jingrow.services.scheduler import start_scheduler, stop_scheduler
from jingrow.services.whitelist import router from jingrow.services.whitelist import router
from jingrow.services.scheduler import get_scheduler_status
from jingrow.core.hooks.init_hooks import init_hooks
@asynccontextmanager @asynccontextmanager
async def lifespan(app: FastAPI): async def lifespan(app: FastAPI):
"""应用生命周期管理""" """应用生命周期管理"""
# 启动事件 # 启动事件
try: try:
# 初始化钩子系统
init_hooks()
print("钩子系统已初始化")
# 启动调度器 # 启动调度器
await start_scheduler() await start_scheduler()
print("调度器已启动") print("调度器已启动")
# 验证调度器状态 # 验证调度器状态
from jingrow.services.scheduler import get_scheduler_status
status = get_scheduler_status() status = get_scheduler_status()
print(f"调度器状态: 运行={status['running']}, 任务数={status['total_jobs']}") print(f"调度器状态: 运行={status['running']}, 任务数={status['total_jobs']}")

View File

@ -43,30 +43,98 @@ def get_pg(*args, **kwargs):
class Page(BasePage): class Page(BasePage):
"""最小可用 Page复用 jingrow 本地数据库 APIsave_pg等 """最小可用 Page复用 jingrow 本地数据库 APIsave_pg等
仅保留 pagetype 生命周期接口insert/save/delete/run_method 仅保留 pagetype 生命周期接口insert/save/delete/run_method
集成现代化的 hooks 系统
""" """
def insert(self, **kwargs) -> "Page": def insert(self, **kwargs) -> "Page":
"""插入页面,触发相关钩子"""
self.set("__islocal", True) self.set("__islocal", True)
# 触发 before_insert 钩子
self._execute_hook("before_insert")
# 本地方法(向后兼容)
self.run_method("before_insert") self.run_method("before_insert")
# 保存到数据库
jingrow.save_pg(self) jingrow.save_pg(self)
# 触发 after_insert 钩子
self._execute_hook("after_insert")
# 本地方法(向后兼容)
self.run_method("after_insert") self.run_method("after_insert")
return self return self
def save(self, **kwargs) -> "Page": def save(self, **kwargs) -> "Page":
"""保存页面,触发相关钩子"""
# 触发 before_save 钩子(如果存在)
if not self.get("name"):
# 新记录,触发 before_insert
self._execute_hook("before_insert")
else:
# 更新记录,触发 on_update
self._execute_hook("on_update", **kwargs)
# 本地方法(向后兼容)
self.run_method("before_save") self.run_method("before_save")
# 保存到数据库
jingrow.save_pg(self) jingrow.save_pg(self)
# 如果之前触发了 before_insert现在触发 after_insert
if not self.get("name") or self.get("__islocal"):
self._execute_hook("after_insert")
else:
self._execute_hook("on_update", **kwargs)
# 本地方法(向后兼容)
self.run_method("on_update") self.run_method("on_update")
return self return self
def delete(self, **kwargs): def delete(self, **kwargs):
"""删除页面,触发相关钩子"""
# 触发 on_trash 钩子
self._execute_hook("on_trash", **kwargs)
# 本地方法(向后兼容)
self.run_method("on_trash") self.run_method("on_trash")
return jingrow.delete_pg(self.pagetype, self.name) return jingrow.delete_pg(self.pagetype, self.name)
def run_method(self, method: str, *args, **kwargs): def run_method(self, method: str, *args, **kwargs):
"""运行本地方法(向后兼容)"""
fn = getattr(self, method, None) fn = getattr(self, method, None)
if callable(fn): if callable(fn):
return fn(*args, **kwargs) return fn(*args, **kwargs)
return None return None
def _execute_hook(self, event_name: str, **kwargs):
"""执行钩子(私有方法)"""
try:
from jingrow.core.hooks import execute_hook
# 构建钩子名称page.{pagetype}.{event_name}
hook_name = f"page.{self.pagetype}.{event_name}"
# 执行钩子
execute_hook(
hook_name,
sender=self.pagetype,
page=self,
**kwargs
)
except ImportError:
# 如果钩子系统未安装,静默失败(向后兼容)
pass
except Exception as e:
# 记录错误但继续执行(防止钩子错误影响主流程)
import logging
logger = logging.getLogger(__name__)
logger.warning(f"执行钩子失败 {event_name}: {e}", exc_info=True)
# 兼容云端常用辅助 # 兼容云端常用辅助
def as_dict(self) -> dict[str, Any]: def as_dict(self) -> dict[str, Any]: