diff --git a/apps/jingrow/jingrow/api/auth_api.py b/apps/jingrow/jingrow/api/auth_api.py index bf04860..08cba39 100644 --- a/apps/jingrow/jingrow/api/auth_api.py +++ b/apps/jingrow/jingrow/api/auth_api.py @@ -1,4 +1,4 @@ -from fastapi import APIRouter, HTTPException, Request +from fastapi import APIRouter, HTTPException, Request, Depends from fastapi.responses import JSONResponse from pydantic import BaseModel from typing import Optional @@ -28,49 +28,68 @@ class UserInfoResponse(BaseModel): user_info: Optional[dict] = None message: Optional[str] = None +# Cookie配置常量 +COOKIE_CONFIG = { + "httponly": True, + "samesite": "lax", + "secure": False, # 开发环境可以设为False,生产环境建议设为True + "path": "/" +} + +# 需要清除的cookie列表 +COOKIES_TO_CLEAR = ["sid", "user_id", "user_image", "full_name", "system_user"] + +def get_session_cookie(request: Request) -> Optional[str]: + """从请求中获取session cookie并设置context(FastAPI依赖注入)""" + session_cookie = request.cookies.get('sid') + if session_cookie: + set_context({"session_cookie": session_cookie}) + return session_cookie + +def handle_auth_result(result: dict, error_status_code: int = 400): + """统一处理认证结果,成功返回数据,失败抛出异常""" + if not result.get("success"): + raise HTTPException( + status_code=error_status_code, + detail=result.get("message", "操作失败") + ) + return result + +def create_response_with_cookie(data: dict, session_cookie: Optional[str] = None) -> JSONResponse: + """创建响应并设置cookie""" + response = JSONResponse(content=data) + if session_cookie: + response.set_cookie(key="sid", value=session_cookie, **COOKIE_CONFIG) + return response + +def create_response_clear_cookies(data: dict) -> JSONResponse: + """创建响应并清除所有相关cookie""" + response = JSONResponse(content=data) + for cookie_name in COOKIES_TO_CLEAR: + cookie_kwargs = COOKIE_CONFIG.copy() + if cookie_name == "sid": + cookie_kwargs.pop("secure", None) # delete_cookie不需要secure参数 + response.delete_cookie(key=cookie_name, **cookie_kwargs) + return response + @router.post("/jingrow/login", response_model=LoginResponse) -async def login_route(request: Request, login_data: LoginRequest): - """ - 登录路由 - """ +async def login_route(login_data: LoginRequest): + """登录路由""" try: - # 调用登录函数 result = login(login_data.username, login_data.password) + handle_auth_result(result, error_status_code=401) - if result.get("success"): - session_cookie = result.get("session_cookie") - - # 获取用户信息 - user_info_result = get_user_info(session_cookie) - user_info = None - if user_info_result.get("success"): - user_info = user_info_result.get("user_info") - - # 创建响应 - response_data = { - "success": True, - "message": result.get("message", "Logged In"), - "user": user_info - } - - # 创建响应并设置cookie - response = JSONResponse(content=response_data) - if session_cookie: - response.set_cookie( - key="sid", - value=session_cookie, - httponly=True, - samesite="lax", - secure=False, # 开发环境可以设为False,生产环境建议设为True - path="/" - ) - - return response - else: - raise HTTPException( - status_code=401, - detail=result.get("message", "登录失败") - ) + session_cookie = result.get("session_cookie") + user_info_result = get_user_info(session_cookie) + user_info = user_info_result.get("user_info") if user_info_result.get("success") else None + + response_data = { + "success": True, + "message": result.get("message", "Logged In"), + "user": user_info + } + + return create_response_with_cookie(response_data, session_cookie) except HTTPException: raise except Exception as e: @@ -78,39 +97,18 @@ async def login_route(request: Request, login_data: LoginRequest): raise HTTPException(status_code=500, detail=f"登录异常: {str(e)}") @router.post("/jingrow/logout", response_model=LogoutResponse) -async def logout_route(request: Request): - """ - 登出路由 - """ +async def logout_route(session_cookie: Optional[str] = Depends(get_session_cookie)): + """登出路由""" try: - # 从cookie获取session - session_cookie = request.cookies.get('sid') - - # 设置context以便logout函数可以使用 - if session_cookie: - set_context({"session_cookie": session_cookie}) - - # 调用登出函数 result = logout(session_cookie) + handle_auth_result(result, error_status_code=400) - if result.get("success"): - # 创建响应并清除cookie - response = JSONResponse(content={ - "success": True, - "message": result.get("message", "登出成功") - }) - # 清除所有相关的cookie(需要设置path="/"才能正确删除) - response.delete_cookie(key="sid", path="/", httponly=True, samesite="lax") - response.delete_cookie(key="user_id", path="/") - response.delete_cookie(key="user_image", path="/") - response.delete_cookie(key="full_name", path="/") - response.delete_cookie(key="system_user", path="/") - return response - else: - raise HTTPException( - status_code=400, - detail=result.get("message", "登出失败") - ) + response_data = { + "success": True, + "message": result.get("message", "登出成功") + } + + return create_response_clear_cookies(response_data) except HTTPException: raise except Exception as e: @@ -118,31 +116,16 @@ async def logout_route(request: Request): raise HTTPException(status_code=500, detail=f"登出异常: {str(e)}") @router.get("/jingrow/user-info", response_model=UserInfoResponse) -async def get_user_info_route(request: Request): - """ - 获取用户信息路由 - """ +async def get_user_info_route(session_cookie: Optional[str] = Depends(get_session_cookie)): + """获取用户信息路由""" try: - # 从cookie获取session - session_cookie = request.cookies.get('sid') - - # 设置context以便get_user_info函数可以使用 - if session_cookie: - set_context({"session_cookie": session_cookie}) - - # 调用获取用户信息函数 result = get_user_info(session_cookie) + handle_auth_result(result, error_status_code=401) - if result.get("success"): - return { - "success": True, - "user_info": result.get("user_info") - } - else: - raise HTTPException( - status_code=401, - detail=result.get("message", "获取用户信息失败") - ) + return { + "success": True, + "user_info": result.get("user_info") + } except HTTPException: raise except Exception as e: diff --git a/apps/jingrow/jingrow/utils/auth.py b/apps/jingrow/jingrow/utils/auth.py index aad9ee5..3e60e9d 100644 --- a/apps/jingrow/jingrow/utils/auth.py +++ b/apps/jingrow/jingrow/utils/auth.py @@ -2,6 +2,7 @@ import threading import sys import os import requests +from typing import Optional # 导入配置 sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))))) @@ -64,6 +65,36 @@ def get_jingrow_cloud_api_headers(): return headers +def _extract_session_cookie_from_response(response: requests.Response, session: requests.Session) -> Optional[str]: + """从响应中提取session cookie""" + session_cookie = None + if 'Set-Cookie' in response.headers: + cookies = response.headers['Set-Cookie'] + if 'sid=' in cookies: + session_cookie = cookies.split('sid=')[1].split(';')[0] + + # 如果没有从Set-Cookie获取,尝试从session的cookies获取 + if not session_cookie: + session_cookie = session.cookies.get('sid') + + return session_cookie + +def _call_jingrow_api(method: str, endpoint: str, session_cookie: Optional[str] = None, **kwargs) -> requests.Response: + """调用Jingrow API的通用函数""" + url = f"{Config.jingrow_server_url}{endpoint}" + headers = kwargs.pop('headers', {}) + + # 如果没有提供session_cookie,尝试从context获取 + if not session_cookie: + context = getattr(_thread_local, 'context', None) + if context: + session_cookie = context.get('session_cookie') + + if session_cookie: + headers["Cookie"] = f"sid={session_cookie}" + + return requests.request(method.upper(), url, headers=headers, timeout=30, **kwargs) + def login(username: str, password: str) -> dict: """ 登录函数,调用jingrow框架的登录API @@ -95,17 +126,7 @@ def login(username: str, password: str) -> dict: if response.status_code == 200: result = response.json() if result.get("message") == "Logged In": - # 获取session cookie - session_cookie = None - if 'Set-Cookie' in response.headers: - cookies = response.headers['Set-Cookie'] - if 'sid=' in cookies: - session_cookie = cookies.split('sid=')[1].split(';')[0] - - # 如果没有从Set-Cookie获取,尝试从session的cookies获取 - if not session_cookie: - session_cookie = session.cookies.get('sid') - + session_cookie = _extract_session_cookie_from_response(response, session) return { "success": True, "message": result.get("message", "Logged In"), @@ -134,7 +155,7 @@ def login(username: str, password: str) -> dict: "message": f"登录异常: {str(e)}" } -def logout(session_cookie: str = None) -> dict: +def logout(session_cookie: Optional[str] = None) -> dict: """ 登出函数,调用jingrow框架的登出API @@ -145,21 +166,7 @@ def logout(session_cookie: str = None) -> dict: dict: 登出结果,包含success和message字段 """ try: - url = f"{Config.jingrow_server_url}/api/action/logout" - headers = { - "Accept": "application/json" - } - - # 如果没有提供session_cookie,尝试从context获取 - if not session_cookie: - context = getattr(_thread_local, 'context', None) - if context: - session_cookie = context.get('session_cookie') - - if session_cookie: - headers["Cookie"] = f"sid={session_cookie}" - - response = requests.get(url, headers=headers, timeout=30) + response = _call_jingrow_api('GET', '/api/action/logout', session_cookie, headers={"Accept": "application/json"}) if response.status_code == 200: return { @@ -177,7 +184,7 @@ def logout(session_cookie: str = None) -> dict: "message": f"登出异常: {str(e)}" } -def get_user_info(session_cookie: str = None) -> dict: +def get_user_info(session_cookie: Optional[str] = None) -> dict: """ 获取用户信息函数,调用jingrow框架的get_user_info API @@ -188,22 +195,15 @@ def get_user_info(session_cookie: str = None) -> dict: dict: 用户信息,包含success和user_info字段 """ try: - url = f"{Config.jingrow_server_url}/api/action/jingrow.realtime.get_user_info" - headers = { - "Accept": "application/json", - "Content-Type": "application/json" - } - - # 如果没有提供session_cookie,尝试从context获取 - if not session_cookie: - context = getattr(_thread_local, 'context', None) - if context: - session_cookie = context.get('session_cookie') - - if session_cookie: - headers["Cookie"] = f"sid={session_cookie}" - - response = requests.get(url, headers=headers, timeout=30) + response = _call_jingrow_api( + 'GET', + '/api/action/jingrow.realtime.get_user_info', + session_cookie, + headers={ + "Accept": "application/json", + "Content-Type": "application/json" + } + ) if response.status_code == 200: data = response.json()