增加后端登录登出路由函数

This commit is contained in:
jingrow 2025-11-05 05:45:57 +08:00
parent 0371b7a8a4
commit 091e89b382
3 changed files with 350 additions and 25 deletions

View File

@ -28,37 +28,36 @@ export function getSessionCookie(): string | null {
} }
export const loginApi = async (username: string, password: string): Promise<LoginResponse> => { export const loginApi = async (username: string, password: string): Promise<LoginResponse> => {
const response = await fetch(`/api/action/login`, { const response = await fetch(`/jingrow/login`, {
method: 'POST', method: 'POST',
headers: { headers: {
'Content-Type': 'application/x-www-form-urlencoded', 'Content-Type': 'application/json',
'Accept': 'application/json' 'Accept': 'application/json'
}, },
credentials: 'include', credentials: 'include',
body: new URLSearchParams({ body: JSON.stringify({
cmd: 'login', username: username,
usr: username, password: password
pwd: password
}) })
}) })
if (!response.ok) { if (!response.ok) {
throw new Error('登录请求失败') const errorData = await response.json().catch(() => ({}))
throw new Error(errorData.detail || errorData.message || '登录请求失败')
} }
const data = await response.json() const data = await response.json()
if (data.message === 'Logged In') { if (data.success && data.user) {
const userInfo = await getUserInfoApi() return { message: data.message || 'Logged In', user: data.user }
return { message: data.message, user: userInfo }
} else { } else {
throw new Error(data.message || '登录失败') throw new Error(data.message || data.detail || '登录失败')
} }
} }
// 获取用户信息 // 获取用户信息
export const getUserInfoApi = async (): Promise<UserInfo> => { export const getUserInfoApi = async (): Promise<UserInfo> => {
const response = await fetch(`/api/action/jingrow.realtime.get_user_info`, { const response = await fetch(`/jingrow/user-info`, {
method: 'GET', method: 'GET',
headers: { headers: {
'Accept': 'application/json', 'Accept': 'application/json',
@ -68,30 +67,34 @@ export const getUserInfoApi = async (): Promise<UserInfo> => {
}) })
if (!response.ok) { if (!response.ok) {
throw new Error('获取用户信息失败') const errorData = await response.json().catch(() => ({}))
throw new Error(errorData.detail || errorData.message || '获取用户信息失败')
} }
const data = await response.json() const data = await response.json()
const userInfo = data.message || data
return { if (data.success && data.user_info) {
id: userInfo.user || userInfo.name || userInfo.username, return data.user_info
username: userInfo.user || userInfo.name || userInfo.username, } else {
email: userInfo.email || '', throw new Error(data.message || data.detail || '获取用户信息失败')
avatar: userInfo.user_image || '',
first_name: userInfo.first_name || '',
last_name: userInfo.last_name || '',
user_type: userInfo.user_type || 'System User'
} }
} }
// 登出 // 登出
export const logoutApi = async (): Promise<void> => { export const logoutApi = async (): Promise<void> => {
await fetch(`/api/action/logout`, { const response = await fetch(`/jingrow/logout`, {
method: 'GET', method: 'POST',
headers: { 'Accept': 'application/json' }, headers: {
'Accept': 'application/json',
'Content-Type': 'application/json'
},
credentials: 'include' credentials: 'include'
}) })
if (!response.ok) {
const errorData = await response.json().catch(() => ({}))
throw new Error(errorData.detail || errorData.message || '登出失败')
}
} }
// 仅使用会话Cookie的最小鉴权头部不影响现有API Key逻辑 // 仅使用会话Cookie的最小鉴权头部不影响现有API Key逻辑

View File

@ -0,0 +1,150 @@
from fastapi import APIRouter, HTTPException, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from typing import Optional
import logging
from jingrow.utils.auth import login, logout, get_user_info, set_context
logger = logging.getLogger(__name__)
router = APIRouter()
class LoginRequest(BaseModel):
username: str
password: str
class LoginResponse(BaseModel):
success: bool
message: str
user: Optional[dict] = None
session_cookie: Optional[str] = None
class LogoutResponse(BaseModel):
success: bool
message: str
class UserInfoResponse(BaseModel):
success: bool
user_info: Optional[dict] = None
message: Optional[str] = None
@router.post("/jingrow/login", response_model=LoginResponse)
async def login_route(request: Request, login_data: LoginRequest):
"""
登录路由
"""
try:
# 调用登录函数
result = login(login_data.username, login_data.password)
if result.get("success"):
session_cookie = result.get("session_cookie")
# 获取用户信息
user_info_result = get_user_info(session_cookie)
user_info = None
if user_info_result.get("success"):
user_info = user_info_result.get("user_info")
# 创建响应
response_data = {
"success": True,
"message": result.get("message", "Logged In"),
"user": user_info
}
# 创建响应并设置cookie
response = JSONResponse(content=response_data)
if session_cookie:
response.set_cookie(
key="sid",
value=session_cookie,
httponly=True,
samesite="lax",
secure=False, # 开发环境可以设为False生产环境建议设为True
path="/"
)
return response
else:
raise HTTPException(
status_code=401,
detail=result.get("message", "登录失败")
)
except HTTPException:
raise
except Exception as e:
logger.error(f"登录异常: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=f"登录异常: {str(e)}")
@router.post("/jingrow/logout", response_model=LogoutResponse)
async def logout_route(request: Request):
"""
登出路由
"""
try:
# 从cookie获取session
session_cookie = request.cookies.get('sid')
# 设置context以便logout函数可以使用
if session_cookie:
set_context({"session_cookie": session_cookie})
# 调用登出函数
result = logout(session_cookie)
if result.get("success"):
# 创建响应并清除cookie
response = JSONResponse(content={
"success": True,
"message": result.get("message", "登出成功")
})
# 清除所有相关的cookie需要设置path="/"才能正确删除)
response.delete_cookie(key="sid", path="/", httponly=True, samesite="lax")
response.delete_cookie(key="user_id", path="/")
response.delete_cookie(key="user_image", path="/")
response.delete_cookie(key="full_name", path="/")
response.delete_cookie(key="system_user", path="/")
return response
else:
raise HTTPException(
status_code=400,
detail=result.get("message", "登出失败")
)
except HTTPException:
raise
except Exception as e:
logger.error(f"登出异常: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=f"登出异常: {str(e)}")
@router.get("/jingrow/user-info", response_model=UserInfoResponse)
async def get_user_info_route(request: Request):
"""
获取用户信息路由
"""
try:
# 从cookie获取session
session_cookie = request.cookies.get('sid')
# 设置context以便get_user_info函数可以使用
if session_cookie:
set_context({"session_cookie": session_cookie})
# 调用获取用户信息函数
result = get_user_info(session_cookie)
if result.get("success"):
return {
"success": True,
"user_info": result.get("user_info")
}
else:
raise HTTPException(
status_code=401,
detail=result.get("message", "获取用户信息失败")
)
except HTTPException:
raise
except Exception as e:
logger.error(f"获取用户信息异常: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=f"获取用户信息异常: {str(e)}")

View File

@ -1,6 +1,7 @@
import threading import threading
import sys import sys
import os import os
import requests
# 导入配置 # 导入配置
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))))) sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))))
@ -62,3 +63,174 @@ def get_jingrow_cloud_api_headers():
headers["Authorization"] = f"token {api_key}:{api_secret}" headers["Authorization"] = f"token {api_key}:{api_secret}"
return headers return headers
def login(username: str, password: str) -> dict:
"""
登录函数调用jingrow框架的登录API
Args:
username: 用户名
password: 密码
Returns:
dict: 登录结果包含success和message字段成功时还包含session_cookie
"""
try:
url = f"{Config.jingrow_server_url}/api/action/login"
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json"
}
data = {
"cmd": "login",
"usr": username,
"pwd": password
}
# 使用session保持cookie
session = requests.Session()
response = session.post(url, headers=headers, data=data, timeout=30)
if response.status_code == 200:
result = response.json()
if result.get("message") == "Logged In":
# 获取session cookie
session_cookie = None
if 'Set-Cookie' in response.headers:
cookies = response.headers['Set-Cookie']
if 'sid=' in cookies:
session_cookie = cookies.split('sid=')[1].split(';')[0]
# 如果没有从Set-Cookie获取尝试从session的cookies获取
if not session_cookie:
session_cookie = session.cookies.get('sid')
return {
"success": True,
"message": result.get("message", "Logged In"),
"session_cookie": session_cookie
}
else:
return {
"success": False,
"message": result.get("message", "登录失败")
}
else:
error_msg = f"登录请求失败 (HTTP {response.status_code})"
try:
error_data = response.json()
error_msg = error_data.get("message", error_data.get("exc", error_msg))
except:
error_msg = response.text or error_msg
return {
"success": False,
"message": error_msg
}
except Exception as e:
return {
"success": False,
"message": f"登录异常: {str(e)}"
}
def logout(session_cookie: str = None) -> dict:
"""
登出函数调用jingrow框架的登出API
Args:
session_cookie: 会话cookie如果为None则从context获取
Returns:
dict: 登出结果包含success和message字段
"""
try:
url = f"{Config.jingrow_server_url}/api/action/logout"
headers = {
"Accept": "application/json"
}
# 如果没有提供session_cookie尝试从context获取
if not session_cookie:
context = getattr(_thread_local, 'context', None)
if context:
session_cookie = context.get('session_cookie')
if session_cookie:
headers["Cookie"] = f"sid={session_cookie}"
response = requests.get(url, headers=headers, timeout=30)
if response.status_code == 200:
return {
"success": True,
"message": "登出成功"
}
else:
return {
"success": False,
"message": f"登出请求失败 (HTTP {response.status_code})"
}
except Exception as e:
return {
"success": False,
"message": f"登出异常: {str(e)}"
}
def get_user_info(session_cookie: str = None) -> dict:
"""
获取用户信息函数调用jingrow框架的get_user_info API
Args:
session_cookie: 会话cookie如果为None则从context获取
Returns:
dict: 用户信息包含success和user_info字段
"""
try:
url = f"{Config.jingrow_server_url}/api/action/jingrow.realtime.get_user_info"
headers = {
"Accept": "application/json",
"Content-Type": "application/json"
}
# 如果没有提供session_cookie尝试从context获取
if not session_cookie:
context = getattr(_thread_local, 'context', None)
if context:
session_cookie = context.get('session_cookie')
if session_cookie:
headers["Cookie"] = f"sid={session_cookie}"
response = requests.get(url, headers=headers, timeout=30)
if response.status_code == 200:
data = response.json()
user_info = data.get('message', data)
# 格式化用户信息
formatted_user_info = {
"id": user_info.get("user") or user_info.get("name") or user_info.get("username", ""),
"username": user_info.get("user") or user_info.get("name") or user_info.get("username", ""),
"email": user_info.get("email", ""),
"avatar": user_info.get("user_image", ""),
"first_name": user_info.get("first_name", ""),
"last_name": user_info.get("last_name", ""),
"user_type": user_info.get("user_type", "System User")
}
return {
"success": True,
"user_info": formatted_user_info
}
else:
return {
"success": False,
"message": f"获取用户信息失败 (HTTP {response.status_code})"
}
except Exception as e:
return {
"success": False,
"message": f"获取用户信息异常: {str(e)}"
}