import aiohttp from functools import wraps from fastapi import HTTPException from typing import Callable, Any, Dict from settings import settings from fastapi.responses import StreamingResponse import json async def verify_api_credentials_and_balance(api_key: str, api_secret: str, api_name: str) -> Dict[str, Any]: try: async with aiohttp.ClientSession() as session: async with session.post( f"{settings.jingrow_api_url}/api/action/jcloud.api.account.verify_api_credentials_and_balance", headers={"Authorization": f"token {settings.jingrow_api_key}:{settings.jingrow_api_secret}"}, json={"api_key": api_key, "api_secret": api_secret, "api_name": api_name} ) as response: if response.status != 200: raise HTTPException(status_code=500, detail="验证服务暂时不可用") result = await response.json() if "message" in result and isinstance(result["message"], dict): result = result["message"] if not result.get("success"): raise HTTPException(status_code=401, detail=result.get("message", "验证失败")) return result except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=f"验证服务暂时不可用: {str(e)}") async def deduct_jingrow_api_usage_fee(api_key: str, api_secret: str, api_name: str, usage_count: int = 1) -> Dict[str, Any]: try: async with aiohttp.ClientSession() as session: async with session.post( f"{settings.jingrow_api_url}/api/action/jcloud.api.account.deduct_api_usage_fee", headers={"Authorization": f"token {settings.jingrow_api_key}:{settings.jingrow_api_secret}"}, json={ "api_key": api_key, "api_secret": api_secret, "api_name": api_name, "usage_count": usage_count } ) as response: if response.status != 200: raise HTTPException(status_code=500, detail="扣费服务暂时不可用") result = await response.json() if "message" in result and isinstance(result["message"], dict): result = result["message"] return result except HTTPException: raise except Exception as e: return {"success": False, "message": f"扣费服务暂时不可用: {str(e)}"} def get_token_from_request(request) -> str: auth_header = request.headers.get("Authorization", "") if not auth_header or not auth_header.startswith("token "): raise HTTPException(status_code=401, detail="无效的Authorization头格式") token = auth_header[6:] if ":" not in token: raise HTTPException(status_code=401, detail="无效的令牌格式") return token def jingrow_api_verify_and_billing(api_name: str): def decorator(func: Callable) -> Callable: @wraps(func) async def wrapper(*args, **kwargs): request = kwargs.get('request') if not request: raise HTTPException(status_code=400, detail="无法获取请求信息") token = get_token_from_request(request) api_key, api_secret = token.split(":", 1) verify_result = await verify_api_credentials_and_balance(api_key, api_secret, api_name) if not verify_result.get("success"): raise HTTPException(status_code=401, detail=verify_result.get("message", "验证失败")) result = await func(*args, **kwargs) usage_count = 1 try: body_data = await request.json() if isinstance(body_data, dict): for key in ["items", "texts", "sentences"]: if key in body_data and isinstance(body_data[key], list): usage_count = len(body_data[key]) break except Exception: pass if isinstance(result, StreamingResponse): original_generator = result.body_iterator success_count = 0 async def wrapped_generator(): nonlocal success_count async for chunk in original_generator: try: data = json.loads(chunk) if isinstance(data, dict) and data.get("status") == "success": success_count += 1 except: pass yield chunk if success_count > 0: await deduct_jingrow_api_usage_fee(api_key, api_secret, api_name, success_count) return StreamingResponse( wrapped_generator(), media_type=result.media_type, headers=result.headers ) if isinstance(result, dict) and result.get("success") is True: actual_usage_count = result.get("successful_count", usage_count) await deduct_jingrow_api_usage_fee(api_key, api_secret, api_name, actual_usage_count) return result await deduct_jingrow_api_usage_fee(api_key, api_secret, api_name, usage_count) return result return wrapper return decorator