2025-10-24 23:10:22 +08:00

160 lines
4.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Copyright (c) 2025, JINGROW
# License: MIT. See LICENSE
from __future__ import annotations
import importlib
import weakref
from functools import cached_property
from typing import Any, Iterable, TYPE_CHECKING, TypeVar
import jingrow
from jingrow.modules import load_pagetype_module
if TYPE_CHECKING:
from jingrow.model.page import Page
D = TypeVar("D", bound="Page")
def get_controller(pagetype: str):
"""
返回给定 PageType 的类对象(带本地缓存)。
本地版不依赖数据库元数据,直接走模块加载规则。
"""
if not pagetype:
raise ImportError("Empty pagetype")
site_controllers = jingrow.controllers.setdefault(jingrow.local.site, {})
if pagetype not in site_controllers:
site_controllers[pagetype] = import_controller(pagetype)
return site_controllers[pagetype]
def import_controller(pagetype: str):
"""
解析并加载 pagetype 对应的 Python 模块与类。
规则对齐云端版:
- 模块路径jingrow.<package>.pagetype.<snake>/<snake>.py
- 类名:去空格/连字符后的驼峰。如 "Local Ai Agent""LocalAIAgent"
- 支持 hooksoverride_pagetype_class可选
"""
module_name = "Core"
# hooks 覆盖(兼容云端约定)
get_hooks = getattr(jingrow, "get_hooks", lambda *a, **k: {})
class_overrides = get_hooks("override_pagetype_class")
module_path = None
classname = pagetype.replace(" ", "").replace("-", "")
if class_overrides and class_overrides.get(pagetype):
import_path = class_overrides[pagetype][-1]
module_path, classname = import_path.rsplit(".", 1)
get_module = getattr(jingrow, "get_module", importlib.import_module)
module = get_module(module_path)
else:
module = load_pagetype_module(pagetype, module_name)
class_ = getattr(module, classname, None)
if class_ is None:
raise ImportError(
pagetype
if module_path is None
else f"{pagetype}: {classname} does not exist in module {module_path}"
)
# 为了保持接口一致,这里不强制继承 BasePage但推荐继承。
return class_
class BasePage:
"""本地版最小可用 BasePage保持与云端关键接口一致。
仅实现 pagetype 机制需要的通用数据容器与子表能力,复杂校验与数据库操作
交由现有本地框架jingrow.save_pg / get_pg 等)处理。
"""
_reserved_keywords = frozenset(
(
"pagetype",
"meta",
"flags",
"parent_pg",
"_table_fieldnames",
"_reserved_keywords",
"permitted_fieldnames",
)
)
def __init__(self, d: dict[str, Any]):
if d.get("pagetype"):
self.pagetype = d["pagetype"]
self.flags = jingrow._dict()
self._table_fieldnames = set()
self.update(d)
if hasattr(self, "__setup__"):
self.__setup__()
@cached_property
def meta(self):
# 需要元数据时,走本地 jingrow.get_meta
return jingrow.get_meta(self.pagetype)
def update(self, d: dict[str, Any]):
if "name" in d:
self.name = d["name"]
for key, value in d.items():
self.set(key, value)
return self
def get(self, key: str, default: Any = None):
return self.__dict__.get(key, default)
def set(self, key: str, value: Any):
if key in self._reserved_keywords:
return
self.__dict__[key] = value
def append(self, key: str, value: D | dict | None = None) -> D:
if value is None:
value = {}
if (table := self.__dict__.get(key)) is None:
self.__dict__[key] = table = []
d = self._init_child(value, key)
table.append(d)
d.parent_pg = weakref.ref(self)
return d
def extend(self, key: str, value: Iterable[dict | D]):
for v in value:
self.append(key, v)
def _init_child(self, value: dict | D, key: str):
if not isinstance(value, BasePage):
if not (pagetype := self.get_table_field_pagetype(key)):
raise AttributeError(key)
value["pagetype"] = pagetype
child_cls = get_controller(pagetype)
value = child_cls(value)
value.parent = getattr(self, "name", None)
value.parenttype = self.pagetype
value.parentfield = key
if not getattr(value, "idx", None):
table = getattr(self, key, None) or []
value.idx = len(table) + 1
if not getattr(value, "name", None):
value.__dict__["__islocal"] = 1
return value
def get_table_field_pagetype(self, fieldname: str) -> str | None:
try:
return self.meta.get_field(fieldname).options
except Exception:
return None