import json import requests import os from typing import Dict, Optional from settings import settings # 模型配置 translation_model = "Doubao" deepseek_api_model = "deepseek-chat" doubao_api_model = "doubao-1-5-pro-32k-250115" chatgpt_api_model = "gpt-4o" # 自定义提示词配置 default_system_message = """ 你是一位专业的中译英翻译专家。请将提供的中文内容翻译成地道、流畅的英文,确保保留原文的风格和语境。 翻译时要注意原文的专业术语和表达方式,使翻译结果符合英语的最佳实践。 只需返回翻译后的英文内容,不要包含任何其他说明或注释。 """ default_user_content = "请将以下中文内容翻译成英文:\n\n{source_text}" class TranslateService: def __init__(self, system_message: str = None, user_content: str = None): """初始化翻译服务 Args: system_message: 自定义系统提示词 user_content: 自定义用户提示词 """ self.system_message = system_message or default_system_message self.user_content = user_content or default_user_content def send_to_chatgpt(self, source_text: str) -> Optional[Dict]: """向ChatGPT发送翻译请求""" payload = { "model": chatgpt_api_model, "messages": [ { "role": "system", "content": self.system_message }, { "role": "user", "content": self.user_content.format(source_text=source_text) } ], "temperature": 0.3, "top_p": 0.9 } headers = { "Content-Type": "application/json", "Authorization": f"Bearer {settings.chatgpt_api_key}" } response = requests.post(settings.chatgpt_api_url, headers=headers, json=payload) if response.status_code != 200: print(f"Error: {response.status_code}, {response.text}") return None return response.json() def send_to_deepseek(self, source_text: str) -> Optional[Dict]: """向DeepSeek发送翻译请求""" payload = { "model": deepseek_api_model, "messages": [ { "role": "system", "content": self.system_message }, { "role": "user", "content": self.user_content.format(source_text=source_text) } ], "temperature": 0.3, "top_p": 0.9 } headers = { "Content-Type": "application/json", "Authorization": f"Bearer {settings.deepseek_api_key}" } response = requests.post(settings.deepseek_api_url, headers=headers, json=payload) if response.status_code != 200: print(f"Error: {response.status_code}, {response.text}") return None return response.json() def send_to_doubao(self, source_text: str) -> Optional[Dict]: """向Doubao发送翻译请求""" payload = { "model": doubao_api_model, "messages": [ { "role": "system", "content": self.system_message }, { "role": "user", "content": self.user_content.format(source_text=source_text) } ], "temperature": 0.3, "top_p": 0.9 } headers = { "Content-Type": "application/json", "Authorization": f"Bearer {settings.doubao_api_key}" } response = requests.post(settings.doubao_api_url, headers=headers, json=payload) if response.status_code != 200: print(f"Error: {response.status_code}, {response.text}") return None return response.json() def translate_text_sync(self, source_text: str) -> Dict: """同步处理翻译请求""" try: if not source_text: return { "status": "error", "message": "未提供翻译文本" } # 选择合适的AI模型处理请求 if translation_model == "DeepSeek": ai_response = self.send_to_deepseek(source_text) elif translation_model == "Doubao": ai_response = self.send_to_doubao(source_text) else: ai_response = self.send_to_chatgpt(source_text) if ai_response is None: return { "status": "error", "message": "AI服务请求失败" } choices = ai_response.get("choices", []) if not choices: return { "status": "error", "message": "AI响应无效" } english_translation = choices[0].get("message", {}).get("content", "").strip() return { "status": "success", "data": { "english_translation": english_translation } } except Exception as e: print(f"翻译任务处理失败: {str(e)}") return { "status": "error", "message": f"处理翻译任务时发生错误: {str(e)}" } async def translate_text(self, source_text: str) -> Dict: """异步处理翻译请求""" try: import asyncio loop = asyncio.get_event_loop() result = await loop.run_in_executor(None, self.translate_text_sync, source_text) return result except Exception as e: return { "status": "error", "message": f"翻译失败: {str(e)}" }