2025-10-25 03:46:42 +08:00

374 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Copyright (c) 2025, JINGROW and contributors
# For license information, please see license.txt
"""
Jingrow API 适配器, 通过 API 调用 Jingrow SaaS 版的服务
"""
from typing import Dict, List, Any, Optional, Union
import logging
import requests
from jingrow.model.page import Page
from jingrow.config import Config
logger = logging.getLogger(__name__)
class ApiAdapter:
"""API 适配器 - 通过 API 调用 Jingrow SaaS 版"""
def __init__(self, **config):
self.api_url = Config.jingrow_server_url
self.api_key = Config.jingrow_api_key
self.api_secret = Config.jingrow_api_secret
self.session_cookie = Config.jingrow_session_cookie
logger.info(f"API adapter initialized with URL: {self.api_url}")
def _get_headers(self):
"""获取请求头"""
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json'
}
# 使用 Jingrow API 认证格式: token {api_key}:{api_secret}
if self.api_key and self.api_secret:
headers['Authorization'] = f'token {self.api_key}:{self.api_secret}'
if self.session_cookie:
headers['Cookie'] = f'sid={self.session_cookie}'
return headers
def _make_request(self, method: str, endpoint: str, data: Dict = None):
"""发送 API 请求"""
url = f"{self.api_url}/{endpoint.lstrip('/')}"
headers = self._get_headers()
try:
if method.upper() == 'GET':
response = requests.get(url, headers=headers, params=data)
elif method.upper() == 'POST':
response = requests.post(url, headers=headers, json=data)
elif method.upper() == 'PUT':
response = requests.put(url, headers=headers, json=data)
elif method.upper() == 'DELETE':
response = requests.delete(url, headers=headers)
else:
raise ValueError(f"Unsupported HTTP method: {method}")
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.error(f"API request failed: {e}")
raise
def _create_page(self, pagetype: str, name: str = None, **data):
"""创建 Page 对象的工厂方法"""
page_data = {"pagetype": pagetype, **data}
if name is not None:
page_data["name"] = name
return Page(page_data)
def _build_endpoint(self, pagetype: str, name: str = None) -> str:
"""构建 API 端点"""
endpoint = f"api/data/{pagetype}"
if name:
endpoint += f"/{name}"
return endpoint
def get_pg(self, *args, **kwargs):
"""获取页面类型文档 - 对齐云端 get_pg 接口
支持多种调用方式:
- get_pg("PageType", "name") - 获取指定文档
- get_pg({"pagetype": "PageType", ...}) - 从字典创建文档
- get_pg(pagetype="PageType", field=...) - 关键字参数方式
- get_pg("PageType", "name", for_update=True) - 带更新锁
"""
pagetype = None
name = None
# 处理参数
if args:
if isinstance(args[0], str):
pagetype = args[0]
if len(args) > 1:
name = args[1]
elif isinstance(args[0], dict):
# 从字典创建文档
kwargs = args[0]
else:
raise ValueError("First non keyword argument must be a string or dict")
# 从 kwargs 中提取 pagetype
if pagetype is None and kwargs:
if "pagetype" in kwargs:
pagetype = kwargs["pagetype"]
else:
raise ValueError('"pagetype" is a required key')
logger.info(f"API get_pg: {pagetype}, {name}")
# 如果只有 pagetype 没有 name返回列表
if name is None:
# 从 kwargs 中移除 pagetype避免参数冲突
list_kwargs = kwargs.copy()
list_kwargs.pop('pagetype', None)
return self.get_list(pagetype, **list_kwargs)
# 获取单个文档
endpoint = self._build_endpoint(pagetype, name)
data = self._make_request('GET', endpoint, kwargs)
# 如果 API 返回的数据中有 name先移除它使用我们指定的 name
if isinstance(data, dict) and 'name' in data:
api_name = data.pop('name')
return self._create_page(pagetype, name, **data)
def get_list(self, pagetype: str, fields=None, filters=None, group_by=None,
order_by=None, limit_start=None, limit_page_length=20,
parent=None, debug=False, as_dict=True, or_filters=None, **kwargs):
"""获取页面类型列表 - 对齐云端 get_list 接口
:param pagetype: PageType on which query is to be made.
:param fields: List of fields or `*`.
:param filters: List of filters (see example).
:param order_by: Order By e.g. `modified desc`.
:param limit_start: Start results at record #. Default 0.
:param limit_page_length: No of records in the page. Default 20.
:param group_by: Group by field.
:param or_filters: OR filter conditions.
:param debug: Debug mode.
:param as_dict: Return as dict format.
Example usage:
- get_list("ToDo", fields=["name", "description"], filters={"owner":"test@example.com"})
- get_list("ToDo", fields="*", filters=[["modified", ">", "2014-01-01"]])
"""
logger.info(f"API get_list: {pagetype}")
# 构建查询参数
params = {}
if fields is not None:
params['fields'] = fields
if filters is not None:
params['filters'] = filters
if group_by is not None:
params['group_by'] = group_by
if order_by is not None:
params['order_by'] = order_by
if limit_start is not None:
params['limit_start'] = limit_start
if limit_page_length != 20: # 默认值不需要传递
params['limit_page_length'] = limit_page_length
if parent is not None:
params['parent'] = parent
if debug:
params['debug'] = debug
if not as_dict: # 默认 True只有 False 时才传递
params['as_dict'] = as_dict
if or_filters is not None:
params['or_filters'] = or_filters
# 合并其他参数
params.update(kwargs)
endpoint = self._build_endpoint(pagetype)
data = self._make_request('GET', endpoint, params)
return [self._create_page(pagetype, item.pop('name', None), **item) for item in data]
def new_pg(self, pagetype: str, *, parent_pg=None, parentfield=None, as_dict=False, **kwargs):
"""创建新的页面类型文档 - 对齐云端 new_pg 接口
:param pagetype: PageType of the new document.
:param parent_pg: [optional] add to parent document.
:param parentfield: [optional] add against this `parentfield`.
:param as_dict: [optional] return as dictionary instead of Page.
:param kwargs: [optional] You can specify fields as field=value pairs in function call.
"""
logger.info(f"API new_pg: {pagetype}")
# 构建新文档数据
new_data = {
"pagetype": pagetype,
"__islocal": 1,
"pagestatus": 0,
**kwargs
}
# 如果有父文档,添加父文档信息
if parent_pg:
new_data["parent"] = parent_pg.name if hasattr(parent_pg, 'name') else str(parent_pg)
new_data["parenttype"] = parent_pg.pagetype if hasattr(parent_pg, 'pagetype') else None
if parentfield:
new_data["parentfield"] = parentfield
if as_dict:
return new_data
else:
# 移除 pagetype 避免参数冲突
pagetype_data = new_data.pop('pagetype')
return self._create_page(pagetype_data, None, **new_data)
def save_pg(self, pg, ignore_permissions=False, ignore_validate=False, ignore_mandatory=False):
"""保存页面类型文档 - 对齐云端 save_pg 接口
:param pg: Page object to save
:param ignore_permissions: Ignore permission checks
:param ignore_validate: Ignore validation
:param ignore_mandatory: Ignore mandatory field checks
"""
logger.info(f"API save_pg: {pg.pagetype}, {getattr(pg, 'name', None)}")
endpoint = self._build_endpoint(pg.pagetype, getattr(pg, 'name', None))
method = 'PUT' if getattr(pg, 'name', None) else 'POST'
data = pg.as_dict()
# 添加保存选项
if ignore_permissions:
data['ignore_permissions'] = True
if ignore_validate:
data['ignore_validate'] = True
if ignore_mandatory:
data['ignore_mandatory'] = True
result = self._make_request(method, endpoint, data)
# 更新文档对象
if result:
# 直接更新对象的属性
for key, value in result.items():
setattr(pg, key, value)
return pg
def delete_pg(self, pagetype: str, name: str, ignore_missing=False):
"""删除页面类型文档 - 对齐云端 delete_pg 接口
:param pagetype: PageType name
:param name: Document name
:param ignore_missing: Ignore if document doesn't exist
"""
logger.info(f"API delete_pg: {pagetype}, {name}")
endpoint = self._build_endpoint(pagetype, name)
params = {'ignore_missing': ignore_missing} if ignore_missing else {}
self._make_request('DELETE', endpoint, params)
return True
def db_exists(self, pagetype: str, filters: Dict[str, Any] = None, cache=False):
"""检查文档是否存在 - 对齐云端 db.exists 接口
:param pagetype: PageType name
:param filters: Filter conditions (dict or name string)
:param cache: Enable caching for single document checks
Examples:
- db_exists("User", "jane@example.org", cache=True)
- db_exists("User", {"full_name": "Jane Doe"})
- db_exists({"pagetype": "User", "full_name": "Jane Doe"})
"""
logger.info(f"API db_exists: {pagetype}, {filters}")
# 处理字典格式的 pagetype
if isinstance(pagetype, dict):
filters = pagetype.copy()
pagetype = filters.pop("pagetype")
# 如果 filters 是字符串,转换为字典
if isinstance(filters, str):
filters = {"name": filters}
endpoint = self._build_endpoint(pagetype)
params = {'filters': filters} if filters else {}
if cache:
params['cache'] = cache
data = self._make_request('GET', endpoint, params)
return len(data) > 0
def db_get_value(self, pagetype: str, filters: Dict[str, Any] = None, fieldname: str = None,
ignore=False, cache=False, order_by=None, parent=None):
"""获取数据库值 - 对齐云端 db.get_value 接口
:param pagetype: PageType name
:param filters: Filter conditions (dict or name string)
:param fieldname: Field name to get value for
:param ignore: Ignore if document doesn't exist
:param cache: Enable caching
:param order_by: Order by field
:param parent: Parent document for child table queries
Examples:
- db_get_value("User", "jane@example.org", "full_name")
- db_get_value("User", {"email": "jane@example.org"}, "full_name")
"""
logger.info(f"API db_get_value: {pagetype}, {filters}, {fieldname}")
# 如果 filters 是字符串,转换为字典
if isinstance(filters, str):
filters = {"name": filters}
endpoint = self._build_endpoint(pagetype)
params = {
'filters': filters,
'fieldname': fieldname
}
if ignore:
params['ignore'] = ignore
if cache:
params['cache'] = cache
if order_by:
params['order_by'] = order_by
if parent:
params['parent'] = parent
data = self._make_request('GET', endpoint, params)
return data
def db_set_value(self, pagetype: str, filters: Dict[str, Any] = None, fieldname: str = None,
value: Any = None, modified=None, modified_by=None, update_modified=True, debug=False):
"""设置数据库值 - 对齐云端 db.set_value 接口
:param pagetype: PageType name
:param filters: Filter conditions (dict or name string)
:param fieldname: Field name to set value for
:param value: Value to set
:param modified: Use this as the modified timestamp
:param modified_by: Set this user as modified_by
:param update_modified: Update the modified timestamp
:param debug: Debug mode
Examples:
- db_set_value("User", "jane@example.org", "full_name", "Jane Doe")
- db_set_value("User", {"email": "jane@example.org"}, "full_name", "Jane Doe")
"""
logger.info(f"API db_set_value: {pagetype}, {filters}, {fieldname}, {value}")
# 如果 filters 是字符串,转换为字典
if isinstance(filters, str):
filters = {"name": filters}
endpoint = self._build_endpoint(pagetype)
data = {
'filters': filters,
'fieldname': fieldname,
'value': value
}
if modified:
data['modified'] = modified
if modified_by:
data['modified_by'] = modified_by
if not update_modified:
data['update_modified'] = update_modified
if debug:
data['debug'] = debug
self._make_request('PUT', endpoint, data)
return True