增加runtime服务层,重构Page和api适配器

This commit is contained in:
jingrow 2025-10-31 18:27:40 +08:00
parent 36b8a02e73
commit 358cc34767
4 changed files with 541 additions and 497 deletions

View File

@ -9,7 +9,6 @@ import os
from jingrow.model.page import Page from jingrow.model.page import Page
from jingrow.config import Config from jingrow.config import Config
_adapter = None
_local = {} _local = {}
# 统一 Jingrow 日志记录器(仅为本模块及调用方提供最小可用输出,不修改全局 root logger # 统一 Jingrow 日志记录器(仅为本模块及调用方提供最小可用输出,不修改全局 root logger
@ -66,44 +65,62 @@ def _ensure_logging_configured() -> None:
# 与 root 保持同级别,避免级别不一致导致丢日志 # 与 root 保持同级别,避免级别不一致导致丢日志
_root_logger.setLevel(root_logger.level) _root_logger.setLevel(root_logger.level)
def _init_adapter(run_mode: str = "api"): # ====== High-level helpers to enforce Page lifecycle (hooks) ======
global _adapter
if run_mode == "api":
from .adapters.api_adapter import ApiAdapter
_adapter = ApiAdapter()
elif run_mode == "local":
from .adapters.local_adapter import LocalAdapter
_adapter = LocalAdapter()
else:
raise ValueError(f"Unsupported run_mode: {run_mode}. Supported modes: 'api', 'local'")
def get_pg(pagetype: str, name: str = None, **kwargs): def get_pg(pagetype: str, name: str):
return _adapter.get_pg(pagetype, name, **kwargs) """获取单条记录并转为可属性访问的对象,失败返回 None。"""
pg = Page(pagetype)
res = pg.get(name)
if not isinstance(res, dict) or not res.get('success'):
return None
data = res.get('data') or {}
return data
def get_list(pagetype: str, *args, **kwargs):
return _adapter.get_list(pagetype, *args, **kwargs)
def get_all(pagetype: str, *args, **kwargs): def create_pg(pagetype: str, data: Dict[str, Any]):
return _adapter.get_list(pagetype, *args, **kwargs) """创建记录,返回创建后的数据对象或 None。"""
pg = Page(pagetype)
res = pg.create(data)
if not isinstance(res, dict) or not res.get('success'):
return None
created = res.get('data') or {}
return created
def new_pg(pagetype: str, **kwargs):
return _adapter.new_pg(pagetype, **kwargs)
def save_pg(pg): def update_pg(pagetype: str, name: str, data: Dict[str, Any]):
return _adapter.save_pg(pg) """更新记录,成功返回更新后的数据对象或 True失败返回 False。"""
pg = Page(pagetype)
res = pg.update(name, data)
if not isinstance(res, dict) or not res.get('success'):
return False
updated = res.get('data')
if updated is None:
return True
return updated
def delete_pg(pagetype: str, name: str) -> bool:
pg = Page(pagetype)
res = pg.delete(name)
return bool(isinstance(res, dict) and res.get('success'))
def get_list(pagetype: str, filters: List[List[Any]] = None, fields: List[str] = None, limit: int = None):
"""获取记录列表,返回对象列表;失败返回空列表。"""
pg = Page(pagetype)
res = pg.list(filters=filters, fields=fields, limit=limit)
if not isinstance(res, dict) or not res.get('success'):
return []
items = res.get('data') or []
return items
def get_all(pagetype: str, filters: List[List[Any]] = None, fields: List[str] = None):
return get_list(pagetype, filters=filters, fields=fields, limit=None)
def delete_pg(pagetype: str, name: str):
return _adapter.delete_pg(pagetype, name)
def db_exists(pagetype: str, filters: Dict[str, Any] = None):
return _adapter.db_exists(pagetype, filters)
def db_get_value(pagetype: str, filters: Dict[str, Any] = None, fieldname: str = None):
return _adapter.db_get_value(pagetype, filters, fieldname)
def db_set_value(pagetype: str, filters: Dict[str, Any] = None, fieldname: str = None, value: Any = None):
return _adapter.db_set_value(pagetype, filters, fieldname, value)
def log_error(title: Optional[str] = None, message: Optional[str] = None, *, exc: Optional[BaseException] = None) -> None: def log_error(title: Optional[str] = None, message: Optional[str] = None, *, exc: Optional[BaseException] = None) -> None:
"""输出错误日志到终端。 """输出错误日志到终端。
@ -121,9 +138,6 @@ def log_error(title: Optional[str] = None, message: Optional[str] = None, *, exc
content = str(message) if title is None else f"{title} - {message}" content = str(message) if title is None else f"{title} - {message}"
_root_logger.error(content, exc_info=exc) _root_logger.error(content, exc_info=exc)
def throw(title: str, message: str):
return _adapter.throw(title, message)
def _dict(): def _dict():
"""创建一个空字典""" """创建一个空字典"""
return {} return {}
@ -182,13 +196,3 @@ def is_whitelisted(api_path: str) -> bool:
def get_whitelisted_function(api_path: str): def get_whitelisted_function(api_path: str):
return _whitelisted_functions.get(api_path) return _whitelisted_functions.get(api_path)
# 兼容性属性
db = type('db', (), {
'exists': db_exists,
'get_value': db_get_value,
'set_value': db_set_value,
})()
# 初始化默认配置
_init_adapter(Config.run_mode)

View File

@ -7,12 +7,12 @@ Jingrow API 适配器, 通过 API 调用 Jingrow SaaS 版的服务
from typing import Dict, List, Any, Optional, Union from typing import Dict, List, Any, Optional, Union
import json import json
import logging import os
import datetime
import requests import requests
from jingrow.model.page import Page import pytz
from jingrow.config import Config from jingrow.config import Config
from jingrow import log_error
logger = logging.getLogger(__name__)
class ApiAdapter: class ApiAdapter:
"""API 适配器 - 通过 API 调用 Jingrow SaaS 版""" """API 适配器 - 通过 API 调用 Jingrow SaaS 版"""
@ -22,7 +22,6 @@ class ApiAdapter:
self.api_key = Config.jingrow_api_key self.api_key = Config.jingrow_api_key
self.api_secret = Config.jingrow_api_secret self.api_secret = Config.jingrow_api_secret
self.session_cookie = Config.jingrow_session_cookie self.session_cookie = Config.jingrow_session_cookie
logger.info(f"API adapter initialized with URL: {self.api_url}")
def _get_headers(self): def _get_headers(self):
"""获取请求头""" """获取请求头"""
@ -42,7 +41,8 @@ class ApiAdapter:
def _make_request(self, method: str, endpoint: str, data: Dict = None): def _make_request(self, method: str, endpoint: str, data: Dict = None):
"""发送 API 请求""" """发送 API 请求"""
url = f"{self.api_url}/{endpoint.lstrip('/')}" # 允许传入完整 URL否则按相对路径拼接
url = endpoint if str(endpoint).startswith("http") else f"{self.api_url}/{str(endpoint).lstrip('/')}"
headers = self._get_headers() headers = self._get_headers()
try: try:
@ -61,316 +61,388 @@ class ApiAdapter:
return response.json() return response.json()
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
logger.error(f"API request failed: {e}")
raise raise
def _create_page(self, pagetype: str, name: str = None, **data): def get_logged_user(self, session_cookie: Optional[str] = None) -> Optional[str]:
"""创建 Page 对象的工厂方法""" try:
page_data = {"pagetype": pagetype, **data} api_url = f"{self.api_url}/api/action/jingrow.auth.get_logged_user"
if name is not None: cookie = session_cookie or self.session_cookie
page_data["name"] = name if not cookie:
return Page(page_data) return None
headers = {
def _build_endpoint(self, pagetype: str, name: str = None) -> str: "Content-Type": "application/json",
"""构建 API 端点""" "Accept": "application/json",
endpoint = f"api/data/{pagetype}" "Cookie": f"sid={cookie}"
if name: }
endpoint += f"/{name}" resp = requests.get(api_url, headers=headers, timeout=8)
return endpoint if resp.status_code == 200:
data = resp.json()
def get_pg(self, *args, **kwargs): if isinstance(data, dict) and 'message' in data:
"""获取页面类型文档 - 对齐云端 get_pg 接口 return data['message']
return data
支持多种调用方式 return None
- get_pg("PageType", "name") - 获取指定文档 except Exception:
- get_pg({"pagetype": "PageType", ...}) - 从字典创建文档 return None
- get_pg(pagetype="PageType", field=...) - 关键字参数方式
- get_pg("PageType", "name", for_update=True) - 带更新锁 def upload_file_to_jingrow(self, file_data: bytes, filename: str,
""" attached_to_pagetype: Optional[str] = None,
pagetype = None attached_to_name: Optional[str] = None,
name = None attached_to_field: Optional[str] = None) -> Dict[str, Any]:
try:
# 处理参数 api_url = f"{self.api_url}/api/action/upload_file"
if args: headers = self._get_headers().copy()
if isinstance(args[0], str): # 让 requests 自动设置 multipart/form-data
pagetype = args[0] if 'Content-Type' in headers:
if len(args) > 1: del headers['Content-Type']
name = args[1]
elif isinstance(args[0], dict): files = {
# 从字典创建文档 'file': (filename, file_data, 'application/octet-stream')
kwargs = args[0] }
data: Dict[str, Any] = {
'file_name': filename,
'is_private': 0
}
if attached_to_pagetype and attached_to_name:
data['pagetype'] = attached_to_pagetype
data['docname'] = attached_to_name
if attached_to_field:
data['fieldname'] = attached_to_field
response = requests.post(api_url, files=files, data=data, headers=headers, timeout=30)
if response.status_code == 200:
result = response.json()
if result.get('session_expired'):
return {'success': False, 'error': 'Session已过期请重新登录'}
if result.get('message'):
return {
'success': True,
'file_url': result['message'].get('file_url'),
'file_name': result['message'].get('file_name')
}
return {'success': False, 'error': 'API响应格式错误'}
else: else:
raise ValueError("First non keyword argument must be a string or dict") error_text = response.text
log_error(f"API请求失败: {response.status_code}, 响应: {error_text}")
# 从 kwargs 中提取 pagetype return {'success': False, 'error': f'API请求失败 (HTTP {response.status_code}): {error_text}'}
if pagetype is None and kwargs: except Exception as e:
if "pagetype" in kwargs: return {'success': False, 'error': f'调用upload_file API异常: {str(e)}'}
pagetype = kwargs["pagetype"]
def get_field_mapping_from_jingrow(self, pagetype: str) -> Dict[str, str]:
try:
api_url = f"{self.api_url}/api/action/jingrow.ai.utils.jlocal.get_field_mapping"
headers = self._get_headers()
data = {'pagetype': pagetype}
response = requests.post(api_url, json=data, headers=headers, timeout=10)
if response.status_code == 200:
result = response.json()
if 'message' in result:
result = result['message']
if result.get('success'):
return result.get('field_mapping', {})
else:
log_error(f"获取字段映射失败: {result.get('error', '未知错误')}")
return {}
else: else:
raise ValueError('"pagetype" is a required key') log_error(f"获取字段映射失败: HTTP {response.status_code}")
return {}
logger.info(f"API get_pg: {pagetype}, {name}") except Exception as e:
log_error(f"获取字段映射异常: {str(e)}")
# 如果只有 pagetype 没有 name返回列表 return {}
if name is None:
# 从 kwargs 中移除 pagetype避免参数冲突 def get_field_value_from_jingrow(self, pagetype: str, name: str, fieldname: str) -> Optional[Any]:
list_kwargs = kwargs.copy() try:
list_kwargs.pop('pagetype', None) api_url = f"{self.api_url}/api/action/jingrow.ai.utils.jlocal.get_field_value"
return self.get_list(pagetype, **list_kwargs) headers = self._get_headers()
data = { 'pagetype': pagetype, 'name': name, 'fieldname': fieldname }
# 获取单个文档 response = requests.post(api_url, json=data, headers=headers, timeout=10)
endpoint = self._build_endpoint(pagetype, name) if response.status_code == 200:
data = self._make_request('GET', endpoint, kwargs) result = response.json()
if 'message' in result:
# 如果 API 返回的数据中有 name先移除它使用我们指定的 name result = result['message']
if isinstance(data, dict) and 'name' in data: if result.get('success'):
api_name = data.pop('name') return result.get('value')
else:
return self._create_page(pagetype, name, **data) log_error(f"获取字段值失败: {result.get('error', '未知错误')}")
return None
def get_list(self, pagetype: str, fields=None, filters=None, group_by=None, else:
order_by=None, limit_start=None, limit_page_length=20, log_error(f"获取字段值失败: HTTP {response.status_code}")
parent=None, debug=False, as_dict=True, or_filters=None, **kwargs): return None
"""获取页面类型列表 - 对齐云端 get_list 接口 except Exception as e:
log_error(f"获取字段值异常: {str(e)}")
:param pagetype: PageType on which query is to be made. return None
:param fields: List of fields or `*`.
:param filters: List of filters (see example). def get_ai_settings_from_jingrow(self) -> Optional[Dict[str, Any]]:
:param order_by: Order By e.g. `modified desc`. try:
:param limit_start: Start results at record #. Default 0. api_url = f"{self.api_url}/api/action/jingrow.ai.utils.jlocal.get_ai_settings"
:param limit_page_length: No of records in the page. Default 20. headers = self._get_headers()
:param group_by: Group by field. response = requests.post(api_url, headers=headers, timeout=10)
:param or_filters: OR filter conditions. if response.status_code == 200:
:param debug: Debug mode. result = response.json()
:param as_dict: Return as dict format. if result.get('session_expired'):
log_error("Session已过期请重新登录")
Example usage: return None
- get_list("ToDo", fields=["name", "description"], filters={"owner":"test@example.com"}) if 'message' in result:
- get_list("ToDo", fields="*", filters=[["modified", ">", "2014-01-01"]]) result = result['message']
""" if result.get('success'):
logger.info(f"API get_list: {pagetype}") return result.get('config', {})
else:
# 构建查询参数 log_error(f"获取AI Settings失败: {result.get('error', '未知错误')}")
params = {} return None
if fields is not None: else:
# API 期望 JSON 字符串 log_error(f"获取AI Settings失败: HTTP {response.status_code}")
params['fields'] = json.dumps(fields) if not isinstance(fields, str) else fields return None
if filters is not None: except Exception as e:
# API 期望 JSON 字符串 log_error(f"获取AI Settings配置异常: {str(e)}")
params['filters'] = json.dumps(filters) if not isinstance(filters, str) else filters return None
if group_by is not None:
params['group_by'] = group_by def get_agent_detail(self, name: str, session_cookie: Optional[str] = None) -> Optional[Dict[str, Any]]:
if order_by is not None: try:
params['order_by'] = order_by api_url = f"{self.api_url}/api/action/jingrow.ai.utils.jlocal.get_local_ai_agent_detail"
if limit_start is not None: headers = self._get_headers()
params['limit_start'] = limit_start if not headers:
if limit_page_length != 20: # 默认值不需要传递 log_error("JINGROW_API_KEY 或 JINGROW_API_SECRET 未配置")
params['limit_page_length'] = limit_page_length return None
if parent is not None: payload = {"name": name}
params['parent'] = parent response = requests.post(api_url, headers=headers, json=payload, timeout=15)
if debug: if response.status_code == 200:
params['debug'] = debug data = response.json()
if not as_dict: # 默认 True只有 False 时才传递 if isinstance(data, dict) and 'message' in data:
params['as_dict'] = as_dict message = data['message']
if or_filters is not None: if isinstance(message, dict) and message.get('success'):
params['or_filters'] = or_filters return message.get('data')
return data
# 合并其他参数 else:
params.update(kwargs) log_error(f"Failed to get agent detail: HTTP {response.status_code}: {response.text}")
return None
endpoint = self._build_endpoint(pagetype) except Exception as e:
data = self._make_request('GET', endpoint, params) log_error(f"获取智能体详情异常: {str(e)}")
return None
return [self._create_page(pagetype, item.pop('name', None), **item) for item in data]
def get_pg(self, pagetype: str, name: str, session_cookie: Optional[str] = None) -> Dict[str, Any]:
def new_pg(self, pagetype: str, *, parent_pg=None, parentfield=None, as_dict=False, **kwargs): try:
"""创建新的页面类型文档 - 对齐云端 new_pg 接口 api_url = f"{self.api_url}/api/data/{pagetype}/{name}"
headers = self._get_headers()
:param pagetype: PageType of the new document. if not headers:
:param parent_pg: [optional] add to parent document. return {'success': False, 'error': 'JINGROW_API_KEY 或 JINGROW_API_SECRET 未配置'}
:param parentfield: [optional] add against this `parentfield`. response = requests.get(api_url, headers=headers, timeout=15)
:param as_dict: [optional] return as dictionary instead of Page. if response.status_code == 200:
:param kwargs: [optional] You can specify fields as field=value pairs in function call. data = response.json()
""" if isinstance(data, dict) and 'data' in data:
logger.info(f"API new_pg: {pagetype}") return {'success': True, 'data': data['data']}
return {'success': True, 'data': data}
# 构建新文档数据 else:
new_data = { return {'success': False, 'error': f"HTTP {response.status_code}: {response.text}"}
"pagetype": pagetype, except Exception as e:
"__islocal": 1, return {'success': False, 'error': f"获取记录异常: {str(e)}"}
"pagestatus": 0,
**kwargs def create_pg(self, pagetype: str, payload: Dict[str, Any]) -> Dict[str, Any]:
} try:
api_url = f"{self.api_url}/api/data/{pagetype}"
# 如果有父文档,添加父文档信息 headers = self._get_headers()
if parent_pg: response = requests.post(api_url, json=payload, headers=headers, timeout=20)
new_data["parent"] = parent_pg.name if hasattr(parent_pg, 'name') else str(parent_pg) if response.status_code in (200, 201):
new_data["parenttype"] = parent_pg.pagetype if hasattr(parent_pg, 'pagetype') else None data = response.json()
if parentfield: if isinstance(data, dict) and 'data' in data:
new_data["parentfield"] = parentfield return {'success': True, 'data': data['data']}
return {'success': True, 'data': data}
if as_dict: else:
return new_data return {'success': False, 'error': f"HTTP {response.status_code}: {response.text}"}
else: except Exception as e:
# 移除 pagetype 避免参数冲突 return {'success': False, 'error': f"创建记录异常: {str(e)}"}
pagetype_data = new_data.pop('pagetype')
return self._create_page(pagetype_data, None, **new_data) def update_pg(self, pagetype: str, name: str, payload: Dict[str, Any]) -> Dict[str, Any]:
try:
def save_pg(self, pg, ignore_permissions=False, ignore_validate=False, ignore_mandatory=False): api_url = f"{self.api_url}/api/data/{pagetype}/{name}"
"""保存页面类型文档 - 对齐云端 save_pg 接口 headers = self._get_headers()
response = requests.put(api_url, json=payload, headers=headers, timeout=20)
:param pg: Page object to save if response.status_code in (200, 202):
:param ignore_permissions: Ignore permission checks data = response.json()
:param ignore_validate: Ignore validation if isinstance(data, dict) and 'data' in data:
:param ignore_mandatory: Ignore mandatory field checks return {'success': True, 'data': data['data']}
""" return {'success': True, 'data': data}
logger.info(f"API save_pg: {pg.pagetype}, {getattr(pg, 'name', None)}") else:
return {'success': False, 'error': f"HTTP {response.status_code}: {response.text}"}
endpoint = self._build_endpoint(pg.pagetype, getattr(pg, 'name', None)) except Exception as e:
method = 'PUT' if getattr(pg, 'name', None) else 'POST' return {'success': False, 'error': f"更新记录异常: {str(e)}"}
data = pg.as_dict()
def update_local_job(self, data: Dict[str, Any]) -> Dict[str, Any]:
# 添加保存选项 return self.push_local_job_to_jingrow(data)
if ignore_permissions:
data['ignore_permissions'] = True def push_local_job_to_jingrow(self, data: Dict[str, Any]) -> Dict[str, Any]:
if ignore_validate: try:
data['ignore_validate'] = True api_url = f"{self.api_url}/api/action/jingrow.core.pagetype.local_job.local_job.push_local_job"
if ignore_mandatory: headers = self._get_headers()
data['ignore_mandatory'] = True if not headers:
return {'success': False, 'error': 'JINGROW_API_KEY 或 JINGROW_API_SECRET 未配置'}
result = self._make_request(method, endpoint, data) headers.setdefault("X-Requested-With", "XMLHttpRequest")
site_name = getattr(Config, "JINGROW_SITE", None) or os.getenv("JINGROW_SITE")
# 更新文档对象 if site_name:
if result: headers["X-Frappe-Site-Name"] = site_name
# 直接更新对象的属性 resp = requests.post(api_url, json=data, headers=headers, timeout=10)
for key, value in result.items(): if resp.status_code == 200:
setattr(pg, key, value) body = resp.json()
if isinstance(body, dict) and 'message' in body:
return pg body = body['message']
if isinstance(body, dict) and body.get('success'):
def delete_pg(self, pagetype: str, name: str, ignore_missing=False): return {'success': True}
"""删除页面类型文档 - 对齐云端 delete_pg 接口 log_error(f"[JFLOW->JINGROW] 推送失败(body): {body}")
return {'success': False, 'error': str(body)}
:param pagetype: PageType name log_error(f"[JFLOW->JINGROW] 推送失败(HTTP): {resp.status_code} {resp.text}")
:param name: Document name return {'success': False, 'error': f"HTTP {resp.status_code}: {resp.text}"}
:param ignore_missing: Ignore if document doesn't exist except Exception as e:
""" log_error(f"[JFLOW->JINGROW] 推送异常: {str(e)}")
logger.info(f"API delete_pg: {pagetype}, {name}") return {'success': False, 'error': f"推送 Local Job 失败: {str(e)}"}
endpoint = self._build_endpoint(pagetype, name) def delete_pg(self, pagetype: str, name: str) -> Dict[str, Any]:
params = {'ignore_missing': ignore_missing} if ignore_missing else {} try:
api_url = f"{self.api_url}/api/data/{pagetype}/{name}"
self._make_request('DELETE', endpoint, params) headers = self._get_headers()
return True response = requests.delete(api_url, headers=headers, timeout=15)
if response.status_code in (200, 202):
def db_exists(self, pagetype: str, filters: Dict[str, Any] = None, cache=False): return {'success': True}
"""检查文档是否存在 - 对齐云端 db.exists 接口 else:
return {'success': False, 'error': f"HTTP {response.status_code}: {response.text}"}
:param pagetype: PageType name except Exception as e:
:param filters: Filter conditions (dict or name string) return {'success': False, 'error': f"删除记录异常: {str(e)}"}
:param cache: Enable caching for single document checks
def map_fields_by_labels(self, field_map: List[Dict[str, Any]], ai_outputs: Dict[str, Any], label_to_fieldname: Dict[str, str]) -> Dict[str, Any]:
Examples: record_data: Dict[str, Any] = {}
- db_exists("User", "jane@example.org", cache=True) fieldname_set = set(label_to_fieldname.values())
- db_exists("User", {"full_name": "Jane Doe"}) for mapping in field_map or []:
- db_exists({"pagetype": "User", "full_name": "Jane Doe"}) source_key = mapping.get('from')
""" to_raw = mapping.get('to')
logger.info(f"API db_exists: {pagetype}, {filters}") if not source_key or to_raw is None:
continue
# 处理字典格式的 pagetype value = ai_outputs.get(source_key)
if isinstance(pagetype, dict): if value is None:
filters = pagetype.copy() continue
pagetype = filters.pop("pagetype") to_field = label_to_fieldname.get(str(to_raw).strip())
if not to_field and str(to_raw).strip() in fieldname_set:
# 如果 filters 是字符串,转换为字典 to_field = str(to_raw).strip()
if isinstance(filters, str): if not to_field:
filters = {"name": filters} continue
record_data[to_field] = value
endpoint = self._build_endpoint(pagetype) return record_data
params = {'filters': filters} if filters else {}
if cache: def get_pg_list(self, pagetype: str, filters: Optional[List[List[Any]]] = None,
params['cache'] = cache fields: Optional[List[str]] = None, limit: Optional[int] = None) -> Dict[str, Any]:
try:
data = self._make_request('GET', endpoint, params) base_url = f"{self.api_url}/api/data/{pagetype}"
return len(data) > 0 params: Dict[str, str] = {}
if filters:
def db_get_value(self, pagetype: str, filters: Dict[str, Any] = None, fieldname: str = None, params['filters'] = json.dumps(filters, ensure_ascii=False)
ignore=False, cache=False, order_by=None, parent=None): if fields:
"""获取数据库值 - 对齐云端 db.get_value 接口 params['fields'] = json.dumps(fields, ensure_ascii=False)
if isinstance(limit, int) and limit > 0:
:param pagetype: PageType name params['limit'] = str(limit)
:param filters: Filter conditions (dict or name string) headers = self._get_headers()
:param fieldname: Field name to get value for response = requests.get(base_url, params=params, headers=headers, timeout=20)
:param ignore: Ignore if document doesn't exist if response.status_code == 200:
:param cache: Enable caching data = response.json()
:param order_by: Order by field if isinstance(data, dict) and 'data' in data:
:param parent: Parent document for child table queries return {'success': True, 'data': data['data']}
if isinstance(data, list):
Examples: return {'success': True, 'data': data}
- db_get_value("User", "jane@example.org", "full_name") return {'success': True, 'data': [data]}
- db_get_value("User", {"email": "jane@example.org"}, "full_name") else:
""" return {'success': False, 'error': f"HTTP {response.status_code}: {response.text}"}
logger.info(f"API db_get_value: {pagetype}, {filters}, {fieldname}") except Exception as e:
return {'success': False, 'error': f"列表查询异常: {str(e)}"}
# 如果 filters 是字符串,转换为字典
if isinstance(filters, str): def get_page_meta(self, pagetype: str) -> Dict[str, Any]:
filters = {"name": filters} try:
api_url = f"{self.api_url}/api/action/jingrow.ai.utils.jlocal.get_meta"
endpoint = self._build_endpoint(pagetype) headers = self._get_headers()
params = { payload = {"pagetype": pagetype}
'filters': filters, response = requests.post(api_url, json=payload, headers=headers, timeout=10)
'fieldname': fieldname if response.status_code == 200:
} result = response.json()
if ignore: if isinstance(result, dict) and 'message' in result:
params['ignore'] = ignore result = result['message']
if cache: if isinstance(result, dict) and result.get('success'):
params['cache'] = cache return {'success': True, 'data': result.get('data', {})}
if order_by: return {'success': False, 'error': result.get('error', '获取Meta失败') if isinstance(result, dict) else '获取Meta失败'}
params['order_by'] = order_by else:
if parent: return {'success': False, 'error': f"HTTP {response.status_code}: {response.text}"}
params['parent'] = parent except Exception as e:
return {'success': False, 'error': f"获取Meta异常: {str(e)}"}
data = self._make_request('GET', endpoint, params)
return data def get_pagetype_module_app(self, pagetype: str) -> Dict[str, Any]:
try:
def db_set_value(self, pagetype: str, filters: Dict[str, Any] = None, fieldname: str = None, api_url = f"{self.api_url}/api/action/jingrow.ai.utils.jlocal.get_pagetype_module_app"
value: Any = None, modified=None, modified_by=None, update_modified=True, debug=False): headers = self._get_headers()
"""设置数据库值 - 对齐云端 db.set_value 接口 if not headers:
return {'success': False, 'error': 'JINGROW_API_KEY 或 JINGROW_API_SECRET 未配置'}
:param pagetype: PageType name payload = {"pagetype": pagetype}
:param filters: Filter conditions (dict or name string) response = requests.post(api_url, json=payload, headers=headers, timeout=10)
:param fieldname: Field name to set value for if response.status_code == 200:
:param value: Value to set data = response.json()
:param modified: Use this as the modified timestamp if isinstance(data, dict) and 'message' in data:
:param modified_by: Set this user as modified_by data = data['message']
:param update_modified: Update the modified timestamp if isinstance(data, dict) and data.get('success'):
:param debug: Debug mode module_val = data.get('module') or (data.get('data') or {}).get('module')
app_val = data.get('app') or (data.get('data') or {}).get('app')
Examples: return {'success': True, 'module': module_val, 'app': app_val}
- db_set_value("User", "jane@example.org", "full_name", "Jane Doe") return {'success': False, 'error': data.get('error', '获取失败') if isinstance(data, dict) else '获取失败'}
- db_set_value("User", {"email": "jane@example.org"}, "full_name", "Jane Doe") return {'success': False, 'error': f"HTTP {response.status_code}: {response.text}"}
""" except Exception as e:
logger.info(f"API db_set_value: {pagetype}, {filters}, {fieldname}, {value}") return {'success': False, 'error': f"获取模块信息异常: {str(e)}"}
# 如果 filters 是字符串,转换为字典 def get_pg_id(self, pagetype: str, filters: Optional[List[List[Any]]] = None,
if isinstance(filters, str): field: Optional[str] = None, value: Optional[str] = None,
filters = {"name": filters} site: Optional[str] = None) -> Dict[str, Any]:
try:
endpoint = self._build_endpoint(pagetype) query_filters: List[List[Any]] = []
data = { if isinstance(filters, list) and filters:
'filters': filters, query_filters = filters
'fieldname': fieldname, elif field and (value is not None):
'value': value query_filters = [[field, '=', value]]
} if site:
if modified: query_filters.append(['site', '=', site])
data['modified'] = modified
if modified_by: res = self.get_pg_list(pagetype, filters=query_filters, fields=['name'], limit=1)
data['modified_by'] = modified_by if not res.get('success'):
if not update_modified: return {'success': False, 'error': res.get('error', '查询失败')}
data['update_modified'] = update_modified data = res.get('data') or []
if debug: if not data:
data['debug'] = debug return {'success': False, 'error': '未找到记录'}
name_val = (data[0] or {}).get('name')
self._make_request('PUT', endpoint, data) if not name_val:
return True return {'success': False, 'error': '记录缺少name字段'}
return {'success': True, 'name': name_val}
except Exception as e:
return {'success': False, 'error': f'获取记录name失败: {str(e)}'}
def get_jingrow_system_timezone(self):
try:
headers = self._get_headers()
if not headers:
return self._get_default_timezone()
url = f"{self.api_url}/api/action/jingrow.core.pagetype.system_settings.system_settings.load"
response = requests.post(url, headers=headers, timeout=5)
if response.status_code == 200:
timezone_name = response.json().get('message', {}).get('defaults', {}).get('time_zone')
if timezone_name:
return pytz.timezone(timezone_name)
except (requests.RequestException, pytz.exceptions.UnknownTimeZoneError, KeyError):
pass
return self._get_default_timezone()
def get_single_pagetype(self, pagetype: str) -> Dict[str, Any]:
try:
api_url = f"{self.api_url}/api/action/jingrow.ai.utils.jlocal.get_single_pagetype"
response = requests.post(api_url, json={"pagetype": pagetype}, headers=self._get_headers(), timeout=10)
if response.status_code == 200:
raw = response.json()
result = raw.get('message', raw)
if result.get('success'):
return {'success': True, 'config': result.get('config', {})}
return {'success': False, 'error': result.get('error', '获取失败')}
return {'success': False, 'error': f"HTTP {response.status_code}"}
except Exception as e:
return {'success': False, 'error': str(e)}
def _get_default_timezone(self):
return datetime.datetime.now().astimezone().tzinfo

