diff --git a/apps/jembedding/__init__.py b/apps/jembedding/__init__.py new file mode 100644 index 0000000..139597f --- /dev/null +++ b/apps/jembedding/__init__.py @@ -0,0 +1,2 @@ + + diff --git a/apps/jembedding/api.py b/apps/jembedding/api.py new file mode 100644 index 0000000..f455c6d --- /dev/null +++ b/apps/jembedding/api.py @@ -0,0 +1,39 @@ +from fastapi import APIRouter, HTTPException, Request +from fastapi.responses import StreamingResponse +from service import JEmbeddingService +from utils import jingrow_api_verify_and_billing +from settings import settings +import json + + +router = APIRouter(prefix=settings.router_prefix) +service = JEmbeddingService() + + +@router.post(settings.generate_route) +@jingrow_api_verify_and_billing(api_name=settings.api_name) +async def generate_embeddings(data: dict, request: Request): + try: + if "texts" not in data or not isinstance(data["texts"], list): + raise HTTPException(status_code=400, detail="缺少texts参数或类型错误") + vectors = await service.embed(data["texts"]) + return {"success": True, "embeddings": vectors, "successful_count": len(vectors)} + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=400, detail=str(e)) + + +@router.post(settings.batch_route) +@jingrow_api_verify_and_billing(api_name=settings.api_name) +async def generate_embeddings_stream(data: dict, request: Request): + if "items" not in data or not isinstance(data["items"], list) or len(data["items"]) == 0: + raise HTTPException(status_code=400, detail="items不能为空") + + async def streamer(): + async for result in service.process_batch(data["items"]): + yield json.dumps(result) + "\n" + + return StreamingResponse(streamer(), media_type="application/x-ndjson") + + diff --git a/apps/jembedding/app.py b/apps/jembedding/app.py new file mode 100644 index 0000000..e184a03 --- /dev/null +++ b/apps/jembedding/app.py @@ -0,0 +1,24 @@ +from fastapi import FastAPI +from api import router +from settings import settings + + +app = FastAPI( + title="JEmbedding", + description="文本向量化服务", + version="1.0.0" +) + +app.include_router(router) + + +if __name__ == "__main__": + import uvicorn + uvicorn.run( + "app:app", + host=settings.host, + port=settings.port, + reload=settings.debug + ) + + diff --git a/apps/jembedding/service.py b/apps/jembedding/service.py new file mode 100644 index 0000000..3d2f631 --- /dev/null +++ b/apps/jembedding/service.py @@ -0,0 +1,50 @@ +import asyncio +from typing import List, Iterable, AsyncGenerator, Optional +from sentence_transformers import SentenceTransformer + + +class JEmbeddingService: + def __init__(self, model_name: str = "Qwen/Qwen3-Embedding-0.6B"): + self.model_name = model_name + self.model: Optional[SentenceTransformer] = None + self._load_model() + + def _load_model(self) -> None: + self.model = SentenceTransformer(self.model_name) + + async def embed(self, texts: List[str]) -> List[List[float]]: + if not isinstance(texts, list) or any(not isinstance(t, str) for t in texts): + raise ValueError("texts必须是字符串列表") + loop = asyncio.get_running_loop() + embeddings = await loop.run_in_executor(None, self.model.encode, texts) + return [vec.tolist() if hasattr(vec, 'tolist') else vec for vec in embeddings] + + async def similarity(self, embeddings_a: List[List[float]], embeddings_b: List[List[float]]): + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, self.model.similarity, embeddings_a, embeddings_b) + + async def process_batch(self, items: Iterable[str]) -> AsyncGenerator[dict, None]: + texts: List[str] = [] + indices: List[int] = [] + for idx, text in enumerate(items): + try: + if not isinstance(text, str): + raise ValueError("每个元素必须是字符串") + texts.append(text) + indices.append(idx) + except Exception as e: + yield {"index": idx, "status": "error", "message": str(e)} + await asyncio.sleep(0) + + if not texts: + return + + try: + vectors = await self.embed(texts) + for i, vec in zip(indices, vectors): + yield {"index": i + 1, "status": "success", "embedding": vec} + except Exception as e: + for i in indices: + yield {"index": i + 1, "status": "error", "message": str(e)} + + diff --git a/apps/jembedding/settings.py b/apps/jembedding/settings.py new file mode 100644 index 0000000..369d589 --- /dev/null +++ b/apps/jembedding/settings.py @@ -0,0 +1,35 @@ +from pydantic_settings import BaseSettings +from typing import Optional +from functools import lru_cache + + +class Settings(BaseSettings): + # Japi Server 配置 + host: str = "0.0.0.0" + port: int = 8115 + debug: bool = False + + # API路由配置 + router_prefix: str = "/jembedding" + generate_route: str = "/generate" + batch_route: str = "/batch" + api_name: str = "jembedding" + + # Jingrow Jcloud API 配置 + jingrow_api_url: str = "https://cloud.jingrow.com" + jingrow_api_key: Optional[str] = None + jingrow_api_secret: Optional[str] = None + + class Config: + env_file = ".env" + + +@lru_cache() +def get_settings() -> Settings: + return Settings() + + +# 全局配置实例 +settings = get_settings() + + diff --git a/apps/jembedding/utils.py b/apps/jembedding/utils.py new file mode 100644 index 0000000..78fd48a --- /dev/null +++ b/apps/jembedding/utils.py @@ -0,0 +1,126 @@ +import aiohttp +from functools import wraps +from fastapi import HTTPException +from typing import Callable, Any, Dict +from settings import settings +from fastapi.responses import StreamingResponse +import json + + +async def verify_api_credentials_and_balance(api_key: str, api_secret: str, api_name: str) -> Dict[str, Any]: + try: + async with aiohttp.ClientSession() as session: + async with session.post( + f"{settings.jingrow_api_url}/api/action/jcloud.api.account.verify_api_credentials_and_balance", + headers={"Authorization": f"token {settings.jingrow_api_key}:{settings.jingrow_api_secret}"}, + json={"api_key": api_key, "api_secret": api_secret, "api_name": api_name} + ) as response: + if response.status != 200: + raise HTTPException(status_code=500, detail="验证服务暂时不可用") + result = await response.json() + if "message" in result and isinstance(result["message"], dict): + result = result["message"] + if not result.get("success"): + raise HTTPException(status_code=401, detail=result.get("message", "验证失败")) + return result + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=f"验证服务暂时不可用: {str(e)}") + + +async def deduct_jingrow_api_usage_fee(api_key: str, api_secret: str, api_name: str, usage_count: int = 1) -> Dict[str, Any]: + try: + async with aiohttp.ClientSession() as session: + async with session.post( + f"{settings.jingrow_api_url}/api/action/jcloud.api.account.deduct_api_usage_fee", + headers={"Authorization": f"token {settings.jingrow_api_key}:{settings.jingrow_api_secret}"}, + json={ + "api_key": api_key, + "api_secret": api_secret, + "api_name": api_name, + "usage_count": usage_count + } + ) as response: + if response.status != 200: + raise HTTPException(status_code=500, detail="扣费服务暂时不可用") + result = await response.json() + if "message" in result and isinstance(result["message"], dict): + result = result["message"] + return result + except HTTPException: + raise + except Exception as e: + return {"success": False, "message": f"扣费服务暂时不可用: {str(e)}"} + + +def get_token_from_request(request) -> str: + auth_header = request.headers.get("Authorization", "") + if not auth_header or not auth_header.startswith("token "): + raise HTTPException(status_code=401, detail="无效的Authorization头格式") + token = auth_header[6:] + if ":" not in token: + raise HTTPException(status_code=401, detail="无效的令牌格式") + return token + + +def jingrow_api_verify_and_billing(api_name: str): + def decorator(func: Callable) -> Callable: + @wraps(func) + async def wrapper(*args, **kwargs): + request = kwargs.get('request') + if not request: + raise HTTPException(status_code=400, detail="无法获取请求信息") + token = get_token_from_request(request) + api_key, api_secret = token.split(":", 1) + verify_result = await verify_api_credentials_and_balance(api_key, api_secret, api_name) + if not verify_result.get("success"): + raise HTTPException(status_code=401, detail=verify_result.get("message", "验证失败")) + + result = await func(*args, **kwargs) + + usage_count = 1 + try: + body_data = await request.json() + if isinstance(body_data, dict): + for key in ["items", "texts", "sentences"]: + if key in body_data and isinstance(body_data[key], list): + usage_count = len(body_data[key]) + break + except Exception: + pass + + if isinstance(result, StreamingResponse): + original_generator = result.body_iterator + success_count = 0 + + async def wrapped_generator(): + nonlocal success_count + async for chunk in original_generator: + try: + data = json.loads(chunk) + if isinstance(data, dict) and data.get("status") == "success": + success_count += 1 + except: + pass + yield chunk + if success_count > 0: + await deduct_jingrow_api_usage_fee(api_key, api_secret, api_name, success_count) + + return StreamingResponse( + wrapped_generator(), + media_type=result.media_type, + headers=result.headers + ) + + if isinstance(result, dict) and result.get("success") is True: + actual_usage_count = result.get("successful_count", usage_count) + await deduct_jingrow_api_usage_fee(api_key, api_secret, api_name, actual_usage_count) + return result + + await deduct_jingrow_api_usage_fee(api_key, api_secret, api_name, usage_count) + return result + return wrapper + return decorator + +