jcloud/jcloud/api/aliyun_server_light.py

974 lines
42 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import jingrow
import os
import sys
import json
import random
import time
import uuid
from datetime import datetime
from typing import Dict, Any
from alibabacloud_swas_open20200601.client import Client as SWAS_OPEN20200601Client
from alibabacloud_credentials.client import Client as CredentialClient
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_swas_open20200601 import models as swas__open20200601_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
from jcloud.utils import get_current_team
class AliyunLightServerManager:
"""阿里云轻量应用服务器管理器"""
_instance = None
_clients = {} # 缓存不同地域的客户端
@classmethod
def get_instance(cls):
"""获取单例实例"""
if cls._instance is None:
cls._instance = cls()
return cls._instance
def __init__(self):
"""初始化管理器"""
if AliyunLightServerManager._instance is not None:
raise Exception("请使用 AliyunLightServerManager.get_instance() 获取实例")
# 初始化配置
self._initialize_credentials()
def _initialize_credentials(self):
"""初始化阿里云凭据"""
try:
settings = jingrow.get_single("Jcloud Settings")
if not settings:
jingrow.log_error("阿里云凭据初始化失败", "无法获取Jcloud Settings配置")
raise Exception("Jcloud Settings配置不存在")
self.access_key_id = settings.get("aliyun_access_key_id")
self.access_secret = settings.get_password("aliyun_access_secret") if settings.get("aliyun_access_secret") else None
if not self.access_key_id or not self.access_secret:
jingrow.log_error("阿里云凭据初始化失败", "阿里云凭据未配置请在Jcloud Settings中配置aliyun_access_key_id和aliyun_access_secret")
raise Exception("阿里云凭据未配置")
except Exception as e:
jingrow.log_error(f"阿里云凭据初始化失败: {str(e)}")
self.access_key_id = None
self.access_secret = None
raise
def _get_client(self, region_id: str = 'cn-shanghai') -> SWAS_OPEN20200601Client:
"""获取指定地域的客户端,使用缓存机制"""
if region_id not in self._clients:
try:
# 直接使用凭据创建配置
config = open_api_models.Config(
access_key_id=self.access_key_id,
access_key_secret=self.access_secret
)
config.endpoint = f'swas.{region_id}.aliyuncs.com'
self._clients[region_id] = SWAS_OPEN20200601Client(config)
except Exception as e:
jingrow.log_error("阿里云客户端创建失败", f"创建客户端时发生错误: {str(e)}")
raise
return self._clients[region_id]
def _convert_response_to_dict(self, response_body):
"""将阿里云SDK响应对象转换为可序列化的字典"""
if response_body is None:
return None
result = {}
# 获取所有公共属性
for attr_name in dir(response_body):
if not attr_name.startswith('_') and not callable(getattr(response_body, attr_name)):
attr_value = getattr(response_body, attr_name)
if attr_value is not None:
if hasattr(attr_value, '__dict__') and not isinstance(attr_value, (str, int, float, bool)):
# 如果是复杂对象,递归转换
result[attr_name] = self._convert_response_to_dict(attr_value)
elif isinstance(attr_value, list):
# 如果是列表,转换列表中的每个元素
converted_list = []
for item in attr_value:
if hasattr(item, '__dict__') and not isinstance(item, (str, int, float, bool)):
converted_list.append(self._convert_response_to_dict(item))
else:
converted_list.append(item)
result[attr_name] = converted_list
else:
# 基本类型直接赋值
result[attr_name] = attr_value
return result
def create_instance(self, plan_id, image_id, period=1, region_id='cn-shanghai'):
"""创建轻量应用服务器实例"""
client = self._get_client(region_id)
try:
request = swas__open20200601_models.CreateInstancesRequest(
region_id=region_id,
plan_id=plan_id,
image_id=image_id,
period=period
)
runtime = util_models.RuntimeOptions()
response = client.create_instances_with_options(request, runtime)
return {'success': True, 'data': self._convert_response_to_dict(response.body), 'message': '实例创建成功'}
except Exception as e:
jingrow.log_error("创建实例失败", f"创建实例时发生错误: {str(e)}")
return {'success': False, 'error': str(e), 'message': '实例创建失败'}
def start_instance(self, instance_id, region_id='cn-shanghai'):
"""启动实例"""
client = self._get_client(region_id)
try:
request = swas__open20200601_models.StartInstanceRequest(
region_id=region_id,
instance_id=instance_id
)
runtime = util_models.RuntimeOptions()
response = client.start_instance_with_options(request, runtime)
return {'success': True, 'data': self._convert_response_to_dict(response.body), 'message': '实例启动成功'}
except Exception as e:
jingrow.log_error("启动实例失败", f"启动实例 {instance_id} 时发生错误: {str(e)}")
return {'success': False, 'error': str(e), 'message': '实例启动失败'}
def stop_instance(self, instance_id, region_id='cn-shanghai'):
"""停止实例"""
client = self._get_client(region_id)
try:
request = swas__open20200601_models.StopInstanceRequest(
region_id=region_id,
instance_id=instance_id
)
runtime = util_models.RuntimeOptions()
response = client.stop_instance_with_options(request, runtime)
return {'success': True, 'data': self._convert_response_to_dict(response.body), 'message': '实例停止成功'}
except Exception as e:
jingrow.log_error("停止实例失败", f"停止实例 {instance_id} 时发生错误: {str(e)}")
return {'success': False, 'error': str(e), 'message': '实例停止失败'}
def reboot_instance(self, instance_id, region_id='cn-shanghai'):
"""重启实例"""
client = self._get_client(region_id)
try:
request = swas__open20200601_models.RebootInstanceRequest(
region_id=region_id,
instance_id=instance_id
)
runtime = util_models.RuntimeOptions()
response = client.reboot_instance_with_options(request, runtime)
return {'success': True, 'data': self._convert_response_to_dict(response.body), 'message': '实例重启成功'}
except Exception as e:
jingrow.log_error("重启实例失败", f"重启实例 {instance_id} 时发生错误: {str(e)}")
return {'success': False, 'error': str(e), 'message': '实例重启失败'}
def upgrade_instance(self, instance_id, plan_id, region_id='cn-shanghai'):
"""升级实例配置"""
client = self._get_client(region_id)
try:
request = swas__open20200601_models.UpgradeInstanceRequest(
region_id=region_id,
instance_id=instance_id,
plan_id=plan_id
)
runtime = util_models.RuntimeOptions()
response = client.upgrade_instance_with_options(request, runtime)
return {'success': True, 'data': self._convert_response_to_dict(response.body), 'message': '实例升级成功'}
except Exception as e:
jingrow.log_error("升级实例失败", f"升级实例 {instance_id} 时发生错误: {str(e)}")
return {'success': False, 'error': str(e), 'message': '实例升级失败'}
def renew_instance(self, instance_id, period=1, period_unit='Month', region_id='cn-shanghai'):
"""续费实例"""
client = self._get_client(region_id)
try:
request = swas__open20200601_models.RenewInstanceRequest(
region_id=region_id,
instance_id=instance_id,
period=period,
period_unit=period_unit
)
runtime = util_models.RuntimeOptions()
response = client.renew_instance_with_options(request, runtime)
return {'success': True, 'data': self._convert_response_to_dict(response.body), 'message': '实例续费成功'}
except Exception as e:
jingrow.log_error("续费实例失败", f"续费实例 {instance_id} 时发生错误: {str(e)}")
return {'success': False, 'error': str(e), 'message': '实例续费失败'}
def reset_system(self, instance_id, image_id=None, password=None, region_id='cn-shanghai'):
"""重置系统"""
client = self._get_client(region_id)
try:
request = swas__open20200601_models.ResetSystemRequest(
region_id=region_id,
instance_id=instance_id
)
if image_id:
request.image_id = image_id
if password:
request.password = password
runtime = util_models.RuntimeOptions()
response = client.reset_system_with_options(request, runtime)
return {'success': True, 'data': self._convert_response_to_dict(response.body), 'message': '系统重置成功'}
except Exception as e:
jingrow.log_error("重置系统失败", f"重置实例 {instance_id} 系统时发生错误: {str(e)}")
return {'success': False, 'error': str(e), 'message': '系统重置失败'}
def update_instance_password(self, instance_id, password, region_id='cn-shanghai'):
"""更新实例密码"""
client = self._get_client(region_id)
try:
request = swas__open20200601_models.UpdateInstanceAttributeRequest(
region_id=region_id,
instance_id=instance_id,
password=password
)
runtime = util_models.RuntimeOptions()
response = client.update_instance_attribute_with_options(request, runtime)
return {'success': True, 'data': self._convert_response_to_dict(response.body), 'message': '密码更新成功'}
except Exception as e:
jingrow.log_error("更新实例密码失败", f"更新实例 {instance_id} 密码时发生错误: {str(e)}")
return {'success': False, 'error': str(e), 'message': '密码更新失败'}
def list_instances(self, page_number=1, page_size=20, region_id='cn-shanghai'):
"""获取实例列表"""
client = self._get_client(region_id)
try:
request = swas__open20200601_models.ListInstancesRequest(
region_id=region_id,
page_number=page_number,
page_size=page_size
)
runtime = util_models.RuntimeOptions()
response = client.list_instances_with_options(request, runtime)
return {'success': True, 'data': self._convert_response_to_dict(response.body), 'message': '获取实例列表成功'}
except Exception as e:
jingrow.log_error("获取实例列表失败", f"获取实例列表时发生错误: {str(e)}")
return {'success': False, 'error': str(e), 'message': '获取实例列表失败'}
def delete_instance(self, instance_id, region_id='cn-shanghai'):
"""删除实例"""
client = self._get_client(region_id)
try:
request = swas__open20200601_models.DeleteInstanceRequest(
region_id=region_id,
instance_id=instance_id
)
runtime = util_models.RuntimeOptions()
response = client.delete_instance_with_options(request, runtime)
return {'success': True, 'data': self._convert_response_to_dict(response.body), 'message': '实例删除成功'}
except Exception as e:
jingrow.log_error("删除实例失败", f"删除实例 {instance_id} 时发生错误: {str(e)}")
return {'success': False, 'error': str(e), 'message': '实例删除失败'}
def get_plans(self, region_id='cn-shanghai'):
"""获取可用套餐列表"""
client = self._get_client(region_id)
try:
request = swas__open20200601_models.ListPlansRequest(
region_id=region_id
)
runtime = util_models.RuntimeOptions()
response = client.list_plans_with_options(request, runtime)
return {'success': True, 'data': self._convert_response_to_dict(response.body), 'message': '获取套餐列表成功'}
except Exception as e:
jingrow.log_error("获取套餐列表失败", f"获取套餐列表时发生错误: {str(e)}")
return {'success': False, 'error': str(e), 'message': '获取套餐列表失败'}
def get_images(self, image_type='system', region_id='cn-shanghai'):
"""获取可用镜像列表"""
client = self._get_client(region_id)
try:
request = swas__open20200601_models.ListImagesRequest(
region_id=region_id,
image_type=image_type
)
runtime = util_models.RuntimeOptions()
response = client.list_images_with_options(request, runtime)
return {'success': True, 'data': self._convert_response_to_dict(response.body), 'message': '获取镜像列表成功'}
except Exception as e:
jingrow.log_error("获取镜像列表失败", f"获取镜像列表时发生错误: {str(e)}")
return {'success': False, 'error': str(e), 'message': '获取镜像列表失败'}
def get_regions(self):
"""获取可用地域列表"""
try:
# 直接使用凭据创建配置
config = open_api_models.Config(
access_key_id=self.access_key_id,
access_key_secret=self.access_secret
)
# 默认用上海endpoint获取所有地域
config.endpoint = 'swas.cn-shanghai.aliyuncs.com'
client = SWAS_OPEN20200601Client(config)
request = swas__open20200601_models.ListRegionsRequest(accept_language='zh-CN')
runtime = util_models.RuntimeOptions()
response = client.list_regions_with_options(request, runtime)
return {'success': True, 'data': self._convert_response_to_dict(response.body), 'message': '获取地域列表成功'}
except Exception as e:
jingrow.log_error("获取地域列表失败", f"获取地域列表时发生错误: {str(e)}")
return {'success': False, 'error': str(e), 'message': '获取地域列表失败'}
def create_instance_key_pair(self, instance_id, key_pair_name, region_id='cn-shanghai'):
"""为实例创建密钥对"""
client = self._get_client(region_id)
try:
request = swas__open20200601_models.CreateInstanceKeyPairRequest(
region_id=region_id,
instance_id=instance_id,
key_pair_name=key_pair_name
)
runtime = util_models.RuntimeOptions()
response = client.create_instance_key_pair_with_options(request, runtime)
return {'success': True, 'data': self._convert_response_to_dict(response.body), 'message': '密钥对创建成功'}
except Exception as e:
jingrow.log_error("创建密钥对失败", f"为实例 {instance_id} 创建密钥对 {key_pair_name} 时发生错误: {str(e)}")
return {'success': False, 'error': str(e), 'message': '密钥对创建失败'}
def delete_instance_key_pair(self, instance_id, region_id='cn-shanghai'):
"""解绑实例的密钥对"""
client = self._get_client(region_id)
try:
request = swas__open20200601_models.DeleteInstanceKeyPairRequest(
region_id=region_id,
instance_id=instance_id
)
runtime = util_models.RuntimeOptions()
response = client.delete_instance_key_pair_with_options(request, runtime)
return {'success': True, 'data': self._convert_response_to_dict(response.body), 'message': '密钥对解绑成功'}
except Exception as e:
jingrow.log_error("解绑密钥对失败", f"解绑实例 {instance_id} 密钥对时发生错误: {str(e)}")
return {'success': False, 'error': str(e), 'message': '密钥对解绑失败'}
def delete_key_pairs(self, key_pair_names, region_id='cn-shanghai'):
"""删除密钥对"""
client = self._get_client(region_id)
try:
request = swas__open20200601_models.DeleteKeyPairsRequest(
region_id=region_id,
key_pair_names=key_pair_names
)
runtime = util_models.RuntimeOptions()
response = client.delete_key_pairs_with_options(request, runtime)
return {'success': True, 'data': self._convert_response_to_dict(response.body), 'message': '密钥对删除成功'}
except Exception as e:
jingrow.log_error("删除密钥对失败", f"删除密钥对 {key_pair_names} 时发生错误: {str(e)}")
return {'success': False, 'error': str(e), 'message': '密钥对删除失败'}
def get_instance_key_pair(self, instance_id, region_id='cn-shanghai'):
"""获取实例的密钥对详细信息"""
client = self._get_client(region_id)
try:
request = swas__open20200601_models.DescribeInstanceKeyPairRequest(
region_id=region_id,
instance_id=instance_id
)
runtime = util_models.RuntimeOptions()
response = client.describe_instance_key_pair_with_options(request, runtime)
return {'success': True, 'data': self._convert_response_to_dict(response.body), 'message': '获取密钥对详细信息成功'}
except Exception as e:
jingrow.log_error("获取密钥对详细信息失败", f"获取实例 {instance_id} 密钥对详细信息时发生错误: {str(e)}")
return {'success': False, 'error': str(e), 'message': '获取密钥对详细信息失败'}
def get_instance_details(self, instance_ids, region_id='cn-shanghai'):
"""获取实例详细信息(支持批量查询)"""
client = self._get_client(region_id)
try:
# 如果instance_ids是列表转换为JSON字符串
if isinstance(instance_ids, list):
instance_ids_str = json.dumps(instance_ids)
else:
instance_ids_str = instance_ids
request = swas__open20200601_models.ListInstancesRequest(
region_id=region_id,
instance_ids=instance_ids_str
)
runtime = util_models.RuntimeOptions()
response = client.list_instances_with_options(request, runtime)
return {'success': True, 'data': self._convert_response_to_dict(response.body), 'message': '获取实例详细信息成功'}
except Exception as e:
jingrow.log_error("获取实例详细信息失败", f"获取实例详细信息时发生错误: {str(e)}")
return {'success': False, 'error': str(e), 'message': '获取实例详细信息失败'}
# 全局管理器实例
_aliyun_manager = None
def _get_manager():
"""获取阿里云管理器实例"""
global _aliyun_manager
if _aliyun_manager is None:
_aliyun_manager = AliyunLightServerManager.get_instance()
return _aliyun_manager
# API函数 - 使用@jingrow.whitelist()装饰器
@jingrow.whitelist()
def create_aliyun_instance(plan_id, image_id, period=1, region_id='cn-shanghai'):
"""创建轻量应用服务器实例"""
manager = _get_manager()
return manager.create_instance(plan_id, image_id, period, region_id)
@jingrow.whitelist()
def start_aliyun_instance(instance_id, region_id='cn-shanghai'):
"""启动实例"""
manager = _get_manager()
return manager.start_instance(instance_id, region_id)
@jingrow.whitelist()
def stop_aliyun_instance(instance_id, region_id='cn-shanghai'):
"""停止实例"""
manager = _get_manager()
return manager.stop_instance(instance_id, region_id)
@jingrow.whitelist()
def reboot_aliyun_instance(instance_id, region_id='cn-shanghai'):
"""重启实例并更新状态"""
try:
# 1. 查找对应的Jsite Server记录
server = jingrow.get_pg("Jsite Server", {"instance_id": instance_id})
if not server:
return {"success": False, "message": "找不到对应的服务器记录"}
# 2. 立即更新状态为"重启中"
server.status = "Starting"
server.save(ignore_permissions=True)
jingrow.db.commit()
# 3. 调用阿里云API重启实例
manager = _get_manager()
result = manager.reboot_instance(instance_id, region_id)
# 4. 启动后台任务监控重启状态延迟10秒开始
time.sleep(10)
jingrow.enqueue(
"jcloud.api.aliyun_server_light.monitor_server_status",
instance_id=instance_id,
region_id=region_id,
timeout=600
)
return {
"success": True,
"message": "重启命令已发送,正在监控重启状态...",
"server_name": server.name
}
except Exception as e:
jingrow.log_error("重启实例失败", f"重启实例 {instance_id} 时发生错误: {str(e)}")
return {"success": False, "error": str(e), "message": "重启实例失败"}
def monitor_server_status(instance_id, region_id):
"""通用的服务器状态监控 - 监控服务器状态直到Running"""
try:
# 通过instance_id查找服务器记录
server = jingrow.get_pg("Jsite Server", {"instance_id": instance_id})
if not server:
jingrow.log_error("找不到服务器记录", f"无法找到实例ID为 {instance_id} 的服务器记录")
return
manager = _get_manager()
max_retries = 60 # 最多查询60次10分钟
# 阿里云轻量应用服务器可能的状态值
waiting_states = ['Starting', 'Stopping', 'Resetting', 'Stopped', 'Pending']
for retry in range(max_retries):
# 查询实例状态
instance_info = manager.get_instance_details([instance_id], region_id)
if not instance_info or not instance_info.get('success'):
jingrow.log_error("获取实例状态失败", f"无法获取实例 {instance_id} 的状态信息")
time.sleep(10)
continue
# 解析实例状态
instances = instance_info.get('data', {}).get('instances', [])
if not instances:
jingrow.log_error("获取实例状态失败", f"实例 {instance_id} 的状态信息为空")
time.sleep(10)
continue
instance = instances[0] # 取第一个实例
status = instance.get('status', '')
# 更新服务器状态
if status == 'Running':
server.status = 'Running'
server.save(ignore_permissions=True)
jingrow.db.commit()
return
elif status in waiting_states:
# 继续等待
time.sleep(10)
else:
server.status = status
server.save(ignore_permissions=True)
jingrow.db.commit()
return
except Exception as e:
jingrow.log_error("监控状态失败", f"监控实例 {instance_id} 状态时发生错误: {str(e)}")
@jingrow.whitelist()
def upgrade_aliyun_instance(instance_id, plan_id, region_id='cn-shanghai'):
"""升级实例配置"""
manager = _get_manager()
return manager.upgrade_instance(instance_id, plan_id, region_id)
@jingrow.whitelist()
def renew_aliyun_instance(instance_id, period=1, period_unit='Month', region_id='cn-shanghai'):
"""续费实例"""
manager = _get_manager()
return manager.renew_instance(instance_id, period, period_unit, region_id)
@jingrow.whitelist()
def reset_aliyun_instance_system(instance_id, image_id=None, password=None, region_id='cn-shanghai'):
"""重置系统并更新状态"""
try:
# 1. 查找对应的Jsite Server记录
server = jingrow.get_pg("Jsite Server", {"instance_id": instance_id})
if not server:
return {"success": False, "message": "找不到对应的服务器记录"}
# 2. 立即更新状态为"重置中"
server.status = "Resetting"
server.save(ignore_permissions=True)
jingrow.db.commit()
# 3. 调用阿里云API重置系统
manager = _get_manager()
result = manager.reset_system(instance_id, image_id, password, region_id)
# 4. 启动后台任务监控重置状态延迟10秒开始
time.sleep(10)
jingrow.enqueue(
"jcloud.api.aliyun_server_light.monitor_server_status",
instance_id=instance_id,
region_id=region_id,
timeout=600
)
return {
"success": True,
"message": "重置命令已发送,正在监控重置状态...",
"server_name": server.name
}
except Exception as e:
jingrow.log_error("重置系统失败", f"重置实例 {instance_id} 系统时发生错误: {str(e)}")
return {"success": False, "error": str(e), "message": "重置系统失败"}
@jingrow.whitelist()
def update_aliyun_instance_password(instance_id, password, region_id='cn-shanghai'):
"""更新实例密码"""
manager = _get_manager()
return manager.update_instance_password(instance_id, password, region_id)
@jingrow.whitelist()
def reset_aliyun_instance_password(instance_id, password, region_id='cn-shanghai'):
"""重置实例密码并更新Jsite Server记录"""
try:
# 1. 查找对应的Jsite Server记录
server = jingrow.get_pg("Jsite Server", {"instance_id": instance_id})
if not server:
return {"success": False, "message": "找不到对应的服务器记录"}
# 2. 调用阿里云API更新密码
manager = _get_manager()
result = manager.update_instance_password(instance_id, password, region_id)
if not result or not result.get('success'):
error_msg = f"阿里云密码更新失败: {result.get('message', '未知错误')}"
jingrow.log_error("重置密码失败", error_msg)
return {"success": False, "message": error_msg}
# 3. 更新Jsite Server记录中的password字段
server.password = password
server.save(ignore_permissions=True)
jingrow.db.commit()
# 4. 自动重启实例
reboot_aliyun_instance(instance_id, region_id)
return {
"success": True,
"message": "密码重置成功",
"server_name": server.name,
"instance_id": instance_id
}
except Exception as e:
jingrow.log_error("重置密码失败", f"重置实例 {instance_id} 密码时发生错误: {str(e)}")
return {"success": False, "error": str(e), "message": "密码重置失败"}
@jingrow.whitelist()
def list_aliyun_instances(page_number=1, page_size=20, region_id='cn-shanghai'):
"""获取实例列表"""
manager = _get_manager()
return manager.list_instances(page_number, page_size, region_id)
@jingrow.whitelist()
def delete_aliyun_instance(instance_id, region_id='cn-shanghai'):
"""删除实例"""
manager = _get_manager()
return manager.delete_instance(instance_id, region_id)
@jingrow.whitelist()
def get_aliyun_plans(region_id='cn-shanghai'):
"""获取可用套餐列表"""
manager = _get_manager()
return manager.get_plans(region_id)
@jingrow.whitelist()
def get_aliyun_images(image_type='system', region_id='cn-shanghai'):
"""获取可用镜像列表"""
manager = _get_manager()
return manager.get_images(image_type, region_id)
@jingrow.whitelist()
def get_aliyun_regions():
"""通过阿里云SDK实时获取可用地域列表"""
manager = _get_manager()
return manager.get_regions()
@jingrow.whitelist()
def create_aliyun_instance_key_pair(instance_id, key_pair_name, region_id='cn-shanghai'):
"""为实例创建密钥对"""
manager = _get_manager()
return manager.create_instance_key_pair(instance_id, key_pair_name, region_id)
@jingrow.whitelist()
def delete_aliyun_instance_key_pair(instance_id):
"""解绑并删除实例的密钥对"""
try:
# 1. 查找对应的Jsite Server记录获取key_pair_name和region_id
server = jingrow.get_pg("Jsite Server", {"instance_id": instance_id})
if not server:
return {"success": False, "message": "找不到对应的服务器记录"}
key_pair_name = server.key_pair_name
region_id = server.region
if not key_pair_name:
return {"success": True, "message": "实例没有关联的密钥对"}
manager = _get_manager()
# 2. 解绑实例的密钥对
unbind_result = manager.delete_instance_key_pair(instance_id, region_id)
if not unbind_result or not unbind_result.get('success'):
return {"success": False, "message": f"解绑密钥对失败: {unbind_result.get('message', '未知错误')}"}
# 3. 删除密钥对
delete_result = manager.delete_key_pairs([key_pair_name], region_id)
if not delete_result or not delete_result.get('success'):
return {"success": False, "message": f"删除密钥对失败: {delete_result.get('message', '未知错误')}"}
# 4. 清除服务器记录中的密钥对信息
server.key_pair_name = None
server.private_key = None
server.save(ignore_permissions=True)
jingrow.db.commit()
return {"success": True, "message": "密钥对解绑并删除成功"}
except Exception as e:
jingrow.log_error("删除实例密钥对失败", f"删除实例 {instance_id} 密钥对时发生错误: {str(e)}")
return {"success": False, "message": str(e)}
@jingrow.whitelist()
def get_aliyun_instance_key_pair(instance_id, region_id='cn-shanghai'):
"""获取实例的密钥对详细信息"""
manager = _get_manager()
return manager.get_instance_key_pair(instance_id, region_id)
@jingrow.whitelist()
def get_aliyun_instance_details(instance_ids, region_id='cn-shanghai'):
"""获取实例详细信息(支持批量查询)"""
manager = _get_manager()
return manager.get_instance_details(instance_ids, region_id)
# 服务器订单和创建相关
@jingrow.whitelist()
def create_server_order(**kwargs):
"""创建服务器订单"""
try:
plan_id = kwargs.get('plan_id')
image_id = kwargs.get('image_id')
period = kwargs.get('period', 1)
region_id = kwargs.get('region_id', 'cn-shanghai')
if not plan_id or not image_id:
jingrow.throw("缺少必要参数")
team = get_current_team(True)
# 获取套餐价格
plans_response = get_aliyun_plans(region_id)
selected_plan = None
if plans_response and plans_response.get('success'):
plans = plans_response['data'].get('plans', [])
selected_plan = next((p for p in plans if p.get('plan_id') == plan_id), None)
if not selected_plan:
jingrow.throw("套餐不存在")
# 计算价格
monthly_price = float(selected_plan.get('origin_price', 0))
total_amount = monthly_price * period
# 生成订单号
order_id = datetime.now().strftime('%Y%m%d%H%M%S%f')[:-3] + ''.join(random.choices('0123456789', k=6))
# 创建订单记录
order = jingrow.get_pg({
"pagetype": "Order",
"order_id": order_id,
"order_type": "新建服务器",
"team": team.name,
"status": "待支付",
"total_amount": total_amount,
"title": f"{region_id}",
"description": f"{selected_plan.get('core')}核/{selected_plan.get('memory')}GB/{selected_plan.get('disk_size')}GB, {period}个月"
})
order.insert(ignore_permissions=True)
# 创建服务器记录
server = jingrow.get_pg({
"pagetype": "Jsite Server",
"team": team.name,
"order_id": order_id,
"status": "Pending",
"region": region_id,
"image_id": image_id,
"end_date": jingrow.utils.add_months(jingrow.utils.nowdate(), period),
"title": f"{region_id} - {selected_plan.get('core')}核/{selected_plan.get('memory')}GB",
"planid": plan_id,
"period": period,
"cpu": selected_plan.get('core'),
"memory": selected_plan.get('memory'),
"disk_size": selected_plan.get('disk_size'),
"bandwidth": selected_plan.get('bandwidth')
})
server.insert(ignore_permissions=True)
jingrow.db.commit()
return {"success": True, "order": order.as_dict(), "server": server.as_dict()}
except Exception as e:
jingrow.log_error("创建服务器订单失败", str(e))
return {"success": False, "message": str(e)}
def create_aliyun_server(order_name):
"""异步创建服务器"""
try:
order = jingrow.get_pg("Order", order_name)
if not order:
raise Exception("订单不存在")
# 查找对应的服务器记录通过订单ID
server = jingrow.get_pg("Jsite Server", {"order_id": order.order_id})
if not server:
raise Exception("找不到对应的服务器记录")
# 从服务器记录中获取配置信息
region_id = server.region
image_id = server.image_id or 'e9363571cf2444aba422b17470285465'
plan_id = server.planid or 'swas.s.c2m1s30b1.linux'
period = server.period or 1
# 调用阿里云API创建实例
result = create_aliyun_instance(plan_id, image_id, period, region_id)
# 打印result到后台日志
jingrow.log_error("阿里云创建实例结果", f"订单 {order_name} 的创建结果: {result}")
if not result or not result.get('success'):
raise Exception(f"阿里云创建失败: {result.get('message', '未知错误')}")
# 阿里云API返回的是instance_ids数组取第一个实例ID
instance_id = result['data']['instance_ids'][0]
# 更新服务器记录状态
server.status = "Running"
server.instance_id = instance_id
server.save(ignore_permissions=True)
# 更新订单状态
order.status = "交易成功"
order.save(ignore_permissions=True)
jingrow.db.commit()
return True
except Exception as e:
jingrow.log_error("服务器创建失败", f"订单 {order_name}: {str(e)}")
raise e
@jingrow.whitelist()
def create_server_key_pair(instance_id):
"""为实例创建密钥对并保存私钥"""
try:
# 查找对应的服务器记录获取region_id
server = jingrow.get_pg("Jsite Server", {"instance_id": instance_id})
if not server:
jingrow.log_error("找不到对应的服务器记录")
region_id = server.region
uuid_suffix = str(uuid.uuid4())[:4]
key_pair_name = f"{region_id}-{instance_id[:8]}-{uuid_suffix}"
# 直接调用管理器方法创建密钥对
manager = _get_manager()
key_pair_result = manager.create_instance_key_pair(instance_id, key_pair_name, region_id)
if not key_pair_result or not key_pair_result.get('success'):
error_msg = key_pair_result.get('message', '未知错误') if key_pair_result else '返回结果为空'
jingrow.log_error(f"密钥对创建失败: {error_msg}")
return {"success": False, "message": f"密钥对创建失败: {error_msg}"}
# 获取私钥 - 直接访问data字段
key_pair_data = key_pair_result.get('data', {})
private_key = key_pair_data.get('private_key')
if private_key:
# 保存私钥到服务器记录
server.key_pair_name = key_pair_name
server.private_key = private_key
server.save(ignore_permissions=True)
jingrow.db.commit()
return {
"success": True,
"message": "密钥对创建成功",
"key_pair_name": key_pair_name,
"private_key": private_key
}
else:
jingrow.log_error("密钥对创建成功但未获取到私钥", f"完整的key_pair_result: {key_pair_result}")
return {"success": False, "message": "密钥对创建成功但未获取到私钥"}
except Exception as e:
jingrow.log_error("创建密钥对失败", f"实例 {instance_id}: {str(e)}")
return {"success": False, "message": str(e)}
@jingrow.whitelist()
def update_server_record(instance_ids):
"""根据阿里云实例信息更新Jsite Server记录"""
try:
# 解析instance_idsJSON字符串格式
if isinstance(instance_ids, str):
instance_ids = json.loads(instance_ids)
# 获取第一个实例ID
instance_id = instance_ids[0]
# 查找对应的Jsite Server记录获取region_id
server = jingrow.get_pg("Jsite Server", {"instance_id": instance_id})
if not server:
jingrow.log_error("更新服务器信息失败", f"找不到实例ID为 {instance_id} 的Jsite Server记录")
return {"success": False, "message": f"找不到实例ID为 {instance_id} 的Jsite Server记录"}
region_id = server.region
# 获取阿里云实例详细信息
instance_details = get_aliyun_instance_details(instance_ids, region_id)
if not instance_details or not instance_details.get('success'):
error_msg = f"获取阿里云实例信息失败: {instance_details.get('message', '未知错误')}"
jingrow.log_error("更新服务器信息失败", error_msg)
return {"success": False, "message": error_msg}
# 解析实例信息
instances = instance_details.get('data', {}).get('instances', [])
if not instances:
jingrow.log_error("更新服务器信息失败", "未找到实例信息")
return {"success": False, "message": "未找到实例信息"}
instance_info = instances[0] # 取第一个实例
# 更新public_ip
public_ip = instance_info.get('public_ip_address')
if public_ip:
server.public_ip = public_ip
# 更新end_date从expired_time转换
expired_time = instance_info.get('expired_time')
if expired_time:
try:
expired_datetime = datetime.fromisoformat(expired_time.replace('Z', '+00:00'))
server.end_date = expired_datetime.strftime('%Y-%m-%d %H:%M:%S')
except Exception as e:
jingrow.log_error("时间格式转换失败", f"转换expired_time {expired_time} 时发生错误: {str(e)}")
# 更新其他相关信息
server.status = instance_info.get('status', '')
# 更新系统信息
image_info = instance_info.get('image', {})
if image_info:
image_name = image_info.get('image_name', '')
image_version = image_info.get('image_version', '')
server.system = f"{image_name} {image_version}".strip()
# 更新资源规格信息
resource_spec = instance_info.get('resource_spec', {})
if resource_spec:
server.cpu = resource_spec.get('cpu')
server.memory = resource_spec.get('memory')
server.disk_size = resource_spec.get('disk_size')
server.bandwidth = resource_spec.get('bandwidth')
# 保存更新
server.save(ignore_permissions=True)
jingrow.db.commit()
return {
"success": True,
"message": "服务器信息更新成功",
"updated_fields": {
"public_ip": public_ip,
"end_date": server.end_date,
"status": server.status,
"system": server.system
}
}
except Exception as e:
jingrow.log_error("更新服务器信息失败", f"更新服务器信息时发生错误: {str(e)}")
return {"success": False, "message": str(e)}
@jingrow.whitelist()
def reset_server_key_pair(instance_id):
"""重置服务器密钥对:先删除旧的,再创建新的"""
try:
# 第一步:删除旧的密钥对(如果存在)
delete_aliyun_instance_key_pair(instance_id)
# 第二步:创建新的密钥对
result = create_server_key_pair(instance_id)
return result
except Exception as e:
jingrow.log_error("重置密钥对失败", f"重置实例 {instance_id} 密钥对时发生错误: {str(e)}")
return {"success": False, "message": str(e)}