japi/apps/jembedding/utils.py

127 lines
5.4 KiB
Python

import aiohttp
from functools import wraps
from fastapi import HTTPException
from typing import Callable, Any, Dict
from settings import settings
from fastapi.responses import StreamingResponse
import json
async def verify_api_credentials_and_balance(api_key: str, api_secret: str, api_name: str) -> Dict[str, Any]:
try:
async with aiohttp.ClientSession() as session:
async with session.post(
f"{settings.jingrow_api_url}/api/action/jcloud.api.account.verify_api_credentials_and_balance",
headers={"Authorization": f"token {settings.jingrow_api_key}:{settings.jingrow_api_secret}"},
json={"api_key": api_key, "api_secret": api_secret, "api_name": api_name}
) as response:
if response.status != 200:
raise HTTPException(status_code=500, detail="验证服务暂时不可用")
result = await response.json()
if "message" in result and isinstance(result["message"], dict):
result = result["message"]
if not result.get("success"):
raise HTTPException(status_code=401, detail=result.get("message", "验证失败"))
return result
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"验证服务暂时不可用: {str(e)}")
async def deduct_jingrow_api_usage_fee(api_key: str, api_secret: str, api_name: str, usage_count: int = 1) -> Dict[str, Any]:
try:
async with aiohttp.ClientSession() as session:
async with session.post(
f"{settings.jingrow_api_url}/api/action/jcloud.api.account.deduct_api_usage_fee",
headers={"Authorization": f"token {settings.jingrow_api_key}:{settings.jingrow_api_secret}"},
json={
"api_key": api_key,
"api_secret": api_secret,
"api_name": api_name,
"usage_count": usage_count
}
) as response:
if response.status != 200:
raise HTTPException(status_code=500, detail="扣费服务暂时不可用")
result = await response.json()
if "message" in result and isinstance(result["message"], dict):
result = result["message"]
return result
except HTTPException:
raise
except Exception as e:
return {"success": False, "message": f"扣费服务暂时不可用: {str(e)}"}
def get_token_from_request(request) -> str:
auth_header = request.headers.get("Authorization", "")
if not auth_header or not auth_header.startswith("token "):
raise HTTPException(status_code=401, detail="无效的Authorization头格式")
token = auth_header[6:]
if ":" not in token:
raise HTTPException(status_code=401, detail="无效的令牌格式")
return token
def jingrow_api_verify_and_billing(api_name: str):
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs):
request = kwargs.get('request')
if not request:
raise HTTPException(status_code=400, detail="无法获取请求信息")
token = get_token_from_request(request)
api_key, api_secret = token.split(":", 1)
verify_result = await verify_api_credentials_and_balance(api_key, api_secret, api_name)
if not verify_result.get("success"):
raise HTTPException(status_code=401, detail=verify_result.get("message", "验证失败"))
result = await func(*args, **kwargs)
usage_count = 1
try:
body_data = await request.json()
if isinstance(body_data, dict):
for key in ["items", "texts", "sentences"]:
if key in body_data and isinstance(body_data[key], list):
usage_count = len(body_data[key])
break
except Exception:
pass
if isinstance(result, StreamingResponse):
original_generator = result.body_iterator
success_count = 0
async def wrapped_generator():
nonlocal success_count
async for chunk in original_generator:
try:
data = json.loads(chunk)
if isinstance(data, dict) and data.get("status") == "success":
success_count += 1
except:
pass
yield chunk
if success_count > 0:
await deduct_jingrow_api_usage_fee(api_key, api_secret, api_name, success_count)
return StreamingResponse(
wrapped_generator(),
media_type=result.media_type,
headers=result.headers
)
if isinstance(result, dict) and result.get("success") is True:
actual_usage_count = result.get("successful_count", usage_count)
await deduct_jingrow_api_usage_fee(api_key, api_secret, api_name, actual_usage_count)
return result
await deduct_jingrow_api_usage_fee(api_key, api_secret, api_name, usage_count)
return result
return wrapper
return decorator