From 7b992b63baef3aa92cbd49603f5adfc576c33cff Mon Sep 17 00:00:00 2001 From: jingrow Date: Fri, 31 Oct 2025 18:27:40 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0runtime=E6=9C=8D=E5=8A=A1?= =?UTF-8?q?=E5=B1=82=EF=BC=8C=E9=87=8D=E6=9E=84Page=E5=92=8Capi=E9=80=82?= =?UTF-8?q?=E9=85=8D=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/jingrow/jingrow/__init__.py | 95 +-- apps/jingrow/jingrow/adapters/api_adapter.py | 700 ++++++++++--------- apps/jingrow/jingrow/model/page.py | 224 +++--- apps/jingrow/jingrow/services/runtime.py | 24 + 4 files changed, 546 insertions(+), 497 deletions(-) create mode 100644 apps/jingrow/jingrow/services/runtime.py diff --git a/apps/jingrow/jingrow/__init__.py b/apps/jingrow/jingrow/__init__.py index 4666be7..c47c305 100644 --- a/apps/jingrow/jingrow/__init__.py +++ b/apps/jingrow/jingrow/__init__.py @@ -9,7 +9,6 @@ import os from jingrow.model.page import Page from jingrow.config import Config -_adapter = None _local = {} # 统一 Jingrow 日志记录器(仅为本模块及调用方提供最小可用输出,不修改全局 root logger) @@ -66,44 +65,67 @@ def _ensure_logging_configured() -> None: # 与 root 保持同级别,避免级别不一致导致丢日志 _root_logger.setLevel(root_logger.level) -def _init_adapter(run_mode: str = "api"): - global _adapter - - if run_mode == "api": - from .adapters.api_adapter import ApiAdapter - _adapter = ApiAdapter() - elif run_mode == "local": - from .adapters.local_adapter import LocalAdapter - _adapter = LocalAdapter() - else: - raise ValueError(f"Unsupported run_mode: {run_mode}. Supported modes: 'api', 'local'") +# ====== High-level helpers to enforce Page lifecycle (hooks) ====== -def get_pg(pagetype: str, name: str = None, **kwargs): - return _adapter.get_pg(pagetype, name, **kwargs) +def get_pg(pagetype: str, name: str): + """获取单条记录并转为可属性访问的对象,失败返回 None。""" + pg = Page(pagetype) + res = pg.get(name) + if not isinstance(res, dict) or not res.get('success'): + return None + data = res.get('data') or {} + return data -def get_list(pagetype: str, *args, **kwargs): - return _adapter.get_list(pagetype, *args, **kwargs) -def get_all(pagetype: str, *args, **kwargs): - return _adapter.get_list(pagetype, *args, **kwargs) +def create_pg(pagetype: str, data: Dict[str, Any]): + """创建记录,返回创建后的数据对象或 None。""" + pg = Page(pagetype) + res = pg.create(data) + if not isinstance(res, dict) or not res.get('success'): + return None + created = res.get('data') or {} + return created -def new_pg(pagetype: str, **kwargs): - return _adapter.new_pg(pagetype, **kwargs) -def save_pg(pg): - return _adapter.save_pg(pg) +def update_pg(pagetype: str, name: str, data: Dict[str, Any]): + """更新记录,成功返回更新后的数据对象或 True,失败返回 False。""" + pg = Page(pagetype) + res = pg.update(name, data) + if not isinstance(res, dict) or not res.get('success'): + return False + updated = res.get('data') + if updated is None: + return True + return updated + + +def delete_pg(pagetype: str, name: str) -> bool: + pg = Page(pagetype) + res = pg.delete(name) + return bool(isinstance(res, dict) and res.get('success')) + + +def get_list(pagetype: str, filters: List[List[Any]] = None, fields: List[str] = None, limit: int = None): + """获取记录列表,返回对象列表;失败返回空列表。""" + pg = Page(pagetype) + res = pg.list(filters=filters, fields=fields, limit=limit) + if not isinstance(res, dict) or not res.get('success'): + return [] + items = res.get('data') or [] + return items + + +def get_all(pagetype: str, filters: List[List[Any]] = None, fields: List[str] = None): + return get_list(pagetype, filters=filters, fields=fields, limit=None) + + +def get_single(pagetype: str): + """获取 single 类型 pagetype 配置,返回 {success, config|error} 结构。""" + return Page(pagetype).get_single() -def delete_pg(pagetype: str, name: str): - return _adapter.delete_pg(pagetype, name) -def db_exists(pagetype: str, filters: Dict[str, Any] = None): - return _adapter.db_exists(pagetype, filters) -def db_get_value(pagetype: str, filters: Dict[str, Any] = None, fieldname: str = None): - return _adapter.db_get_value(pagetype, filters, fieldname) -def db_set_value(pagetype: str, filters: Dict[str, Any] = None, fieldname: str = None, value: Any = None): - return _adapter.db_set_value(pagetype, filters, fieldname, value) def log_error(title: Optional[str] = None, message: Optional[str] = None, *, exc: Optional[BaseException] = None) -> None: """输出错误日志到终端。 @@ -121,9 +143,6 @@ def log_error(title: Optional[str] = None, message: Optional[str] = None, *, exc content = str(message) if title is None else f"{title} - {message}" _root_logger.error(content, exc_info=exc) -def throw(title: str, message: str): - return _adapter.throw(title, message) - def _dict(): """创建一个空字典""" return {} @@ -182,13 +201,3 @@ def is_whitelisted(api_path: str) -> bool: def get_whitelisted_function(api_path: str): return _whitelisted_functions.get(api_path) - -# 兼容性属性 -db = type('db', (), { - 'exists': db_exists, - 'get_value': db_get_value, - 'set_value': db_set_value, -})() - -# 初始化默认配置 -_init_adapter(Config.run_mode) diff --git a/apps/jingrow/jingrow/adapters/api_adapter.py b/apps/jingrow/jingrow/adapters/api_adapter.py index 6ac970f..423c61e 100644 --- a/apps/jingrow/jingrow/adapters/api_adapter.py +++ b/apps/jingrow/jingrow/adapters/api_adapter.py @@ -7,12 +7,12 @@ Jingrow API 适配器, 通过 API 调用 Jingrow SaaS 版的服务 from typing import Dict, List, Any, Optional, Union import json -import logging +import os +import datetime import requests -from jingrow.model.page import Page +import pytz from jingrow.config import Config - -logger = logging.getLogger(__name__) +from jingrow import log_error class ApiAdapter: """API 适配器 - 通过 API 调用 Jingrow SaaS 版""" @@ -22,7 +22,6 @@ class ApiAdapter: self.api_key = Config.jingrow_api_key self.api_secret = Config.jingrow_api_secret self.session_cookie = Config.jingrow_session_cookie - logger.info(f"API adapter initialized with URL: {self.api_url}") def _get_headers(self): """获取请求头""" @@ -42,7 +41,8 @@ class ApiAdapter: def _make_request(self, method: str, endpoint: str, data: Dict = None): """发送 API 请求""" - url = f"{self.api_url}/{endpoint.lstrip('/')}" + # 允许传入完整 URL;否则按相对路径拼接 + url = endpoint if str(endpoint).startswith("http") else f"{self.api_url}/{str(endpoint).lstrip('/')}" headers = self._get_headers() try: @@ -61,316 +61,388 @@ class ApiAdapter: return response.json() except requests.exceptions.RequestException as e: - logger.error(f"API request failed: {e}") raise - def _create_page(self, pagetype: str, name: str = None, **data): - """创建 Page 对象的工厂方法""" - page_data = {"pagetype": pagetype, **data} - if name is not None: - page_data["name"] = name - return Page(page_data) - - def _build_endpoint(self, pagetype: str, name: str = None) -> str: - """构建 API 端点""" - endpoint = f"api/data/{pagetype}" - if name: - endpoint += f"/{name}" - return endpoint - - def get_pg(self, *args, **kwargs): - """获取页面类型文档 - 对齐云端 get_pg 接口 - - 支持多种调用方式: - - get_pg("PageType", "name") - 获取指定文档 - - get_pg({"pagetype": "PageType", ...}) - 从字典创建文档 - - get_pg(pagetype="PageType", field=...) - 关键字参数方式 - - get_pg("PageType", "name", for_update=True) - 带更新锁 - """ - pagetype = None - name = None - - # 处理参数 - if args: - if isinstance(args[0], str): - pagetype = args[0] - if len(args) > 1: - name = args[1] - elif isinstance(args[0], dict): - # 从字典创建文档 - kwargs = args[0] + def get_logged_user(self, session_cookie: Optional[str] = None) -> Optional[str]: + try: + api_url = f"{self.api_url}/api/action/jingrow.auth.get_logged_user" + cookie = session_cookie or self.session_cookie + if not cookie: + return None + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + "Cookie": f"sid={cookie}" + } + resp = requests.get(api_url, headers=headers, timeout=8) + if resp.status_code == 200: + data = resp.json() + if isinstance(data, dict) and 'message' in data: + return data['message'] + return data + return None + except Exception: + return None + + def upload_file_to_jingrow(self, file_data: bytes, filename: str, + attached_to_pagetype: Optional[str] = None, + attached_to_name: Optional[str] = None, + attached_to_field: Optional[str] = None) -> Dict[str, Any]: + try: + api_url = f"{self.api_url}/api/action/upload_file" + headers = self._get_headers().copy() + # 让 requests 自动设置 multipart/form-data + if 'Content-Type' in headers: + del headers['Content-Type'] + + files = { + 'file': (filename, file_data, 'application/octet-stream') + } + data: Dict[str, Any] = { + 'file_name': filename, + 'is_private': 0 + } + if attached_to_pagetype and attached_to_name: + data['pagetype'] = attached_to_pagetype + data['docname'] = attached_to_name + if attached_to_field: + data['fieldname'] = attached_to_field + + response = requests.post(api_url, files=files, data=data, headers=headers, timeout=30) + if response.status_code == 200: + result = response.json() + if result.get('session_expired'): + return {'success': False, 'error': 'Session已过期,请重新登录'} + if result.get('message'): + return { + 'success': True, + 'file_url': result['message'].get('file_url'), + 'file_name': result['message'].get('file_name') + } + return {'success': False, 'error': 'API响应格式错误'} else: - raise ValueError("First non keyword argument must be a string or dict") - - # 从 kwargs 中提取 pagetype - if pagetype is None and kwargs: - if "pagetype" in kwargs: - pagetype = kwargs["pagetype"] + error_text = response.text + log_error(f"API请求失败: {response.status_code}, 响应: {error_text}") + return {'success': False, 'error': f'API请求失败 (HTTP {response.status_code}): {error_text}'} + except Exception as e: + return {'success': False, 'error': f'调用upload_file API异常: {str(e)}'} + + def get_field_mapping_from_jingrow(self, pagetype: str) -> Dict[str, str]: + try: + api_url = f"{self.api_url}/api/action/jingrow.ai.utils.jlocal.get_field_mapping" + headers = self._get_headers() + data = {'pagetype': pagetype} + response = requests.post(api_url, json=data, headers=headers, timeout=10) + if response.status_code == 200: + result = response.json() + if 'message' in result: + result = result['message'] + if result.get('success'): + return result.get('field_mapping', {}) + else: + log_error(f"获取字段映射失败: {result.get('error', '未知错误')}") + return {} else: - raise ValueError('"pagetype" is a required key') - - logger.info(f"API get_pg: {pagetype}, {name}") - - # 如果只有 pagetype 没有 name,返回列表 - if name is None: - # 从 kwargs 中移除 pagetype,避免参数冲突 - list_kwargs = kwargs.copy() - list_kwargs.pop('pagetype', None) - return self.get_list(pagetype, **list_kwargs) - - # 获取单个文档 - endpoint = self._build_endpoint(pagetype, name) - data = self._make_request('GET', endpoint, kwargs) - - # 如果 API 返回的数据中有 name,先移除它,使用我们指定的 name - if isinstance(data, dict) and 'name' in data: - api_name = data.pop('name') - - return self._create_page(pagetype, name, **data) - - def get_list(self, pagetype: str, fields=None, filters=None, group_by=None, - order_by=None, limit_start=None, limit_page_length=20, - parent=None, debug=False, as_dict=True, or_filters=None, **kwargs): - """获取页面类型列表 - 对齐云端 get_list 接口 - - :param pagetype: PageType on which query is to be made. - :param fields: List of fields or `*`. - :param filters: List of filters (see example). - :param order_by: Order By e.g. `modified desc`. - :param limit_start: Start results at record #. Default 0. - :param limit_page_length: No of records in the page. Default 20. - :param group_by: Group by field. - :param or_filters: OR filter conditions. - :param debug: Debug mode. - :param as_dict: Return as dict format. - - Example usage: - - get_list("ToDo", fields=["name", "description"], filters={"owner":"test@example.com"}) - - get_list("ToDo", fields="*", filters=[["modified", ">", "2014-01-01"]]) - """ - logger.info(f"API get_list: {pagetype}") - - # 构建查询参数 - params = {} - if fields is not None: - # API 期望 JSON 字符串 - params['fields'] = json.dumps(fields) if not isinstance(fields, str) else fields - if filters is not None: - # API 期望 JSON 字符串 - params['filters'] = json.dumps(filters) if not isinstance(filters, str) else filters - if group_by is not None: - params['group_by'] = group_by - if order_by is not None: - params['order_by'] = order_by - if limit_start is not None: - params['limit_start'] = limit_start - if limit_page_length != 20: # 默认值不需要传递 - params['limit_page_length'] = limit_page_length - if parent is not None: - params['parent'] = parent - if debug: - params['debug'] = debug - if not as_dict: # 默认 True,只有 False 时才传递 - params['as_dict'] = as_dict - if or_filters is not None: - params['or_filters'] = or_filters - - # 合并其他参数 - params.update(kwargs) - - endpoint = self._build_endpoint(pagetype) - data = self._make_request('GET', endpoint, params) - - return [self._create_page(pagetype, item.pop('name', None), **item) for item in data] - - def new_pg(self, pagetype: str, *, parent_pg=None, parentfield=None, as_dict=False, **kwargs): - """创建新的页面类型文档 - 对齐云端 new_pg 接口 - - :param pagetype: PageType of the new document. - :param parent_pg: [optional] add to parent document. - :param parentfield: [optional] add against this `parentfield`. - :param as_dict: [optional] return as dictionary instead of Page. - :param kwargs: [optional] You can specify fields as field=value pairs in function call. - """ - logger.info(f"API new_pg: {pagetype}") - - # 构建新文档数据 - new_data = { - "pagetype": pagetype, - "__islocal": 1, - "pagestatus": 0, - **kwargs - } - - # 如果有父文档,添加父文档信息 - if parent_pg: - new_data["parent"] = parent_pg.name if hasattr(parent_pg, 'name') else str(parent_pg) - new_data["parenttype"] = parent_pg.pagetype if hasattr(parent_pg, 'pagetype') else None - if parentfield: - new_data["parentfield"] = parentfield - - if as_dict: - return new_data - else: - # 移除 pagetype 避免参数冲突 - pagetype_data = new_data.pop('pagetype') - return self._create_page(pagetype_data, None, **new_data) - - def save_pg(self, pg, ignore_permissions=False, ignore_validate=False, ignore_mandatory=False): - """保存页面类型文档 - 对齐云端 save_pg 接口 - - :param pg: Page object to save - :param ignore_permissions: Ignore permission checks - :param ignore_validate: Ignore validation - :param ignore_mandatory: Ignore mandatory field checks - """ - logger.info(f"API save_pg: {pg.pagetype}, {getattr(pg, 'name', None)}") - - endpoint = self._build_endpoint(pg.pagetype, getattr(pg, 'name', None)) - method = 'PUT' if getattr(pg, 'name', None) else 'POST' - data = pg.as_dict() - - # 添加保存选项 - if ignore_permissions: - data['ignore_permissions'] = True - if ignore_validate: - data['ignore_validate'] = True - if ignore_mandatory: - data['ignore_mandatory'] = True - - result = self._make_request(method, endpoint, data) - - # 更新文档对象 - if result: - # 直接更新对象的属性 - for key, value in result.items(): - setattr(pg, key, value) - - return pg - - def delete_pg(self, pagetype: str, name: str, ignore_missing=False): - """删除页面类型文档 - 对齐云端 delete_pg 接口 - - :param pagetype: PageType name - :param name: Document name - :param ignore_missing: Ignore if document doesn't exist - """ - logger.info(f"API delete_pg: {pagetype}, {name}") - - endpoint = self._build_endpoint(pagetype, name) - params = {'ignore_missing': ignore_missing} if ignore_missing else {} - - self._make_request('DELETE', endpoint, params) - return True - - def db_exists(self, pagetype: str, filters: Dict[str, Any] = None, cache=False): - """检查文档是否存在 - 对齐云端 db.exists 接口 - - :param pagetype: PageType name - :param filters: Filter conditions (dict or name string) - :param cache: Enable caching for single document checks - - Examples: - - db_exists("User", "jane@example.org", cache=True) - - db_exists("User", {"full_name": "Jane Doe"}) - - db_exists({"pagetype": "User", "full_name": "Jane Doe"}) - """ - logger.info(f"API db_exists: {pagetype}, {filters}") - - # 处理字典格式的 pagetype - if isinstance(pagetype, dict): - filters = pagetype.copy() - pagetype = filters.pop("pagetype") - - # 如果 filters 是字符串,转换为字典 - if isinstance(filters, str): - filters = {"name": filters} - - endpoint = self._build_endpoint(pagetype) - params = {'filters': filters} if filters else {} - if cache: - params['cache'] = cache - - data = self._make_request('GET', endpoint, params) - return len(data) > 0 - - def db_get_value(self, pagetype: str, filters: Dict[str, Any] = None, fieldname: str = None, - ignore=False, cache=False, order_by=None, parent=None): - """获取数据库值 - 对齐云端 db.get_value 接口 - - :param pagetype: PageType name - :param filters: Filter conditions (dict or name string) - :param fieldname: Field name to get value for - :param ignore: Ignore if document doesn't exist - :param cache: Enable caching - :param order_by: Order by field - :param parent: Parent document for child table queries - - Examples: - - db_get_value("User", "jane@example.org", "full_name") - - db_get_value("User", {"email": "jane@example.org"}, "full_name") - """ - logger.info(f"API db_get_value: {pagetype}, {filters}, {fieldname}") - - # 如果 filters 是字符串,转换为字典 - if isinstance(filters, str): - filters = {"name": filters} - - endpoint = self._build_endpoint(pagetype) - params = { - 'filters': filters, - 'fieldname': fieldname - } - if ignore: - params['ignore'] = ignore - if cache: - params['cache'] = cache - if order_by: - params['order_by'] = order_by - if parent: - params['parent'] = parent - - data = self._make_request('GET', endpoint, params) - return data - - def db_set_value(self, pagetype: str, filters: Dict[str, Any] = None, fieldname: str = None, - value: Any = None, modified=None, modified_by=None, update_modified=True, debug=False): - """设置数据库值 - 对齐云端 db.set_value 接口 - - :param pagetype: PageType name - :param filters: Filter conditions (dict or name string) - :param fieldname: Field name to set value for - :param value: Value to set - :param modified: Use this as the modified timestamp - :param modified_by: Set this user as modified_by - :param update_modified: Update the modified timestamp - :param debug: Debug mode - - Examples: - - db_set_value("User", "jane@example.org", "full_name", "Jane Doe") - - db_set_value("User", {"email": "jane@example.org"}, "full_name", "Jane Doe") - """ - logger.info(f"API db_set_value: {pagetype}, {filters}, {fieldname}, {value}") - - # 如果 filters 是字符串,转换为字典 - if isinstance(filters, str): - filters = {"name": filters} - - endpoint = self._build_endpoint(pagetype) - data = { - 'filters': filters, - 'fieldname': fieldname, - 'value': value - } - if modified: - data['modified'] = modified - if modified_by: - data['modified_by'] = modified_by - if not update_modified: - data['update_modified'] = update_modified - if debug: - data['debug'] = debug - - self._make_request('PUT', endpoint, data) - return True + log_error(f"获取字段映射失败: HTTP {response.status_code}") + return {} + except Exception as e: + log_error(f"获取字段映射异常: {str(e)}") + return {} + + def get_field_value_from_jingrow(self, pagetype: str, name: str, fieldname: str) -> Optional[Any]: + try: + api_url = f"{self.api_url}/api/action/jingrow.ai.utils.jlocal.get_field_value" + headers = self._get_headers() + data = { 'pagetype': pagetype, 'name': name, 'fieldname': fieldname } + response = requests.post(api_url, json=data, headers=headers, timeout=10) + if response.status_code == 200: + result = response.json() + if 'message' in result: + result = result['message'] + if result.get('success'): + return result.get('value') + else: + log_error(f"获取字段值失败: {result.get('error', '未知错误')}") + return None + else: + log_error(f"获取字段值失败: HTTP {response.status_code}") + return None + except Exception as e: + log_error(f"获取字段值异常: {str(e)}") + return None + + def get_ai_settings_from_jingrow(self) -> Optional[Dict[str, Any]]: + try: + api_url = f"{self.api_url}/api/action/jingrow.ai.utils.jlocal.get_ai_settings" + headers = self._get_headers() + response = requests.post(api_url, headers=headers, timeout=10) + if response.status_code == 200: + result = response.json() + if result.get('session_expired'): + log_error("Session已过期,请重新登录") + return None + if 'message' in result: + result = result['message'] + if result.get('success'): + return result.get('config', {}) + else: + log_error(f"获取AI Settings失败: {result.get('error', '未知错误')}") + return None + else: + log_error(f"获取AI Settings失败: HTTP {response.status_code}") + return None + except Exception as e: + log_error(f"获取AI Settings配置异常: {str(e)}") + return None + + def get_agent_detail(self, name: str, session_cookie: Optional[str] = None) -> Optional[Dict[str, Any]]: + try: + api_url = f"{self.api_url}/api/action/jingrow.ai.utils.jlocal.get_local_ai_agent_detail" + headers = self._get_headers() + if not headers: + log_error("JINGROW_API_KEY 或 JINGROW_API_SECRET 未配置") + return None + payload = {"name": name} + response = requests.post(api_url, headers=headers, json=payload, timeout=15) + if response.status_code == 200: + data = response.json() + if isinstance(data, dict) and 'message' in data: + message = data['message'] + if isinstance(message, dict) and message.get('success'): + return message.get('data') + return data + else: + log_error(f"Failed to get agent detail: HTTP {response.status_code}: {response.text}") + return None + except Exception as e: + log_error(f"获取智能体详情异常: {str(e)}") + return None + + def get_pg(self, pagetype: str, name: str, session_cookie: Optional[str] = None) -> Dict[str, Any]: + try: + api_url = f"{self.api_url}/api/data/{pagetype}/{name}" + headers = self._get_headers() + if not headers: + return {'success': False, 'error': 'JINGROW_API_KEY 或 JINGROW_API_SECRET 未配置'} + response = requests.get(api_url, headers=headers, timeout=15) + if response.status_code == 200: + data = response.json() + if isinstance(data, dict) and 'data' in data: + return {'success': True, 'data': data['data']} + return {'success': True, 'data': data} + else: + return {'success': False, 'error': f"HTTP {response.status_code}: {response.text}"} + except Exception as e: + return {'success': False, 'error': f"获取记录异常: {str(e)}"} + + def create_pg(self, pagetype: str, payload: Dict[str, Any]) -> Dict[str, Any]: + try: + api_url = f"{self.api_url}/api/data/{pagetype}" + headers = self._get_headers() + response = requests.post(api_url, json=payload, headers=headers, timeout=20) + if response.status_code in (200, 201): + data = response.json() + if isinstance(data, dict) and 'data' in data: + return {'success': True, 'data': data['data']} + return {'success': True, 'data': data} + else: + return {'success': False, 'error': f"HTTP {response.status_code}: {response.text}"} + except Exception as e: + return {'success': False, 'error': f"创建记录异常: {str(e)}"} + + def update_pg(self, pagetype: str, name: str, payload: Dict[str, Any]) -> Dict[str, Any]: + try: + api_url = f"{self.api_url}/api/data/{pagetype}/{name}" + headers = self._get_headers() + response = requests.put(api_url, json=payload, headers=headers, timeout=20) + if response.status_code in (200, 202): + data = response.json() + if isinstance(data, dict) and 'data' in data: + return {'success': True, 'data': data['data']} + return {'success': True, 'data': data} + else: + return {'success': False, 'error': f"HTTP {response.status_code}: {response.text}"} + except Exception as e: + return {'success': False, 'error': f"更新记录异常: {str(e)}"} + + def update_local_job(self, data: Dict[str, Any]) -> Dict[str, Any]: + return self.push_local_job_to_jingrow(data) + + def push_local_job_to_jingrow(self, data: Dict[str, Any]) -> Dict[str, Any]: + try: + api_url = f"{self.api_url}/api/action/jingrow.core.pagetype.local_job.local_job.push_local_job" + headers = self._get_headers() + if not headers: + return {'success': False, 'error': 'JINGROW_API_KEY 或 JINGROW_API_SECRET 未配置'} + headers.setdefault("X-Requested-With", "XMLHttpRequest") + site_name = getattr(Config, "JINGROW_SITE", None) or os.getenv("JINGROW_SITE") + if site_name: + headers["X-Frappe-Site-Name"] = site_name + resp = requests.post(api_url, json=data, headers=headers, timeout=10) + if resp.status_code == 200: + body = resp.json() + if isinstance(body, dict) and 'message' in body: + body = body['message'] + if isinstance(body, dict) and body.get('success'): + return {'success': True} + log_error(f"[JFLOW->JINGROW] 推送失败(body): {body}") + return {'success': False, 'error': str(body)} + log_error(f"[JFLOW->JINGROW] 推送失败(HTTP): {resp.status_code} {resp.text}") + return {'success': False, 'error': f"HTTP {resp.status_code}: {resp.text}"} + except Exception as e: + log_error(f"[JFLOW->JINGROW] 推送异常: {str(e)}") + return {'success': False, 'error': f"推送 Local Job 失败: {str(e)}"} + + def delete_pg(self, pagetype: str, name: str) -> Dict[str, Any]: + try: + api_url = f"{self.api_url}/api/data/{pagetype}/{name}" + headers = self._get_headers() + response = requests.delete(api_url, headers=headers, timeout=15) + if response.status_code in (200, 202): + return {'success': True} + else: + return {'success': False, 'error': f"HTTP {response.status_code}: {response.text}"} + except Exception as e: + return {'success': False, 'error': f"删除记录异常: {str(e)}"} + + def map_fields_by_labels(self, field_map: List[Dict[str, Any]], ai_outputs: Dict[str, Any], label_to_fieldname: Dict[str, str]) -> Dict[str, Any]: + record_data: Dict[str, Any] = {} + fieldname_set = set(label_to_fieldname.values()) + for mapping in field_map or []: + source_key = mapping.get('from') + to_raw = mapping.get('to') + if not source_key or to_raw is None: + continue + value = ai_outputs.get(source_key) + if value is None: + continue + to_field = label_to_fieldname.get(str(to_raw).strip()) + if not to_field and str(to_raw).strip() in fieldname_set: + to_field = str(to_raw).strip() + if not to_field: + continue + record_data[to_field] = value + return record_data + + def get_pg_list(self, pagetype: str, filters: Optional[List[List[Any]]] = None, + fields: Optional[List[str]] = None, limit: Optional[int] = None) -> Dict[str, Any]: + try: + base_url = f"{self.api_url}/api/data/{pagetype}" + params: Dict[str, str] = {} + if filters: + params['filters'] = json.dumps(filters, ensure_ascii=False) + if fields: + params['fields'] = json.dumps(fields, ensure_ascii=False) + if isinstance(limit, int) and limit > 0: + params['limit'] = str(limit) + headers = self._get_headers() + response = requests.get(base_url, params=params, headers=headers, timeout=20) + if response.status_code == 200: + data = response.json() + if isinstance(data, dict) and 'data' in data: + return {'success': True, 'data': data['data']} + if isinstance(data, list): + return {'success': True, 'data': data} + return {'success': True, 'data': [data]} + else: + return {'success': False, 'error': f"HTTP {response.status_code}: {response.text}"} + except Exception as e: + return {'success': False, 'error': f"列表查询异常: {str(e)}"} + + def get_page_meta(self, pagetype: str) -> Dict[str, Any]: + try: + api_url = f"{self.api_url}/api/action/jingrow.ai.utils.jlocal.get_meta" + headers = self._get_headers() + payload = {"pagetype": pagetype} + response = requests.post(api_url, json=payload, headers=headers, timeout=10) + if response.status_code == 200: + result = response.json() + if isinstance(result, dict) and 'message' in result: + result = result['message'] + if isinstance(result, dict) and result.get('success'): + return {'success': True, 'data': result.get('data', {})} + return {'success': False, 'error': result.get('error', '获取Meta失败') if isinstance(result, dict) else '获取Meta失败'} + else: + return {'success': False, 'error': f"HTTP {response.status_code}: {response.text}"} + except Exception as e: + return {'success': False, 'error': f"获取Meta异常: {str(e)}"} + + def get_pagetype_module_app(self, pagetype: str) -> Dict[str, Any]: + try: + api_url = f"{self.api_url}/api/action/jingrow.ai.utils.jlocal.get_pagetype_module_app" + headers = self._get_headers() + if not headers: + return {'success': False, 'error': 'JINGROW_API_KEY 或 JINGROW_API_SECRET 未配置'} + payload = {"pagetype": pagetype} + response = requests.post(api_url, json=payload, headers=headers, timeout=10) + if response.status_code == 200: + data = response.json() + if isinstance(data, dict) and 'message' in data: + data = data['message'] + if isinstance(data, dict) and data.get('success'): + module_val = data.get('module') or (data.get('data') or {}).get('module') + app_val = data.get('app') or (data.get('data') or {}).get('app') + return {'success': True, 'module': module_val, 'app': app_val} + return {'success': False, 'error': data.get('error', '获取失败') if isinstance(data, dict) else '获取失败'} + return {'success': False, 'error': f"HTTP {response.status_code}: {response.text}"} + except Exception as e: + return {'success': False, 'error': f"获取模块信息异常: {str(e)}"} + + def get_pg_id(self, pagetype: str, filters: Optional[List[List[Any]]] = None, + field: Optional[str] = None, value: Optional[str] = None, + site: Optional[str] = None) -> Dict[str, Any]: + try: + query_filters: List[List[Any]] = [] + if isinstance(filters, list) and filters: + query_filters = filters + elif field and (value is not None): + query_filters = [[field, '=', value]] + if site: + query_filters.append(['site', '=', site]) + + res = self.get_pg_list(pagetype, filters=query_filters, fields=['name'], limit=1) + if not res.get('success'): + return {'success': False, 'error': res.get('error', '查询失败')} + data = res.get('data') or [] + if not data: + return {'success': False, 'error': '未找到记录'} + name_val = (data[0] or {}).get('name') + if not name_val: + return {'success': False, 'error': '记录缺少name字段'} + return {'success': True, 'name': name_val} + except Exception as e: + return {'success': False, 'error': f'获取记录name失败: {str(e)}'} + + def get_jingrow_system_timezone(self): + try: + headers = self._get_headers() + if not headers: + return self._get_default_timezone() + url = f"{self.api_url}/api/action/jingrow.core.pagetype.system_settings.system_settings.load" + response = requests.post(url, headers=headers, timeout=5) + if response.status_code == 200: + timezone_name = response.json().get('message', {}).get('defaults', {}).get('time_zone') + if timezone_name: + return pytz.timezone(timezone_name) + except (requests.RequestException, pytz.exceptions.UnknownTimeZoneError, KeyError): + pass + return self._get_default_timezone() + + def get_single_pagetype(self, pagetype: str) -> Dict[str, Any]: + try: + api_url = f"{self.api_url}/api/action/jingrow.ai.utils.jlocal.get_single_pagetype" + response = requests.post(api_url, json={"pagetype": pagetype}, headers=self._get_headers(), timeout=10) + if response.status_code == 200: + raw = response.json() + result = raw.get('message', raw) + if result.get('success'): + return {'success': True, 'config': result.get('config', {})} + return {'success': False, 'error': result.get('error', '获取失败')} + return {'success': False, 'error': f"HTTP {response.status_code}"} + except Exception as e: + return {'success': False, 'error': str(e)} + + def _get_default_timezone(self): + return datetime.datetime.now().astimezone().tzinfo + diff --git a/apps/jingrow/jingrow/model/page.py b/apps/jingrow/jingrow/model/page.py index 680765d..25aa071 100644 --- a/apps/jingrow/jingrow/model/page.py +++ b/apps/jingrow/jingrow/model/page.py @@ -1,159 +1,103 @@ # Copyright (c) 2025, JINGROW # License: MIT. See LICENSE -from __future__ import annotations - -from typing import Any, Iterable - +from typing import Any, Dict, List, Optional +from jingrow.services.runtime import get_adapter +from jingrow.core.hooks import execute_hook, execute_hook_async +import asyncio import jingrow -class Page(): - """最小可用 Page:本地数据容器 + 钩子触发""" +class Page: + """基于适配器的通用 Page 模型,提供常用 CRUD 与钩子机制。 + 钩子事件命名与 API 对齐:after_insert, on_update, on_trash 等。 + """ - def __init__(self, d: dict[str, Any] | None = None): - if d is None: - d = {} - if not isinstance(d, dict): - raise ValueError("Page expects a dict payload") - # 基础字段 - self.pagetype = d.get("pagetype") - if "name" in d: - self.name = d["name"] - # flags 简化为普通 dict(本地版不依赖 flags 机制) - self.flags = {} - self.update(d) + def __init__(self, pagetype: str): + self.pagetype = pagetype + # ---------------------- 内部工具 ---------------------- @property - def meta(self): - return jingrow.get_meta(self.pagetype) + def adapter(self): + return get_adapter() - def update(self, d: dict[str, Any]): - for key, value in d.items(): - self.set(key, value) - return self - - def get(self, key: str, default: Any = None): - return self.__dict__.get(key, default) - - def set(self, key: str, value: Any): - self.__dict__[key] = value - - def insert(self, **kwargs) -> "Page": - """插入页面,触发相关钩子""" - self.set("__islocal", True) - - # 触发 before_insert 钩子 - self._execute_hook("before_insert") - - # 本地方法(向后兼容) - self.run_method("before_insert") - - # 保存到数据库 - jingrow.save_pg(self) - - # 触发 after_insert 钩子 - self._execute_hook("after_insert") - - # 本地方法(向后兼容) - self.run_method("after_insert") - - return self - - def save(self, **kwargs) -> "Page": - """保存页面,触发相关钩子""" - # 触发 before_save 钩子(如果存在) - if not self.get("name"): - # 新记录,触发 before_insert - self._execute_hook("before_insert") - else: - # 更新记录,触发 on_update - self._execute_hook("on_update", **kwargs) - - # 本地方法(向后兼容) - self.run_method("before_save") - - # 保存到数据库 - jingrow.save_pg(self) - - # 如果之前触发了 before_insert,现在触发 after_insert - if not self.get("name") or self.get("__islocal"): - self._execute_hook("after_insert") - else: - self._execute_hook("on_update", **kwargs) - - # 本地方法(向后兼容) - self.run_method("on_update") - - return self - - def delete(self, **kwargs): - """删除页面,触发相关钩子""" - # 触发 on_trash 钩子 - self._execute_hook("on_trash", **kwargs) - - # 本地方法(向后兼容) - self.run_method("on_trash") - - return jingrow.delete_pg(self.pagetype, self.name) - - def run_method(self, method: str, *args, **kwargs): - """运行本地方法(向后兼容)""" - fn = getattr(self, method, None) - if callable(fn): - return fn(*args, **kwargs) - return None - - def _execute_hook(self, event_name: str, **kwargs): - """执行钩子(私有方法)""" + def _execute_hook(self, name: str, record_name: str, data: Optional[Dict[str, Any]] = None) -> bool: try: - import logging - from jingrow.core.hooks import execute_hook - logger = logging.getLogger(__name__) - - # 精确钩子:page.{pagetype}.{event_name} - specific_hook = f"page.{self.pagetype}.{event_name}" - results_specific = execute_hook( - specific_hook, - sender=self.pagetype, - page=self, - event=event_name, - **kwargs - ) - logger.debug(f"Hook executed: {specific_hook}, handlers_return={len(results_specific)}") - - # 通配钩子:page.*.{event_name}(用于全局监听) - wildcard_hook = f"page.*.{event_name}" - results_wildcard = execute_hook( + wildcard_hook = f"page.*.{name}" + pg_obj = jingrow.get_pg(self.pagetype, record_name) + execute_hook( wildcard_hook, sender=self.pagetype, - page=self, - event=event_name, - **kwargs + page=pg_obj, + event=name, + data=(data or {}), ) - logger.debug(f"Hook executed: {wildcard_hook}, handlers_return={len(results_wildcard)}") + return True + except Exception: + return False - except ImportError: - # 如果钩子系统未安装,静默失败(向后兼容) - pass - except Exception as e: - # 记录错误但继续执行(防止钩子错误影响主流程) - import logging - logger = logging.getLogger(__name__) - logger.warning(f"执行钩子失败 {event_name}: {e}", exc_info=True) + def _execute_hook_async(self, name: str, record_name: str, data: Optional[Dict[str, Any]] = None) -> bool: + try: + wildcard_hook = f"page.*.{name}" + pg_obj = jingrow.get_pg(self.pagetype, record_name) - # 兼容云端常用辅助 - def as_dict(self) -> dict[str, Any]: - d = {k: v for k, v in self.__dict__.items() if not k.startswith("_")} - d["pagetype"] = self.pagetype - return d + async def _run(): + await execute_hook_async( + wildcard_hook, + sender=self.pagetype, + page=pg_obj, + event=name, + data=(data or {}), + ) + + asyncio.create_task(_run()) + return True + except Exception: + return False + + # ---------------------- CRUD ---------------------- + def get(self, name: str) -> Dict[str, Any]: + return self.adapter.get_record(self.pagetype, name) + + def list(self, filters: Optional[List[List[Any]]] = None, + fields: Optional[List[str]] = None, + limit: Optional[int] = None) -> Dict[str, Any]: + return self.adapter.get_record_list(self.pagetype, filters=filters, fields=fields, limit=limit) + + def create(self, data: Dict[str, Any]) -> Dict[str, Any]: + result = self.adapter.create_record(self.pagetype, data) + if result.get('success'): + created = result.get('data', {}) + record_name = created.get('name') or data.get('name') + if record_name: + self._execute_hook_async('after_insert', record_name, created) + return result + + def update(self, name: str, data: Dict[str, Any]) -> Dict[str, Any]: + result = self.adapter.update_record(self.pagetype, name, data) + if result.get('success'): + self._execute_hook_async('on_update', name, data) + return result + + def delete(self, name: str) -> Dict[str, Any]: + # 先发同步 on_trash,再删除 + self._execute_hook('on_trash', name) + return self.adapter.delete_record(self.pagetype, name) + + # ---------------------- 其他便捷能力 ---------------------- + def get_meta(self) -> Dict[str, Any]: + return self.adapter.get_page_meta(self.pagetype) + + def get_module_app(self) -> Dict[str, Any]: + return self.adapter.get_pagetype_module_app(self.pagetype) + + def get_record_id(self, filters: Optional[List[List[Any]]] = None, + field: Optional[str] = None, value: Optional[str] = None, + site: Optional[str] = None) -> Dict[str, Any]: + return self.adapter.get_record_id(self.pagetype, filters=filters, field=field, value=value, site=site) + + def get_single(self) -> Dict[str, Any]: + return self.adapter.get_single_pagetype(self.pagetype) - def get_all_children(self) -> list["Page"]: - children = [] - for df in self.meta.get_table_fields(): - value = getattr(self, df.fieldname, None) - if value: - children.extend(value) - return children diff --git a/apps/jingrow/jingrow/services/runtime.py b/apps/jingrow/jingrow/services/runtime.py new file mode 100644 index 0000000..ec7a24f --- /dev/null +++ b/apps/jingrow/jingrow/services/runtime.py @@ -0,0 +1,24 @@ +from typing import Optional +from jingrow.config import Config + +_adapter = None + +def init_adapter(run_mode: str = "api"): + global _adapter + if run_mode == "api": + from jingrow.adapters.api_adapter import ApiAdapter + _adapter = ApiAdapter() + elif run_mode == "local": + from jingrow.adapters.local_adapter import LocalAdapter + _adapter = LocalAdapter() + else: + raise ValueError(f"Unsupported run_mode: {run_mode}. Supported modes: 'api', 'local'") + +def get_adapter(): + global _adapter + if _adapter is None: + # Lazy init using Config if not explicitly initialized + init_adapter(getattr(Config, "run_mode", "api")) + return _adapter + +