增加jembedding微服务
This commit is contained in:
parent
f7363c8ba8
commit
fea1df9990
2
apps/jembedding/__init__.py
Normal file
2
apps/jembedding/__init__.py
Normal file
@ -0,0 +1,2 @@
|
||||
|
||||
|
||||
39
apps/jembedding/api.py
Normal file
39
apps/jembedding/api.py
Normal file
@ -0,0 +1,39 @@
|
||||
from fastapi import APIRouter, HTTPException, Request
|
||||
from fastapi.responses import StreamingResponse
|
||||
from service import JEmbeddingService
|
||||
from utils import jingrow_api_verify_and_billing
|
||||
from settings import settings
|
||||
import json
|
||||
|
||||
|
||||
router = APIRouter(prefix=settings.router_prefix)
|
||||
service = JEmbeddingService()
|
||||
|
||||
|
||||
@router.post(settings.generate_route)
|
||||
@jingrow_api_verify_and_billing(api_name=settings.api_name)
|
||||
async def generate_embeddings(data: dict, request: Request):
|
||||
try:
|
||||
if "texts" not in data or not isinstance(data["texts"], list):
|
||||
raise HTTPException(status_code=400, detail="缺少texts参数或类型错误")
|
||||
vectors = await service.embed(data["texts"])
|
||||
return {"success": True, "embeddings": vectors, "successful_count": len(vectors)}
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=400, detail=str(e))
|
||||
|
||||
|
||||
@router.post(settings.batch_route)
|
||||
@jingrow_api_verify_and_billing(api_name=settings.api_name)
|
||||
async def generate_embeddings_stream(data: dict, request: Request):
|
||||
if "items" not in data or not isinstance(data["items"], list) or len(data["items"]) == 0:
|
||||
raise HTTPException(status_code=400, detail="items不能为空")
|
||||
|
||||
async def streamer():
|
||||
async for result in service.process_batch(data["items"]):
|
||||
yield json.dumps(result) + "\n"
|
||||
|
||||
return StreamingResponse(streamer(), media_type="application/x-ndjson")
|
||||
|
||||
|
||||
24
apps/jembedding/app.py
Normal file
24
apps/jembedding/app.py
Normal file
@ -0,0 +1,24 @@
|
||||
from fastapi import FastAPI
|
||||
from api import router
|
||||
from settings import settings
|
||||
|
||||
|
||||
app = FastAPI(
|
||||
title="JEmbedding",
|
||||
description="文本向量化服务",
|
||||
version="1.0.0"
|
||||
)
|
||||
|
||||
app.include_router(router)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
uvicorn.run(
|
||||
"app:app",
|
||||
host=settings.host,
|
||||
port=settings.port,
|
||||
reload=settings.debug
|
||||
)
|
||||
|
||||
|
||||
50
apps/jembedding/service.py
Normal file
50
apps/jembedding/service.py
Normal file
@ -0,0 +1,50 @@
|
||||
import asyncio
|
||||
from typing import List, Iterable, AsyncGenerator, Optional
|
||||
from sentence_transformers import SentenceTransformer
|
||||
|
||||
|
||||
class JEmbeddingService:
|
||||
def __init__(self, model_name: str = "Qwen/Qwen3-Embedding-0.6B"):
|
||||
self.model_name = model_name
|
||||
self.model: Optional[SentenceTransformer] = None
|
||||
self._load_model()
|
||||
|
||||
def _load_model(self) -> None:
|
||||
self.model = SentenceTransformer(self.model_name)
|
||||
|
||||
async def embed(self, texts: List[str]) -> List[List[float]]:
|
||||
if not isinstance(texts, list) or any(not isinstance(t, str) for t in texts):
|
||||
raise ValueError("texts必须是字符串列表")
|
||||
loop = asyncio.get_running_loop()
|
||||
embeddings = await loop.run_in_executor(None, self.model.encode, texts)
|
||||
return [vec.tolist() if hasattr(vec, 'tolist') else vec for vec in embeddings]
|
||||
|
||||
async def similarity(self, embeddings_a: List[List[float]], embeddings_b: List[List[float]]):
|
||||
loop = asyncio.get_running_loop()
|
||||
return await loop.run_in_executor(None, self.model.similarity, embeddings_a, embeddings_b)
|
||||
|
||||
async def process_batch(self, items: Iterable[str]) -> AsyncGenerator[dict, None]:
|
||||
texts: List[str] = []
|
||||
indices: List[int] = []
|
||||
for idx, text in enumerate(items):
|
||||
try:
|
||||
if not isinstance(text, str):
|
||||
raise ValueError("每个元素必须是字符串")
|
||||
texts.append(text)
|
||||
indices.append(idx)
|
||||
except Exception as e:
|
||||
yield {"index": idx, "status": "error", "message": str(e)}
|
||||
await asyncio.sleep(0)
|
||||
|
||||
if not texts:
|
||||
return
|
||||
|
||||
try:
|
||||
vectors = await self.embed(texts)
|
||||
for i, vec in zip(indices, vectors):
|
||||
yield {"index": i + 1, "status": "success", "embedding": vec}
|
||||
except Exception as e:
|
||||
for i in indices:
|
||||
yield {"index": i + 1, "status": "error", "message": str(e)}
|
||||
|
||||
|
||||
35
apps/jembedding/settings.py
Normal file
35
apps/jembedding/settings.py
Normal file
@ -0,0 +1,35 @@
|
||||
from pydantic_settings import BaseSettings
|
||||
from typing import Optional
|
||||
from functools import lru_cache
|
||||
|
||||
|
||||
class Settings(BaseSettings):
|
||||
# Japi Server 配置
|
||||
host: str = "0.0.0.0"
|
||||
port: int = 8115
|
||||
debug: bool = False
|
||||
|
||||
# API路由配置
|
||||
router_prefix: str = "/jembedding"
|
||||
generate_route: str = "/generate"
|
||||
batch_route: str = "/batch"
|
||||
api_name: str = "jembedding"
|
||||
|
||||
# Jingrow Jcloud API 配置
|
||||
jingrow_api_url: str = "https://cloud.jingrow.com"
|
||||
jingrow_api_key: Optional[str] = None
|
||||
jingrow_api_secret: Optional[str] = None
|
||||
|
||||
class Config:
|
||||
env_file = ".env"
|
||||
|
||||
|
||||
@lru_cache()
|
||||
def get_settings() -> Settings:
|
||||
return Settings()
|
||||
|
||||
|
||||
# 全局配置实例
|
||||
settings = get_settings()
|
||||
|
||||
|
||||
126
apps/jembedding/utils.py
Normal file
126
apps/jembedding/utils.py
Normal file
@ -0,0 +1,126 @@
|
||||
import aiohttp
|
||||
from functools import wraps
|
||||
from fastapi import HTTPException
|
||||
from typing import Callable, Any, Dict
|
||||
from settings import settings
|
||||
from fastapi.responses import StreamingResponse
|
||||
import json
|
||||
|
||||
|
||||
async def verify_api_credentials_and_balance(api_key: str, api_secret: str, api_name: str) -> Dict[str, Any]:
|
||||
try:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(
|
||||
f"{settings.jingrow_api_url}/api/action/jcloud.api.account.verify_api_credentials_and_balance",
|
||||
headers={"Authorization": f"token {settings.jingrow_api_key}:{settings.jingrow_api_secret}"},
|
||||
json={"api_key": api_key, "api_secret": api_secret, "api_name": api_name}
|
||||
) as response:
|
||||
if response.status != 200:
|
||||
raise HTTPException(status_code=500, detail="验证服务暂时不可用")
|
||||
result = await response.json()
|
||||
if "message" in result and isinstance(result["message"], dict):
|
||||
result = result["message"]
|
||||
if not result.get("success"):
|
||||
raise HTTPException(status_code=401, detail=result.get("message", "验证失败"))
|
||||
return result
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=f"验证服务暂时不可用: {str(e)}")
|
||||
|
||||
|
||||
async def deduct_jingrow_api_usage_fee(api_key: str, api_secret: str, api_name: str, usage_count: int = 1) -> Dict[str, Any]:
|
||||
try:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(
|
||||
f"{settings.jingrow_api_url}/api/action/jcloud.api.account.deduct_api_usage_fee",
|
||||
headers={"Authorization": f"token {settings.jingrow_api_key}:{settings.jingrow_api_secret}"},
|
||||
json={
|
||||
"api_key": api_key,
|
||||
"api_secret": api_secret,
|
||||
"api_name": api_name,
|
||||
"usage_count": usage_count
|
||||
}
|
||||
) as response:
|
||||
if response.status != 200:
|
||||
raise HTTPException(status_code=500, detail="扣费服务暂时不可用")
|
||||
result = await response.json()
|
||||
if "message" in result and isinstance(result["message"], dict):
|
||||
result = result["message"]
|
||||
return result
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
return {"success": False, "message": f"扣费服务暂时不可用: {str(e)}"}
|
||||
|
||||
|
||||
def get_token_from_request(request) -> str:
|
||||
auth_header = request.headers.get("Authorization", "")
|
||||
if not auth_header or not auth_header.startswith("token "):
|
||||
raise HTTPException(status_code=401, detail="无效的Authorization头格式")
|
||||
token = auth_header[6:]
|
||||
if ":" not in token:
|
||||
raise HTTPException(status_code=401, detail="无效的令牌格式")
|
||||
return token
|
||||
|
||||
|
||||
def jingrow_api_verify_and_billing(api_name: str):
|
||||
def decorator(func: Callable) -> Callable:
|
||||
@wraps(func)
|
||||
async def wrapper(*args, **kwargs):
|
||||
request = kwargs.get('request')
|
||||
if not request:
|
||||
raise HTTPException(status_code=400, detail="无法获取请求信息")
|
||||
token = get_token_from_request(request)
|
||||
api_key, api_secret = token.split(":", 1)
|
||||
verify_result = await verify_api_credentials_and_balance(api_key, api_secret, api_name)
|
||||
if not verify_result.get("success"):
|
||||
raise HTTPException(status_code=401, detail=verify_result.get("message", "验证失败"))
|
||||
|
||||
result = await func(*args, **kwargs)
|
||||
|
||||
usage_count = 1
|
||||
try:
|
||||
body_data = await request.json()
|
||||
if isinstance(body_data, dict):
|
||||
for key in ["items", "texts", "sentences"]:
|
||||
if key in body_data and isinstance(body_data[key], list):
|
||||
usage_count = len(body_data[key])
|
||||
break
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if isinstance(result, StreamingResponse):
|
||||
original_generator = result.body_iterator
|
||||
success_count = 0
|
||||
|
||||
async def wrapped_generator():
|
||||
nonlocal success_count
|
||||
async for chunk in original_generator:
|
||||
try:
|
||||
data = json.loads(chunk)
|
||||
if isinstance(data, dict) and data.get("status") == "success":
|
||||
success_count += 1
|
||||
except:
|
||||
pass
|
||||
yield chunk
|
||||
if success_count > 0:
|
||||
await deduct_jingrow_api_usage_fee(api_key, api_secret, api_name, success_count)
|
||||
|
||||
return StreamingResponse(
|
||||
wrapped_generator(),
|
||||
media_type=result.media_type,
|
||||
headers=result.headers
|
||||
)
|
||||
|
||||
if isinstance(result, dict) and result.get("success") is True:
|
||||
actual_usage_count = result.get("successful_count", usage_count)
|
||||
await deduct_jingrow_api_usage_fee(api_key, api_secret, api_name, actual_usage_count)
|
||||
return result
|
||||
|
||||
await deduct_jingrow_api_usage_fee(api_key, api_secret, api_name, usage_count)
|
||||
return result
|
||||
return wrapper
|
||||
return decorator
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user