147 lines
6.5 KiB
Python
147 lines
6.5 KiB
Python
import aiohttp
|
|
from functools import wraps
|
|
from fastapi import HTTPException
|
|
import os
|
|
from typing import Callable, Any, Dict, Optional, Tuple
|
|
from settings import settings
|
|
from fastapi.responses import StreamingResponse
|
|
import json
|
|
|
|
async def verify_api_credentials_and_balance(api_key: str, api_secret: str, api_name: str) -> Dict[str, Any]:
|
|
"""验证API密钥和团队余额"""
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.post(
|
|
f"{settings.jingrow_api_url}/api/action/jcloud.api.account.verify_api_credentials_and_balance",
|
|
headers={"Authorization": f"token {settings.jingrow_api_key}:{settings.jingrow_api_secret}"},
|
|
json={"api_key": api_key, "api_secret": api_secret, "api_name": api_name}
|
|
) as response:
|
|
if response.status != 200:
|
|
raise HTTPException(status_code=500, detail="验证服务暂时不可用")
|
|
|
|
result = await response.json()
|
|
if "message" in result and isinstance(result["message"], dict):
|
|
result = result["message"]
|
|
|
|
if not result.get("success"):
|
|
raise HTTPException(status_code=401, detail=result.get("message", "验证失败"))
|
|
|
|
return result
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"验证服务暂时不可用: {str(e)}")
|
|
|
|
async def deduct_jingrow_api_usage_fee(api_key: str, api_secret: str, api_name: str, usage_count: int = 1) -> Dict[str, Any]:
|
|
"""从Jingrow平台扣除API使用费"""
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.post(
|
|
f"{settings.jingrow_api_url}/api/action/jcloud.api.account.deduct_api_usage_fee",
|
|
headers={"Authorization": f"token {settings.jingrow_api_key}:{settings.jingrow_api_secret}"},
|
|
json={
|
|
"api_key": api_key,
|
|
"api_secret": api_secret,
|
|
"api_name": api_name,
|
|
"usage_count": usage_count
|
|
}
|
|
) as response:
|
|
if response.status != 200:
|
|
raise HTTPException(status_code=500, detail="扣费服务暂时不可用")
|
|
|
|
result = await response.json()
|
|
if "message" in result and isinstance(result["message"], dict):
|
|
result = result["message"]
|
|
|
|
return result
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
return {"success": False, "message": f"扣费服务暂时不可用: {str(e)}"}
|
|
|
|
def get_token_from_request(request) -> str:
|
|
"""从请求中获取访问令牌"""
|
|
if not request:
|
|
raise HTTPException(status_code=400, detail="无法获取请求信息")
|
|
|
|
auth_header = request.headers.get("Authorization", "")
|
|
if not auth_header or not auth_header.startswith("token "):
|
|
raise HTTPException(status_code=401, detail="无效的Authorization头格式")
|
|
|
|
token = auth_header[6:]
|
|
if ":" not in token:
|
|
raise HTTPException(status_code=401, detail="无效的令牌格式")
|
|
|
|
return token
|
|
|
|
def jingrow_api_verify_and_billing(api_name: str):
|
|
"""Jingrow API 验证装饰器(带余额检查和扣费)"""
|
|
def decorator(func: Callable) -> Callable:
|
|
@wraps(func)
|
|
async def wrapper(*args, **kwargs):
|
|
try:
|
|
request = kwargs.get('request')
|
|
if not request:
|
|
raise HTTPException(status_code=400, detail="无法获取请求信息")
|
|
|
|
token = get_token_from_request(request)
|
|
api_key, api_secret = token.split(":", 1)
|
|
|
|
verify_result = await verify_api_credentials_and_balance(api_key, api_secret, api_name)
|
|
if not verify_result.get("success"):
|
|
raise HTTPException(status_code=401, detail=verify_result.get("message", "验证失败"))
|
|
|
|
result = await func(*args, **kwargs)
|
|
|
|
usage_count = 1
|
|
try:
|
|
body_data = await request.json()
|
|
if isinstance(body_data, dict):
|
|
for key in ["items", "urls", "images", "files"]:
|
|
if key in body_data and isinstance(body_data[key], list):
|
|
usage_count = len(body_data[key])
|
|
break
|
|
except Exception:
|
|
pass
|
|
|
|
if isinstance(result, StreamingResponse):
|
|
original_generator = result.body_iterator
|
|
success_count = 0
|
|
|
|
async def wrapped_generator():
|
|
nonlocal success_count
|
|
async for chunk in original_generator:
|
|
try:
|
|
data = json.loads(chunk)
|
|
if isinstance(data, dict) and data.get("status") == "success":
|
|
success_count += 1
|
|
except:
|
|
pass
|
|
yield chunk
|
|
|
|
if success_count > 0:
|
|
await deduct_jingrow_api_usage_fee(api_key, api_secret, api_name, success_count)
|
|
|
|
return StreamingResponse(
|
|
wrapped_generator(),
|
|
media_type=result.media_type,
|
|
headers=result.headers
|
|
)
|
|
|
|
if isinstance(result, dict) and result.get("success") is True:
|
|
actual_usage_count = result.get("successful_count", usage_count)
|
|
await deduct_jingrow_api_usage_fee(api_key, api_secret, api_name, actual_usage_count)
|
|
return result
|
|
|
|
await deduct_jingrow_api_usage_fee(api_key, api_secret, api_name, usage_count)
|
|
return result
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"API验证过程发生异常: {str(e)}")
|
|
return wrapper
|
|
return decorator
|