增加runtime服务层,重构Page和api适配器

This commit is contained in:
jingrow 2025-10-31 18:27:40 +08:00
parent 36b8a02e73
commit 7b992b63ba
4 changed files with 546 additions and 497 deletions

View File

@ -9,7 +9,6 @@ import os
from jingrow.model.page import Page
from jingrow.config import Config
_adapter = None
_local = {}
# 统一 Jingrow 日志记录器(仅为本模块及调用方提供最小可用输出,不修改全局 root logger
@ -66,44 +65,67 @@ def _ensure_logging_configured() -> None:
# 与 root 保持同级别,避免级别不一致导致丢日志
_root_logger.setLevel(root_logger.level)
def _init_adapter(run_mode: str = "api"):
global _adapter
if run_mode == "api":
from .adapters.api_adapter import ApiAdapter
_adapter = ApiAdapter()
elif run_mode == "local":
from .adapters.local_adapter import LocalAdapter
_adapter = LocalAdapter()
else:
raise ValueError(f"Unsupported run_mode: {run_mode}. Supported modes: 'api', 'local'")
# ====== High-level helpers to enforce Page lifecycle (hooks) ======
def get_pg(pagetype: str, name: str = None, **kwargs):
return _adapter.get_pg(pagetype, name, **kwargs)
def get_pg(pagetype: str, name: str):
"""获取单条记录并转为可属性访问的对象,失败返回 None。"""
pg = Page(pagetype)
res = pg.get(name)
if not isinstance(res, dict) or not res.get('success'):
return None
data = res.get('data') or {}
return data
def get_list(pagetype: str, *args, **kwargs):
return _adapter.get_list(pagetype, *args, **kwargs)
def get_all(pagetype: str, *args, **kwargs):
return _adapter.get_list(pagetype, *args, **kwargs)
def create_pg(pagetype: str, data: Dict[str, Any]):
"""创建记录,返回创建后的数据对象或 None。"""
pg = Page(pagetype)
res = pg.create(data)
if not isinstance(res, dict) or not res.get('success'):
return None
created = res.get('data') or {}
return created
def new_pg(pagetype: str, **kwargs):
return _adapter.new_pg(pagetype, **kwargs)
def save_pg(pg):
return _adapter.save_pg(pg)
def update_pg(pagetype: str, name: str, data: Dict[str, Any]):
"""更新记录,成功返回更新后的数据对象或 True失败返回 False。"""
pg = Page(pagetype)
res = pg.update(name, data)
if not isinstance(res, dict) or not res.get('success'):
return False
updated = res.get('data')
if updated is None:
return True
return updated
def delete_pg(pagetype: str, name: str) -> bool:
pg = Page(pagetype)
res = pg.delete(name)
return bool(isinstance(res, dict) and res.get('success'))
def get_list(pagetype: str, filters: List[List[Any]] = None, fields: List[str] = None, limit: int = None):
"""获取记录列表,返回对象列表;失败返回空列表。"""
pg = Page(pagetype)
res = pg.list(filters=filters, fields=fields, limit=limit)
if not isinstance(res, dict) or not res.get('success'):
return []
items = res.get('data') or []
return items
def get_all(pagetype: str, filters: List[List[Any]] = None, fields: List[str] = None):
return get_list(pagetype, filters=filters, fields=fields, limit=None)
def get_single(pagetype: str):
"""获取 single 类型 pagetype 配置,返回 {success, config|error} 结构。"""
return Page(pagetype).get_single()
def delete_pg(pagetype: str, name: str):
return _adapter.delete_pg(pagetype, name)
def db_exists(pagetype: str, filters: Dict[str, Any] = None):
return _adapter.db_exists(pagetype, filters)
def db_get_value(pagetype: str, filters: Dict[str, Any] = None, fieldname: str = None):
return _adapter.db_get_value(pagetype, filters, fieldname)
def db_set_value(pagetype: str, filters: Dict[str, Any] = None, fieldname: str = None, value: Any = None):
return _adapter.db_set_value(pagetype, filters, fieldname, value)
def log_error(title: Optional[str] = None, message: Optional[str] = None, *, exc: Optional[BaseException] = None) -> None:
"""输出错误日志到终端。
@ -121,9 +143,6 @@ def log_error(title: Optional[str] = None, message: Optional[str] = None, *, exc
content = str(message) if title is None else f"{title} - {message}"
_root_logger.error(content, exc_info=exc)
def throw(title: str, message: str):
return _adapter.throw(title, message)
def _dict():
"""创建一个空字典"""
return {}
@ -182,13 +201,3 @@ def is_whitelisted(api_path: str) -> bool:
def get_whitelisted_function(api_path: str):
return _whitelisted_functions.get(api_path)
# 兼容性属性
db = type('db', (), {
'exists': db_exists,
'get_value': db_get_value,
'set_value': db_set_value,
})()
# 初始化默认配置
_init_adapter(Config.run_mode)

View File

@ -7,12 +7,12 @@ Jingrow API 适配器, 通过 API 调用 Jingrow SaaS 版的服务
from typing import Dict, List, Any, Optional, Union
import json
import logging
import os
import datetime
import requests
from jingrow.model.page import Page
import pytz
from jingrow.config import Config
logger = logging.getLogger(__name__)
from jingrow import log_error
class ApiAdapter:
"""API 适配器 - 通过 API 调用 Jingrow SaaS 版"""
@ -22,7 +22,6 @@ class ApiAdapter:
self.api_key = Config.jingrow_api_key
self.api_secret = Config.jingrow_api_secret
self.session_cookie = Config.jingrow_session_cookie
logger.info(f"API adapter initialized with URL: {self.api_url}")
def _get_headers(self):
"""获取请求头"""
@ -42,7 +41,8 @@ class ApiAdapter:
def _make_request(self, method: str, endpoint: str, data: Dict = None):
"""发送 API 请求"""
url = f"{self.api_url}/{endpoint.lstrip('/')}"
# 允许传入完整 URL否则按相对路径拼接
url = endpoint if str(endpoint).startswith("http") else f"{self.api_url}/{str(endpoint).lstrip('/')}"
headers = self._get_headers()
try:
@ -61,316 +61,388 @@ class ApiAdapter:
return response.json()
except requests.exceptions.RequestException as e:
logger.error(f"API request failed: {e}")
raise
def _create_page(self, pagetype: str, name: str = None, **data):
"""创建 Page 对象的工厂方法"""
page_data = {"pagetype": pagetype, **data}
if name is not None:
page_data["name"] = name
return Page(page_data)
def _build_endpoint(self, pagetype: str, name: str = None) -> str:
"""构建 API 端点"""
endpoint = f"api/data/{pagetype}"
if name:
endpoint += f"/{name}"
return endpoint
def get_pg(self, *args, **kwargs):
"""获取页面类型文档 - 对齐云端 get_pg 接口
支持多种调用方式
- get_pg("PageType", "name") - 获取指定文档
- get_pg({"pagetype": "PageType", ...}) - 从字典创建文档
- get_pg(pagetype="PageType", field=...) - 关键字参数方式
- get_pg("PageType", "name", for_update=True) - 带更新锁
"""
pagetype = None
name = None
# 处理参数
if args:
if isinstance(args[0], str):
pagetype = args[0]
if len(args) > 1:
name = args[1]
elif isinstance(args[0], dict):
# 从字典创建文档
kwargs = args[0]
def get_logged_user(self, session_cookie: Optional[str] = None) -> Optional[str]:
try:
api_url = f"{self.api_url}/api/action/jingrow.auth.get_logged_user"
cookie = session_cookie or self.session_cookie
if not cookie:
return None
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"Cookie": f"sid={cookie}"
}
resp = requests.get(api_url, headers=headers, timeout=8)
if resp.status_code == 200:
data = resp.json()
if isinstance(data, dict) and 'message' in data:
return data['message']
return data
return None
except Exception:
return None
def upload_file_to_jingrow(self, file_data: bytes, filename: str,
attached_to_pagetype: Optional[str] = None,
attached_to_name: Optional[str] = None,
attached_to_field: Optional[str] = None) -> Dict[str, Any]:
try:
api_url = f"{self.api_url}/api/action/upload_file"
headers = self._get_headers().copy()
# 让 requests 自动设置 multipart/form-data
if 'Content-Type' in headers:
del headers['Content-Type']
files = {
'file': (filename, file_data, 'application/octet-stream')
}
data: Dict[str, Any] = {
'file_name': filename,
'is_private': 0
}
if attached_to_pagetype and attached_to_name:
data['pagetype'] = attached_to_pagetype
data['docname'] = attached_to_name
if attached_to_field:
data['fieldname'] = attached_to_field
response = requests.post(api_url, files=files, data=data, headers=headers, timeout=30)
if response.status_code == 200:
result = response.json()
if result.get('session_expired'):
return {'success': False, 'error': 'Session已过期请重新登录'}
if result.get('message'):
return {
'success': True,
'file_url': result['message'].get('file_url'),
'file_name': result['message'].get('file_name')
}
return {'success': False, 'error': 'API响应格式错误'}
else:
raise ValueError("First non keyword argument must be a string or dict")
# 从 kwargs 中提取 pagetype
if pagetype is None and kwargs:
if "pagetype" in kwargs:
pagetype = kwargs["pagetype"]
error_text = response.text
log_error(f"API请求失败: {response.status_code}, 响应: {error_text}")
return {'success': False, 'error': f'API请求失败 (HTTP {response.status_code}): {error_text}'}
except Exception as e:
return {'success': False, 'error': f'调用upload_file API异常: {str(e)}'}
def get_field_mapping_from_jingrow(self, pagetype: str) -> Dict[str, str]:
try:
api_url = f"{self.api_url}/api/action/jingrow.ai.utils.jlocal.get_field_mapping"
headers = self._get_headers()
data = {'pagetype': pagetype}
response = requests.post(api_url, json=data, headers=headers, timeout=10)
if response.status_code == 200:
result = response.json()
if 'message' in result:
result = result['message']
if result.get('success'):
return result.get('field_mapping', {})
else:
log_error(f"获取字段映射失败: {result.get('error', '未知错误')}")
return {}
else:
raise ValueError('"pagetype" is a required key')
logger.info(f"API get_pg: {pagetype}, {name}")
# 如果只有 pagetype 没有 name返回列表
if name is None:
# 从 kwargs 中移除 pagetype避免参数冲突
list_kwargs = kwargs.copy()
list_kwargs.pop('pagetype', None)
return self.get_list(pagetype, **list_kwargs)
# 获取单个文档
endpoint = self._build_endpoint(pagetype, name)
data = self._make_request('GET', endpoint, kwargs)
# 如果 API 返回的数据中有 name先移除它使用我们指定的 name
if isinstance(data, dict) and 'name' in data:
api_name = data.pop('name')
return self._create_page(pagetype, name, **data)
def get_list(self, pagetype: str, fields=None, filters=None, group_by=None,
order_by=None, limit_start=None, limit_page_length=20,
parent=None, debug=False, as_dict=True, or_filters=None, **kwargs):
"""获取页面类型列表 - 对齐云端 get_list 接口
:param pagetype: PageType on which query is to be made.
:param fields: List of fields or `*`.
:param filters: List of filters (see example).
:param order_by: Order By e.g. `modified desc`.
:param limit_start: Start results at record #. Default 0.
:param limit_page_length: No of records in the page. Default 20.
:param group_by: Group by field.
:param or_filters: OR filter conditions.
:param debug: Debug mode.
:param as_dict: Return as dict format.
Example usage:
- get_list("ToDo", fields=["name", "description"], filters={"owner":"test@example.com"})
- get_list("ToDo", fields="*", filters=[["modified", ">", "2014-01-01"]])
"""
logger.info(f"API get_list: {pagetype}")
# 构建查询参数
params = {}
if fields is not None:
# API 期望 JSON 字符串
params['fields'] = json.dumps(fields) if not isinstance(fields, str) else fields
if filters is not None:
# API 期望 JSON 字符串
params['filters'] = json.dumps(filters) if not isinstance(filters, str) else filters
if group_by is not None:
params['group_by'] = group_by
if order_by is not None:
params['order_by'] = order_by
if limit_start is not None:
params['limit_start'] = limit_start
if limit_page_length != 20: # 默认值不需要传递
params['limit_page_length'] = limit_page_length
if parent is not None:
params['parent'] = parent
if debug:
params['debug'] = debug
if not as_dict: # 默认 True只有 False 时才传递
params['as_dict'] = as_dict
if or_filters is not None:
params['or_filters'] = or_filters
# 合并其他参数
params.update(kwargs)
endpoint = self._build_endpoint(pagetype)
data = self._make_request('GET', endpoint, params)
return [self._create_page(pagetype, item.pop('name', None), **item) for item in data]
def new_pg(self, pagetype: str, *, parent_pg=None, parentfield=None, as_dict=False, **kwargs):
"""创建新的页面类型文档 - 对齐云端 new_pg 接口
:param pagetype: PageType of the new document.
:param parent_pg: [optional] add to parent document.
:param parentfield: [optional] add against this `parentfield`.
:param as_dict: [optional] return as dictionary instead of Page.
:param kwargs: [optional] You can specify fields as field=value pairs in function call.
"""
logger.info(f"API new_pg: {pagetype}")
# 构建新文档数据
new_data = {
"pagetype": pagetype,
"__islocal": 1,
"pagestatus": 0,
**kwargs
}
# 如果有父文档,添加父文档信息
if parent_pg:
new_data["parent"] = parent_pg.name if hasattr(parent_pg, 'name') else str(parent_pg)
new_data["parenttype"] = parent_pg.pagetype if hasattr(parent_pg, 'pagetype') else None
if parentfield:
new_data["parentfield"] = parentfield
if as_dict:
return new_data
else:
# 移除 pagetype 避免参数冲突
pagetype_data = new_data.pop('pagetype')
return self._create_page(pagetype_data, None, **new_data)
def save_pg(self, pg, ignore_permissions=False, ignore_validate=False, ignore_mandatory=False):
"""保存页面类型文档 - 对齐云端 save_pg 接口
:param pg: Page object to save
:param ignore_permissions: Ignore permission checks
:param ignore_validate: Ignore validation
:param ignore_mandatory: Ignore mandatory field checks
"""
logger.info(f"API save_pg: {pg.pagetype}, {getattr(pg, 'name', None)}")
endpoint = self._build_endpoint(pg.pagetype, getattr(pg, 'name', None))
method = 'PUT' if getattr(pg, 'name', None) else 'POST'
data = pg.as_dict()
# 添加保存选项
if ignore_permissions:
data['ignore_permissions'] = True
if ignore_validate:
data['ignore_validate'] = True
if ignore_mandatory:
data['ignore_mandatory'] = True
result = self._make_request(method, endpoint, data)
# 更新文档对象
if result:
# 直接更新对象的属性
for key, value in result.items():
setattr(pg, key, value)
return pg
def delete_pg(self, pagetype: str, name: str, ignore_missing=False):
"""删除页面类型文档 - 对齐云端 delete_pg 接口
:param pagetype: PageType name
:param name: Document name
:param ignore_missing: Ignore if document doesn't exist
"""
logger.info(f"API delete_pg: {pagetype}, {name}")
endpoint = self._build_endpoint(pagetype, name)
params = {'ignore_missing': ignore_missing} if ignore_missing else {}
self._make_request('DELETE', endpoint, params)
return True
def db_exists(self, pagetype: str, filters: Dict[str, Any] = None, cache=False):
"""检查文档是否存在 - 对齐云端 db.exists 接口
:param pagetype: PageType name
:param filters: Filter conditions (dict or name string)
:param cache: Enable caching for single document checks
Examples:
- db_exists("User", "jane@example.org", cache=True)
- db_exists("User", {"full_name": "Jane Doe"})
- db_exists({"pagetype": "User", "full_name": "Jane Doe"})
"""
logger.info(f"API db_exists: {pagetype}, {filters}")
# 处理字典格式的 pagetype
if isinstance(pagetype, dict):
filters = pagetype.copy()
pagetype = filters.pop("pagetype")
# 如果 filters 是字符串,转换为字典
if isinstance(filters, str):
filters = {"name": filters}
endpoint = self._build_endpoint(pagetype)
params = {'filters': filters} if filters else {}
if cache:
params['cache'] = cache
data = self._make_request('GET', endpoint, params)
return len(data) > 0
def db_get_value(self, pagetype: str, filters: Dict[str, Any] = None, fieldname: str = None,
ignore=False, cache=False, order_by=None, parent=None):
"""获取数据库值 - 对齐云端 db.get_value 接口
:param pagetype: PageType name
:param filters: Filter conditions (dict or name string)
:param fieldname: Field name to get value for
:param ignore: Ignore if document doesn't exist
:param cache: Enable caching
:param order_by: Order by field
:param parent: Parent document for child table queries
Examples:
- db_get_value("User", "jane@example.org", "full_name")
- db_get_value("User", {"email": "jane@example.org"}, "full_name")
"""
logger.info(f"API db_get_value: {pagetype}, {filters}, {fieldname}")
# 如果 filters 是字符串,转换为字典
if isinstance(filters, str):
filters = {"name": filters}
endpoint = self._build_endpoint(pagetype)
params = {
'filters': filters,
'fieldname': fieldname
}
if ignore:
params['ignore'] = ignore
if cache:
params['cache'] = cache
if order_by:
params['order_by'] = order_by
if parent:
params['parent'] = parent
data = self._make_request('GET', endpoint, params)
return data
def db_set_value(self, pagetype: str, filters: Dict[str, Any] = None, fieldname: str = None,
value: Any = None, modified=None, modified_by=None, update_modified=True, debug=False):
"""设置数据库值 - 对齐云端 db.set_value 接口
:param pagetype: PageType name
:param filters: Filter conditions (dict or name string)
:param fieldname: Field name to set value for
:param value: Value to set
:param modified: Use this as the modified timestamp
:param modified_by: Set this user as modified_by
:param update_modified: Update the modified timestamp
:param debug: Debug mode
Examples:
- db_set_value("User", "jane@example.org", "full_name", "Jane Doe")
- db_set_value("User", {"email": "jane@example.org"}, "full_name", "Jane Doe")
"""
logger.info(f"API db_set_value: {pagetype}, {filters}, {fieldname}, {value}")
# 如果 filters 是字符串,转换为字典
if isinstance(filters, str):
filters = {"name": filters}
endpoint = self._build_endpoint(pagetype)
data = {
'filters': filters,
'fieldname': fieldname,
'value': value
}
if modified:
data['modified'] = modified
if modified_by:
data['modified_by'] = modified_by
if not update_modified:
data['update_modified'] = update_modified
if debug:
data['debug'] = debug
self._make_request('PUT', endpoint, data)
return True
log_error(f"获取字段映射失败: HTTP {response.status_code}")
return {}
except Exception as e:
log_error(f"获取字段映射异常: {str(e)}")
return {}
def get_field_value_from_jingrow(self, pagetype: str, name: str, fieldname: str) -> Optional[Any]:
try:
api_url = f"{self.api_url}/api/action/jingrow.ai.utils.jlocal.get_field_value"
headers = self._get_headers()
data = { 'pagetype': pagetype, 'name': name, 'fieldname': fieldname }
response = requests.post(api_url, json=data, headers=headers, timeout=10)
if response.status_code == 200:
result = response.json()
if 'message' in result:
result = result['message']
if result.get('success'):
return result.get('value')
else:
log_error(f"获取字段值失败: {result.get('error', '未知错误')}")
return None
else:
log_error(f"获取字段值失败: HTTP {response.status_code}")
return None
except Exception as e:
log_error(f"获取字段值异常: {str(e)}")
return None
def get_ai_settings_from_jingrow(self) -> Optional[Dict[str, Any]]:
try:
api_url = f"{self.api_url}/api/action/jingrow.ai.utils.jlocal.get_ai_settings"
headers = self._get_headers()
response = requests.post(api_url, headers=headers, timeout=10)
if response.status_code == 200:
result = response.json()
if result.get('session_expired'):
log_error("Session已过期请重新登录")
return None
if 'message' in result:
result = result['message']
if result.get('success'):
return result.get('config', {})
else:
log_error(f"获取AI Settings失败: {result.get('error', '未知错误')}")
return None
else:
log_error(f"获取AI Settings失败: HTTP {response.status_code}")
return None
except Exception as e:
log_error(f"获取AI Settings配置异常: {str(e)}")
return None
def get_agent_detail(self, name: str, session_cookie: Optional[str] = None) -> Optional[Dict[str, Any]]:
try:
api_url = f"{self.api_url}/api/action/jingrow.ai.utils.jlocal.get_local_ai_agent_detail"
headers = self._get_headers()
if not headers:
log_error("JINGROW_API_KEY 或 JINGROW_API_SECRET 未配置")
return None
payload = {"name": name}
response = requests.post(api_url, headers=headers, json=payload, timeout=15)
if response.status_code == 200:
data = response.json()
if isinstance(data, dict) and 'message' in data:
message = data['message']
if isinstance(message, dict) and message.get('success'):
return message.get('data')
return data
else:
log_error(f"Failed to get agent detail: HTTP {response.status_code}: {response.text}")
return None
except Exception as e:
log_error(f"获取智能体详情异常: {str(e)}")
return None
def get_pg(self, pagetype: str, name: str, session_cookie: Optional[str] = None) -> Dict[str, Any]:
try:
api_url = f"{self.api_url}/api/data/{pagetype}/{name}"
headers = self._get_headers()
if not headers:
return {'success': False, 'error': 'JINGROW_API_KEY 或 JINGROW_API_SECRET 未配置'}
response = requests.get(api_url, headers=headers, timeout=15)
if response.status_code == 200:
data = response.json()
if isinstance(data, dict) and 'data' in data:
return {'success': True, 'data': data['data']}
return {'success': True, 'data': data}
else:
return {'success': False, 'error': f"HTTP {response.status_code}: {response.text}"}
except Exception as e:
return {'success': False, 'error': f"获取记录异常: {str(e)}"}
def create_pg(self, pagetype: str, payload: Dict[str, Any]) -> Dict[str, Any]:
try:
api_url = f"{self.api_url}/api/data/{pagetype}"
headers = self._get_headers()
response = requests.post(api_url, json=payload, headers=headers, timeout=20)
if response.status_code in (200, 201):
data = response.json()
if isinstance(data, dict) and 'data' in data:
return {'success': True, 'data': data['data']}
return {'success': True, 'data': data}
else:
return {'success': False, 'error': f"HTTP {response.status_code}: {response.text}"}
except Exception as e:
return {'success': False, 'error': f"创建记录异常: {str(e)}"}
def update_pg(self, pagetype: str, name: str, payload: Dict[str, Any]) -> Dict[str, Any]:
try:
api_url = f"{self.api_url}/api/data/{pagetype}/{name}"
headers = self._get_headers()
response = requests.put(api_url, json=payload, headers=headers, timeout=20)
if response.status_code in (200, 202):
data = response.json()
if isinstance(data, dict) and 'data' in data:
return {'success': True, 'data': data['data']}
return {'success': True, 'data': data}
else:
return {'success': False, 'error': f"HTTP {response.status_code}: {response.text}"}
except Exception as e:
return {'success': False, 'error': f"更新记录异常: {str(e)}"}
def update_local_job(self, data: Dict[str, Any]) -> Dict[str, Any]:
return self.push_local_job_to_jingrow(data)
def push_local_job_to_jingrow(self, data: Dict[str, Any]) -> Dict[str, Any]:
try:
api_url = f"{self.api_url}/api/action/jingrow.core.pagetype.local_job.local_job.push_local_job"
headers = self._get_headers()
if not headers:
return {'success': False, 'error': 'JINGROW_API_KEY 或 JINGROW_API_SECRET 未配置'}
headers.setdefault("X-Requested-With", "XMLHttpRequest")
site_name = getattr(Config, "JINGROW_SITE", None) or os.getenv("JINGROW_SITE")
if site_name:
headers["X-Frappe-Site-Name"] = site_name
resp = requests.post(api_url, json=data, headers=headers, timeout=10)
if resp.status_code == 200:
body = resp.json()
if isinstance(body, dict) and 'message' in body:
body = body['message']
if isinstance(body, dict) and body.get('success'):
return {'success': True}
log_error(f"[JFLOW->JINGROW] 推送失败(body): {body}")
return {'success': False, 'error': str(body)}
log_error(f"[JFLOW->JINGROW] 推送失败(HTTP): {resp.status_code} {resp.text}")
return {'success': False, 'error': f"HTTP {resp.status_code}: {resp.text}"}
except Exception as e:
log_error(f"[JFLOW->JINGROW] 推送异常: {str(e)}")
return {'success': False, 'error': f"推送 Local Job 失败: {str(e)}"}
def delete_pg(self, pagetype: str, name: str) -> Dict[str, Any]:
try:
api_url = f"{self.api_url}/api/data/{pagetype}/{name}"
headers = self._get_headers()
response = requests.delete(api_url, headers=headers, timeout=15)
if response.status_code in (200, 202):
return {'success': True}
else:
return {'success': False, 'error': f"HTTP {response.status_code}: {response.text}"}
except Exception as e:
return {'success': False, 'error': f"删除记录异常: {str(e)}"}
def map_fields_by_labels(self, field_map: List[Dict[str, Any]], ai_outputs: Dict[str, Any], label_to_fieldname: Dict[str, str]) -> Dict[str, Any]:
record_data: Dict[str, Any] = {}
fieldname_set = set(label_to_fieldname.values())
for mapping in field_map or []:
source_key = mapping.get('from')
to_raw = mapping.get('to')
if not source_key or to_raw is None:
continue
value = ai_outputs.get(source_key)
if value is None:
continue
to_field = label_to_fieldname.get(str(to_raw).strip())
if not to_field and str(to_raw).strip() in fieldname_set:
to_field = str(to_raw).strip()
if not to_field:
continue
record_data[to_field] = value
return record_data
def get_pg_list(self, pagetype: str, filters: Optional[List[List[Any]]] = None,
fields: Optional[List[str]] = None, limit: Optional[int] = None) -> Dict[str, Any]:
try:
base_url = f"{self.api_url}/api/data/{pagetype}"
params: Dict[str, str] = {}
if filters:
params['filters'] = json.dumps(filters, ensure_ascii=False)
if fields:
params['fields'] = json.dumps(fields, ensure_ascii=False)
if isinstance(limit, int) and limit > 0:
params['limit'] = str(limit)
headers = self._get_headers()
response = requests.get(base_url, params=params, headers=headers, timeout=20)
if response.status_code == 200:
data = response.json()
if isinstance(data, dict) and 'data' in data:
return {'success': True, 'data': data['data']}
if isinstance(data, list):
return {'success': True, 'data': data}
return {'success': True, 'data': [data]}
else:
return {'success': False, 'error': f"HTTP {response.status_code}: {response.text}"}
except Exception as e:
return {'success': False, 'error': f"列表查询异常: {str(e)}"}
def get_page_meta(self, pagetype: str) -> Dict[str, Any]:
try:
api_url = f"{self.api_url}/api/action/jingrow.ai.utils.jlocal.get_meta"
headers = self._get_headers()
payload = {"pagetype": pagetype}
response = requests.post(api_url, json=payload, headers=headers, timeout=10)
if response.status_code == 200:
result = response.json()
if isinstance(result, dict) and 'message' in result:
result = result['message']
if isinstance(result, dict) and result.get('success'):
return {'success': True, 'data': result.get('data', {})}
return {'success': False, 'error': result.get('error', '获取Meta失败') if isinstance(result, dict) else '获取Meta失败'}
else:
return {'success': False, 'error': f"HTTP {response.status_code}: {response.text}"}
except Exception as e:
return {'success': False, 'error': f"获取Meta异常: {str(e)}"}
def get_pagetype_module_app(self, pagetype: str) -> Dict[str, Any]:
try:
api_url = f"{self.api_url}/api/action/jingrow.ai.utils.jlocal.get_pagetype_module_app"
headers = self._get_headers()
if not headers:
return {'success': False, 'error': 'JINGROW_API_KEY 或 JINGROW_API_SECRET 未配置'}
payload = {"pagetype": pagetype}
response = requests.post(api_url, json=payload, headers=headers, timeout=10)
if response.status_code == 200:
data = response.json()
if isinstance(data, dict) and 'message' in data:
data = data['message']
if isinstance(data, dict) and data.get('success'):
module_val = data.get('module') or (data.get('data') or {}).get('module')
app_val = data.get('app') or (data.get('data') or {}).get('app')
return {'success': True, 'module': module_val, 'app': app_val}
return {'success': False, 'error': data.get('error', '获取失败') if isinstance(data, dict) else '获取失败'}
return {'success': False, 'error': f"HTTP {response.status_code}: {response.text}"}
except Exception as e:
return {'success': False, 'error': f"获取模块信息异常: {str(e)}"}
def get_pg_id(self, pagetype: str, filters: Optional[List[List[Any]]] = None,
field: Optional[str] = None, value: Optional[str] = None,
site: Optional[str] = None) -> Dict[str, Any]:
try:
query_filters: List[List[Any]] = []
if isinstance(filters, list) and filters:
query_filters = filters
elif field and (value is not None):
query_filters = [[field, '=', value]]
if site:
query_filters.append(['site', '=', site])
res = self.get_pg_list(pagetype, filters=query_filters, fields=['name'], limit=1)
if not res.get('success'):
return {'success': False, 'error': res.get('error', '查询失败')}
data = res.get('data') or []
if not data:
return {'success': False, 'error': '未找到记录'}
name_val = (data[0] or {}).get('name')
if not name_val:
return {'success': False, 'error': '记录缺少name字段'}
return {'success': True, 'name': name_val}
except Exception as e:
return {'success': False, 'error': f'获取记录name失败: {str(e)}'}
def get_jingrow_system_timezone(self):
try:
headers = self._get_headers()
if not headers:
return self._get_default_timezone()
url = f"{self.api_url}/api/action/jingrow.core.pagetype.system_settings.system_settings.load"
response = requests.post(url, headers=headers, timeout=5)
if response.status_code == 200:
timezone_name = response.json().get('message', {}).get('defaults', {}).get('time_zone')
if timezone_name:
return pytz.timezone(timezone_name)
except (requests.RequestException, pytz.exceptions.UnknownTimeZoneError, KeyError):
pass
return self._get_default_timezone()
def get_single_pagetype(self, pagetype: str) -> Dict[str, Any]:
try:
api_url = f"{self.api_url}/api/action/jingrow.ai.utils.jlocal.get_single_pagetype"
response = requests.post(api_url, json={"pagetype": pagetype}, headers=self._get_headers(), timeout=10)
if response.status_code == 200:
raw = response.json()
result = raw.get('message', raw)
if result.get('success'):
return {'success': True, 'config': result.get('config', {})}
return {'success': False, 'error': result.get('error', '获取失败')}
return {'success': False, 'error': f"HTTP {response.status_code}"}
except Exception as e:
return {'success': False, 'error': str(e)}
def _get_default_timezone(self):
return datetime.datetime.now().astimezone().tzinfo

View File

@ -1,159 +1,103 @@
# Copyright (c) 2025, JINGROW
# License: MIT. See LICENSE
from __future__ import annotations
from typing import Any, Iterable
from typing import Any, Dict, List, Optional
from jingrow.services.runtime import get_adapter
from jingrow.core.hooks import execute_hook, execute_hook_async
import asyncio
import jingrow
class Page():
"""最小可用 Page本地数据容器 + 钩子触发"""
class Page:
"""基于适配器的通用 Page 模型,提供常用 CRUD 与钩子机制。
钩子事件命名与 API 对齐after_insert, on_update, on_trash
"""
def __init__(self, d: dict[str, Any] | None = None):
if d is None:
d = {}
if not isinstance(d, dict):
raise ValueError("Page expects a dict payload")
# 基础字段
self.pagetype = d.get("pagetype")
if "name" in d:
self.name = d["name"]
# flags 简化为普通 dict本地版不依赖 flags 机制)
self.flags = {}
self.update(d)
def __init__(self, pagetype: str):
self.pagetype = pagetype
# ---------------------- 内部工具 ----------------------
@property
def meta(self):
return jingrow.get_meta(self.pagetype)
def adapter(self):
return get_adapter()
def update(self, d: dict[str, Any]):
for key, value in d.items():
self.set(key, value)
return self
def get(self, key: str, default: Any = None):
return self.__dict__.get(key, default)
def set(self, key: str, value: Any):
self.__dict__[key] = value
def insert(self, **kwargs) -> "Page":
"""插入页面,触发相关钩子"""
self.set("__islocal", True)
# 触发 before_insert 钩子
self._execute_hook("before_insert")
# 本地方法(向后兼容)
self.run_method("before_insert")
# 保存到数据库
jingrow.save_pg(self)
# 触发 after_insert 钩子
self._execute_hook("after_insert")
# 本地方法(向后兼容)
self.run_method("after_insert")
return self
def save(self, **kwargs) -> "Page":
"""保存页面,触发相关钩子"""
# 触发 before_save 钩子(如果存在)
if not self.get("name"):
# 新记录,触发 before_insert
self._execute_hook("before_insert")
else:
# 更新记录,触发 on_update
self._execute_hook("on_update", **kwargs)
# 本地方法(向后兼容)
self.run_method("before_save")
# 保存到数据库
jingrow.save_pg(self)
# 如果之前触发了 before_insert现在触发 after_insert
if not self.get("name") or self.get("__islocal"):
self._execute_hook("after_insert")
else:
self._execute_hook("on_update", **kwargs)
# 本地方法(向后兼容)
self.run_method("on_update")
return self
def delete(self, **kwargs):
"""删除页面,触发相关钩子"""
# 触发 on_trash 钩子
self._execute_hook("on_trash", **kwargs)
# 本地方法(向后兼容)
self.run_method("on_trash")
return jingrow.delete_pg(self.pagetype, self.name)
def run_method(self, method: str, *args, **kwargs):
"""运行本地方法(向后兼容)"""
fn = getattr(self, method, None)
if callable(fn):
return fn(*args, **kwargs)
return None
def _execute_hook(self, event_name: str, **kwargs):
"""执行钩子(私有方法)"""
def _execute_hook(self, name: str, record_name: str, data: Optional[Dict[str, Any]] = None) -> bool:
try:
import logging
from jingrow.core.hooks import execute_hook
logger = logging.getLogger(__name__)
# 精确钩子page.{pagetype}.{event_name}
specific_hook = f"page.{self.pagetype}.{event_name}"
results_specific = execute_hook(
specific_hook,
sender=self.pagetype,
page=self,
event=event_name,
**kwargs
)
logger.debug(f"Hook executed: {specific_hook}, handlers_return={len(results_specific)}")
# 通配钩子page.*.{event_name}(用于全局监听)
wildcard_hook = f"page.*.{event_name}"
results_wildcard = execute_hook(
wildcard_hook = f"page.*.{name}"
pg_obj = jingrow.get_pg(self.pagetype, record_name)
execute_hook(
wildcard_hook,
sender=self.pagetype,
page=self,
event=event_name,
**kwargs
page=pg_obj,
event=name,
data=(data or {}),
)
logger.debug(f"Hook executed: {wildcard_hook}, handlers_return={len(results_wildcard)}")
return True
except Exception:
return False
except ImportError:
# 如果钩子系统未安装,静默失败(向后兼容)
pass
except Exception as e:
# 记录错误但继续执行(防止钩子错误影响主流程)
import logging
logger = logging.getLogger(__name__)
logger.warning(f"执行钩子失败 {event_name}: {e}", exc_info=True)
def _execute_hook_async(self, name: str, record_name: str, data: Optional[Dict[str, Any]] = None) -> bool:
try:
wildcard_hook = f"page.*.{name}"
pg_obj = jingrow.get_pg(self.pagetype, record_name)
# 兼容云端常用辅助
def as_dict(self) -> dict[str, Any]:
d = {k: v for k, v in self.__dict__.items() if not k.startswith("_")}
d["pagetype"] = self.pagetype
return d
async def _run():
await execute_hook_async(
wildcard_hook,
sender=self.pagetype,
page=pg_obj,
event=name,
data=(data or {}),
)
asyncio.create_task(_run())
return True
except Exception:
return False
# ---------------------- CRUD ----------------------
def get(self, name: str) -> Dict[str, Any]:
return self.adapter.get_record(self.pagetype, name)
def list(self, filters: Optional[List[List[Any]]] = None,
fields: Optional[List[str]] = None,
limit: Optional[int] = None) -> Dict[str, Any]:
return self.adapter.get_record_list(self.pagetype, filters=filters, fields=fields, limit=limit)
def create(self, data: Dict[str, Any]) -> Dict[str, Any]:
result = self.adapter.create_record(self.pagetype, data)
if result.get('success'):
created = result.get('data', {})
record_name = created.get('name') or data.get('name')
if record_name:
self._execute_hook_async('after_insert', record_name, created)
return result
def update(self, name: str, data: Dict[str, Any]) -> Dict[str, Any]:
result = self.adapter.update_record(self.pagetype, name, data)
if result.get('success'):
self._execute_hook_async('on_update', name, data)
return result
def delete(self, name: str) -> Dict[str, Any]:
# 先发同步 on_trash再删除
self._execute_hook('on_trash', name)
return self.adapter.delete_record(self.pagetype, name)
# ---------------------- 其他便捷能力 ----------------------
def get_meta(self) -> Dict[str, Any]:
return self.adapter.get_page_meta(self.pagetype)
def get_module_app(self) -> Dict[str, Any]:
return self.adapter.get_pagetype_module_app(self.pagetype)
def get_record_id(self, filters: Optional[List[List[Any]]] = None,
field: Optional[str] = None, value: Optional[str] = None,
site: Optional[str] = None) -> Dict[str, Any]:
return self.adapter.get_record_id(self.pagetype, filters=filters, field=field, value=value, site=site)
def get_single(self) -> Dict[str, Any]:
return self.adapter.get_single_pagetype(self.pagetype)
def get_all_children(self) -> list["Page"]:
children = []
for df in self.meta.get_table_fields():
value = getattr(self, df.fieldname, None)
if value:
children.extend(value)
return children

View File

@ -0,0 +1,24 @@
from typing import Optional
from jingrow.config import Config
_adapter = None
def init_adapter(run_mode: str = "api"):
global _adapter
if run_mode == "api":
from jingrow.adapters.api_adapter import ApiAdapter
_adapter = ApiAdapter()
elif run_mode == "local":
from jingrow.adapters.local_adapter import LocalAdapter
_adapter = LocalAdapter()
else:
raise ValueError(f"Unsupported run_mode: {run_mode}. Supported modes: 'api', 'local'")
def get_adapter():
global _adapter
if _adapter is None:
# Lazy init using Config if not explicitly initialized
init_adapter(getattr(Config, "run_mode", "api"))
return _adapter