import aiohttp from functools import wraps from fastapi import HTTPException import os from typing import Callable, Any, Dict, Optional, Tuple, List from settings import settings from fastapi.responses import StreamingResponse import json import requests import io import re from pathlib import Path from urllib.parse import urlparse from PIL import Image async def verify_api_credentials_and_balance(api_key: str, api_secret: str, api_name: str) -> Dict[str, Any]: """验证API密钥和团队余额""" try: async with aiohttp.ClientSession() as session: async with session.post( f"{settings.jingrow_api_url}/api/method/jcloud.api.account.verify_api_credentials_and_balance", headers={"Authorization": f"token {settings.jingrow_api_key}:{settings.jingrow_api_secret}"}, json={"api_key": api_key, "api_secret": api_secret, "api_name": api_name} ) as response: if response.status != 200: raise HTTPException(status_code=500, detail="验证服务暂时不可用") result = await response.json() if "message" in result and isinstance(result["message"], dict): result = result["message"] if not result.get("success"): raise HTTPException(status_code=401, detail=result.get("message", "验证失败")) return result except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=f"验证服务暂时不可用: {str(e)}") async def deduct_jingrow_api_usage_fee(api_key: str, api_secret: str, api_name: str, usage_count: int = 1) -> Dict[str, Any]: """从Jingrow平台扣除API使用费""" try: async with aiohttp.ClientSession() as session: async with session.post( f"{settings.jingrow_api_url}/api/method/jcloud.api.account.deduct_api_usage_fee", headers={"Authorization": f"token {settings.jingrow_api_key}:{settings.jingrow_api_secret}"}, json={ "api_key": api_key, "api_secret": api_secret, "api_name": api_name, "usage_count": usage_count } ) as response: if response.status != 200: raise HTTPException(status_code=500, detail="扣费服务暂时不可用") result = await response.json() if "message" in result and isinstance(result["message"], dict): result = result["message"] return result except HTTPException: raise except Exception as e: return {"success": False, "message": f"扣费服务暂时不可用: {str(e)}"} def get_token_from_request(request) -> str: """从请求中获取访问令牌""" if not request: raise HTTPException(status_code=400, detail="无法获取请求信息") auth_header = request.headers.get("Authorization", "") if not auth_header or not auth_header.startswith("token "): raise HTTPException(status_code=401, detail="无效的Authorization头格式") token = auth_header[6:] if ":" not in token: raise HTTPException(status_code=401, detail="无效的令牌格式") return token def jingrow_api_verify_and_billing(api_name: str): """Jingrow API 验证装饰器(带余额检查和扣费)""" def decorator(func: Callable) -> Callable: @wraps(func) async def wrapper(*args, **kwargs): try: request = kwargs.get('request') if not request: raise HTTPException(status_code=400, detail="无法获取请求信息") token = get_token_from_request(request) api_key, api_secret = token.split(":", 1) verify_result = await verify_api_credentials_and_balance(api_key, api_secret, api_name) if not verify_result.get("success"): raise HTTPException(status_code=401, detail=verify_result.get("message", "验证失败")) result = await func(*args, **kwargs) usage_count = 1 try: body_data = await request.json() if isinstance(body_data, dict): for key in ["items", "urls", "images", "files"]: if key in body_data and isinstance(body_data[key], list): usage_count = len(body_data[key]) break except Exception: pass if isinstance(result, StreamingResponse): original_generator = result.body_iterator success_count = 0 async def wrapped_generator(): nonlocal success_count async for chunk in original_generator: try: data = json.loads(chunk) if isinstance(data, dict) and data.get("status") == "success": success_count += 1 except: pass yield chunk if success_count > 0: await deduct_jingrow_api_usage_fee(api_key, api_secret, api_name, success_count) return StreamingResponse( wrapped_generator(), media_type=result.media_type, headers=result.headers ) if isinstance(result, dict) and result.get("success") is True: actual_usage_count = result.get("successful_count", usage_count) await deduct_jingrow_api_usage_fee(api_key, api_secret, api_name, actual_usage_count) return result await deduct_jingrow_api_usage_fee(api_key, api_secret, api_name, usage_count) return result except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=f"API验证过程发生异常: {str(e)}") return wrapper return decorator def is_valid_image_url(url: str) -> bool: """验证图片URL是否有效 Args: url: 要验证的URL Returns: bool: URL是否有效 """ if not url or not isinstance(url, str): return False try: parsed = urlparse(url) if not parsed.scheme or not parsed.netloc: return False # 检查文件扩展名 path = parsed.path.lower() valid_extensions = ['.jpg', '.jpeg', '.png', '.webp', '.gif'] return any(path.endswith(ext) for ext in valid_extensions) except: return False def validate_image_file(file_path: str) -> bool: """验证图片文件是否有效 Args: file_path: 图片文件路径 Returns: bool: 文件是否有效 """ try: with Image.open(file_path) as img: img.verify() return True except: return False def get_image_size(image_url: str) -> Optional[Tuple[int, int]]: """获取图片尺寸 Args: image_url: 图片URL Returns: Optional[Tuple[int, int]]: 图片尺寸(宽,高),如果获取失败则返回None """ try: response = requests.get(image_url, verify=False, timeout=10) if response.status_code != 200: return None with Image.open(io.BytesIO(response.content)) as img: return img.size except: return None def is_valid_image_size(image_url: str, min_size: int = 512) -> bool: """验证图片尺寸是否满足最小要求 Args: image_url: 图片URL min_size: 最小尺寸要求 Returns: bool: 图片尺寸是否满足要求 """ size = get_image_size(image_url) if not size: return False width, height = size return width >= min_size and height >= min_size def extract_image_urls_from_text(text: str) -> List[str]: """从文本中提取图片URL Args: text: 包含图片URL的文本 Returns: List[str]: 提取到的图片URL列表 """ # 匹配常见的图片URL模式 url_pattern = r'https?://[^\s<>"]+?\.(?:jpg|jpeg|png|webp|gif)(?:\?[^\s<>"]*)?' urls = re.findall(url_pattern, text, re.IGNORECASE) return [url for url in urls if is_valid_image_url(url)] def sanitize_filename(filename: str) -> str: """清理文件名,移除非法字符 Args: filename: 原始文件名 Returns: str: 清理后的文件名 """ # 移除非法字符 filename = re.sub(r'[<>:"/\\|?*]', '', filename) # 限制长度 if len(filename) > 255: name, ext = os.path.splitext(filename) filename = name[:255-len(ext)] + ext return filename def get_new_image_url(image_url: str) -> str: """将图片URL转换为新的存储URL Args: image_url: 原始图片URL Returns: str: 新的图片URL Raises: HTTPException: 当图片处理失败时抛出 """ try: # 使用settings中的upload_url upload_url = settings.upload_url if not upload_url: raise HTTPException(status_code=500, detail="未配置上传URL") # 下载图片 response = requests.get(image_url, verify=False, timeout=30) if response.status_code != 200: raise HTTPException(status_code=400, detail=f"无法下载图片: HTTP {response.status_code}") image_data = response.content # 解析文件名和扩展名 parsed_url = urlparse(image_url) file_name = Path(parsed_url.path).name file_name = sanitize_filename(file_name) file_ext = Path(file_name).suffix.lower() # 如果图片是webp格式,转换为png格式 if file_ext == '.webp': image = Image.open(io.BytesIO(image_data)) png_buffer = io.BytesIO() image.save(png_buffer, format='PNG') image_data = png_buffer.getvalue() file_name = file_name.replace('.webp', '.png') # 准备文件上传 files = {"file": (file_name, image_data)} # 上传图片 upload_response = requests.post(upload_url, files=files, verify=False, timeout=30) if upload_response.status_code != 200: error_message = f"图片URL转换失败: 状态码 {upload_response.status_code}, 响应: {upload_response.text[:200]}" raise HTTPException(status_code=500, detail=error_message) result = upload_response.json() new_url = result.get("url") if not new_url: raise HTTPException(status_code=500, detail="上传成功但未返回URL") return new_url except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=f"图片URL转换异常: {str(e)}")