基于session cookie重构用户访问权限的实现逻辑,实现基于角色的用户访问权限

This commit is contained in:
jingrow 2026-01-24 23:23:14 +08:00
parent 9e468f9f13
commit 0d94d18f74
3 changed files with 90 additions and 7 deletions

View File

@ -22,18 +22,34 @@ class ApiAdapter:
self.api_secret = Config.jingrow_api_secret
def _get_headers(self, session_cookie: Optional[str] = None):
"""获取请求头"""
"""
获取请求头
认证优先级
1. 优先使用参数传入的 session_cookie
2. 如果没有 ContextVar 获取 SessionMiddleware 设置
3. 如果都没有使用 API Key系统级权限
"""
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json'
}
# 使用 Jingrow API 认证格式: token {api_key}:{api_secret}
if self.api_key and self.api_secret:
headers['Authorization'] = f'token {self.api_key}:{self.api_secret}'
# 1. 优先使用参数传入的 session cookie
if not session_cookie:
# 2. 从 ContextVar 获取SessionMiddleware 已设置)
try:
from jingrow.utils.auth import get_request_session_cookie
session_cookie = get_request_session_cookie()
except Exception:
pass
# 优先使用 session cookie用户权限
if session_cookie:
headers['Cookie'] = f'sid={session_cookie}'
# 3. 如果没有 session cookie使用 API Key系统级权限
elif self.api_key and self.api_secret:
headers['Authorization'] = f'token {self.api_key}:{self.api_secret}'
return headers
@ -161,16 +177,38 @@ class ApiAdapter:
def get_pg(self, pagetype: str, name: str, session_cookie: Optional[str] = None) -> Dict[str, Any]:
try:
api_url = f"{self.api_url}/api/data/{pagetype}/{name}"
headers = self._get_headers(session_cookie=session_cookie)
# PageType 元数据是公开的,所有用户都应该能读取
# 强制使用管理员 API Key 获取 PageType 元数据
if pagetype == 'PageType':
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json'
}
if self.api_key and self.api_secret:
headers['Authorization'] = f'token {self.api_key}:{self.api_secret}'
print(f"[ADAPTER] get_pg: pagetype=PageType, forcing API Key auth")
else:
headers = self._get_headers(session_cookie=session_cookie)
print(f"[ADAPTER] get_pg: pagetype={pagetype}, name={name}")
print(f"[ADAPTER] headers keys: {list(headers.keys())}")
if 'Cookie' in headers:
print(f"[ADAPTER] Using session cookie auth")
elif 'Authorization' in headers:
print(f"[ADAPTER] Using API Key auth")
response = requests.get(api_url, headers=headers, timeout=15)
print(f"[ADAPTER] Response status: {response.status_code}")
if response.status_code == 200:
data = response.json()
if isinstance(data, dict) and 'data' in data:
return {'success': True, 'data': data['data']}
return {'success': True, 'data': data}
else:
print(f"[ADAPTER] Response error: {response.text[:200]}")
return {'success': False, 'error': f"HTTP {response.status_code}: {response.text}"}
except Exception as e:
print(f"[ADAPTER] Exception: {e}")
return {'success': False, 'error': f"获取记录异常: {str(e)}"}
def create_pg(self, pagetype: str, payload: Dict[str, Any]) -> Dict[str, Any]:
@ -250,7 +288,18 @@ class ApiAdapter:
params['fields'] = json.dumps(fields, ensure_ascii=False)
if isinstance(limit, int) and limit > 0:
params['limit'] = str(limit)
headers = self._get_headers()
# PageType 列表也应该使用 API Key 获取
if pagetype == 'PageType':
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json'
}
if self.api_key and self.api_secret:
headers['Authorization'] = f'token {self.api_key}:{self.api_secret}'
else:
headers = self._get_headers()
response = requests.get(base_url, params=params, headers=headers, timeout=20)
if response.status_code == 200:
data = response.json()

View File

@ -6,17 +6,36 @@ Jingrow 应用入口
"""
import logging
from fastapi import FastAPI
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from starlette.middleware.base import BaseHTTPMiddleware
from contextlib import asynccontextmanager
from jingrow.utils.router_auto_register import include_routers_from_package
from jingrow.services.scheduler import start_scheduler, stop_scheduler
from jingrow.services.whitelist import router
from jingrow.services.scheduler import get_scheduler_status
from jingrow.core.hooks.init_hooks import init_hooks
from jingrow.utils.auth import set_request_session_cookie
logger = logging.getLogger("uvicorn.error")
class SessionMiddleware(BaseHTTPMiddleware):
"""
Session 中间件自动提取请求中的 session cookie 并存储到 ContextVar
这样在整个请求生命周期中都可以访问
"""
async def dispatch(self, request: Request, call_next):
# 提取 session cookie
session_cookie = request.cookies.get('sid')
# 存储到 ContextVar异步安全
set_request_session_cookie(session_cookie)
# 继续处理请求
response = await call_next(request)
return response
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用生命周期管理"""
@ -64,6 +83,9 @@ def create_app():
allow_headers=["*"],
)
# 添加 Session 中间件(必须在 CORS 之后)
app.add_middleware(SessionMiddleware)
# 自动注册 Jingrow 框架的静态路由(无前缀)
include_routers_from_package(app, "jingrow.api")

View File

@ -3,6 +3,7 @@ import sys
import os
import requests
from typing import Optional
from contextvars import ContextVar
# 导入配置
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))))
@ -10,6 +11,17 @@ from jingrow.config import Config
_thread_local = threading.local()
# ContextVar for async-safe session cookie storage
_request_session_cookie: ContextVar[Optional[str]] = ContextVar('request_session_cookie', default=None)
def set_request_session_cookie(cookie: Optional[str]):
"""设置当前请求的 session cookie异步安全"""
_request_session_cookie.set(cookie)
def get_request_session_cookie() -> Optional[str]:
"""获取当前请求的 session cookie异步安全"""
return _request_session_cookie.get()
def set_context(context):
"""设置当前线程的context"""
_thread_local.context = context