jflow/backend/utils/jingrow_api.py

202 lines
6.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import sys
import os
import uuid
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from config import Config
from utils.auth import get_session_api_headers
def log_info(message):
"""记录信息日志"""
print(f"[INFO] {message}")
def log_error(message):
"""记录错误日志"""
print(f"[ERROR] {message}")
def upload_file_to_jingrow(file_data, filename, attached_to_pagetype=None, attached_to_name=None, attached_to_field=None):
"""
上传文件到Jingrow服务器
Args:
file_data (bytes): 文件二进制数据
filename (str): 文件名
attached_to_pagetype (str, optional): 关联的页面类型
attached_to_name (str, optional): 关联的记录名称
attached_to_field (str, optional): 关联的字段名称
Returns:
dict: 上传结果
{
'success': bool,
'file_url': str, # 成功时返回文件URL
'file_name': str, # 成功时返回文件名
'error': str # 失败时返回错误信息
}
"""
try:
# 构建API URL
api_url = f"{Config.JINGROW_SERVER_URL}/api/action/upload_file"
# 获取认证头信息
headers = get_session_api_headers()
# 移除Content-Type让requests自动设置multipart/form-data
if 'Content-Type' in headers:
del headers['Content-Type']
# 准备请求数据 - Jingrow期望multipart/form-data格式
files = {
'file': (filename, file_data, 'application/octet-stream')
}
data = {
'file_name': filename,
'is_private': 0
}
# 如果有关联记录,添加到请求数据中
if attached_to_pagetype and attached_to_name:
data['pagetype'] = attached_to_pagetype
data['docname'] = attached_to_name
if attached_to_field:
data['fieldname'] = attached_to_field
# 发送请求
response = requests.post(api_url, files=files, data=data, headers=headers, timeout=30)
if response.status_code == 200:
result = response.json()
# 检查session是否过期
if result.get('session_expired'):
return {'success': False, 'error': 'Session已过期请重新登录'}
if result.get('message'):
return {
'success': True,
'file_url': result['message'].get('file_url'),
'file_name': result['message'].get('file_name')
}
else:
return {'success': False, 'error': 'API响应格式错误'}
else:
error_text = response.text
log_error(f"API请求失败: {response.status_code}, 响应: {error_text}")
return {'success': False, 'error': f'API请求失败 (HTTP {response.status_code}): {error_text}'}
except Exception as e:
return {'success': False, 'error': f'调用upload_file API异常: {str(e)}'}
def get_field_mapping_from_jingrow(pagetype):
"""
从Jingrow获取字段的label到fieldname的映射
Args:
pagetype (str): 页面类型名称
Returns:
dict: {label: fieldname} 的映射字典
"""
try:
# 调用Jingrow的jflow API获取字段映射
api_url = f"{Config.JINGROW_SERVER_URL}/api/action/jingrow.ai.utils.jflow.get_field_mapping"
headers = get_session_api_headers()
data = {
'pagetype': pagetype
}
response = requests.post(api_url, json=data, headers=headers, timeout=10)
if response.status_code == 200:
result = response.json()
# 处理Jingrow API的响应格式
if 'message' in result:
result = result['message']
if result.get('success'):
return result.get('field_mapping', {})
else:
log_error(f"获取字段映射失败: {result.get('error', '未知错误')}")
return {}
else:
log_error(f"获取字段映射失败: HTTP {response.status_code}")
return {}
except Exception as e:
log_error(f"获取字段映射异常: {str(e)}")
return {}
def get_field_value_from_jingrow(pagetype, name, fieldname):
"""
从Jingrow获取字段的当前值
Args:
pagetype (str): 页面类型名称
name (str): 记录名称
fieldname (str): 字段名称
Returns:
str: 字段的当前值如果为空则返回None
"""
try:
# 调用Jingrow的jflow API获取字段值
api_url = f"{Config.JINGROW_SERVER_URL}/api/action/jingrow.ai.utils.jflow.get_field_value"
headers = get_session_api_headers()
data = {
'pagetype': pagetype,
'name': name,
'fieldname': fieldname
}
response = requests.post(api_url, json=data, headers=headers, timeout=10)
if response.status_code == 200:
result = response.json()
if 'message' in result:
result = result['message']
if result.get('success'):
return result.get('value')
else:
log_error(f"获取字段值失败: {result.get('error', '未知错误')}")
return None
else:
log_error(f"获取字段值失败: HTTP {response.status_code}")
return None
except Exception as e:
log_error(f"获取字段值异常: {str(e)}")
return None
def get_ai_settings_from_jingrow():
try:
# 使用新的API端点
api_url = f"{Config.JINGROW_SERVER_URL}/api/action/jingrow.ai.utils.jflow.get_ai_settings"
headers = get_session_api_headers()
response = requests.post(api_url, headers=headers, timeout=10)
if response.status_code == 200:
result = response.json()
# 检查session是否过期
if result.get('session_expired'):
log_error("Session已过期请重新登录")
return None
# 处理Jingrow API的响应格式
if 'message' in result:
result = result['message']
if result.get('success'):
return result.get('config', {})
else:
log_error(f"获取AI Settings失败: {result.get('error', '未知错误')}")
return None
else:
log_error(f"获取AI Settings失败: HTTP {response.status_code}")
return None
except Exception as e:
log_error(f"获取AI Settings配置异常: {str(e)}")
return None