View File

@ -1,159 +1,103 @@
# Copyright (c) 2025, JINGROW # Copyright (c) 2025, JINGROW
# License: MIT. See LICENSE # License: MIT. See LICENSE
from __future__ import annotations from typing import Any, Dict, List, Optional
from jingrow.services.runtime import get_adapter
from typing import Any, Iterable from jingrow.core.hooks import execute_hook, execute_hook_async
import asyncio
import jingrow import jingrow
class Page(): class Page:
"""最小可用 Page本地数据容器 + 钩子触发""" """基于适配器的通用 Page 模型,提供常用 CRUD 与钩子机制。
钩子事件命名与 API 对齐after_insert, on_update, on_trash
"""
def __init__(self, d: dict[str, Any] | None = None): def __init__(self, pagetype: str):
if d is None: self.pagetype = pagetype
d = {}
if not isinstance(d, dict):
raise ValueError("Page expects a dict payload")
# 基础字段
self.pagetype = d.get("pagetype")
if "name" in d:
self.name = d["name"]
# flags 简化为普通 dict本地版不依赖 flags 机制)
self.flags = {}
self.update(d)
# ---------------------- 内部工具 ----------------------
@property @property
def meta(self): def adapter(self):
return jingrow.get_meta(self.pagetype) return get_adapter()
def update(self, d: dict[str, Any]): def _execute_hook(self, name: str, record_name: str, data: Optional[Dict[str, Any]] = None) -> bool:
for key, value in d.items():
self.set(key, value)
return self
def get(self, key: str, default: Any = None):
return self.__dict__.get(key, default)
def set(self, key: str, value: Any):
self.__dict__[key] = value
def insert(self, **kwargs) -> "Page":
"""插入页面,触发相关钩子"""
self.set("__islocal", True)
# 触发 before_insert 钩子
self._execute_hook("before_insert")
# 本地方法(向后兼容)
self.run_method("before_insert")
# 保存到数据库
jingrow.save_pg(self)
# 触发 after_insert 钩子
self._execute_hook("after_insert")
# 本地方法(向后兼容)
self.run_method("after_insert")
return self
def save(self, **kwargs) -> "Page":
"""保存页面,触发相关钩子"""
# 触发 before_save 钩子(如果存在)
if not self.get("name"):
# 新记录,触发 before_insert
self._execute_hook("before_insert")
else:
# 更新记录,触发 on_update
self._execute_hook("on_update", **kwargs)
# 本地方法(向后兼容)
self.run_method("before_save")
# 保存到数据库
jingrow.save_pg(self)
# 如果之前触发了 before_insert现在触发 after_insert
if not self.get("name") or self.get("__islocal"):
self._execute_hook("after_insert")
else:
self._execute_hook("on_update", **kwargs)
# 本地方法(向后兼容)
self.run_method("on_update")
return self
def delete(self, **kwargs):
"""删除页面,触发相关钩子"""
# 触发 on_trash 钩子
self._execute_hook("on_trash", **kwargs)
# 本地方法(向后兼容)
self.run_method("on_trash")
return jingrow.delete_pg(self.pagetype, self.name)
def run_method(self, method: str, *args, **kwargs):
"""运行本地方法(向后兼容)"""
fn = getattr(self, method, None)
if callable(fn):
return fn(*args, **kwargs)
return None
def _execute_hook(self, event_name: str, **kwargs):
"""执行钩子(私有方法)"""
try: try:
import logging wildcard_hook = f"page.*.{name}"
from jingrow.core.hooks import execute_hook pg_obj = jingrow.get_pg(self.pagetype, record_name)
logger = logging.getLogger(__name__) execute_hook(
# 精确钩子page.{pagetype}.{event_name}
specific_hook = f"page.{self.pagetype}.{event_name}"
results_specific = execute_hook(
specific_hook,
sender=self.pagetype,
page=self,
event=event_name,
**kwargs
)
logger.debug(f"Hook executed: {specific_hook}, handlers_return={len(results_specific)}")
# 通配钩子page.*.{event_name}(用于全局监听)
wildcard_hook = f"page.*.{event_name}"
results_wildcard = execute_hook(
wildcard_hook, wildcard_hook,
sender=self.pagetype, sender=self.pagetype,
page=self, page=pg_obj,
event=event_name, event=name,
**kwargs data=(data or {}),
) )
logger.debug(f"Hook executed: {wildcard_hook}, handlers_return={len(results_wildcard)}") return True
except Exception:
return False
except ImportError: def _execute_hook_async(self, name: str, record_name: str, data: Optional[Dict[str, Any]] = None) -> bool:
# 如果钩子系统未安装,静默失败(向后兼容) try:
pass wildcard_hook = f"page.*.{name}"
except Exception as e: pg_obj = jingrow.get_pg(self.pagetype, record_name)
# 记录错误但继续执行(防止钩子错误影响主流程)
import logging
logger = logging.getLogger(__name__)
logger.warning(f"执行钩子失败 {event_name}: {e}", exc_info=True)
# 兼容云端常用辅助 async def _run():
def as_dict(self) -> dict[str, Any]: await execute_hook_async(
d = {k: v for k, v in self.__dict__.items() if not k.startswith("_")} wildcard_hook,
d["pagetype"] = self.pagetype sender=self.pagetype,
return d page=pg_obj,
event=name,
data=(data or {}),
)
asyncio.create_task(_run())
return True
except Exception:
return False
# ---------------------- CRUD ----------------------
def get(self, name: str) -> Dict[str, Any]:
return self.adapter.get_record(self.pagetype, name)
def list(self, filters: Optional[List[List[Any]]] = None,
fields: Optional[List[str]] = None,
limit: Optional[int] = None) -> Dict[str, Any]:
return self.adapter.get_record_list(self.pagetype, filters=filters, fields=fields, limit=limit)
def create(self, data: Dict[str, Any]) -> Dict[str, Any]:
result = self.adapter.create_record(self.pagetype, data)
if result.get('success'):
created = result.get('data', {})
record_name = created.get('name') or data.get('name')
if record_name:
self._execute_hook_async('after_insert', record_name, created)
return result
def update(self, name: str, data: Dict[str, Any]) -> Dict[str, Any]:
result = self.adapter.update_record(self.pagetype, name, data)
if result.get('success'):
self._execute_hook_async('on_update', name, data)
return result
def delete(self, name: str) -> Dict[str, Any]:
# 先发同步 on_trash再删除
self._execute_hook('on_trash', name)
return self.adapter.delete_record(self.pagetype, name)
# ---------------------- 其他便捷能力 ----------------------
def get_meta(self) -> Dict[str, Any]:
return self.adapter.get_page_meta(self.pagetype)
def get_module_app(self) -> Dict[str, Any]:
return self.adapter.get_pagetype_module_app(self.pagetype)
def get_record_id(self, filters: Optional[List[List[Any]]] = None,
field: Optional[str] = None, value: Optional[str] = None,
site: Optional[str] = None) -> Dict[str, Any]:
return self.adapter.get_record_id(self.pagetype, filters=filters, field=field, value=value, site=site)
def get_single(self) -> Dict[str, Any]:
return self.adapter.get_single_pagetype(self.pagetype)
def get_all_children(self) -> list["Page"]:
children = []
for df in self.meta.get_table_fields():
value = getattr(self, df.fieldname, None)
if value:
children.extend(value)
return children

View File

@ -0,0 +1,24 @@
from typing import Optional
from jingrow.config import Config
_adapter = None
def init_adapter(run_mode: str = "api"):
global _adapter
if run_mode == "api":
from jingrow.adapters.api_adapter import ApiAdapter
_adapter = ApiAdapter()
elif run_mode == "local":
from jingrow.adapters.local_adapter import LocalAdapter
_adapter = LocalAdapter()
else:
raise ValueError(f"Unsupported run_mode: {run_mode}. Supported modes: 'api', 'local'")
def get_adapter():
global _adapter
if _adapter is None:
# Lazy init using Config if not explicitly initialized
init_adapter(getattr(Config, "run_mode", "api"))
return _adapter