jcloud/jcloud/api/domain_west.py

3448 lines
126 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Copyright (c) 2024, JINGROW
# For license information, please see license.txt
import jingrow
import requests
import time
import hashlib
import json
import random
from datetime import datetime
from urllib.parse import urlencode
from typing import Dict, Any, Optional, List
from jcloud.utils import get_current_team
from pypinyin import lazy_pinyin
import base64
class WestDomain:
"""西部数码域名API客户端"""
def __init__(self, username: str, password: str):
"""
初始化西部数码API客户端
Args:
username: 西部数码用户名
password: 西部数码API密码
"""
self.username = username.strip()
self.password = password.strip()
self.api_base_url = "https://api.west.cn/api/v2"
self.time = None
self.token = None
def _generate_token(self) -> str:
"""生成认证token"""
self.time = self._get_current_timestamp()
token_string = f"{self.username}{self.password}{self.time}"
return hashlib.md5(token_string.encode('utf-8')).hexdigest()
def _get_current_timestamp(self) -> int:
"""获取当前时间戳(毫秒)"""
return int(time.time() * 1000)
def _generate_common_parameters(self) -> Dict[str, str]:
"""生成公共参数"""
self.token = self._generate_token()
return {
'username': self.username,
'time': str(self.time),
'token': self.token,
}
def _make_request(self, action: str, method: str = 'GET',
query_params: Optional[Dict] = None,
body_params: Optional[Dict] = None) -> Dict[str, Any]:
"""
发送API请求
Args:
action: API动作路径
method: 请求方法 (GET/POST)
query_params: 查询参数
body_params: 请求体参数
Returns:
API响应结果
"""
# 构建URL
common_params = self._generate_common_parameters()
param_string = urlencode(common_params)
url = f"{self.api_base_url}{action}"
if '?' in action:
url += f"&{param_string}"
else:
url += f"?{param_string}"
# 添加查询参数
if query_params:
url += f"&{urlencode(query_params)}"
headers = {
'Content-Type': 'application/x-www-form-urlencoded'
}
try:
if method.upper() == 'POST':
data = body_params or {}
# 确保中文字符正确编码
response = requests.post(url, data=data, headers=headers, timeout=30)
else:
response = requests.get(url, headers=headers, timeout=30)
response.raise_for_status()
try:
result = response.json()
except json.JSONDecodeError:
result = {"status": "error", "message": "无法解析API响应"}
return result
except requests.exceptions.RequestException as e:
return {"status": "error", "message": f"API请求失败: {str(e)}"}
def check_balance(self) -> Dict[str, Any]:
"""获取账户可用余额"""
return self._make_request('/info/?act=checkbalance', 'GET')
def get_domain_price(self, domain: str, year: int = 1) -> Dict[str, Any]:
"""
获取域名价格
Args:
domain: 域名
year: 注册年限
"""
body_params = {
'type': 'domain',
'value': domain,
'year': year,
}
return self._make_request('/info/?act=getprice', 'POST', body_params=body_params)
def get_domain_renew_price(self, domain: str, year: int = 1) -> Dict[str, Any]:
"""
获取域名续费价格
Args:
domain: 域名
year: 续费年限
"""
body_params = {
'act': 'getrenprice',
'domain': domain,
'year': year,
}
return self._make_request('/domain/', 'POST', body_params=body_params)
def query_domain(self, domain: str, suffix: str = '.com') -> Dict[str, Any]:
"""
域名查询
Args:
domain: 域名前缀
suffix: 域名后缀
"""
body_params = {
'domain': domain,
'suffix': suffix,
}
return self._make_request('/domain/query/', 'POST', body_params=body_params)
def register_domain(self, domain: str, regyear: int = 1,
domainpwd: Optional[str] = None,
dns_host1: Optional[str] = None,
dns_host2: Optional[str] = None,
dns_host3: Optional[str] = None,
dns_host4: Optional[str] = None,
dns_host5: Optional[str] = None,
dns_host6: Optional[str] = None,
c_sysid: Optional[str] = None,
client_price: Optional[str] = None,
premium: Optional[str] = None,
domchannel: Optional[str] = None,
westusechn: Optional[str] = None) -> Dict[str, Any]:
"""
注册域名
Args:
domain: 域名(多个域名用英文逗号分隔)
regyear: 注册年限
domainpwd: 域名密码,不填则系统随机生成
dns_host1: 主DNS必填
dns_host2: 辅DNS必填
dns_host3-6: 可选DNS
c_sysid: 模板ID必填
client_price: 价格保护代理商用户购买价格如果价格低于其成本价激活失败防止亏损不需要保护传99999
premium: 普通域名不需要传,溢价域名必须传"yes"才能注册
domchannel: 仅对.cn域名有效"hk"为特价渠道,空为默认,"cn"为普通渠道
westusechn: 空为国内渠道,如需国际合作渠道传"hk",此功能支持:.com(需特殊权限)、.top、.cyou、.icu、.vip、.xyz、.site、.shop、.co
"""
body_params = {
'act': 'regdomain',
'domain': domain,
'regyear': regyear,
}
# 添加可选参数
if domainpwd:
body_params['domainpwd'] = domainpwd
if dns_host1:
body_params['dns_host1'] = dns_host1
if dns_host2:
body_params['dns_host2'] = dns_host2
if dns_host3:
body_params['dns_host3'] = dns_host3
if dns_host4:
body_params['dns_host4'] = dns_host4
if dns_host5:
body_params['dns_host5'] = dns_host5
if dns_host6:
body_params['dns_host6'] = dns_host6
if c_sysid:
body_params['c_sysid'] = c_sysid
if client_price:
body_params['client_price'] = client_price
if premium:
body_params['premium'] = premium
if domchannel:
body_params['domchannel'] = domchannel
if westusechn:
body_params['westusechn'] = westusechn
return self._make_request('/audit/', 'POST', body_params=body_params)
def renew_domain(self, domain: str, year: int = 1,
expire_date: Optional[str] = None,
client_price: Optional[int] = None) -> Dict[str, Any]:
"""
域名续费
Args:
domain: 域名
year: 续费年限
expire_date: 到期时间
client_price: 客户价格
"""
body_params = {
'domain': domain,
'year': year,
}
if expire_date:
body_params['expiredate'] = expire_date
if client_price:
body_params['client_price'] = client_price
return self._make_request('/domain/?act=renew', 'POST', body_params=body_params)
def get_domain_list(self, limit: int = 10, page: int = 1) -> Dict[str, Any]:
"""
获取域名列表
Args:
limit: 每页数量
page: 页码
"""
query_params = {
'limit': limit,
'page': page,
}
return self._make_request('/domain/?act=getdomains', 'GET', query_params=query_params)
def get_domain_info(self, domain: str) -> Dict[str, Any]:
"""
获取域名详细信息
Args:
domain: 域名
"""
body_params = {
'domain': domain,
}
return self._make_request('/domain/?act=getinfo', 'POST', body_params=body_params)
def get_dns_records(self, domain: str) -> Dict[str, Any]:
"""
获取域名DNS记录
Args:
domain: 域名
"""
body_params = {
'domain': domain,
}
return self._make_request('/domain/?act=getdns', 'POST', body_params=body_params)
def get_dns_records_paginated(self, domain: str, limit: int = 20, pageno: int = 1) -> Dict[str, Any]:
"""
获取域名解析记录(支持分页)
Args:
domain: 域名
limit: 每页大小默认20
pageno: 第几页默认为1
"""
body_params = {
'act': 'getdnsrecord',
'domain': domain,
'limit': limit,
'pageno': pageno,
}
return self._make_request('/domain/', 'POST', body_params=body_params)
def modify_dns_records(self, domain: str, records: List[Dict]) -> Dict[str, Any]:
"""
修改域名DNS记录
Args:
domain: 域名
records: DNS记录列表
"""
body_params = {
'domain': domain,
'records': json.dumps(records),
}
return self._make_request('/domain/?act=modifydns', 'POST', body_params=body_params)
def add_dns_record(self, domain: str, record_type: str,
host: str, value: str, ttl: int = 600, level: int = 10, line: str = "") -> Dict[str, Any]:
"""
添加DNS记录
Args:
domain: 域名
record_type: 记录类型 (A, AAAA, CNAME, MX, NS, TXT, SRV)
host: 主机记录
value: 记录值
ttl: TTL值 (60~86400秒默认900)
level: 优先级MX记录使用1-100默认10
line: 线路 (默认="", 电信="LTEL", 联通="LCNC", 移动="LMOB", 教育网="LEDU", 搜索引擎="LSEO")
"""
body_params = {
'act': 'adddnsrecord',
'domain': domain,
'host': host,
'type': record_type,
'value': value,
'ttl': ttl,
'level': level,
'line': line,
}
return self._make_request('/domain/', 'POST', body_params=body_params)
def modify_dns_record(self, domain: str, value: str, ttl: int = 600, level: int = 10,
record_id: Optional[str] = None, host: Optional[str] = None,
record_type: Optional[str] = None, line: str = "", old_value: Optional[str] = None) -> Dict[str, Any]:
"""
修改DNS记录
Args:
domain: 域名
value: 新的解析值
ttl: TTL值 (60~86400秒默认900)
level: 优先级MX记录使用1-100默认10
record_id: 解析记录编号(优先使用)
host: 主机头当record_id未提供时必填
record_type: 解析类型当record_id未提供时必填
line: 线路 (默认="", 电信="LTEL", 联通="LCNC", 移动="LMOB", 教育网="LEDU", 搜索引擎="LSEO")
old_value: 旧解析值(可选,用于确定唯一记录)
"""
body_params = {
'act': 'moddnsrecord',
'domain': domain,
'value': value,
'ttl': ttl,
'level': level,
'line': line,
}
# 优先使用record_id
if record_id:
body_params['id'] = record_id
else:
# 必须提供host和type
if not host or not record_type:
return {"status": "error", "message": "当未提供记录ID时主机头和解析类型为必填项"}
body_params['host'] = host
body_params['type'] = record_type
# 添加旧值(可选)
if old_value:
body_params['oldvalue'] = old_value
return self._make_request('/domain/', 'POST', body_params=body_params)
def delete_dns_record(self, domain: str, record_id: Optional[str] = None,
host: Optional[str] = None, record_type: Optional[str] = None,
value: Optional[str] = None, line: str = "") -> Dict[str, Any]:
"""
删除DNS记录
Args:
domain: 域名
record_id: 解析记录ID优先使用
host: 主机头当record_id未提供时必填
record_type: 解析类型当record_id未提供时必填
value: 解析值(可选)
line: 线路 (默认="", 电信="LTEL", 联通="LCNC", 移动="LMOB", 教育网="LEDU", 搜索引擎="LSEO")
"""
body_params = {
'act': 'deldnsrecord',
'domain': domain,
'line': line,
}
# 优先使用record_id
if record_id:
body_params['id'] = record_id
else:
# 必须提供host和type
if not host or not record_type:
return {"status": "error", "message": "当未提供记录ID时主机头和解析类型为必填项"}
body_params['host'] = host
body_params['type'] = record_type
if value:
body_params['value'] = value
return self._make_request('/domain/', 'POST', body_params=body_params)
def transfer_domain(self, domain: str, auth_code: str) -> Dict[str, Any]:
"""
域名转入
Args:
domain: 域名
auth_code: 转移授权码
"""
body_params = {
'domain': domain,
'auth_code': auth_code,
}
return self._make_request('/domain/?act=transfer', 'POST', body_params=body_params)
def lock_domain(self, domain: str, lock: bool = True) -> Dict[str, Any]:
"""
锁定/解锁域名
Args:
domain: 域名
lock: 是否锁定
"""
action = '/domain/?act=lock' if lock else '/domain/?act=unlock'
body_params = {
'domain': domain,
}
return self._make_request(action, 'POST', body_params=body_params)
def get_template_list(self, limit: int = 10, page: int = 1) -> Dict[str, Any]:
"""
获取域名模板列表
Args:
limit: 每页数量
page: 页码
"""
body_params = {
'act': 'gettemplates',
'limit': limit,
'page': page,
}
return self._make_request('/audit/', 'POST', body_params=body_params)
def get_template_detail(self, template_id: str) -> Dict[str, Any]:
"""
获取指定模板详情
Args:
template_id: 模板ID
"""
body_params = {
'act': 'auditinfo',
'c_sysid': template_id,
}
return self._make_request('/audit/', 'POST', body_params=body_params)
def create_contact_template(self, template_data: Dict[str, Any]) -> Dict[str, Any]:
"""
创建域名模板
Args:
template_data: 模板数据,包含所有必要的字段
"""
body_params = {
'act': 'auditsub',
**template_data
}
# 对body_params进行GBK编码
encoded_data = urlencode(body_params, encoding='gbk').encode('gbk')
return self._make_request('/audit/', 'POST', body_params=encoded_data)
def modify_contact_template(self, template_data: Dict[str, Any]) -> Dict[str, Any]:
"""
修改实名模板资料
Args:
template_data: 模板数据必须包含c_sysid字段
"""
if not template_data.get('c_sysid'):
return {"status": "error", "message": "缺少必要参数c_sysid"}
body_params = {
'act': 'auditmod',
**template_data
}
# 对body_params进行GBK编码
encoded_data = urlencode(body_params, encoding='gbk').encode('gbk')
return self._make_request('/audit/', 'POST', body_params=encoded_data)
def delete_contact_template(self, template_id: str) -> Dict[str, Any]:
"""
删除指定模板
Args:
template_id: 模板标识(c_sysid)
"""
if not template_id:
return {"status": "error", "message": "缺少必要参数template_id"}
body_params = {
'act': 'auditdel',
'c_sysid': template_id,
}
return self._make_request('/audit/', 'POST', body_params=body_params)
def get_domain_real_info(self, domain: str) -> Dict[str, Any]:
"""
获取域名实名信息
Args:
domain: 域名
Returns:
域名实名信息,包含所有者、管理联系人、技术联系人、缴费联系人等信息
"""
body_params = {
'act': 'domaininfo',
'domain': domain,
}
return self._make_request('/audit/', 'POST', body_params=body_params)
def get_upload_token(self, c_sysid: str, f_type_org: str, f_code_org: str,
f_type_lxr: Optional[str] = None, f_code_lxr: Optional[str] = None) -> Dict[str, Any]:
"""
获取实名上传token
Args:
c_sysid: 模板标识
f_type_org: 证件类型,详情见附录
f_code_org: 证件号码
f_type_lxr: 联系人证件类型(企业时填写)
f_code_lxr: 联系人证件号码(企业时填写)
Returns:
API响应结果包含上传token
"""
body_params = {
'act': 'uploadwcftoken',
'c_sysid': c_sysid,
'f_type_org': f_type_org,
'f_code_org': f_code_org,
}
# 添加可选参数
if f_type_lxr:
body_params['f_type_lxr'] = f_type_lxr
if f_code_lxr:
body_params['f_code_lxr'] = f_code_lxr
return self._make_request('/audit/', 'POST', body_params=body_params)
def upload_real_name_files(self, token: str, file_org: str, file_lxr: Optional[str] = None) -> Dict[str, Any]:
"""
模板实名资料上传
Args:
token: 实名上传Token
file_org: 图片完整的base64
file_lxr: 企业联系人图片完整的base64(只有token中设置了联系人的才上传)
Returns:
API响应结果
"""
upload_url = "https://netservice.vhostgo.com/wcfservice/Service1.svc/Wcf_AuditUploadFile"
# 构建请求数据
data = {
'token': token,
'file_org': file_org,
}
# 添加可选参数
if file_lxr:
data['file_lxr'] = file_lxr
headers = {
'Content-Type': 'application/json'
}
try:
# 发送JSON格式的POST请求
response = requests.post(upload_url, json=data, headers=headers, timeout=30)
response.raise_for_status()
try:
result = response.json()
except json.JSONDecodeError:
result = {"status": "error", "message": "无法解析API响应"}
return result
except requests.exceptions.RequestException as e:
return {"status": "error", "message": f"API请求失败: {str(e)}"}
def modify_dns_server(self, domain: str, dns1: str, dns2: str,
dns3: Optional[str] = None, dns4: Optional[str] = None,
dns5: Optional[str] = None, dns6: Optional[str] = None) -> Dict[str, Any]:
"""
修改域名DNS服务器
Args:
domain: 要修改DNS的域名
dns1: 主DNS服务器
dns2: 辅DNS服务器
dns3: 第三个DNS服务器可选
dns4: 第四个DNS服务器可选
dns5: 第五个DNS服务器可选
dns6: 第六个DNS服务器可选
Returns:
API响应结果
"""
body_params = {
'act': 'moddns',
'domain': domain,
'dns1': dns1,
'dns2': dns2,
}
# 添加可选的DNS服务器
if dns3:
body_params['dns3'] = dns3
if dns4:
body_params['dns4'] = dns4
if dns5:
body_params['dns5'] = dns5
if dns6:
body_params['dns6'] = dns6
return self._make_request('/domain/', 'POST', body_params=body_params)
def format_date(date):
"""格式化域名到期时间"""
if not date:
return None
if isinstance(date, str):
return date
return date.strftime('%Y-%m-%d')
def get_west_client() -> WestDomain:
"""获取西部数码域名API客户端实例"""
try:
# 从Jcloud Settings获取配置
settings = jingrow.get_single("Jcloud Settings")
if settings:
username = settings.get("west_username")
password = settings.get_password("west_api_password") if settings.get("west_api_password") else None
if username and password:
return WestDomain(username, password)
else:
return None
else:
return None
except Exception as e:
return None
# API端点函数
@jingrow.whitelist()
def check_west_balance():
"""检查西部数码账户余额"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
return client.check_balance()
@jingrow.whitelist()
def check_domain(domain: str, suffix: str = '.com'):
"""查询域名是否可注册"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
if not domain:
return {"status": "error", "message": "缺少域名参数"}
response = client.query_domain(domain, suffix)
if response.get("status") == "error":
return response
try:
# 直接检查响应格式
if response.get("result") != 200:
return {"status": "error", "message": "API查询失败"}
full_domain = domain + suffix
for item in response.get("data", []):
if item.get("name") == full_domain:
return {
"available": item.get("avail", 0) == 1,
"domain": full_domain,
"message": "域名可用" if item.get("avail", 0) == 1 else "域名已被注册"
}
return {"status": "error", "message": f"未找到域名 {full_domain} 的查询结果"}
except Exception as e:
return {"status": "error", "message": "域名查询响应解析失败"}
@jingrow.whitelist()
def get_west_domain_price(domain: str, year: int = 1):
"""获取域名价格"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
if not domain:
return {"status": "error", "message": "缺少域名参数"}
response = client.get_domain_price(domain, year)
if response.get("status") == "error":
return response
try:
# 直接检查响应格式
if response.get("result") != 200:
return {"status": "error", "message": "API查询失败"}
data = response.get("data", {})
original_price = data.get("buyprice", 0)
# 统一增加10%的利润参考aliyun_server_light.py的方法
if original_price and original_price > 0:
# 确保10%利润率且价格为整数
adjusted_price = int(original_price / (1 - 0.1))
else:
adjusted_price = original_price
return {
"data": {
"price": adjusted_price,
"original_price": original_price, # 保留原价用于参考
"domain": domain,
"year": year
}
}
except Exception as e:
return {"status": "error", "message": "域名价格查询响应解析失败"}
@jingrow.whitelist()
def get_west_domain_renew_price(domain: str, year: int = 1):
"""获取域名续费价格"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
if not domain:
return {"status": "error", "message": "缺少域名参数"}
response = client.get_domain_renew_price(domain, year)
if response.get("status") == "error":
return response
try:
# 直接检查响应格式
if response.get("result") != 200:
return {"status": "error", "message": "API查询失败"}
data = response.get("data", {})
original_price = data.get("price", 0)
# 统一增加10%的利润参考aliyun_server_light.py的方法
if original_price and original_price > 0:
# 确保10%利润率且价格为整数
adjusted_price = int(original_price / (1 - 0.1))
else:
adjusted_price = original_price
return {
"data": {
"price": adjusted_price,
"domain": domain,
"year": year,
"ispremium": data.get("ispremium", "n")
}
}
except Exception as e:
return {"status": "error", "message": "域名续费价格查询响应解析失败"}
@jingrow.whitelist()
def west_domain_register(domain: str, regyear: int = 1, dns_host1: str = "ns1.myhostadmin.net",
dns_host2: str = "ns2.myhostadmin.net", c_sysid: str = None,
domainpwd: str = None, dns_host3: str = None, dns_host4: str = None,
dns_host5: str = None, dns_host6: str = None, client_price: str = None,
premium: str = None, domchannel: str = None, westusechn: str = None):
"""注册域名"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
return client.register_domain(
domain=domain,
regyear=regyear,
dns_host1=dns_host1,
dns_host2=dns_host2,
c_sysid=c_sysid,
domainpwd=domainpwd,
dns_host3=dns_host3,
dns_host4=dns_host4,
dns_host5=dns_host5,
dns_host6=dns_host6,
client_price=client_price,
premium=premium,
domchannel=domchannel,
westusechn=westusechn
)
@jingrow.whitelist()
def west_domain_renew(**data):
"""域名续费"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
domain = data.get('domain')
year = data.get('year', 1)
expire_date = data.get('expire_date')
client_price = data.get('client_price')
if not domain:
return {"status": "error", "message": "缺少域名参数"}
return client.renew_domain(domain, year, expire_date, client_price)
@jingrow.whitelist()
def west_domain_get_list(**data):
"""获取域名列表"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
limit = data.get('limit', 10)
page = data.get('page', 1)
return client.get_domain_list(limit, page)
@jingrow.whitelist()
def west_domain_get_info(**data):
"""获取域名信息"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
domain = data.get('domain')
if not domain:
return {"status": "error", "message": "缺少域名参数"}
return client.get_domain_info(domain)
@jingrow.whitelist()
def west_domain_get_dns(**data):
"""获取域名DNS记录"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
domain = data.get('domain')
if not domain:
return {"status": "error", "message": "缺少域名参数"}
return client.get_dns_records(domain)
@jingrow.whitelist()
def get_west_domain_dns_records(**data):
"""获取域名解析记录(支持分页)"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
domain = data.get('domain')
limit = data.get('limit', 20)
pageno = data.get('pageno', 1)
if not domain:
return {"status": "error", "message": "缺少域名参数"}
# 获取所有记录(不分页)
response = client.get_dns_records_paginated(domain, 1000, 1) # 获取足够多的记录
if response.get("status") == "error":
return response
try:
# 检查响应格式
if response.get("result") != 200:
return {"status": "error", "message": "API查询失败"}
data = response.get("data", {})
all_items = data.get("items", [])
# 对所有记录按类型排序
sorted_items = sorted(all_items, key=lambda x: x.get('type', ''))
# 计算分页
total = len(sorted_items)
pagecount = (total + limit - 1) // limit
start_index = (pageno - 1) * limit
end_index = start_index + limit
# 返回当前页的记录
current_page_items = sorted_items[start_index:end_index]
# 返回格式化的解析记录信息
return {
"status": "success",
"data": {
"pageno": pageno,
"limit": limit,
"total": total,
"pagecount": pagecount,
"items": current_page_items
}
}
except Exception as e:
return {"status": "error", "message": "域名解析记录查询响应解析失败"}
@jingrow.whitelist()
def get_jingrow_domain_dns_records(**data):
"""获取域名解析记录(从本地数据库获取,支持分页)"""
try:
domain = data.get('domain')
limit = data.get('limit', 20)
pageno = data.get('pageno', 1)
if not domain:
return {"status": "error", "message": "缺少域名参数"}
# 查找对应的域名记录
domain_records = jingrow.get_all(
"Jsite Domain",
{"domain": domain},
["name"]
)
if not domain_records:
return {"status": "error", "message": "未找到指定的域名记录"}
domain_record = domain_records[0]
# 获取该域名的所有DNS解析记录
dns_records = jingrow.get_all(
"Dns Resolution",
{"parent": domain_record.name},
["name", "host", "type", "value", "ttl", "level", "line", "record_id"]
)
# 转换为API格式的数据结构
all_items = []
for record in dns_records:
item = {
"id": record.record_id or record.name,
"host": record.host or "",
"type": record.type or "",
"value": record.value or "",
"ttl": int(record.ttl) if record.ttl else 600,
"level": int(record.level) if record.level else 10,
"line": record.line or "",
"record_id": record.record_id or record.name
}
all_items.append(item)
# 对所有记录按类型排序
sorted_items = sorted(all_items, key=lambda x: x.get('type', ''))
# 计算分页
total = len(sorted_items)
pagecount = (total + limit - 1) // limit
start_index = (pageno - 1) * limit
end_index = start_index + limit
# 返回当前页的记录
current_page_items = sorted_items[start_index:end_index]
# 返回格式化的解析记录信息
return {
"status": "success",
"data": {
"pageno": pageno,
"limit": limit,
"total": total,
"pagecount": pagecount,
"items": current_page_items
}
}
except Exception as e:
return {"status": "error", "message": f"域名解析记录查询失败: {str(e)}"}
@jingrow.whitelist()
def west_domain_modify_dns(**data):
"""修改域名DNS记录批量修改兼容旧版本"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
domain = data.get('domain')
records = data.get('records')
if not domain:
return {"status": "error", "message": "缺少域名参数"}
if not records:
return {"status": "error", "message": "缺少DNS记录参数"}
# 逐个修改记录
results = []
for record in records:
record_type = record.get('type')
host = record.get('host')
value = record.get('value')
ttl = record.get('ttl', 600)
level = record.get('level', 10)
line = record.get('line', '')
record_id = record.get('record_id')
old_value = record.get('old_value')
# 验证必要参数
if not value:
results.append({"status": "error", "message": "缺少解析值"})
continue
# 如果没有提供record_id则必须提供host和record_type
if not record_id and (not host or not record_type):
results.append({"status": "error", "message": "当未提供记录ID时主机头和解析类型为必填项"})
continue
# 验证TTL值
if ttl < 60 or ttl > 86400:
results.append({"status": "error", "message": "TTL值必须在60~86400秒之间"})
continue
# 验证优先级
if level < 1 or level > 100:
results.append({"status": "error", "message": "优先级必须在1~100之间"})
continue
# 验证记录类型
if record_type:
valid_types = ['A', 'AAAA', 'CNAME', 'MX', 'NS', 'TXT', 'SRV']
if record_type not in valid_types:
results.append({"status": "error", "message": f"不支持的记录类型: {record_type}"})
continue
# 调用单个记录修改API
response = client.modify_dns_record(
domain, value, ttl, level, record_id, host, record_type, line, old_value
)
if response.get("status") == "error":
results.append(response)
elif response.get("result") != 200:
error_msg = response.get('msg', response.get('message', '未知错误'))
results.append({"status": "error", "message": f"修改DNS记录失败: {error_msg}"})
else:
results.append({
"status": "success",
"message": "DNS记录修改成功",
"data": {
"domain": domain,
"value": value,
"ttl": ttl,
"level": level,
"record_id": record_id,
"host": host,
"record_type": record_type,
"line": line
}
})
# 返回批量操作结果
success_count = sum(1 for r in results if r.get("status") == "success")
error_count = len(results) - success_count
return {
"status": "success" if error_count == 0 else "partial_success",
"message": f"批量修改完成,成功: {success_count},失败: {error_count}",
"results": results
}
@jingrow.whitelist()
def west_domain_add_dns_record(**data):
"""添加DNS记录"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
domain = data.get('domain')
record_type = data.get('record_type')
host = data.get('host')
value = data.get('value')
ttl = data.get('ttl', 600)
level = data.get('level', 10)
line = data.get('line', '')
if not all([domain, record_type, host, value]):
return {"status": "error", "message": "缺少必要参数"}
# 验证记录类型
valid_types = ['A', 'AAAA', 'CNAME', 'MX', 'NS', 'TXT', 'SRV']
if record_type not in valid_types:
return {"status": "error", "message": f"不支持的记录类型: {record_type},支持的类型: {', '.join(valid_types)}"}
# 验证TTL值
if ttl < 60 or ttl > 86400:
return {"status": "error", "message": "TTL值必须在60~86400秒之间"}
# 验证优先级
if level < 1 or level > 100:
return {"status": "error", "message": "优先级必须在1~100之间"}
response = client.add_dns_record(domain, record_type, host, value, ttl, level, line)
if response.get("status") == "error":
return response
try:
# 检查响应格式
if response.get("result") != 200:
error_msg = response.get('msg', response.get('message', '未知错误'))
return {"status": "error", "message": f"添加DNS记录失败: {error_msg}"}
# 获取新增记录的ID
record_id = response.get("data", {}).get("id")
# 同步添加记录到本地数据库
if record_id:
try:
# 查找对应的域名记录
domain_records = jingrow.get_all(
"Jsite Domain",
{"domain": domain},
["name"]
)
if domain_records:
domain_record = domain_records[0]
# 立即执行插入操作
# 先获取父记录
parent_pg = jingrow.get_pg("Jsite Domain", domain_record.name)
# 创建新的DNS记录作为子记录
dns_pg = jingrow.new_pg(
"Dns Resolution",
parent_pg=parent_pg,
parentfield="dns_resolution",
host=host,
type=record_type,
value=value,
ttl=str(ttl),
level=str(level),
line=line,
record_id=record_id
)
dns_pg.insert(ignore_permissions=True)
jingrow.db.commit()
except Exception as e:
jingrow.log_error(f"域名 {domain} DNS记录同步添加失败", f"错误: {str(e)}")
# 返回成功结果
return {
"status": "success",
"message": "DNS记录添加成功",
"data": {
"id": record_id,
"domain": domain,
"host": host,
"type": record_type,
"value": value,
"ttl": ttl,
"level": level,
"line": line
}
}
except Exception as e:
return {"status": "error", "message": "添加DNS记录响应解析失败"}
@jingrow.whitelist()
def west_domain_delete_dns_record(**data):
"""删除DNS记录"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
domain = data.get('domain')
record_id = data.get('record_id')
host = data.get('host')
record_type = data.get('record_type')
value = data.get('value')
line = data.get('line', '')
if not domain:
return {"status": "error", "message": "缺少域名参数"}
# 如果没有提供record_id则必须提供host和record_type
if not record_id and (not host or not record_type):
return {"status": "error", "message": "当未提供记录ID时主机头和解析类型为必填项"}
# 验证记录类型
if record_type:
valid_types = ['A', 'AAAA', 'CNAME', 'MX', 'NS', 'TXT', 'SRV']
if record_type not in valid_types:
return {"status": "error", "message": f"不支持的记录类型: {record_type},支持的类型: {', '.join(valid_types)}"}
response = client.delete_dns_record(domain, record_id, host, record_type, value, line)
if response.get("status") == "error":
return response
try:
# 检查响应格式
if response.get("result") != 200:
error_msg = response.get('msg', response.get('message', '未知错误'))
return {"status": "error", "message": f"删除DNS记录失败: {error_msg}"}
# 同步删除本地数据库中的对应记录
if record_id:
try:
dns_records = jingrow.get_all(
"Dns Resolution",
{"record_id": record_id},
["name"]
)
if dns_records:
dns_record = dns_records[0]
# 立即执行删除操作
jingrow.delete_pg("Dns Resolution", dns_record.name, ignore_permissions=True)
jingrow.db.commit()
except Exception as e:
jingrow.log_error(f"域名 {domain} DNS记录同步删除失败", f"错误: {str(e)}")
# 返回成功结果
return {
"status": "success",
"message": "DNS记录删除成功"
}
except Exception as e:
return {"status": "error", "message": "删除DNS记录响应解析失败"}
@jingrow.whitelist()
def west_domain_delete_dns_records(**data):
"""批量删除DNS记录"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
domain = data.get('domain')
record_ids = data.get('record_ids')
if not domain:
return {"status": "error", "message": "缺少域名参数"}
if not record_ids:
return {"status": "error", "message": "缺少记录ID列表"}
# 处理record_ids参数支持字符串和列表格式
if isinstance(record_ids, str):
# 如果是逗号分隔的字符串,转换为列表
record_ids = [rid.strip() for rid in record_ids.split(',') if rid.strip()]
elif not isinstance(record_ids, list):
return {"status": "error", "message": "记录ID列表格式错误"}
if not record_ids:
return {"status": "error", "message": "记录ID列表不能为空"}
# 验证记录ID格式
for record_id in record_ids:
if not record_id or not isinstance(record_id, str):
return {"status": "error", "message": "记录ID格式错误"}
# 批量删除逻辑
results = []
success_count = 0
error_count = 0
for record_id in record_ids:
try:
result = client.delete_dns_record(domain, record_id)
if result.get("status") == "error" or result.get("result") != 200:
error_count += 1
error_msg = result.get('msg', result.get('message', '删除失败'))
results.append({
"record_id": record_id,
"status": "error",
"message": error_msg
})
else:
success_count += 1
results.append({
"record_id": record_id,
"status": "success"
})
# 同步删除本地数据库中的对应记录
try:
dns_records = jingrow.get_all(
"Dns Resolution",
{"record_id": record_id},
["name"]
)
if dns_records:
dns_record = dns_records[0]
# 立即执行删除操作
jingrow.delete_pg("Dns Resolution", dns_record.name, ignore_permissions=True)
jingrow.db.commit()
except Exception as e:
jingrow.log_error(f"域名 {domain} DNS记录同步删除失败", f"记录ID: {record_id}, 错误: {str(e)}")
except Exception as e:
error_count += 1
results.append({
"record_id": record_id,
"status": "error",
"message": str(e)
})
# 返回批量删除结果
if error_count == 0:
return {
"status": "success",
"message": f"批量删除成功,共删除 {success_count} 条记录"
}
elif success_count == 0:
return {
"status": "error",
"message": f"批量删除失败,共 {error_count} 条记录删除失败"
}
else:
return {
"status": "partial_success",
"message": f"批量删除部分成功,成功 {success_count} 条,失败 {error_count}"
}
@jingrow.whitelist()
def west_domain_transfer(**data):
"""域名转入"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
domain = data.get('domain')
auth_code = data.get('auth_code')
if not domain:
return {"status": "error", "message": "缺少域名参数"}
if not auth_code:
return {"status": "error", "message": "缺少转移授权码参数"}
return client.transfer_domain(domain, auth_code)
@jingrow.whitelist()
def west_domain_lock(**data):
"""锁定域名"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
domain = data.get('domain')
lock = data.get('lock', True)
if not domain:
return {"status": "error", "message": "缺少域名参数"}
return client.lock_domain(domain, lock)
@jingrow.whitelist()
def west_domain_get_templates(**data):
"""获取域名模板列表"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
limit = data.get('limit', 10)
page = data.get('page', 1)
return client.get_template_list(limit, page)
@jingrow.whitelist()
def get_west_template_detail(**data):
"""获取指定模板详情"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
template_id = data.get('template_id')
if not template_id:
return {"status": "error", "message": "缺少模板ID参数"}
return client.get_template_detail(template_id)
@jingrow.whitelist()
def create_domain_order(domain, period=1, payment_method='balance', domain_owner=None):
"""创建域名注册订单"""
try:
# 获取当前用户团队
team = get_current_team(True)
# 验证域名格式
if not domain or '.' not in domain:
return {"success": False, "message": "域名格式不正确"}
# 验证域名所有者
if not domain_owner:
return {"success": False, "message": "请选择域名所有者"}
# 检查域名所有者是否存在
owner_exists = jingrow.get_all(
"Domain Owner",
{"name": domain_owner, "team": team.name},
["name"]
)
if not owner_exists:
return {"success": False, "message": "域名所有者不存在"}
# 查询域名价格
client = get_west_client()
if not client:
return {"success": False, "message": "API客户端初始化失败"}
# 获取域名价格 - 使用统一的 get_west_domain_price 函数
price_result = get_west_domain_price(domain, 1)
if price_result.get("status") == "error":
return {"success": False, "message": "获取域名价格失败"}
# 计算总价格 - 使用与前端一致的价格字段
yearly_price = price_result.get("data", {}).get("price", 0) # 使用前端一致的价格字段
total_amount = yearly_price * period
# 生成订单号
order_id = f"{datetime.now().strftime('%Y%m%d%H%M%S%f')[:-3] + ''.join(random.choices('0123456789', k=6))}"
# 获取域名所有者的c_sysid
domain_owner_pg = jingrow.get_pg("Domain Owner", domain_owner)
if not domain_owner_pg:
return {"success": False, "message": "域名所有者不存在"}
c_sysid = domain_owner_pg.c_sysid
if not c_sysid:
return {"success": False, "message": "域名所有者缺少系统ID请重新创建域名所有者"}
# 构建业务参数
biz_params = {
"domain": domain,
"period": period,
"domain_owner": domain_owner,
"yearly_price": yearly_price,
"auto_renew": False,
# 注册域名所需参数
"regyear": period,
"dns_host1": "ns1.myhostadmin.net",
"dns_host2": "ns2.myhostadmin.net",
"c_sysid": c_sysid,
"client_price": None
}
# 创建订单记录
order = jingrow.get_pg({
"pagetype": "Order",
"order_id": order_id,
"order_type": "域名注册",
"team": team.name,
"status": "待支付",
"total_amount": total_amount,
"title": domain,
"description": f"{period}",
"biz_params": json.dumps(biz_params, ensure_ascii=False)
})
order.insert(ignore_permissions=True)
jingrow.db.commit()
return {
"success": True,
"message": "订单创建成功",
"order": order.as_dict()
}
except Exception as e:
return {"success": False, "message": f"创建订单失败: {str(e)}"}
@jingrow.whitelist()
def create_domain_renew_order(**kwargs):
"""创建域名续费订单"""
try:
domain = kwargs.get('domain')
renewal_years = kwargs.get('renewal_years', 1)
if not domain:
jingrow.throw("缺少域名信息")
# 验证输入
domain_pg = jingrow.get_pg("Jsite Domain", domain)
if not domain_pg:
jingrow.throw("域名不存在")
team = domain_pg.team
# 验证当前用户权限
current_team = get_current_team(True)
if current_team.name != team:
jingrow.throw("您没有权限为此域名创建续费订单")
# 计算续费金额 - 使用现有的价格获取函数保持统一
renewal_years = int(renewal_years)
# 调用现有的价格获取函数
price_data = get_west_domain_renew_price(domain_pg.domain, 1)
if price_data.get("data", {}).get("price"):
yearly_price = price_data["data"]["price"]
else:
yearly_price = domain_pg.price or 0
total_amount = yearly_price * renewal_years
# 生成唯一订单号
order_id = f"{datetime.now().strftime('%Y%m%d%H%M%S%f')[:-3] + ''.join(random.choices('0123456789', k=6))}"
# 构建业务参数
biz_params = {
"domain": domain_pg.domain,
"domain_name": domain,
"renewal_years": renewal_years,
"yearly_price": yearly_price,
"domain_owner": domain_pg.domain_owner,
# 续费域名所需参数
"year": renewal_years,
"expire_date": format_date(domain_pg.end_date),
"client_price": None
}
# 创建订单记录
order = jingrow.get_pg({
"pagetype": "Order",
"order_id": order_id,
"order_type": "域名续费",
"team": team,
"status": "待支付",
"total_amount": total_amount,
"title": domain_pg.domain,
"description": str(renewal_years), # 存储续费年数
"biz_params": json.dumps(biz_params, ensure_ascii=False)
})
order.insert(ignore_permissions=True)
jingrow.db.commit()
return {
"success": True,
"order": order.as_dict()
}
except Exception as e:
return {
"success": False,
"message": f"创建续费订单失败: {str(e)}"
}
def register_domain_from_order(order_name):
"""支付成功后异步注册域名"""
try:
order = jingrow.get_pg("Order", order_name)
if not order:
raise Exception("订单不存在")
# 从biz_params中读取业务参数
biz_params = json.loads(order.biz_params) if order.biz_params else {}
domain_name = biz_params.get("domain")
period = biz_params.get("period", 1)
domain_owner = biz_params.get("domain_owner")
if not domain_name:
raise Exception("订单中缺少域名信息")
# 调用西部数码API注册域名
client = get_west_client()
if not client:
raise Exception("API客户端初始化失败")
result = client.register_domain(
domain=domain_name,
regyear=biz_params.get("regyear", period),
dns_host1=biz_params.get("dns_host1", "ns1.myhostadmin.net"),
dns_host2=biz_params.get("dns_host2", "ns2.myhostadmin.net"),
c_sysid=biz_params.get("c_sysid"),
client_price=biz_params.get("client_price")
)
if not result or result.get('result') != 200:
error_msg = result.get('msg', result.get('message', '未知错误'))
raise Exception(f"域名注册失败: {error_msg}")
# 创建域名记录
domain_pg = jingrow.get_pg({
"pagetype": "Jsite Domain",
"domain": domain_name,
"team": order.team,
"order_id": order.order_id,
"period": period,
"domain_owner": domain_owner,
"registration_date": jingrow.utils.nowdate(),
"end_date": jingrow.utils.add_months(jingrow.utils.nowdate(), period * 12)
})
domain_pg.insert(ignore_permissions=True)
# 更新订单状态
order.status = "交易成功"
order.save(ignore_permissions=True)
jingrow.db.commit()
# 异步执行域名信息同步
try:
jingrow.enqueue(
"jcloud.api.domain_west.sync_domain_info_from_west",
domain=domain_name,
queue="default",
timeout=300
)
except Exception as e:
jingrow.log_error(f"域名 {domain_name} 异步同步任务创建失败", f"错误: {str(e)}")
return True
except Exception as e:
raise e
def renew_domain_from_order(order_name):
"""支付成功后异步续费域名"""
try:
order = jingrow.get_pg("Order", order_name)
if not order:
raise Exception("订单不存在")
# 从biz_params中读取业务参数
biz_params = json.loads(order.biz_params) if order.biz_params else {}
domain_name = biz_params.get("domain")
renewal_years = biz_params.get("renewal_years", 1)
domain_record_name = biz_params.get("domain_name") # 域名记录的名称
if not domain_name:
raise Exception("订单中缺少域名信息")
# 查找域名记录
domain = jingrow.get_pg("Jsite Domain", domain_record_name)
if not domain:
raise Exception("找不到对应的域名记录")
# 调用西部数码API续费域名
client = get_west_client()
if not client:
raise Exception("API客户端初始化失败")
# 验证域名到期时间
if not biz_params.get("expire_date"):
raise Exception("无法获取域名到期时间")
result = client.renew_domain(
domain_name,
biz_params.get("year", renewal_years),
biz_params.get("expire_date"),
biz_params.get("client_price")
)
if not result or result.get('result') != 200:
error_msg = result.get('msg', result.get('message', '未知错误'))
raise Exception(f"域名续费失败: {error_msg}")
# 更新域名到期时间
domain.end_date = jingrow.utils.add_months(domain.end_date or jingrow.utils.nowdate(), renewal_years * 12)
domain.save(ignore_permissions=True)
# 更新订单状态
order.status = "交易成功"
order.save(ignore_permissions=True)
jingrow.db.commit()
return True
except Exception as e:
raise e
@jingrow.whitelist()
def toggle_domain_auto_renew(pagetype, name, auto_renew):
"""切换域名自动续费状态"""
try:
# 获取当前用户团队
team = get_current_team(True)
# 获取域名记录
domain = jingrow.get_pg(pagetype, name)
if not domain:
return {"success": False, "message": "找不到域名记录"}
# 验证权限
if domain.team != team.name:
return {"success": False, "message": "您没有权限操作此域名"}
# 更新自动续费状态
domain.auto_renew = bool(auto_renew)
domain.save(ignore_permissions=True)
return {"success": True, "message": "自动续费状态更新成功"}
except Exception as e:
return {"success": False, "message": f"操作失败: {str(e)}"}
@jingrow.whitelist()
def toggle_domain_whois_protection(pagetype, name, whois_protection):
"""切换域名隐私保护状态"""
try:
# 获取当前用户团队
team = get_current_team(True)
# 获取域名记录
domain = jingrow.get_pg(pagetype, name)
if not domain:
return {"success": False, "message": "找不到域名记录"}
# 验证权限
if domain.team != team.name:
return {"success": False, "message": "您没有权限操作此域名"}
# 更新隐私保护状态
domain.whois_protection = bool(whois_protection)
domain.save(ignore_permissions=True)
return {"success": True, "message": "隐私保护状态更新成功"}
except Exception as e:
return {"success": False, "message": f"操作失败: {str(e)}"}
@jingrow.whitelist()
def delete_domain(pagetype, name):
"""删除域名记录"""
try:
# 获取当前用户团队
team = get_current_team(True)
# 获取域名记录
domain = jingrow.get_pg(pagetype, name)
if not domain:
return {"success": False, "message": "找不到域名记录"}
# 验证权限
if domain.team != team.name:
return {"success": False, "message": "您没有权限操作此域名"}
# 删除域名记录
domain.delete(ignore_permissions=True)
return {"success": True, "message": "域名记录删除成功"}
except Exception as e:
return {"success": False, "message": f"删除失败: {str(e)}"}
@jingrow.whitelist()
def get_domain_owners(**data):
"""获取当前团队的域名所有者列表(支持分页、搜索和过滤)"""
try:
team = get_current_team()
if not team:
return {"status": "Error", "message": "未找到当前团队"}
# 获取分页参数
limit = data.get('limit', 20)
pageno = data.get('pageno', 1)
search_query = data.get('searchQuery', '').strip()
selected_status = data.get('selectedStatus', '')
# 确保参数类型正确
try:
limit = int(limit)
pageno = int(pageno)
except (ValueError, TypeError):
limit = 20
pageno = 1
# 确保参数在合理范围内
limit = max(1, min(limit, 100)) # 限制每页最多100条
pageno = max(1, pageno)
# 获取当前团队的所有域名所有者
domain_owners = jingrow.get_all(
"Domain Owner",
{"team": team},
["name", "title", "fullname", "c_regtype", "c_org_m", "c_ln_m", "c_fn_m",
"c_em", "c_ph", "c_st_m", "c_ct_m", "c_adr_m", "c_pc", "r_status",
"c_idtype_gswl", "c_idnum_gswl", "c_sysid"]
)
# 搜索过滤
if search_query:
search_query_lower = search_query.lower()
filtered_owners = []
for owner in domain_owners:
# 检查显示名称
display_name = ""
if owner.c_regtype == 'I':
if owner.fullname:
display_name = owner.fullname
else:
display_name = (owner.c_ln_m or '') + (owner.c_fn_m or '')
elif owner.c_regtype == 'E':
display_name = owner.c_org_m or owner.title or owner.name
else:
display_name = owner.title or owner.name
# 检查是否匹配搜索条件
if (search_query_lower in display_name.lower() or
search_query_lower in (owner.title or '').lower() or
search_query_lower in (owner.c_em or '').lower()):
filtered_owners.append(owner)
domain_owners = filtered_owners
# 状态过滤
if selected_status:
filtered_owners = []
for owner in domain_owners:
r_status = owner.r_status
is_verified = r_status == '1'
if str('1' if is_verified else '0') == selected_status:
filtered_owners.append(owner)
domain_owners = filtered_owners
# 计算分页信息
total = len(domain_owners)
pagecount = (total + limit - 1) // limit
start_index = (pageno - 1) * limit
end_index = start_index + limit
# 返回当前页的记录
current_page_items = domain_owners[start_index:end_index]
return {
"status": "Success",
"data": {
"pageno": pageno,
"limit": limit,
"total": total,
"pagecount": pagecount,
"items": current_page_items
}
}
except Exception as e:
return {"status": "Error", "message": f"获取域名所有者列表失败: {str(e)}"}
@jingrow.whitelist()
def get_domain_owner(name):
"""获取单个域名所有者信息"""
try:
if not name:
return {"status": "Error", "message": "域名所有者名称不能为空"}
# 获取指定的域名所有者
domain_owner = jingrow.get_pg("Domain Owner", name)
if not domain_owner:
return {"status": "Error", "message": "未找到指定的域名所有者"}
# 检查权限(只能查看当前团队的所有者)
team = get_current_team()
if not team or domain_owner.team != team:
return {"status": "Error", "message": "无权访问该域名所有者信息"}
# 返回所有者信息
owner_data = {
"name": domain_owner.name,
"r_status": domain_owner.r_status,
"title": domain_owner.title,
"fullname": domain_owner.fullname,
"c_regtype": domain_owner.c_regtype,
"c_org_m": domain_owner.c_org_m,
"c_ln_m": domain_owner.c_ln_m,
"c_fn_m": domain_owner.c_fn_m,
"c_em": domain_owner.c_em,
"c_ph": domain_owner.c_ph,
"c_st_m": domain_owner.c_st_m,
"c_ct_m": domain_owner.c_ct_m,
"c_adr_m": domain_owner.c_adr_m,
"c_pc": domain_owner.c_pc,
"c_idtype_gswl": domain_owner.c_idtype_gswl,
"c_idnum_gswl": domain_owner.c_idnum_gswl
}
return {
"status": "Success",
"data": owner_data
}
except Exception as e:
return {"status": "Error", "message": f"获取域名所有者信息失败: {str(e)}"}
@jingrow.whitelist()
def create_domain_template(**data):
"""创建域名模板"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
# 验证必填字段
required_fields = ['c_regtype', 'c_ln_m', 'c_fn_m', 'c_st_m', 'c_ct_m', 'c_adr_m', 'c_pc', 'c_em', 'c_idtype_gswl', 'c_idnum_gswl']
for field in required_fields:
if not data.get(field):
return {"status": "error", "message": f"字段 {field} 是必填项"}
# 验证电话号码字段根据c_ph_type判断
c_ph_type = data.get('c_ph_type', '0') # 默认为手机
if c_ph_type == '0': # 手机
if not data.get('c_ph'):
return {"status": "error", "message": "手机号码 c_ph 是必填项"}
elif c_ph_type == '1': # 座机
if not data.get('c_ph_code'):
return {"status": "error", "message": "座机区号 c_ph_code 是必填项"}
if not data.get('c_ph_num'):
return {"status": "error", "message": "座机号码 c_ph_num 是必填项"}
# 验证字段长度
if len(data.get('c_ln_m', '')) < 1 or len(data.get('c_ln_m', '')) > 16:
return {"status": "error", "message": "姓(中文)长度必须为116位"}
if len(data.get('c_fn_m', '')) < 1 or len(data.get('c_fn_m', '')) > 64:
return {"status": "error", "message": "名(中文)长度必须为164位"}
if len(data.get('c_st_m', '')) < 2 or len(data.get('c_st_m', '')) > 10:
return {"status": "error", "message": "省份(中文)长度必须为210位"}
# 验证完整姓名长度
fullname = data.get('c_ln_m', '') + data.get('c_fn_m', '')
if len(fullname) < 2 or len(fullname) > 64:
return {"status": "error", "message": "完整姓名(中文)长度必须为264位"}
# 生成英文姓名
if data.get('c_ln_m') and data.get('c_fn_m'):
last_name_pinyin = ' '.join(lazy_pinyin(data['c_ln_m']))
first_name_pinyin = ' '.join(lazy_pinyin(data['c_fn_m']))
data['c_ln'] = last_name_pinyin.title()
data['c_fn'] = first_name_pinyin.title()
else:
# 设置默认的英文姓名
data['c_ln'] = data.get('c_ln', '')
data['c_fn'] = data.get('c_fn', '')
# 生成英文地址
if data.get('c_adr_m'):
address_pinyin = ' '.join(lazy_pinyin(data['c_adr_m']))
data['c_adr'] = address_pinyin.title()
else:
data['c_adr'] = data.get('c_adr', '')
# 生成英文省份和城市
if data.get('c_st_m'):
state_pinyin = ' '.join(lazy_pinyin(data['c_st_m']))
data['c_st'] = state_pinyin.title()
else:
data['c_st'] = data.get('c_st', '')
if data.get('c_ct_m'):
city_pinyin = ' '.join(lazy_pinyin(data['c_ct_m']))
data['c_ct'] = city_pinyin.title()
else:
data['c_ct'] = data.get('c_ct', '')
# 生成英文单位名称
if data.get('c_org_m'):
org_pinyin = ' '.join(lazy_pinyin(data['c_org_m']))
data['c_org'] = org_pinyin.title()
else:
data['c_org'] = data.get('c_org', '')
# 设置默认值
template_data = {
'c_regtype': data['c_regtype'],
'c_ln_m': data['c_ln_m'],
'c_fn_m': data['c_fn_m'],
'c_co': data.get('c_co', 'CN'),
'cocode': data.get('cocode', '+86'),
'c_st_m': data['c_st_m'],
'c_ct_m': data['c_ct_m'],
'c_dt_m': data.get('c_dt_m', ''),
'c_adr_m': data['c_adr_m'],
'c_pc': data['c_pc'],
'c_ph_type': data.get('c_ph_type', '0'),
'c_em': data['c_em'],
'c_ln': data.get('c_ln', ''), # 使用生成的英文姓,如果没有则为空
'c_fn': data.get('c_fn', ''), # 使用生成的英文名,如果没有则为空
'c_st': data.get('c_st', ''), # 使用生成的英文省份,如果没有则为空
'c_ct': data.get('c_ct', ''), # 使用生成的英文城市,如果没有则为空
'c_adr': data.get('c_adr', ''), # 使用生成的英文地址,如果没有则为空
'c_idtype_gswl': data['c_idtype_gswl'],
'c_idnum_gswl': data['c_idnum_gswl'],
'fullname': data.get('c_ln_m', '') + data.get('c_fn_m', '') # 完整姓名
}
# 个人类型时不传入c_org_m参数
if data['c_regtype'] == 'E' and data.get('c_org_m'):
template_data['c_org_m'] = data['c_org_m']
template_data['c_org'] = data.get('c_org', '') # 英文单位名称
# 验证企业类型必须填写单位名称
if data['c_regtype'] == 'E' and not data.get('c_org_m'):
return {"status": "error", "message": "企业类型必须填写单位名称"}
# 添加区县信息
if data.get('c_dt_m'):
template_data['c_dt_m'] = data['c_dt_m']
# 添加香港域名相关字段
if data.get('c_idtype_hk'):
template_data['c_idtype_hk'] = data['c_idtype_hk']
if data.get('c_idnum_hk'):
template_data['c_idnum_hk'] = data['c_idnum_hk']
# 根据c_ph_type添加电话相关字段
if data.get('c_ph_type') == '1': # 座机
if data.get('c_ph_code'):
template_data['c_ph_code'] = data['c_ph_code']
if data.get('c_ph_num'):
template_data['c_ph_num'] = data['c_ph_num']
# 构建完整的电话号码格式
if data.get('c_ph_code') and data.get('c_ph_num'):
template_data['c_ph_all'] = f"{data.get('cocode', '+86')}.{data['c_ph_code']}-{data['c_ph_num']}"
else: # 手机
if data.get('c_ph'):
template_data['c_ph'] = data['c_ph']
try:
result = client.create_contact_template(template_data)
if result.get('result') == 200:
c_sysid = result.get('data', {}).get('c_sysid')
return {
"status": "success",
"message": "域名模板创建成功",
"data": {
"c_sysid": c_sysid
}
}
else:
error_msg = result.get('msg', result.get('message', '未知错误'))
return {"status": "error", "message": f"创建域名模板失败: {error_msg}"}
except Exception as e:
return {"status": "error", "message": f"创建域名模板失败: {str(e)}"}
@jingrow.whitelist()
def create_domain_owner(**data):
"""创建新的域名所有者"""
try:
team = get_current_team()
if not team:
return {"status": "Error", "message": "未找到当前团队"}
# 验证基本必填字段
basic_required_fields = ['c_regtype', 'c_ln_m', 'c_fn_m', 'c_st_m', 'c_ct_m', 'c_adr_m', 'c_pc', 'c_em']
for field in basic_required_fields:
if not data.get(field):
return {"status": "Error", "message": f"字段 {field} 是必填项"}
# 验证电话号码字段根据c_ph_type判断
c_ph_type = data.get('c_ph_type', '0') # 默认为手机
if c_ph_type == '0': # 手机
if not data.get('c_ph'):
return {"status": "Error", "message": "手机号码 c_ph 是必填项"}
elif c_ph_type == '1': # 座机
if not data.get('c_ph_code'):
return {"status": "Error", "message": "座机区号 c_ph_code 是必填项"}
if not data.get('c_ph_num'):
return {"status": "Error", "message": "座机号码 c_ph_num 是必填项"}
# 生成英文姓名
if data.get('c_ln_m') and data.get('c_fn_m'):
last_name_pinyin = ' '.join(lazy_pinyin(data['c_ln_m']))
first_name_pinyin = ' '.join(lazy_pinyin(data['c_fn_m']))
data['c_ln'] = last_name_pinyin.title()
data['c_fn'] = first_name_pinyin.title()
else:
# 设置默认的英文姓名
data['c_ln'] = data.get('c_ln', '')
data['c_fn'] = data.get('c_fn', '')
# 生成英文地址
if data.get('c_adr_m'):
address_pinyin = ' '.join(lazy_pinyin(data['c_adr_m']))
data['c_adr'] = address_pinyin.title()
else:
data['c_adr'] = data.get('c_adr', '')
# 生成英文省份和城市
if data.get('c_st_m'):
state_pinyin = ' '.join(lazy_pinyin(data['c_st_m']))
data['c_st'] = state_pinyin.title()
else:
data['c_st'] = data.get('c_st', '')
if data.get('c_ct_m'):
city_pinyin = ' '.join(lazy_pinyin(data['c_ct_m']))
data['c_ct'] = city_pinyin.title()
else:
data['c_ct'] = data.get('c_ct', '')
# 生成英文单位名称
if data.get('c_org_m'):
org_pinyin = ' '.join(lazy_pinyin(data['c_org_m']))
data['c_org'] = org_pinyin.title()
else:
data['c_org'] = data.get('c_org', '')
# 设置默认值
data['team'] = team
data['c_co'] = data.get('c_co', 'CN') # 中国
data['cocode'] = data.get('cocode', '+86')
data['c_ph_type'] = data.get('c_ph_type', '0') # 手机
data['reg_contact_type'] = data.get('reg_contact_type', 'cg') # 常规
# 确保c_sysid字段存在如果提供
if data.get('c_sysid'):
data['c_sysid'] = data['c_sysid']
# 生成完整姓名和标题
data['fullname'] = data.get('c_ln_m', '') + data.get('c_fn_m', '')
if data['c_regtype'] == 'I': # 个人
data['title'] = f"{data['c_ln_m']}{data['c_fn_m']}"
else: # 企业
data['title'] = f"{data['c_org_m']}"
# 创建域名所有者记录
domain_owner = jingrow.get_pg({
"pagetype": "Domain Owner",
**data
})
domain_owner.insert(ignore_permissions=True)
return {
"status": "Success",
"message": "域名所有者创建成功",
"data": {
"name": domain_owner.name,
"title": domain_owner.title
}
}
except Exception as e:
return {"status": "Error", "message": f"创建域名所有者失败: {str(e)}"}
@jingrow.whitelist()
def create_domain_owner_with_template(**data):
"""创建域名所有者(包含模板创建)"""
try:
# 第一步:创建域名模板
template_result = create_domain_template(**data)
if template_result.get("status") != "success":
return template_result
# 获取模板ID
c_sysid = template_result.get("data", {}).get("c_sysid")
if not c_sysid:
return {"status": "Error", "message": "创建域名模板成功但未返回模板ID"}
# 第二步:创建域名所有者记录
data['c_sysid'] = c_sysid
owner_result = create_domain_owner(**data)
if owner_result.get("status") != "Success":
return owner_result
# 成功完成
result = {
"status": "Success",
"message": "域名所有者创建成功",
"data": {
"c_sysid": c_sysid,
"owner_name": owner_result.get("data", {}).get("name"),
"owner_title": owner_result.get("data", {}).get("title")
}
}
return result
except Exception as e:
return {"status": "Error", "message": f"创建域名所有者失败: {str(e)}"}
@jingrow.whitelist()
def get_west_domain_real_info(**data):
"""获取域名实名信息"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
domain = data.get('domain')
if not domain:
return {"status": "error", "message": "缺少域名参数"}
response = client.get_domain_real_info(domain)
if response.get("status") == "error":
return response
try:
# 检查响应格式
if response.get("result") != 200:
return {"status": "error", "message": "API查询失败"}
data = response.get("data", {})
# 返回格式化的实名信息
return {
"status": "success",
"data": {
"domain": data.get("domain"),
"c_sysid": data.get("c_sysid"),
"c_regtype": data.get("c_regtype"),
"c_status": data.get("c_status"),
"c_failinfo": data.get("c_failinfo"),
"status": data.get("status"),
"regdate": data.get("regdate"),
"rexpiredate": data.get("rexpiredate"),
"dns_hosts": {
"dns_host1": data.get("dns_host1"),
"dns_host2": data.get("dns_host2"),
"dns_host3": data.get("dns_host3"),
"dns_host4": data.get("dns_host4"),
"dns_host5": data.get("dns_host5"),
"dns_host6": data.get("dns_host6")
},
"owner": {
"dom_ln": data.get("dom_ln"),
"dom_fn": data.get("dom_fn"),
"dom_ln_m": data.get("dom_ln_m"),
"dom_fn_m": data.get("dom_fn_m"),
"dom_org": data.get("dom_org"),
"dom_org_m": data.get("dom_org_m"),
"dom_co": data.get("dom_co"),
"dom_st": data.get("dom_st"),
"dom_st_m": data.get("dom_st_m"),
"dom_ct": data.get("dom_ct"),
"dom_ct_m": data.get("dom_ct_m"),
"dom_adr1": data.get("dom_adr1"),
"dom_adr_m": data.get("dom_adr_m"),
"dom_pc": data.get("dom_pc"),
"dom_ph": data.get("dom_ph"),
"dom_fax": data.get("dom_fax"),
"dom_em": data.get("dom_em")
},
"admin": {
"admi_ln": data.get("admi_ln"),
"admi_fn": data.get("admi_fn"),
"admi_ln_m": data.get("admi_ln_m"),
"admi_fn_m": data.get("admi_fn_m"),
"admi_co": data.get("admi_co"),
"admi_st": data.get("admi_st"),
"admi_st_m": data.get("admi_st_m"),
"admi_ct": data.get("admi_ct"),
"admi_ct_m": data.get("admi_ct_m"),
"admi_adr1": data.get("admi_adr1"),
"admi_adr_m": data.get("admi_adr_m"),
"admi_pc": data.get("admi_pc"),
"admi_ph": data.get("admi_ph"),
"admi_fax": data.get("admi_fax"),
"admi_em": data.get("admi_em")
},
"tech": {
"tech_ln": data.get("tech_ln"),
"tech_fn": data.get("tech_fn"),
"tech_ln_m": data.get("tech_ln_m"),
"tech_fn_m": data.get("tech_fn_m"),
"tech_co": data.get("tech_co"),
"tech_st": data.get("tech_st"),
"tech_st_m": data.get("tech_st_m"),
"tech_ct": data.get("tech_ct"),
"tech_ct_m": data.get("tech_ct_m"),
"tech_adr1": data.get("tech_adr1"),
"tech_adr_m": data.get("tech_adr_m"),
"tech_pc": data.get("tech_pc"),
"tech_ph": data.get("tech_ph"),
"tech_fax": data.get("tech_fax"),
"tech_em": data.get("tech_em")
},
"billing": {
"bill_ln": data.get("bill_ln"),
"bill_fn": data.get("bill_fn"),
"bill_ln_m": data.get("bill_ln_m"),
"bill_fn_m": data.get("bill_fn_m"),
"bill_co": data.get("bill_co"),
"bill_st": data.get("bill_st"),
"bill_st_m": data.get("bill_st_m"),
"bill_ct": data.get("bill_ct"),
"bill_ct_m": data.get("bill_ct_m"),
"bill_adr1": data.get("bill_adr1"),
"bill_adr_m": data.get("bill_adr_m"),
"bill_pc": data.get("bill_pc"),
"bill_ph": data.get("bill_ph"),
"bill_fax": data.get("bill_fax"),
"bill_em": data.get("bill_em")
},
"real_name_status": {
"r_status": data.get("r_status"),
"r_failinfo": data.get("r_failinfo"),
"c_ispublic": data.get("c_ispublic")
},
"orgfile": data.get("orgfile", {})
}
}
except Exception as e:
return {"status": "error", "message": "域名实名信息查询响应解析失败"}
@jingrow.whitelist()
def get_west_upload_token(**data):
"""获取西部数码实名上传token"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
c_sysid = data.get('c_sysid')
f_type_org = data.get('f_type_org')
f_code_org = data.get('f_code_org')
f_type_lxr = data.get('f_type_lxr')
f_code_lxr = data.get('f_code_lxr')
if not all([c_sysid, f_type_org, f_code_org]):
return {"status": "error", "message": "缺少必要参数c_sysid, f_type_org, f_code_org"}
response = client.get_upload_token(c_sysid, f_type_org, f_code_org, f_type_lxr, f_code_lxr)
if response.get("status") == "error":
return response
try:
# 检查响应格式
if response.get("result") != 200:
return {"status": "error", "message": "API查询失败"}
return {
"status": "success",
"data": {
"token": response.get("data"),
"clientid": response.get("clientid")
}
}
except Exception as e:
return {"status": "error", "message": "获取实名上传token响应解析失败"}
@jingrow.whitelist()
def west_upload_real_name_files(**data):
"""西部数码模板实名资料上传"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
token = data.get('token')
file_org = data.get('file_org')
file_lxr = data.get('file_lxr')
if not all([token, file_org]):
return {"status": "error", "message": "缺少必要参数token, file_org"}
response = client.upload_real_name_files(token, file_org, file_lxr)
if response.get("status") == "error":
return response
try:
# 解析响应格式
d = response.get("d", {})
result = d.get("Result")
msg = d.get("Msg", "")
if result == 200:
return {
"status": "success",
"message": "实名资料上传成功",
"data": {
"result": result,
"msg": msg
}
}
else:
return {
"status": "error",
"message": f"实名资料上传失败: {msg}"
}
except Exception as e:
return {"status": "error", "message": "实名资料上传响应解析失败"}
@jingrow.whitelist()
def west_domain_modify_dns_record(**data):
"""修改DNS记录 - 只修改可修改的字段值、TTL、优先级、线路"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
domain = data.get('domain')
value = data.get('value')
ttl = data.get('ttl', 600)
level = data.get('level', 10)
record_id = data.get('record_id')
line = data.get('line', '')
# 验证TTL值
if ttl < 60 or ttl > 86400:
return {"status": "error", "message": "TTL值必须在60~86400秒之间"}
# 验证优先级
if level < 1 or level > 100:
return {"status": "error", "message": "优先级必须在1~100之间"}
# 只传递可修改的字段给底层API
response = client.modify_dns_record(
domain, value, ttl, level, record_id, line
)
if response.get("status") == "error":
return response
try:
# 检查响应格式
if response.get("result") != 200:
error_msg = response.get('msg', response.get('message', '未知错误'))
return {"status": "error", "message": f"修改DNS记录失败: {error_msg}"}
# 同步更新本地数据库中的对应记录
if record_id:
try:
# 查找对应的DNS记录
dns_records = jingrow.get_all(
"Dns Resolution",
{"record_id": record_id},
["name"]
)
if dns_records:
dns_record = dns_records[0]
# 立即执行更新操作
jingrow.set_value("Dns Resolution", dns_record.name, {
"value": value,
"ttl": str(ttl),
"level": str(level),
"line": line
})
jingrow.db.commit()
except Exception as e:
jingrow.log_error(f"域名 {domain} DNS记录同步更新失败", f"错误: {str(e)}")
# 返回成功结果
return {
"status": "success",
"message": "DNS记录修改成功",
"data": {
"domain": domain,
"value": value,
"ttl": ttl,
"level": level,
"record_id": record_id,
"line": line
}
}
except Exception as e:
return {"status": "error", "message": "修改DNS记录响应解析失败"}
@jingrow.whitelist()
def test_dns_record_management():
"""测试DNS记录管理功能"""
try:
# 测试参数
test_domain = "test.com"
test_host = "www"
test_value = "127.0.0.1"
test_record_type = "A"
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
# 测试1: 添加DNS记录
add_result = client.add_dns_record(
domain=test_domain,
record_type=test_record_type,
host=test_host,
value=test_value,
ttl=900,
level=10,
line=""
)
# 测试2: 获取DNS记录列表
list_result = client.get_dns_records_paginated(test_domain, 20, 1)
# 测试3: 修改DNS记录如果添加成功
if add_result.get("result") == 200:
record_id = add_result.get("data", {}).get("id")
if record_id:
modify_result = client.modify_dns_record(
domain=test_domain,
value="127.0.0.2",
ttl=600,
level=10,
record_id=record_id,
line=""
)
# 测试4: 删除DNS记录
delete_result = client.delete_dns_record(
domain=test_domain,
record_id=record_id
)
return {
"status": "success",
"message": "DNS记录管理功能测试完成",
"data": {
"add_result": add_result,
"list_result": list_result
}
}
except Exception as e:
return {"status": "error", "message": f"DNS记录管理功能测试失败: {str(e)}"}
@jingrow.whitelist()
def west_domain_modify_dns_server(**data):
"""修改域名DNS服务器"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
domain = data.get('domain')
dns1 = data.get('dns1')
dns2 = data.get('dns2')
dns3 = data.get('dns3')
dns4 = data.get('dns4')
dns5 = data.get('dns5')
dns6 = data.get('dns6')
if not domain:
return {"status": "error", "message": "缺少域名参数"}
if not dns1:
return {"status": "error", "message": "缺少主DNS服务器参数"}
if not dns2:
return {"status": "error", "message": "缺少辅DNS服务器参数"}
# 验证DNS服务器格式
dns_servers = [dns1, dns2]
if dns3:
dns_servers.append(dns3)
if dns4:
dns_servers.append(dns4)
if dns5:
dns_servers.append(dns5)
if dns6:
dns_servers.append(dns6)
# 验证DNS服务器格式简单验证
for dns in dns_servers:
if dns and not ('.' in dns and len(dns) > 3):
return {"status": "error", "message": f"DNS服务器格式不正确: {dns}"}
response = client.modify_dns_server(
domain=domain,
dns1=dns1,
dns2=dns2,
dns3=dns3,
dns4=dns4,
dns5=dns5,
dns6=dns6
)
if response.get("status") == "error":
return response
try:
# 检查响应格式
if response.get("result") != 200:
error_msg = response.get('msg', response.get('message', '未知错误'))
return {"status": "error", "message": f"修改DNS服务器失败: {error_msg}"}
# 异步更新本地域名记录的DNS服务器字段
try:
domain_records = jingrow.get_all(
"Jsite Domain",
{"domain": domain},
["name"]
)
if domain_records:
jingrow.enqueue(
"jingrow.client.set_value",
pagetype="Jsite Domain",
name=domain_records[0].name,
fieldname={
"dns_host1": dns1,
"dns_host2": dns2,
"dns_host3": dns3,
"dns_host4": dns4,
"dns_host5": dns5,
"dns_host6": dns6
}
)
except Exception as e:
jingrow.log_error(f"域名 {domain} DNS服务器更新失败", f"错误: {str(e)}")
# 返回成功结果
return {
"status": "success",
"message": "DNS服务器修改成功",
"data": {
"domain": domain,
"dns_servers": {
"dns1": dns1,
"dns2": dns2,
"dns3": dns3,
"dns4": dns4,
"dns5": dns5,
"dns6": dns6
},
"clientid": response.get("clientid")
}
}
except Exception as e:
return {"status": "error", "message": "修改DNS服务器响应解析失败"}
@jingrow.whitelist()
def sync_domain_info_from_west(**data):
"""从西部数据同步域名信息"""
try:
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
domain = data.get('domain')
if not domain:
return {"status": "error", "message": "缺少域名参数"}
# 获取域名DNS记录 - 参考get_west_domain_dns_records使用分页API
dns_response = client.get_dns_records_paginated(domain, 1000, 1) # 获取足够多的记录
# 获取域名实名信息
real_info_response = client.get_domain_real_info(domain)
# 处理DNS记录 - 参考get_west_domain_dns_records的处理方式
dns_data = {}
if dns_response.get("result") == 200:
data = dns_response.get("data", {})
all_items = data.get("items", [])
# 对所有记录按类型排序
sorted_items = sorted(all_items, key=lambda x: x.get('type', ''))
dns_data = {
"items": sorted_items,
"total": len(sorted_items)
}
real_data = real_info_response.get("data", {}) if real_info_response.get("result") == 200 else {}
# 获取当前域名记录的团队信息
domain_pg = jingrow.get_pg("Jsite Domain", {"domain": domain})
if not domain_pg:
return {"status": "error", "message": "本地未找到该域名记录"}
# 从域名记录获取团队信息
team_name = domain_pg.team
if not team_name:
return {"status": "error", "message": "域名记录中未找到团队信息"}
team = jingrow.get_pg("Team", team_name)
if not team:
return {"status": "error", "message": "未找到团队信息"}
# 同步Domain Owner信息
domain_owner_name = None
if real_data and real_data.get("r_status") == 1:
# 从实名信息中提取c_sysid和所有者信息
c_sysid = real_data.get("c_sysid")
owner_info = real_data.get("owner", {})
if c_sysid:
# 如果owner_info为空使用real_data中的所有者字段
if not owner_info:
owner_info = real_data
if owner_info:
# 根据c_sysid查找是否已存在Domain Owner - 只按c_sysid查找不限制团队
existing_owners = jingrow.get_all(
"Domain Owner",
{"c_sysid": c_sysid},
["name", "team"]
)
if existing_owners:
# 更新已存在的Domain Owner
domain_owner_name = existing_owners[0].name
existing_team = existing_owners[0].team
try:
domain_owner_pg = jingrow.get_pg("Domain Owner", domain_owner_name)
if domain_owner_pg:
# 更新团队信息(如果不同或为空)
if not domain_owner_pg.team or domain_owner_pg.team != team.name:
domain_owner_pg.team = team.name
# 更新所有者信息
owner_name = owner_info.get("dom_org_m") or owner_info.get("dom_ln_m", "") + owner_info.get("dom_fn_m", "")
domain_owner_pg.fullname = owner_name
domain_owner_pg.c_status = real_data.get("c_status")
domain_owner_pg.r_status = real_data.get("r_status")
# 更新国家代码和电话代码
domain_owner_pg.c_co = owner_info.get("dom_co", "CN")
domain_owner_pg.cocode = owner_info.get("dom_ph", "").split('.')[0] if '.' in owner_info.get("dom_ph", "") else "+86" # 从手机号提取电话代码
# 根据类型更新不同字段
c_regtype = "E" if owner_info.get("dom_org_m") else "I"
domain_owner_pg.c_regtype = c_regtype
if c_regtype == "E": # 企业
domain_owner_pg.c_org_m = owner_info.get("dom_org_m", "")
domain_owner_pg.c_org = owner_info.get("dom_org", "")
domain_owner_pg.c_ln_m = owner_info.get("dom_ln_m", "")
domain_owner_pg.c_fn_m = owner_info.get("dom_fn_m", "")
domain_owner_pg.c_ln = owner_info.get("dom_ln", "")
domain_owner_pg.c_fn = owner_info.get("dom_fn", "")
domain_owner_pg.title = owner_info.get("dom_org_m", "")
else: # 个人
domain_owner_pg.c_ln_m = owner_info.get("dom_ln_m", "")
domain_owner_pg.c_fn_m = owner_info.get("dom_fn_m", "")
domain_owner_pg.c_ln = owner_info.get("dom_ln", "")
domain_owner_pg.c_fn = owner_info.get("dom_fn", "")
domain_owner_pg.title = owner_info.get("dom_ln_m", "") + owner_info.get("dom_fn_m", "")
# 更新地址信息
domain_owner_pg.c_st_m = owner_info.get("dom_st_m", "")
domain_owner_pg.c_ct_m = owner_info.get("dom_ct_m", "")
domain_owner_pg.c_adr_m = owner_info.get("dom_adr_m", "")
domain_owner_pg.c_pc = owner_info.get("dom_pc", "")
domain_owner_pg.c_st = owner_info.get("dom_st", "")
domain_owner_pg.c_ct = owner_info.get("dom_ct", "")
domain_owner_pg.c_adr = owner_info.get("dom_adr", "")
domain_owner_pg.c_em = owner_info.get("dom_em", "")
domain_owner_pg.c_ph = owner_info.get("dom_ph", "")
domain_owner_pg.c_ph_type = "0" # 默认为手机
# 更新证件信息从orgfile中获取
orgfile = real_data.get("orgfile", {})
if orgfile.get("f_code"):
domain_owner_pg.c_idnum_gswl = orgfile.get("f_code")
if orgfile.get("f_type"):
domain_owner_pg.c_idtype_gswl = str(orgfile.get("f_type"))
domain_owner_pg.save(ignore_permissions=True)
except Exception as e:
jingrow.log_error(f"更新Domain Owner失败", f"域名: {domain}, c_sysid: {c_sysid}, 错误: {str(e)}")
else:
# 创建新的Domain Owner
try:
# 判断所有者类型(个人或企业)
c_regtype = "E" if owner_info.get("dom_org_m") else "I"
# 生成所有者名称
owner_name = owner_info.get("dom_org_m") or owner_info.get("dom_ln_m", "") + owner_info.get("dom_fn_m", "")
# 构建Domain Owner数据
owner_data = {
"pagetype": "Domain Owner",
"team": team.name, # 使用当前团队
"c_sysid": c_sysid,
"c_regtype": c_regtype,
"fullname": owner_name,
"c_co": owner_info.get("dom_co", "CN"), # 从数据中获取国家代码
"cocode": owner_info.get("dom_ph", "").split('.')[0] if '.' in owner_info.get("dom_ph", "") else "+86", # 从手机号提取电话代码
"reg_contact_type": "cg",
"c_status": real_data.get("c_status"),
"r_status": real_data.get("r_status"),
"title": owner_name
}
# 根据类型设置不同字段
if c_regtype == "E": # 企业
owner_data.update({
"c_org_m": owner_info.get("dom_org_m", ""),
"c_org": owner_info.get("dom_org", ""),
"c_ln_m": owner_info.get("dom_ln_m", ""),
"c_fn_m": owner_info.get("dom_fn_m", ""),
"c_ln": owner_info.get("dom_ln", ""),
"c_fn": owner_info.get("dom_fn", ""),
"title": owner_info.get("dom_org_m", "")
})
else: # 个人
owner_data.update({
"c_ln_m": owner_info.get("dom_ln_m", ""),
"c_fn_m": owner_info.get("dom_fn_m", ""),
"c_ln": owner_info.get("dom_ln", ""),
"c_fn": owner_info.get("dom_fn", ""),
"title": owner_info.get("dom_ln_m", "") + owner_info.get("dom_fn_m", "")
})
# 设置地址信息
owner_data.update({
"c_st_m": owner_info.get("dom_st_m", ""),
"c_ct_m": owner_info.get("dom_ct_m", ""),
"c_adr_m": owner_info.get("dom_adr_m", ""),
"c_pc": owner_info.get("dom_pc", ""),
"c_st": owner_info.get("dom_st", ""),
"c_ct": owner_info.get("dom_ct", ""),
"c_adr": owner_info.get("dom_adr", ""),
"c_em": owner_info.get("dom_em", ""),
"c_ph": owner_info.get("dom_ph", ""),
"c_ph_type": "0" # 默认为手机
})
# 设置证件信息从orgfile中获取
orgfile = real_data.get("orgfile", {})
if orgfile.get("f_code"):
owner_data["c_idnum_gswl"] = orgfile.get("f_code")
if orgfile.get("f_type"):
owner_data["c_idtype_gswl"] = str(orgfile.get("f_type"))
# 创建Domain Owner记录
domain_owner_pg = jingrow.get_pg(owner_data)
domain_owner_pg.insert(ignore_permissions=True)
domain_owner_name = domain_owner_pg.name
except Exception as e:
jingrow.log_error(f"创建Domain Owner失败", f"域名: {domain}, c_sysid: {c_sysid}, 错误: {str(e)}")
domain_owner_name = None
# 更新本地域名记录
try:
# domain_pg 已经在前面获取过了,直接使用
# 更新基本信息 - 从实名信息中获取
if real_data:
domain_pg.registration_date = real_data.get("regdate")
domain_pg.end_date = real_data.get("rexpiredate")
domain_pg.status = real_data.get("status")
# 更新DNS服务器信息
domain_pg.dns_host1 = real_data.get("dns_host1", "")
domain_pg.dns_host2 = real_data.get("dns_host2", "")
domain_pg.dns_host3 = real_data.get("dns_host3", "")
domain_pg.dns_host4 = real_data.get("dns_host4", "")
domain_pg.dns_host5 = real_data.get("dns_host5", "")
domain_pg.dns_host6 = real_data.get("dns_host6", "")
# 更新Domain Owner
if domain_owner_name:
domain_pg.domain_owner = domain_owner_name
# 更新DNS解析记录
if dns_data and "items" in dns_data:
# 清空现有DNS解析记录
domain_pg.dns_resolution = []
# 添加新的DNS解析记录
for record in dns_data.get("items", []):
# 确保必需字段有值 - 注意字段名是item不是host
host = record.get("item", "") # 修正字段名
record_type = record.get("type", "")
value = record.get("value", "")
# 只有当必需字段都有值时才添加记录
if host and record_type and value:
domain_pg.append("dns_resolution", {
"type": record_type,
"host": host,
"value": value,
"ttl": record.get("ttl", 600),
"level": record.get("level", 10),
"line": record.get("line", ""),
"record_id": record.get("id", ""),
"record_status": record.get("status", "")
})
# 保存更新
domain_pg.save()
return {
"status": "success",
"message": "域名信息同步成功",
"data": {
"dns_records": dns_data,
"real_info": real_data,
"domain_owner": domain_owner_name
}
}
except Exception as e:
return {"status": "error", "message": f"更新本地记录失败: {str(e)}"}
except Exception as e:
return {"status": "error", "message": f"同步失败: {str(e)}"}
@jingrow.whitelist()
def upload_domain_real_name_files(**data):
"""上传域名实名资料文件"""
import base64
# 尝试从jingrow.request中获取参数
try:
# 检查是否是FormData格式
if jingrow.request.content_type and 'multipart/form-data' in jingrow.request.content_type:
# 处理FormData格式
request_data = {}
for key in jingrow.request.form:
request_data[key] = jingrow.request.form[key]
else:
# 处理JSON格式
request_data = jingrow.request.get_json() if jingrow.request.is_json else {}
except Exception as e:
request_data = {}
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
# 从多个来源提取参数
domain = (data.get('domain') or
request_data.get('domain') or
data.get('domainDoc.domain'))
id_type = (data.get('idType') or
request_data.get('idType') or
'idcard')
id_number = (data.get('idNumber') or
request_data.get('idNumber'))
if not all([domain, id_number]):
return {"status": "error", "message": f"缺少必要参数domain={domain}, idNumber={id_number}"}
try:
# 获取域名信息
domain_list = jingrow.get_list("Jsite Domain", filters={"domain": domain})
if domain_list:
domain_name = domain_list[0].get('name')
# 通过name获取完整的域名信息
domain_info = jingrow.get_pg("Jsite Domain", domain_name)
else:
return {"status": "error", "message": f"未找到主域名{domain}"}
# 获取域名所有者信息
owner_name = domain_info.get('domain_owner') or domain_info.get('owner')
if not owner_name:
return {"status": "error", "message": "域名所有者信息不存在"}
owner_info = jingrow.get_pg("Domain Owner", owner_name)
if not owner_info:
return {"status": "error", "message": "域名所有者信息不存在"}
# 获取上传token
c_sysid = owner_info.get('c_sysid')
if not c_sysid:
return {"status": "error", "message": "域名所有者未关联所有者模板ID"}
# 证件类型映射到西部数码参数
id_type_map = {
"SFZ": "1", # 身份证
"HZ": "5", # 护照
"GAJMTX": "6", # 港澳居民来往内地通行证
"TWJMTX": "11", # 台湾居民来往大陆通行证
"WJLSFZ": "12", # 外国人永久居留身份证
"GAJZZ": "30", # 港澳台居民居住证
"ORG": "2", # 组织机构代码证
"YYZZ": "3", # 工商营业执照
"TYDM": "4" # 统一社会信用代码证书
}
f_type_org = id_type_map.get(id_type, "1") # 默认使用身份证
f_code_org = id_number
# 获取上传token
token_response = client.get_upload_token(c_sysid, f_type_org, f_code_org)
if token_response.get("status") == "error":
return token_response
if token_response.get("result") != 200:
return {"status": "error", "message": "获取上传token失败"}
token = token_response.get("data")
if not token:
return {"status": "error", "message": "获取上传token失败"}
# 处理文件上传 - 从FormData中提取文件并转换为base64
files = []
# 从jingrow.request.files中获取文件
if hasattr(jingrow.request, 'files') and jingrow.request.files:
for key in jingrow.request.files:
file_obj = jingrow.request.files[key]
if hasattr(file_obj, 'read'):
# 读取文件内容并转换为base64
file_content = file_obj.read()
file_base64 = base64.b64encode(file_content).decode('utf-8')
files.append(file_base64)
# 如果从request.files中没有找到文件尝试从data中获取
if not files:
for key, value in data.items():
if key.startswith('files_') and hasattr(value, 'read'):
# 读取文件内容并转换为base64
file_content = value.read()
file_base64 = base64.b64encode(file_content).decode('utf-8')
files.append(file_base64)
# 如果还是没有文件尝试从data.files中获取前端传递的文件数组
if not files and 'files' in data:
for file_obj in data['files']:
if hasattr(file_obj, 'read'):
# 读取文件内容并转换为base64
file_content = file_obj.read()
file_base64 = base64.b64encode(file_content).decode('utf-8')
files.append(file_base64)
# 如果还是没有文件尝试从jingrow.form_dict中获取
if not files and hasattr(jingrow, 'form_dict'):
for key, value in jingrow.form_dict.items():
if key.startswith('files_') and hasattr(value, 'read'):
# 读取文件内容并转换为base64
file_content = value.read()
file_base64 = base64.b64encode(file_content).decode('utf-8')
files.append(file_base64)
if not files:
return {"status": "error", "message": "请上传证件材料"}
# 上传文件到西部数码
file_org = files[0] # 使用第一个文件作为证件文件
file_lxr = files[1] if len(files) > 1 else None # 第二个文件作为联系人文件(可选)
upload_response = client.upload_real_name_files(token, file_org, file_lxr)
if upload_response.get("status") == "error":
return upload_response
# 解析响应
d = upload_response.get("d", {})
result = d.get("Result")
msg = d.get("Msg", "")
if result == 200:
response_data = {
"status": "success",
"message": "实名资料上传成功,请等待审核",
"data": {
"result": result,
"msg": msg
}
}
return response_data
else:
response_data = {
"status": "error",
"message": f"实名资料上传失败: {msg}"
}
return response_data
except Exception as e:
return {"status": "error", "message": "上传实名资料失败,请重试"}
@jingrow.whitelist()
def modify_west_contact_template(**data):
"""
修改西部数码实名模板资料
必填参数:
- c_sysid: 模板标识
可选参数:
- c_ln_m: (中文)联系人姓
- c_fn_m: (中文)联系人名
- c_co: 所属国家简称(CN: 中国, US: 美国)
- cocode: 国家电话代码(+86: 中国, +1: 美国)
- c_st_m: (中文)所属省
- c_ct_m: (中文)所属市
- c_dt_m: (中文)所属县
- c_adr_m: (中文)通讯地址
- c_pc: 邮编
- c_ph_type: 联系电话类型(0:手机, 1:座机)
- c_ph: 手机号码(当c_ph_type为0时必填)
- c_ph_code: 座机号码区号(当c_ph_type为1时必填)
- c_ph_num: 座机号码(当c_ph_type为1时必填)
- c_ph_fj: 座机号码分机号(可选)
- c_em: 所有者电子邮箱
- c_ln: (英文)联系人姓
- c_fn: (英文)联系人名
- c_st: (英文)省份
- c_ct: (英文)城市
- c_adr: (英文)通讯地址
- reg_contact_type: 适用范围(hk,gswl等)
- c_idtype_hk: hk域名证件类型
- c_idnum_hk: hk域名证件号码
- c_idtype_gswl: 特殊中文域名证件类型
- c_idnum_gswl: 特殊中文域名证件号码
"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
# 验证必填字段
c_sysid = data.get('c_sysid')
if not c_sysid:
return {"status": "error", "message": "缺少必要参数c_sysid"}
# 验证电话号码字段根据c_ph_type判断
c_ph_type = data.get('c_ph_type', '0') # 默认为手机
if c_ph_type == '0' and data.get('c_ph'): # 手机
# 如果提供了手机号码,验证格式
pass
elif c_ph_type == '1' and (data.get('c_ph_code') or data.get('c_ph_num')): # 座机
# 如果提供了座机信息,验证完整性
if data.get('c_ph_code') and not data.get('c_ph_num'):
return {"status": "error", "message": "座机号码 c_ph_num 是必填项"}
if data.get('c_ph_num') and not data.get('c_ph_code'):
return {"status": "error", "message": "座机区号 c_ph_code 是必填项"}
# 验证字段长度
if data.get('c_ln_m'):
if len(data['c_ln_m']) < 1 or len(data['c_ln_m']) > 16:
return {"status": "error", "message": "姓(中文)长度必须为116位"}
if data.get('c_fn_m'):
if len(data['c_fn_m']) < 1 or len(data['c_fn_m']) > 64:
return {"status": "error", "message": "名(中文)长度必须为164位"}
if data.get('c_st_m'):
if len(data['c_st_m']) < 2 or len(data['c_st_m']) > 10:
return {"status": "error", "message": "省份(中文)长度必须为210位"}
# 验证完整姓名长度
if data.get('c_ln_m') and data.get('c_fn_m'):
fullname = data['c_ln_m'] + data['c_fn_m']
if len(fullname) < 2 or len(fullname) > 64:
return {"status": "error", "message": "完整姓名(中文)长度必须为264位"}
# 生成英文姓名(如果提供了中文姓名)
if data.get('c_ln_m') and data.get('c_fn_m'):
try:
from pypinyin import lazy_pinyin
last_name_pinyin = ' '.join(lazy_pinyin(data['c_ln_m']))
first_name_pinyin = ' '.join(lazy_pinyin(data['c_fn_m']))
data['c_ln'] = last_name_pinyin.title()
data['c_fn'] = first_name_pinyin.title()
except ImportError:
# 如果没有pypinyin库使用默认值
data['c_ln'] = data.get('c_ln', '')
data['c_fn'] = data.get('c_fn', '')
# 生成英文地址(如果提供了中文地址)
if data.get('c_adr_m'):
try:
from pypinyin import lazy_pinyin
address_pinyin = ' '.join(lazy_pinyin(data['c_adr_m']))
data['c_adr'] = address_pinyin.title()
except ImportError:
data['c_adr'] = data.get('c_adr', '')
# 生成英文省份和城市(如果提供了中文省份和城市)
if data.get('c_st_m'):
try:
from pypinyin import lazy_pinyin
state_pinyin = ' '.join(lazy_pinyin(data['c_st_m']))
data['c_st'] = state_pinyin.title()
except ImportError:
data['c_st'] = data.get('c_st', '')
if data.get('c_ct_m'):
try:
from pypinyin import lazy_pinyin
city_pinyin = ' '.join(lazy_pinyin(data['c_ct_m']))
data['c_ct'] = city_pinyin.title()
except ImportError:
data['c_ct'] = data.get('c_ct', '')
# 设置默认值
template_data = {
'c_sysid': c_sysid,
'c_co': data.get('c_co', 'CN'),
'cocode': data.get('cocode', '+86'),
'c_ph_type': data.get('c_ph_type', '0'),
}
# 添加可选字段(只添加有值的字段)
optional_fields = [
'c_ln_m', 'c_fn_m', 'c_st_m', 'c_ct_m', 'c_dt_m', 'c_adr_m', 'c_pc',
'c_em', 'c_ln', 'c_fn', 'c_st', 'c_ct', 'c_adr', 'reg_contact_type',
'c_idtype_hk', 'c_idnum_hk', 'c_idtype_gswl', 'c_idnum_gswl'
]
for field in optional_fields:
if data.get(field):
template_data[field] = data[field]
# 根据c_ph_type添加电话相关字段
if data.get('c_ph_type') == '1': # 座机
if data.get('c_ph_code'):
template_data['c_ph_code'] = data['c_ph_code']
if data.get('c_ph_num'):
template_data['c_ph_num'] = data['c_ph_num']
if data.get('c_ph_fj'):
template_data['c_ph_fj'] = data['c_ph_fj']
else: # 手机
if data.get('c_ph'):
template_data['c_ph'] = data['c_ph']
try:
result = client.modify_contact_template(template_data)
if result.get('result') == 200:
return {
"status": "success",
"message": "实名模板资料修改成功",
"data": {
"c_sysid": result.get('data', {}).get('c_sysid', c_sysid)
}
}
else:
error_msg = result.get('msg', result.get('message', '未知错误'))
return {"status": "error", "message": f"修改实名模板资料失败: {error_msg}"}
except Exception as e:
return {"status": "error", "message": f"修改实名模板资料失败: {str(e)}"}
@jingrow.whitelist()
def update_domain_owner(name, **data):
"""更新域名所有者信息"""
try:
if not name:
return {"status": "Error", "message": "域名所有者名称不能为空"}
# 获取指定的域名所有者
domain_owner = jingrow.get_pg("Domain Owner", name)
if not domain_owner:
return {"status": "Error", "message": "未找到指定的域名所有者"}
# 检查权限(只能更新当前团队的所有者)
team = get_current_team()
if not team:
return {"status": "Error", "message": "未找到当前团队"}
if domain_owner.team != team:
return {"status": "Error", "message": "无权更新该域名所有者信息"}
# 检查是否有 c_sysid西部数码模板标识
c_sysid = getattr(domain_owner, 'c_sysid', None)
if not c_sysid:
return {"status": "Error", "message": "域名所有者没有模板标识,无法更新"}
# 构建西部数码模板更新数据
template_data = {
'c_sysid': c_sysid
}
# 添加需要更新的字段
field_mapping = {
'c_regtype': 'c_regtype',
'c_org_m': 'c_org_m',
'c_ln_m': 'c_ln_m',
'c_fn_m': 'c_fn_m',
'c_em': 'c_em',
'c_ph': 'c_ph',
'c_st_m': 'c_st_m',
'c_ct_m': 'c_ct_m',
'c_adr_m': 'c_adr_m',
'c_pc': 'c_pc',
'c_ph_type': 'c_ph_type',
'c_ph_code': 'c_ph_code',
'c_ph_num': 'c_ph_num',
'c_ph_fj': 'c_ph_fj',
'reg_contact_type': 'reg_contact_type',
'c_idtype_gswl': 'c_idtype_gswl',
'c_idnum_gswl': 'c_idnum_gswl'
}
for local_field, template_field in field_mapping.items():
if local_field in data:
template_data[template_field] = data[local_field]
# 添加现有字段(如果新数据中没有提供)
for local_field, template_field in field_mapping.items():
if template_field not in template_data and hasattr(domain_owner, local_field):
current_value = getattr(domain_owner, local_field, None)
if current_value:
template_data[template_field] = current_value
# 调用西部数码模板更新接口
try:
west_result = modify_west_contact_template(**template_data)
if west_result.get('status') != 'success':
error_msg = west_result.get('message', '所有者模板更新失败')
return {"status": "Error", "message": f"所有者模板更新失败: {error_msg}"}
except Exception as e:
return {"status": "Error", "message": f"所有者模板更新失败: {str(e)}"}
# 西部数码模板更新成功后,更新本地记录
updated_fields = []
for key, value in data.items():
if hasattr(domain_owner, key):
old_value = getattr(domain_owner, key, None)
setattr(domain_owner, key, value)
updated_fields.append(f"{key}: {old_value} -> {value}")
# 保存更新,使用 ignore_permissions=True 忽略权限检查
domain_owner.save(ignore_permissions=True)
jingrow.db.commit()
return {
"status": "Success",
"message": "域名所有者信息更新成功",
"data": {
"name": domain_owner.name,
"title": domain_owner.title
}
}
except Exception as e:
return {"status": "Error", "message": f"更新域名所有者信息失败: {str(e)}"}
@jingrow.whitelist()
def delete_domain_owner(name):
"""删除域名所有者模板"""
try:
if not name:
return {"status": "Error", "message": "域名所有者名称不能为空"}
# 获取指定的域名所有者
domain_owner = jingrow.get_pg("Domain Owner", name)
if not domain_owner:
return {"status": "Error", "message": "未找到指定的域名所有者"}
# 检查权限(只能删除当前团队的所有者)
team = get_current_team()
if not team:
return {"status": "Error", "message": "未找到当前团队"}
if domain_owner.team != team:
return {"status": "Error", "message": "无权删除该域名所有者信息"}
# 检查是否有 c_sysid西部数码模板标识
c_sysid = getattr(domain_owner, 'c_sysid', None)
if not c_sysid:
return {"status": "Error", "message": "域名所有者没有模板标识,无法删除"}
# 先删除西部数码的模板
try:
client = get_west_client()
if not client:
return {"status": "Error", "message": "API客户端初始化失败"}
# 调用西部数码删除模板接口
west_result = client.delete_contact_template(c_sysid)
if west_result.get('result') != 200:
error_msg = west_result.get('msg', west_result.get('message', '所有者模板删除失败'))
return {"status": "Error", "message": f"所有者模板删除失败: {error_msg}"}
except Exception as e:
return {"status": "Error", "message": f"所有者模板删除失败: {str(e)}"}
# 西部数码模板删除成功后,删除本地记录
try:
domain_owner.delete(ignore_permissions=True)
return {"status": "Success", "message": "域名所有者模板删除成功"}
except Exception as e:
return {"status": "Error", "message": f"本地记录删除失败: {str(e)}"}
except Exception as e:
return {"status": "Error", "message": f"删除失败: {str(e)}"}