117 lines
4.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Copyright (c) 2025, JINGROW
# License: MIT. See LICENSE
from typing import Any, Dict, List, Optional
from jingrow.services.runtime import get_adapter
from jingrow.core.hooks import execute_hook, execute_hook_async
import asyncio
class Page:
"""基于适配器的通用 Page 模型,提供常用 CRUD 与钩子机制。
钩子事件命名与 API 对齐after_insert, on_update, on_trash 等。
"""
def __init__(self, pagetype: str):
self.pagetype = pagetype
# ---------------------- 内部工具 ----------------------
@property
def adapter(self):
return get_adapter()
def _execute_hook(self, name: str, record_name: str, data: Optional[Dict[str, Any]] = None) -> bool:
try:
wildcard_hook = f"page.*.{name}"
res = self.get(record_name)
if isinstance(res, dict) and res.get('success'):
pg_obj = res.get('data') or {}
else:
pg_obj = None
execute_hook(
wildcard_hook,
sender=self.pagetype,
page=pg_obj,
event=name,
data=(data or {}),
)
return True
except Exception:
return False
def _execute_hook_async(self, name: str, record_name: str, data: Optional[Dict[str, Any]] = None) -> bool:
try:
wildcard_hook = f"page.*.{name}"
res = self.get(record_name)
if isinstance(res, dict) and res.get('success'):
pg_obj = res.get('data') or {}
else:
pg_obj = None
async def _run():
await execute_hook_async(
wildcard_hook,
sender=self.pagetype,
page=pg_obj,
event=name,
data=(data or {}),
)
asyncio.create_task(_run())
return True
except Exception:
return False
# ---------------------- CRUD ----------------------
def get(self, name: str) -> Dict[str, Any]:
return self.adapter.get_pg(self.pagetype, name)
def list(self, filters: Optional[List[List[Any]]] = None,
fields: Optional[List[str]] = None,
limit: Optional[int] = None) -> Dict[str, Any]:
return self.adapter.get_pg_list(self.pagetype, filters=filters, fields=fields, limit=limit)
def create(self, data: Dict[str, Any]) -> Dict[str, Any]:
result = self.adapter.create_pg(self.pagetype, data)
if result.get('success'):
created = result.get('data', {})
record_name = created.get('name') or data.get('name')
if record_name:
self._execute_hook('after_insert', record_name, created)
return result
def update(self, name: str, data: Dict[str, Any]) -> Dict[str, Any]:
result = self.adapter.update_pg(self.pagetype, name, data)
if result.get('success'):
self._execute_hook('on_update', name, data)
return result
def delete(self, name: str) -> Dict[str, Any]:
# 先发同步 on_trash再删除
self._execute_hook('on_trash', name)
return self.adapter.delete_pg(self.pagetype, name)
# ---------------------- 其他便捷能力 ----------------------
def get_meta(self) -> Dict[str, Any]:
return self.adapter.get_page_meta(self.pagetype)
def get_module_app(self) -> Dict[str, Any]:
return self.adapter.get_pagetype_module_app(self.pagetype)
def get_pg_id(self, filters: Optional[List[List[Any]]] = None,
field: Optional[str] = None, value: Optional[str] = None,
site: Optional[str] = None) -> Dict[str, Any]:
return self.adapter.get_pg_id(self.pagetype, filters=filters, field=field, value=value, site=site)
def get_single(self) -> Dict[str, Any]:
return self.adapter.get_single_pagetype(self.pagetype)
def get_field_mapping_from_jingrow(self) -> Dict[str, str]:
return self.adapter.get_field_mapping_from_jingrow(self.pagetype)
def get_field_value_from_jingrow(self, name: str, fieldname: str) -> Optional[Any]:
return self.adapter.get_field_value_from_jingrow(self.pagetype, name, fieldname)