320 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Copyright (c) 2025, JINGROW and contributors
# For license information, please see license.txt
"""
系统环境配置管理 API
仅限系统管理员用户可以访问
"""
from fastapi import APIRouter, HTTPException, Request
from typing import Dict, Any, Optional
import logging
from jingrow.utils.jingrow_api import get_logged_user
from jingrow.config import Config, get_settings
logger = logging.getLogger(__name__)
router = APIRouter()
def check_admin_permission(request: Request) -> str:
"""
检查用户是否为系统管理员Administrator
返回当前用户名,如果不是管理员则抛出异常
"""
session_cookie = request.cookies.get('sid')
if not session_cookie:
raise HTTPException(status_code=401, detail="未提供认证信息")
user = get_logged_user(session_cookie)
if not user:
raise HTTPException(status_code=401, detail="认证失败")
if user != "Administrator":
raise HTTPException(status_code=403, detail="仅系统管理员可以访问此功能")
return user
@router.get("/jingrow/system/environment-config")
async def get_environment_config(request: Request):
"""
获取环境配置信息
仅限系统管理员访问
"""
try:
# 检查管理员权限
check_admin_permission(request)
# 获取当前配置
settings = get_settings()
# 返回完整配置信息(仅管理员可以访问,因此返回完整信息)
config_dict = settings.model_dump()
return {
"success": True,
"data": config_dict
}
except HTTPException:
raise
except Exception as e:
logger.error(f"获取环境配置失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"获取环境配置失败: {str(e)}")
@router.post("/jingrow/system/environment-config")
async def update_environment_config(request: Request, config_data: Dict[str, Any]):
"""
更新环境配置信息
仅限系统管理员访问
"""
try:
# 检查管理员权限
check_admin_permission(request)
# 验证配置数据
if not config_data:
raise HTTPException(status_code=400, detail="配置数据不能为空")
# 获取当前配置
settings = get_settings()
# 可更新的配置字段列表(从 config.py 中的 Settings 类定义)
# 按照 config.py 中的注释分组
field_groups = {
'Jingrow API配置': [
'jingrow_server_url',
'jingrow_api_key',
'jingrow_api_secret',
],
'Jingrow Cloud API配置': [
'jingrow_cloud_url',
'jingrow_cloud_api_url',
'jingrow_cloud_api_key',
'jingrow_cloud_api_secret',
],
'本地后端主机配置': [
'backend_host',
'backend_port',
'backend_reload',
],
'Dramatiq 任务队列': [
'worker_processes',
'worker_threads',
'watch',
],
'Qdrant 向量数据库配置': [
'qdrant_host',
'qdrant_port',
],
'数据库配置': [
'jingrow_db_host',
'jingrow_db_port',
'jingrow_db_name',
'jingrow_db_user',
'jingrow_db_password',
'jingrow_db_type',
],
'运行模式': [
'run_mode',
],
'环境development/production控制启动模式、热重载等': [
'environment',
],
'日志级别DEBUG/INFO/WARNING/ERROR/CRITICAL全局默认级别': [
'log_level',
],
}
# 生成所有允许的字段列表
allowed_fields = []
for fields in field_groups.values():
allowed_fields.extend(fields)
# 更新允许的字段
updated_fields = []
env_file_path = settings.model_config.get('env_file')
# 读取现有的 .env 文件内容(如果存在)
env_lines = []
if env_file_path:
try:
from pathlib import Path
env_path = Path(env_file_path)
if env_path.exists():
env_lines = env_path.read_text(encoding='utf-8').split('\n')
except Exception as e:
logger.warning(f"读取 .env 文件失败: {e}")
# 构建更新的配置字典
env_updates = {}
for field, value in config_data.items():
if field in allowed_fields:
env_updates[field.upper()] = str(value) if value is not None else ''
updated_fields.append(field)
# 更新 .env 文件
if env_file_path and env_updates:
try:
from pathlib import Path
env_path = Path(env_file_path)
# 收集所有已存在的键值对(包括未更新的)
existing_config = {}
header_comments = [] # 保存文件开头的注释(分组注释之前的)
# 解析现有文件内容
found_first_config = False
for line in env_lines:
stripped = line.strip()
if not stripped:
if not found_first_config:
header_comments.append('')
continue
if stripped.startswith('#'):
# 检查是否是分组注释
is_group_comment = any(group in stripped for group in field_groups.keys())
if not found_first_config and not is_group_comment:
# 文件开头的非分组注释,保留
header_comments.append(line.rstrip())
continue
if '=' in stripped:
found_first_config = True
key, value = stripped.split('=', 1)
key = key.strip()
existing_config[key.upper()] = (key, value.strip())
# 更新配置字典
for key_upper, new_value in env_updates.items():
# 保持原有的键名格式(大小写)
if key_upper in existing_config:
original_key, _ = existing_config[key_upper]
existing_config[key_upper] = (original_key, new_value)
else:
# 新字段,使用大写键名
existing_config[key_upper] = (key_upper, new_value)
# 按照分组顺序生成新的 .env 文件内容
new_lines = []
# 添加文件开头的注释
if header_comments:
new_lines.extend(header_comments)
# 如果最后一行不是空行,添加一个空行用于分隔
if header_comments and header_comments[-1].strip():
new_lines.append('')
# 按照分组顺序添加配置项
has_previous_group = False
for group_name, fields in field_groups.items():
# 检查该组是否有配置项
group_configs = []
for field in fields:
field_upper = field.upper()
if field_upper in existing_config:
key, value = existing_config[field_upper]
group_configs.append(f'{key}={value}')
# 如果有配置项,添加分组注释和配置
if group_configs:
if has_previous_group:
new_lines.append('') # 组之间添加空行
new_lines.append(f'# {group_name}')
new_lines.extend(group_configs)
has_previous_group = True
# 添加剩余的未分组配置项(如果有)
remaining_keys = set(existing_config.keys())
for fields in field_groups.values():
for field in fields:
remaining_keys.discard(field.upper())
if remaining_keys:
if has_previous_group:
new_lines.append('') # 与其他分组之间添加空行
new_lines.append('# 其他配置')
for key_upper in sorted(remaining_keys):
key, value = existing_config[key_upper]
new_lines.append(f'{key}={value}')
# 写入文件(移除末尾的多个空行)
content = '\n'.join(new_lines).rstrip()
if content:
env_path.write_text(content + '\n', encoding='utf-8')
else:
env_path.write_text('\n', encoding='utf-8')
except Exception as e:
logger.error(f"更新 .env 文件失败: {e}")
raise HTTPException(status_code=500, detail=f"更新配置文件失败: {str(e)}")
logger.info(f"环境配置已更新,字段: {updated_fields}")
return {
"success": True,
"message": "环境配置已更新",
"updated_fields": updated_fields
}
except HTTPException:
raise
except Exception as e:
logger.error(f"更新环境配置失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"更新环境配置失败: {str(e)}")
@router.post("/jingrow/system/restart-environment")
async def restart_environment(request: Request):
"""
重启环境
仅限系统管理员访问
"""
try:
# 检查管理员权限
check_admin_permission(request)
logger.info("收到环境重启请求")
# 使用后台任务在响应返回后执行重启
# 这样可以确保客户端能够收到响应
import asyncio
import os
import signal
async def delayed_restart():
# 等待一小段时间,确保响应已经返回
await asyncio.sleep(2)
logger.info("开始执行环境重启...")
# 获取当前进程ID
current_pid = os.getpid()
# 如果是开发模式,可以通过设置重启标志或发送信号
# 这里使用发送SIGTERM信号来触发重启如果使用reload模式
try:
# 发送SIGTERM信号给当前进程
# 如果使用uvicorn的reload模式这会触发重启
os.kill(current_pid, signal.SIGTERM)
except Exception as e:
logger.error(f"发送重启信号失败: {str(e)}")
# 如果发送信号失败,尝试其他方法
# 可以通过修改配置文件或环境变量来触发重启
logger.warning("无法通过信号重启,请手动重启服务")
# 启动后台任务
asyncio.create_task(delayed_restart())
return {
"success": True,
"message": "环境重启请求已提交,系统将在稍后重启"
}
except HTTPException:
raise
except Exception as e:
logger.error(f"重启环境失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"重启环境失败: {str(e)}")