jcloud/jcloud/api/domain_west.py

2447 lines
86 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Copyright (c) 2024, JINGROW
# For license information, please see license.txt
import jingrow
import requests
import time
import hashlib
import json
import random
from datetime import datetime
from urllib.parse import urlencode
from typing import Dict, Any, Optional, List
from jcloud.utils import get_current_team
from pypinyin import lazy_pinyin
class WestDomain:
"""西部数码域名API客户端"""
def __init__(self, username: str, password: str):
"""
初始化西部数码API客户端
Args:
username: 西部数码用户名
password: 西部数码API密码
"""
self.username = username.strip()
self.password = password.strip()
self.api_base_url = "https://api.west.cn/api/v2"
self.time = None
self.token = None
def _generate_token(self) -> str:
"""生成认证token"""
self.time = self._get_current_timestamp()
token_string = f"{self.username}{self.password}{self.time}"
return hashlib.md5(token_string.encode('utf-8')).hexdigest()
def _get_current_timestamp(self) -> int:
"""获取当前时间戳(毫秒)"""
return int(time.time() * 1000)
def _generate_common_parameters(self) -> Dict[str, str]:
"""生成公共参数"""
self.token = self._generate_token()
return {
'username': self.username,
'time': str(self.time),
'token': self.token,
}
def _make_request(self, action: str, method: str = 'GET',
query_params: Optional[Dict] = None,
body_params: Optional[Dict] = None) -> Dict[str, Any]:
"""
发送API请求
Args:
action: API动作路径
method: 请求方法 (GET/POST)
query_params: 查询参数
body_params: 请求体参数
Returns:
API响应结果
"""
# 构建URL
common_params = self._generate_common_parameters()
param_string = urlencode(common_params)
url = f"{self.api_base_url}{action}"
if '?' in action:
url += f"&{param_string}"
else:
url += f"?{param_string}"
# 添加查询参数
if query_params:
url += f"&{urlencode(query_params)}"
headers = {
'Content-Type': 'application/x-www-form-urlencoded'
}
try:
if method.upper() == 'POST':
data = body_params or {}
# 确保中文字符正确编码
jingrow.log_error("西部数码API调试", f"发送POST请求URL: {url}, 数据: {data}")
response = requests.post(url, data=data, headers=headers, timeout=30)
else:
response = requests.get(url, headers=headers, timeout=30)
response.raise_for_status()
try:
result = response.json()
except json.JSONDecodeError:
jingrow.log_error("西部数码API响应解析失败", response_text=response.text)
result = {"status": "error", "message": "无法解析API响应"}
return result
except requests.exceptions.RequestException as e:
jingrow.log_error("西部数码API请求失败", error=str(e), url=url)
return {"status": "error", "message": f"API请求失败: {str(e)}"}
def check_balance(self) -> Dict[str, Any]:
"""获取账户可用余额"""
return self._make_request('/info/?act=checkbalance', 'GET')
def get_domain_price(self, domain: str, year: int = 1) -> Dict[str, Any]:
"""
获取域名价格
Args:
domain: 域名
year: 注册年限
"""
body_params = {
'type': 'domain',
'value': domain,
'year': year,
}
return self._make_request('/info/?act=getprice', 'POST', body_params=body_params)
def get_domain_renew_price(self, domain: str, year: int = 1) -> Dict[str, Any]:
"""
获取域名续费价格
Args:
domain: 域名
year: 续费年限
"""
body_params = {
'act': 'getrenprice',
'domain': domain,
'year': year,
}
return self._make_request('/domain/', 'POST', body_params=body_params)
def query_domain(self, domain: str, suffix: str = '.com') -> Dict[str, Any]:
"""
域名查询
Args:
domain: 域名前缀
suffix: 域名后缀
"""
body_params = {
'domain': domain,
'suffix': suffix,
}
return self._make_request('/domain/query/', 'POST', body_params=body_params)
def register_domain(self, domain: str, regyear: int = 1,
domainpwd: Optional[str] = None,
dns_host1: Optional[str] = None,
dns_host2: Optional[str] = None,
dns_host3: Optional[str] = None,
dns_host4: Optional[str] = None,
dns_host5: Optional[str] = None,
dns_host6: Optional[str] = None,
c_sysid: Optional[str] = None,
client_price: Optional[str] = None,
premium: Optional[str] = None,
domchannel: Optional[str] = None,
westusechn: Optional[str] = None) -> Dict[str, Any]:
"""
注册域名
Args:
domain: 域名(多个域名用英文逗号分隔)
regyear: 注册年限
domainpwd: 域名密码,不填则系统随机生成
dns_host1: 主DNS必填
dns_host2: 辅DNS必填
dns_host3-6: 可选DNS
c_sysid: 模板ID必填
client_price: 价格保护代理商用户购买价格如果价格低于其成本价激活失败防止亏损不需要保护传99999
premium: 普通域名不需要传,溢价域名必须传"yes"才能注册
domchannel: 仅对.cn域名有效"hk"为特价渠道,空为默认,"cn"为普通渠道
westusechn: 空为国内渠道,如需国际合作渠道传"hk",此功能支持:.com(需特殊权限)、.top、.cyou、.icu、.vip、.xyz、.site、.shop、.co
"""
body_params = {
'act': 'regdomain',
'domain': domain,
'regyear': regyear,
}
# 添加可选参数
if domainpwd:
body_params['domainpwd'] = domainpwd
if dns_host1:
body_params['dns_host1'] = dns_host1
if dns_host2:
body_params['dns_host2'] = dns_host2
if dns_host3:
body_params['dns_host3'] = dns_host3
if dns_host4:
body_params['dns_host4'] = dns_host4
if dns_host5:
body_params['dns_host5'] = dns_host5
if dns_host6:
body_params['dns_host6'] = dns_host6
if c_sysid:
body_params['c_sysid'] = c_sysid
if client_price:
body_params['client_price'] = client_price
if premium:
body_params['premium'] = premium
if domchannel:
body_params['domchannel'] = domchannel
if westusechn:
body_params['westusechn'] = westusechn
return self._make_request('/audit/', 'POST', body_params=body_params)
def renew_domain(self, domain: str, year: int = 1,
expire_date: Optional[str] = None,
client_price: Optional[int] = None) -> Dict[str, Any]:
"""
域名续费
Args:
domain: 域名
year: 续费年限
expire_date: 到期时间
client_price: 客户价格
"""
body_params = {
'domain': domain,
'year': year,
}
if expire_date:
body_params['expiredate'] = expire_date
if client_price:
body_params['client_price'] = client_price
return self._make_request('/domain/?act=renew', 'POST', body_params=body_params)
def get_domain_list(self, limit: int = 10, page: int = 1) -> Dict[str, Any]:
"""
获取域名列表
Args:
limit: 每页数量
page: 页码
"""
query_params = {
'limit': limit,
'page': page,
}
return self._make_request('/domain/?act=getdomains', 'GET', query_params=query_params)
def get_domain_info(self, domain: str) -> Dict[str, Any]:
"""
获取域名详细信息
Args:
domain: 域名
"""
body_params = {
'domain': domain,
}
return self._make_request('/domain/?act=getinfo', 'POST', body_params=body_params)
def get_dns_records(self, domain: str) -> Dict[str, Any]:
"""
获取域名DNS记录
Args:
domain: 域名
"""
body_params = {
'domain': domain,
}
return self._make_request('/domain/?act=getdns', 'POST', body_params=body_params)
def get_dns_records_paginated(self, domain: str, limit: int = 20, pageno: int = 1) -> Dict[str, Any]:
"""
获取域名解析记录(支持分页)
Args:
domain: 域名
limit: 每页大小默认20
pageno: 第几页默认为1
"""
body_params = {
'act': 'getdnsrecord',
'domain': domain,
'limit': limit,
'pageno': pageno,
}
return self._make_request('/domain/', 'POST', body_params=body_params)
def modify_dns_records(self, domain: str, records: List[Dict]) -> Dict[str, Any]:
"""
修改域名DNS记录
Args:
domain: 域名
records: DNS记录列表
"""
body_params = {
'domain': domain,
'records': json.dumps(records),
}
return self._make_request('/domain/?act=modifydns', 'POST', body_params=body_params)
def add_dns_record(self, domain: str, record_type: str,
host: str, value: str, ttl: int = 600, level: int = 10, line: str = "") -> Dict[str, Any]:
"""
添加DNS记录
Args:
domain: 域名
record_type: 记录类型 (A, CNAME, MX, TXT, AAAA, SRV)
host: 主机记录
value: 记录值
ttl: TTL值 (60~86400秒默认900)
level: 优先级MX记录使用1-100默认10
line: 线路 (默认="", 电信="LTEL", 联通="LCNC", 移动="LMOB", 教育网="LEDU", 搜索引擎="LSEO")
"""
body_params = {
'act': 'adddnsrecord',
'domain': domain,
'host': host,
'type': record_type,
'value': value,
'ttl': ttl,
'level': level,
'line': line,
}
return self._make_request('/domain/', 'POST', body_params=body_params)
def modify_dns_record(self, domain: str, value: str, ttl: int = 600, level: int = 10,
record_id: Optional[str] = None, host: Optional[str] = None,
record_type: Optional[str] = None, line: str = "", old_value: Optional[str] = None) -> Dict[str, Any]:
"""
修改DNS记录
Args:
domain: 域名
value: 新的解析值
ttl: TTL值 (60~86400秒默认900)
level: 优先级MX记录使用1-100默认10
record_id: 解析记录编号(优先使用)
host: 主机头当record_id未提供时必填
record_type: 解析类型当record_id未提供时必填
line: 线路 (默认="", 电信="LTEL", 联通="LCNC", 移动="LMOB", 教育网="LEDU", 搜索引擎="LSEO")
old_value: 旧解析值(可选,用于确定唯一记录)
"""
body_params = {
'act': 'moddnsrecord',
'domain': domain,
'value': value,
'ttl': ttl,
'level': level,
'line': line,
}
# 优先使用record_id
if record_id:
body_params['id'] = record_id
else:
# 必须提供host和type
if not host or not record_type:
return {"status": "error", "message": "当未提供记录ID时主机头和解析类型为必填项"}
body_params['host'] = host
body_params['type'] = record_type
# 添加旧值(可选)
if old_value:
body_params['oldvalue'] = old_value
return self._make_request('/domain/', 'POST', body_params=body_params)
def delete_dns_record(self, domain: str, record_id: Optional[str] = None,
host: Optional[str] = None, record_type: Optional[str] = None,
value: Optional[str] = None, line: str = "") -> Dict[str, Any]:
"""
删除DNS记录
Args:
domain: 域名
record_id: 解析记录ID优先使用
host: 主机头当record_id未提供时必填
record_type: 解析类型当record_id未提供时必填
value: 解析值(可选)
line: 线路 (默认="", 电信="LTEL", 联通="LCNC", 移动="LMOB", 教育网="LEDU", 搜索引擎="LSEO")
"""
body_params = {
'act': 'deldnsrecord',
'domain': domain,
'line': line,
}
# 优先使用record_id
if record_id:
body_params['id'] = record_id
else:
# 必须提供host和type
if not host or not record_type:
return {"status": "error", "message": "当未提供记录ID时主机头和解析类型为必填项"}
body_params['host'] = host
body_params['type'] = record_type
if value:
body_params['value'] = value
return self._make_request('/domain/', 'POST', body_params=body_params)
def transfer_domain(self, domain: str, auth_code: str) -> Dict[str, Any]:
"""
域名转入
Args:
domain: 域名
auth_code: 转移授权码
"""
body_params = {
'domain': domain,
'auth_code': auth_code,
}
return self._make_request('/domain/?act=transfer', 'POST', body_params=body_params)
def lock_domain(self, domain: str, lock: bool = True) -> Dict[str, Any]:
"""
锁定/解锁域名
Args:
domain: 域名
lock: 是否锁定
"""
action = '/domain/?act=lock' if lock else '/domain/?act=unlock'
body_params = {
'domain': domain,
}
return self._make_request(action, 'POST', body_params=body_params)
def get_template_list(self, limit: int = 10, page: int = 1) -> Dict[str, Any]:
"""
获取域名模板列表
Args:
limit: 每页数量
page: 页码
"""
body_params = {
'act': 'gettemplates',
'limit': limit,
'page': page,
}
return self._make_request('/audit/', 'POST', body_params=body_params)
def get_template_detail(self, template_id: str) -> Dict[str, Any]:
"""
获取指定模板详情
Args:
template_id: 模板ID
"""
body_params = {
'act': 'auditinfo',
'c_sysid': template_id,
}
return self._make_request('/audit/', 'POST', body_params=body_params)
def create_contact_template(self, template_data: Dict[str, Any]) -> Dict[str, Any]:
"""
创建域名模板
Args:
template_data: 模板数据,包含所有必要的字段
"""
body_params = {
'act': 'auditsub',
**template_data
}
# 对body_params进行GBK编码
encoded_data = urlencode(body_params, encoding='gbk').encode('gbk')
return self._make_request('/audit/', 'POST', body_params=encoded_data)
def get_domain_real_info(self, domain: str) -> Dict[str, Any]:
"""
获取域名实名信息
Args:
domain: 域名
Returns:
域名实名信息,包含所有者、管理联系人、技术联系人、缴费联系人等信息
"""
body_params = {
'act': 'domaininfo',
'domain': domain,
}
return self._make_request('/audit/', 'POST', body_params=body_params)
def get_upload_token(self, c_sysid: str, f_type_org: str, f_code_org: str,
f_type_lxr: Optional[str] = None, f_code_lxr: Optional[str] = None) -> Dict[str, Any]:
"""
获取实名上传token
Args:
c_sysid: 模板标识
f_type_org: 证件类型,详情见附录
f_code_org: 证件号码
f_type_lxr: 联系人证件类型(企业时填写)
f_code_lxr: 联系人证件号码(企业时填写)
Returns:
API响应结果包含上传token
"""
body_params = {
'act': 'uploadwcftoken',
'c_sysid': c_sysid,
'f_type_org': f_type_org,
'f_code_org': f_code_org,
}
# 添加可选参数
if f_type_lxr:
body_params['f_type_lxr'] = f_type_lxr
if f_code_lxr:
body_params['f_code_lxr'] = f_code_lxr
return self._make_request('/audit/', 'POST', body_params=body_params)
def upload_real_name_files(self, token: str, file_org: str, file_lxr: Optional[str] = None) -> Dict[str, Any]:
"""
模板实名资料上传
Args:
token: 实名上传Token
file_org: 图片完整的base64
file_lxr: 企业联系人图片完整的base64(只有token中设置了联系人的才上传)
Returns:
API响应结果
"""
upload_url = "https://netservice.vhostgo.com/wcfservice/Service1.svc/Wcf_AuditUploadFile"
# 构建请求数据
data = {
'token': token,
'file_org': file_org,
}
# 添加可选参数
if file_lxr:
data['file_lxr'] = file_lxr
headers = {
'Content-Type': 'application/json'
}
try:
# 发送JSON格式的POST请求
jingrow.log_error("西部数码实名上传调试", f"发送POST请求URL: {upload_url}, 数据: {data}")
response = requests.post(upload_url, json=data, headers=headers, timeout=30)
response.raise_for_status()
try:
result = response.json()
except json.JSONDecodeError:
jingrow.log_error("西部数码实名上传响应解析失败", response_text=response.text)
result = {"status": "error", "message": "无法解析API响应"}
return result
except requests.exceptions.RequestException as e:
jingrow.log_error("西部数码实名上传请求失败", error=str(e), url=upload_url)
return {"status": "error", "message": f"API请求失败: {str(e)}"}
def modify_dns_server(self, domain: str, dns1: str, dns2: str,
dns3: Optional[str] = None, dns4: Optional[str] = None,
dns5: Optional[str] = None, dns6: Optional[str] = None) -> Dict[str, Any]:
"""
修改域名DNS服务器
Args:
domain: 要修改DNS的域名
dns1: 主DNS服务器
dns2: 辅DNS服务器
dns3: 第三个DNS服务器可选
dns4: 第四个DNS服务器可选
dns5: 第五个DNS服务器可选
dns6: 第六个DNS服务器可选
Returns:
API响应结果
"""
body_params = {
'act': 'moddns',
'domain': domain,
'dns1': dns1,
'dns2': dns2,
}
# 添加可选的DNS服务器
if dns3:
body_params['dns3'] = dns3
if dns4:
body_params['dns4'] = dns4
if dns5:
body_params['dns5'] = dns5
if dns6:
body_params['dns6'] = dns6
return self._make_request('/domain/', 'POST', body_params=body_params)
def format_date(date):
"""格式化域名到期时间"""
if not date:
return None
if isinstance(date, str):
return date
return date.strftime('%Y-%m-%d')
def get_west_client() -> WestDomain:
"""获取西部数码域名API客户端实例"""
try:
# 从Jcloud Settings获取配置
settings = jingrow.get_single("Jcloud Settings")
if settings:
username = settings.get("west_username")
password = settings.get_password("west_api_password") if settings.get("west_api_password") else None
if username and password:
return WestDomain(username, password)
else:
jingrow.log_error("西部数码域名API: 缺少用户名或密码配置")
return None
else:
jingrow.log_error("西部数码域名API: Jcloud Settings 尚未配置")
return None
except Exception as e:
jingrow.log_error(f"西部数码域名API客户端初始化失败: {str(e)}")
return None
# API端点函数
@jingrow.whitelist()
def check_west_balance():
"""检查西部数码账户余额"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
return client.check_balance()
@jingrow.whitelist()
def check_domain(domain: str, suffix: str = '.com'):
"""查询域名是否可注册"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
if not domain:
return {"status": "error", "message": "缺少域名参数"}
response = client.query_domain(domain, suffix)
# 添加调试日志
jingrow.log_error("域名查询调试", f"domain={domain}, suffix={suffix}, response={response}")
if response.get("status") == "error":
return response
try:
# 直接检查响应格式
if response.get("result") != 200:
return {"status": "error", "message": "API查询失败"}
full_domain = domain + suffix
for item in response.get("data", []):
if item.get("name") == full_domain:
return {
"available": item.get("avail", 0) == 1,
"domain": full_domain,
"message": "域名可用" if item.get("avail", 0) == 1 else "域名已被注册"
}
return {"status": "error", "message": f"未找到域名 {full_domain} 的查询结果"}
except Exception as e:
jingrow.log_error("域名查询响应解析失败", error=str(e))
return {"status": "error", "message": "域名查询响应解析失败"}
@jingrow.whitelist()
def get_west_domain_price(domain: str, year: int = 1):
"""获取域名价格"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
if not domain:
return {"status": "error", "message": "缺少域名参数"}
response = client.get_domain_price(domain, year)
if response.get("status") == "error":
return response
try:
# 直接检查响应格式
if response.get("result") != 200:
return {"status": "error", "message": "API查询失败"}
data = response.get("data", {})
original_price = data.get("buyprice", 0)
# 统一增加10%的利润参考aliyun_server_light.py的方法
if original_price and original_price > 0:
# 确保10%利润率且价格为整数
adjusted_price = int(original_price / (1 - 0.1))
else:
adjusted_price = original_price
return {
"data": {
"price": adjusted_price,
"original_price": original_price, # 保留原价用于参考
"domain": domain,
"year": year
}
}
except Exception as e:
jingrow.log_error("域名价格查询响应解析失败", error=str(e))
return {"status": "error", "message": "域名价格查询响应解析失败"}
@jingrow.whitelist()
def get_west_domain_renew_price(domain: str, year: int = 1):
"""获取域名续费价格"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
if not domain:
return {"status": "error", "message": "缺少域名参数"}
response = client.get_domain_renew_price(domain, year)
if response.get("status") == "error":
return response
try:
# 直接检查响应格式
if response.get("result") != 200:
return {"status": "error", "message": "API查询失败"}
data = response.get("data", {})
original_price = data.get("price", 0)
# 统一增加10%的利润参考aliyun_server_light.py的方法
if original_price and original_price > 0:
# 确保10%利润率且价格为整数
adjusted_price = int(original_price / (1 - 0.1))
else:
adjusted_price = original_price
return {
"data": {
"price": adjusted_price,
"domain": domain,
"year": year,
"ispremium": data.get("ispremium", "n")
}
}
except Exception as e:
jingrow.log_error("域名续费价格查询响应解析失败", error=str(e))
return {"status": "error", "message": "域名续费价格查询响应解析失败"}
@jingrow.whitelist()
def west_domain_register(domain: str, regyear: int = 1, dns_host1: str = "ns1.myhostadmin.net",
dns_host2: str = "ns2.myhostadmin.net", c_sysid: str = None,
domainpwd: str = None, dns_host3: str = None, dns_host4: str = None,
dns_host5: str = None, dns_host6: str = None, client_price: str = None,
premium: str = None, domchannel: str = None, westusechn: str = None):
"""注册域名"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
return client.register_domain(
domain=domain,
regyear=regyear,
dns_host1=dns_host1,
dns_host2=dns_host2,
c_sysid=c_sysid,
domainpwd=domainpwd,
dns_host3=dns_host3,
dns_host4=dns_host4,
dns_host5=dns_host5,
dns_host6=dns_host6,
client_price=client_price,
premium=premium,
domchannel=domchannel,
westusechn=westusechn
)
@jingrow.whitelist()
def west_domain_renew(**data):
"""域名续费"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
domain = data.get('domain')
year = data.get('year', 1)
expire_date = data.get('expire_date')
client_price = data.get('client_price')
if not domain:
return {"status": "error", "message": "缺少域名参数"}
return client.renew_domain(domain, year, expire_date, client_price)
@jingrow.whitelist()
def west_domain_get_list(**data):
"""获取域名列表"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
limit = data.get('limit', 10)
page = data.get('page', 1)
return client.get_domain_list(limit, page)
@jingrow.whitelist()
def west_domain_get_info(**data):
"""获取域名信息"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
domain = data.get('domain')
if not domain:
return {"status": "error", "message": "缺少域名参数"}
return client.get_domain_info(domain)
@jingrow.whitelist()
def west_domain_get_dns(**data):
"""获取域名DNS记录"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
domain = data.get('domain')
if not domain:
return {"status": "error", "message": "缺少域名参数"}
return client.get_dns_records(domain)
@jingrow.whitelist()
def get_west_domain_dns_records(**data):
"""获取域名解析记录(支持分页)"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
domain = data.get('domain')
limit = data.get('limit', 20)
pageno = data.get('pageno', 1)
if not domain:
return {"status": "error", "message": "缺少域名参数"}
response = client.get_dns_records_paginated(domain, limit, pageno)
if response.get("status") == "error":
return response
try:
# 检查响应格式
if response.get("result") != 200:
return {"status": "error", "message": "API查询失败"}
data = response.get("data", {})
# 返回格式化的解析记录信息
return {
"status": "success",
"data": {
"pageno": data.get("pageno", 1),
"limit": data.get("limit", 20),
"total": data.get("total", 0),
"pagecount": data.get("pagecount", 0),
"items": data.get("items", [])
}
}
except Exception as e:
jingrow.log_error("域名解析记录查询响应解析失败", error=str(e))
return {"status": "error", "message": "域名解析记录查询响应解析失败"}
@jingrow.whitelist()
def west_domain_modify_dns(**data):
"""修改域名DNS记录批量修改兼容旧版本"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
domain = data.get('domain')
records = data.get('records')
if not domain:
return {"status": "error", "message": "缺少域名参数"}
if not records:
return {"status": "error", "message": "缺少DNS记录参数"}
# 逐个修改记录
results = []
for record in records:
record_type = record.get('type')
host = record.get('host')
value = record.get('value')
ttl = record.get('ttl', 600)
level = record.get('level', 10)
line = record.get('line', '')
record_id = record.get('record_id')
old_value = record.get('old_value')
# 验证必要参数
if not value:
results.append({"status": "error", "message": "缺少解析值"})
continue
# 如果没有提供record_id则必须提供host和record_type
if not record_id and (not host or not record_type):
results.append({"status": "error", "message": "当未提供记录ID时主机头和解析类型为必填项"})
continue
# 验证TTL值
if ttl < 60 or ttl > 86400:
results.append({"status": "error", "message": "TTL值必须在60~86400秒之间"})
continue
# 验证优先级
if level < 1 or level > 100:
results.append({"status": "error", "message": "优先级必须在1~100之间"})
continue
# 验证记录类型
if record_type:
valid_types = ['A', 'CNAME', 'MX', 'TXT', 'AAAA', 'SRV']
if record_type not in valid_types:
results.append({"status": "error", "message": f"不支持的记录类型: {record_type}"})
continue
# 调用单个记录修改API
response = client.modify_dns_record(
domain, value, ttl, level, record_id, host, record_type, line, old_value
)
if response.get("status") == "error":
results.append(response)
elif response.get("result") != 200:
error_msg = response.get('msg', response.get('message', '未知错误'))
results.append({"status": "error", "message": f"修改DNS记录失败: {error_msg}"})
else:
results.append({
"status": "success",
"message": "DNS记录修改成功",
"data": {
"domain": domain,
"value": value,
"ttl": ttl,
"level": level,
"record_id": record_id,
"host": host,
"record_type": record_type,
"line": line
}
})
# 返回批量操作结果
success_count = sum(1 for r in results if r.get("status") == "success")
error_count = len(results) - success_count
return {
"status": "success" if error_count == 0 else "partial_success",
"message": f"批量修改完成,成功: {success_count},失败: {error_count}",
"results": results
}
@jingrow.whitelist()
def west_domain_add_dns_record(**data):
"""添加DNS记录"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
domain = data.get('domain')
record_type = data.get('record_type')
host = data.get('host')
value = data.get('value')
ttl = data.get('ttl', 600)
level = data.get('level', 10)
line = data.get('line', '')
if not all([domain, record_type, host, value]):
return {"status": "error", "message": "缺少必要参数"}
# 验证记录类型
valid_types = ['A', 'CNAME', 'MX', 'TXT', 'AAAA', 'SRV']
if record_type not in valid_types:
return {"status": "error", "message": f"不支持的记录类型: {record_type},支持的类型: {', '.join(valid_types)}"}
# 验证TTL值
if ttl < 60 or ttl > 86400:
return {"status": "error", "message": "TTL值必须在60~86400秒之间"}
# 验证优先级
if level < 1 or level > 100:
return {"status": "error", "message": "优先级必须在1~100之间"}
response = client.add_dns_record(domain, record_type, host, value, ttl, level, line)
if response.get("status") == "error":
return response
try:
# 检查响应格式
if response.get("result") != 200:
error_msg = response.get('msg', response.get('message', '未知错误'))
return {"status": "error", "message": f"添加DNS记录失败: {error_msg}"}
# 返回成功结果
return {
"status": "success",
"message": "DNS记录添加成功",
"data": {
"id": response.get("data", {}).get("id"),
"domain": domain,
"host": host,
"type": record_type,
"value": value,
"ttl": ttl,
"level": level,
"line": line
}
}
except Exception as e:
jingrow.log_error("添加DNS记录响应解析失败", error=str(e))
return {"status": "error", "message": "添加DNS记录响应解析失败"}
@jingrow.whitelist()
def west_domain_delete_dns_record(**data):
"""删除DNS记录"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
domain = data.get('domain')
record_id = data.get('record_id')
host = data.get('host')
record_type = data.get('record_type')
value = data.get('value')
line = data.get('line', '')
if not domain:
return {"status": "error", "message": "缺少域名参数"}
# 如果没有提供record_id则必须提供host和record_type
if not record_id and (not host or not record_type):
return {"status": "error", "message": "当未提供记录ID时主机头和解析类型为必填项"}
# 验证记录类型
if record_type:
valid_types = ['A', 'CNAME', 'MX', 'TXT', 'AAAA', 'SRV']
if record_type not in valid_types:
return {"status": "error", "message": f"不支持的记录类型: {record_type},支持的类型: {', '.join(valid_types)}"}
response = client.delete_dns_record(domain, record_id, host, record_type, value, line)
if response.get("status") == "error":
return response
try:
# 检查响应格式
if response.get("result") != 200:
error_msg = response.get('msg', response.get('message', '未知错误'))
return {"status": "error", "message": f"删除DNS记录失败: {error_msg}"}
# 返回成功结果
return {
"status": "success",
"message": "DNS记录删除成功",
"data": {
"domain": domain,
"record_id": record_id,
"host": host,
"record_type": record_type,
"value": value,
"line": line
}
}
except Exception as e:
jingrow.log_error("删除DNS记录响应解析失败", error=str(e))
return {"status": "error", "message": "删除DNS记录响应解析失败"}
@jingrow.whitelist()
def west_domain_transfer(**data):
"""域名转入"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
domain = data.get('domain')
auth_code = data.get('auth_code')
if not domain:
return {"status": "error", "message": "缺少域名参数"}
if not auth_code:
return {"status": "error", "message": "缺少转移授权码参数"}
return client.transfer_domain(domain, auth_code)
@jingrow.whitelist()
def west_domain_lock(**data):
"""锁定域名"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
domain = data.get('domain')
lock = data.get('lock', True)
if not domain:
return {"status": "error", "message": "缺少域名参数"}
return client.lock_domain(domain, lock)
@jingrow.whitelist()
def west_domain_get_templates(**data):
"""获取域名模板列表"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
limit = data.get('limit', 10)
page = data.get('page', 1)
return client.get_template_list(limit, page)
@jingrow.whitelist()
def get_west_template_detail(**data):
"""获取指定模板详情"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
template_id = data.get('template_id')
if not template_id:
return {"status": "error", "message": "缺少模板ID参数"}
return client.get_template_detail(template_id)
@jingrow.whitelist()
def create_domain_order(domain, period=1, payment_method='balance', domain_owner=None):
"""创建域名注册订单"""
try:
# 获取当前用户团队
team = get_current_team(True)
# 验证域名格式
if not domain or '.' not in domain:
return {"success": False, "message": "域名格式不正确"}
# 验证域名所有者
if not domain_owner:
return {"success": False, "message": "请选择域名所有者"}
# 检查域名所有者是否存在
owner_exists = jingrow.get_all(
"Domain Owner",
{"name": domain_owner, "team": team.name},
["name"]
)
if not owner_exists:
return {"success": False, "message": "域名所有者不存在"}
# 查询域名价格
client = get_west_client()
if not client:
return {"success": False, "message": "API客户端初始化失败"}
# 获取域名价格 - 使用统一的 get_west_domain_price 函数
price_result = get_west_domain_price(domain, 1)
if price_result.get("status") == "error":
return {"success": False, "message": "获取域名价格失败"}
# 添加调试日志
jingrow.log_error("域名价格调试", f"域名 {domain} 的价格结果: {price_result}")
# 计算总价格 - 使用与前端一致的价格字段
yearly_price = price_result.get("data", {}).get("price", 0) # 使用前端一致的价格字段
total_amount = yearly_price * period
# 生成订单号
order_id = f"{datetime.now().strftime('%Y%m%d%H%M%S%f')[:-3] + ''.join(random.choices('0123456789', k=6))}"
# 获取域名所有者的c_sysid
domain_owner_pg = jingrow.get_pg("Domain Owner", domain_owner)
if not domain_owner_pg:
return {"success": False, "message": "域名所有者不存在"}
c_sysid = domain_owner_pg.c_sysid
if not c_sysid:
return {"success": False, "message": "域名所有者缺少系统ID请重新创建域名所有者"}
# 构建业务参数
biz_params = {
"domain": domain,
"period": period,
"domain_owner": domain_owner,
"yearly_price": yearly_price,
"auto_renew": False,
"whois_protection": True,
# 注册域名所需参数
"regyear": period,
"dns_host1": "ns1.myhostadmin.net",
"dns_host2": "ns2.myhostadmin.net",
"c_sysid": c_sysid,
"client_price": None
}
# 创建订单记录
order = jingrow.get_pg({
"pagetype": "Order",
"order_id": order_id,
"order_type": "域名注册",
"team": team.name,
"status": "待支付",
"total_amount": total_amount,
"title": domain,
"description": f"{period}",
"biz_params": json.dumps(biz_params, ensure_ascii=False)
})
order.insert(ignore_permissions=True)
jingrow.db.commit()
return {
"success": True,
"message": "订单创建成功",
"order": order.as_dict()
}
except Exception as e:
jingrow.log_error("域名订单", f"创建域名订单失败: {str(e)}")
return {"success": False, "message": f"创建订单失败: {str(e)}"}
@jingrow.whitelist()
def create_domain_renew_order(**kwargs):
"""创建域名续费订单"""
try:
domain = kwargs.get('domain')
renewal_years = kwargs.get('renewal_years', 1)
if not domain:
jingrow.throw("缺少域名信息")
# 验证输入
domain_pg = jingrow.get_pg("Jsite Domain", domain)
if not domain_pg:
jingrow.throw("域名不存在")
team = domain_pg.team
# 验证当前用户权限
current_team = get_current_team(True)
if current_team.name != team:
jingrow.throw("您没有权限为此域名创建续费订单")
# 计算续费金额 - 使用现有的价格获取函数保持统一
renewal_years = int(renewal_years)
# 调用现有的价格获取函数
price_data = get_west_domain_renew_price(domain_pg.domain, 1)
if price_data.get("data", {}).get("price"):
yearly_price = price_data["data"]["price"]
else:
yearly_price = domain_pg.price or 0
total_amount = yearly_price * renewal_years
# 生成唯一订单号
order_id = f"{datetime.now().strftime('%Y%m%d%H%M%S%f')[:-3] + ''.join(random.choices('0123456789', k=6))}"
# 构建业务参数
biz_params = {
"domain": domain_pg.domain,
"domain_name": domain,
"renewal_years": renewal_years,
"yearly_price": yearly_price,
"domain_owner": domain_pg.domain_owner,
# 续费域名所需参数
"year": renewal_years,
"expire_date": format_date(domain_pg.end_date),
"client_price": None
}
# 创建订单记录
order = jingrow.get_pg({
"pagetype": "Order",
"order_id": order_id,
"order_type": "域名续费",
"team": team,
"status": "待支付",
"total_amount": total_amount,
"title": domain_pg.domain,
"description": str(renewal_years), # 存储续费年数
"biz_params": json.dumps(biz_params, ensure_ascii=False)
})
order.insert(ignore_permissions=True)
jingrow.db.commit()
return {
"success": True,
"order": order.as_dict()
}
except Exception as e:
jingrow.log_error("域名续费订单错误", f"创建域名续费订单失败: {str(e)}")
return {
"success": False,
"message": f"创建续费订单失败: {str(e)}"
}
def register_domain_from_order(order_name):
"""支付成功后异步注册域名"""
try:
order = jingrow.get_pg("Order", order_name)
if not order:
raise Exception("订单不存在")
# 从biz_params中读取业务参数
biz_params = json.loads(order.biz_params) if order.biz_params else {}
domain_name = biz_params.get("domain")
period = biz_params.get("period", 1)
domain_owner = biz_params.get("domain_owner")
yearly_price = biz_params.get("yearly_price", 0)
auto_renew = biz_params.get("auto_renew", False)
whois_protection = biz_params.get("whois_protection", True)
if not domain_name:
raise Exception("订单中缺少域名信息")
# 调用西部数码API注册域名
client = get_west_client()
if not client:
raise Exception("API客户端初始化失败")
result = client.register_domain(
domain=domain_name,
regyear=biz_params.get("regyear", period),
dns_host1=biz_params.get("dns_host1", "ns1.myhostadmin.net"),
dns_host2=biz_params.get("dns_host2", "ns2.myhostadmin.net"),
c_sysid=biz_params.get("c_sysid"),
client_price=biz_params.get("client_price")
)
# 打印result到后台日志
jingrow.log_error("西部数码域名注册结果", f"订单 {order_name} 的注册结果: {result}")
if not result or result.get('result') != 200:
error_msg = result.get('msg', result.get('message', '未知错误'))
raise Exception(f"域名注册失败: {error_msg}")
# 创建域名记录
domain_pg = jingrow.get_pg({
"pagetype": "Jsite Domain",
"domain": domain_name,
"team": order.team,
"order_id": order.order_id,
"status": "Active",
"price": yearly_price,
"period": period,
"domain_owner": domain_owner,
"auto_renew": auto_renew,
"whois_protection": whois_protection,
"registration_date": jingrow.utils.nowdate(),
"end_date": jingrow.utils.add_months(jingrow.utils.nowdate(), period * 12)
})
domain_pg.insert(ignore_permissions=True)
# 更新订单状态
order.status = "交易成功"
order.save(ignore_permissions=True)
jingrow.db.commit()
return True
except Exception as e:
jingrow.log_error("域名注册失败", f"订单 {order_name}: {str(e)}")
raise e
def renew_domain_from_order(order_name):
"""支付成功后异步续费域名"""
try:
order = jingrow.get_pg("Order", order_name)
if not order:
raise Exception("订单不存在")
# 从biz_params中读取业务参数
biz_params = json.loads(order.biz_params) if order.biz_params else {}
domain_name = biz_params.get("domain")
renewal_years = biz_params.get("renewal_years", 1)
domain_record_name = biz_params.get("domain_name") # 域名记录的名称
if not domain_name:
raise Exception("订单中缺少域名信息")
# 查找域名记录
domain = jingrow.get_pg("Jsite Domain", domain_record_name)
if not domain:
raise Exception("找不到对应的域名记录")
# 调用西部数码API续费域名
client = get_west_client()
if not client:
raise Exception("API客户端初始化失败")
# 验证域名到期时间
if not biz_params.get("expire_date"):
raise Exception("无法获取域名到期时间")
result = client.renew_domain(
domain_name,
biz_params.get("year", renewal_years),
biz_params.get("expire_date"),
biz_params.get("client_price")
)
# 打印result到后台日志
jingrow.log_error("西部数码域名续费结果", f"订单 {order_name} 的续费结果: {result}")
if not result or result.get('result') != 200:
error_msg = result.get('msg', result.get('message', '未知错误'))
raise Exception(f"域名续费失败: {error_msg}")
# 更新域名到期时间
domain.end_date = jingrow.utils.add_months(domain.end_date or jingrow.utils.nowdate(), renewal_years * 12)
domain.save(ignore_permissions=True)
# 更新订单状态
order.status = "交易成功"
order.save(ignore_permissions=True)
jingrow.db.commit()
return True
except Exception as e:
jingrow.log_error("域名续费失败", f"订单 {order_name}: {str(e)}")
raise e
@jingrow.whitelist()
def toggle_domain_auto_renew(pagetype, name, auto_renew):
"""切换域名自动续费状态"""
try:
# 获取当前用户团队
team = get_current_team(True)
# 获取域名记录
domain = jingrow.get_pg(pagetype, name)
if not domain:
return {"success": False, "message": "找不到域名记录"}
# 验证权限
if domain.team != team.name:
return {"success": False, "message": "您没有权限操作此域名"}
# 更新自动续费状态
domain.auto_renew = bool(auto_renew)
domain.save(ignore_permissions=True)
return {"success": True, "message": "自动续费状态更新成功"}
except Exception as e:
jingrow.log_error("域名管理", f"切换自动续费状态失败: {str(e)}")
return {"success": False, "message": f"操作失败: {str(e)}"}
@jingrow.whitelist()
def toggle_domain_whois_protection(pagetype, name, whois_protection):
"""切换域名隐私保护状态"""
try:
# 获取当前用户团队
team = get_current_team(True)
# 获取域名记录
domain = jingrow.get_pg(pagetype, name)
if not domain:
return {"success": False, "message": "找不到域名记录"}
# 验证权限
if domain.team != team.name:
return {"success": False, "message": "您没有权限操作此域名"}
# 更新隐私保护状态
domain.whois_protection = bool(whois_protection)
domain.save(ignore_permissions=True)
return {"success": True, "message": "隐私保护状态更新成功"}
except Exception as e:
jingrow.log_error("域名管理", f"切换隐私保护状态失败: {str(e)}")
return {"success": False, "message": f"操作失败: {str(e)}"}
@jingrow.whitelist()
def delete_domain(pagetype, name):
"""删除域名记录"""
try:
# 获取当前用户团队
team = get_current_team(True)
# 获取域名记录
domain = jingrow.get_pg(pagetype, name)
if not domain:
return {"success": False, "message": "找不到域名记录"}
# 验证权限
if domain.team != team.name:
return {"success": False, "message": "您没有权限操作此域名"}
# 删除域名记录
domain.delete(ignore_permissions=True)
return {"success": True, "message": "域名记录删除成功"}
except Exception as e:
jingrow.log_error("域名管理", f"删除域名记录失败: {str(e)}")
return {"success": False, "message": f"删除失败: {str(e)}"}
@jingrow.whitelist()
def get_domain_owners():
"""获取当前团队的域名所有者列表"""
try:
team = get_current_team()
if not team:
return {"status": "Error", "message": "未找到当前团队"}
# 获取当前团队的所有域名所有者
domain_owners = jingrow.get_all(
"Domain Owner",
{"team": team},
["name", "title", "fullname", "c_regtype", "c_org_m", "c_ln_m", "c_fn_m"]
)
return {
"status": "Success",
"data": domain_owners
}
except Exception as e:
jingrow.log_error("获取域名所有者列表失败", str(e))
return {"status": "Error", "message": f"获取域名所有者列表失败: {str(e)}"}
@jingrow.whitelist()
def get_domain_owner(name):
"""获取单个域名所有者信息"""
try:
if not name:
return {"status": "Error", "message": "域名所有者名称不能为空"}
# 获取指定的域名所有者
domain_owner = jingrow.get_pg("Domain Owner", name)
if not domain_owner:
return {"status": "Error", "message": "未找到指定的域名所有者"}
# 检查权限(只能查看当前团队的所有者)
team = get_current_team()
if not team or domain_owner.team != team:
return {"status": "Error", "message": "无权访问该域名所有者信息"}
# 返回所有者信息
owner_data = {
"name": domain_owner.name,
"r_status": domain_owner.r_status,
"title": domain_owner.title,
"fullname": domain_owner.fullname,
"c_regtype": domain_owner.c_regtype,
"c_org_m": domain_owner.c_org_m,
"c_ln_m": domain_owner.c_ln_m,
"c_fn_m": domain_owner.c_fn_m,
"c_em": domain_owner.c_em,
"c_ph": domain_owner.c_ph,
"c_st_m": domain_owner.c_st_m,
"c_ct_m": domain_owner.c_ct_m,
"c_adr_m": domain_owner.c_adr_m,
"c_pc": domain_owner.c_pc
}
return {
"status": "Success",
"data": owner_data
}
except Exception as e:
jingrow.log_error("获取域名所有者信息失败", str(e))
return {"status": "Error", "message": f"获取域名所有者信息失败: {str(e)}"}
@jingrow.whitelist()
def create_domain_template(**data):
"""创建域名模板"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
# 验证必填字段
required_fields = ['c_regtype', 'c_ln_m', 'c_fn_m', 'c_st_m', 'c_ct_m', 'c_adr_m', 'c_pc', 'c_em', 'c_idtype_gswl', 'c_idnum_gswl']
for field in required_fields:
if not data.get(field):
return {"status": "error", "message": f"字段 {field} 是必填项"}
# 验证电话号码字段根据c_ph_type判断
c_ph_type = data.get('c_ph_type', '0') # 默认为手机
if c_ph_type == '0': # 手机
if not data.get('c_ph'):
return {"status": "error", "message": "手机号码 c_ph 是必填项"}
elif c_ph_type == '1': # 座机
if not data.get('c_ph_code'):
return {"status": "error", "message": "座机区号 c_ph_code 是必填项"}
if not data.get('c_ph_num'):
return {"status": "error", "message": "座机号码 c_ph_num 是必填项"}
# 验证字段长度
if len(data.get('c_ln_m', '')) < 1 or len(data.get('c_ln_m', '')) > 16:
return {"status": "error", "message": "姓(中文)长度必须为116位"}
if len(data.get('c_fn_m', '')) < 1 or len(data.get('c_fn_m', '')) > 64:
return {"status": "error", "message": "名(中文)长度必须为164位"}
if len(data.get('c_st_m', '')) < 2 or len(data.get('c_st_m', '')) > 10:
return {"status": "error", "message": "省份(中文)长度必须为210位"}
# 验证完整姓名长度
fullname = data.get('c_ln_m', '') + data.get('c_fn_m', '')
if len(fullname) < 2 or len(fullname) > 64:
return {"status": "error", "message": "完整姓名(中文)长度必须为264位"}
# 生成英文姓名
if data.get('c_ln_m') and data.get('c_fn_m'):
last_name_pinyin = ' '.join(lazy_pinyin(data['c_ln_m']))
first_name_pinyin = ' '.join(lazy_pinyin(data['c_fn_m']))
data['c_ln'] = last_name_pinyin.title()
data['c_fn'] = first_name_pinyin.title()
else:
# 设置默认的英文姓名
data['c_ln'] = data.get('c_ln', '')
data['c_fn'] = data.get('c_fn', '')
# 生成英文地址
if data.get('c_adr_m'):
address_pinyin = ' '.join(lazy_pinyin(data['c_adr_m']))
data['c_adr'] = address_pinyin.title()
else:
data['c_adr'] = data.get('c_adr', '')
# 生成英文省份和城市
if data.get('c_st_m'):
state_pinyin = ' '.join(lazy_pinyin(data['c_st_m']))
data['c_st'] = state_pinyin.title()
else:
data['c_st'] = data.get('c_st', '')
if data.get('c_ct_m'):
city_pinyin = ' '.join(lazy_pinyin(data['c_ct_m']))
data['c_ct'] = city_pinyin.title()
else:
data['c_ct'] = data.get('c_ct', '')
# 生成英文单位名称
if data.get('c_org_m'):
org_pinyin = ' '.join(lazy_pinyin(data['c_org_m']))
data['c_org'] = org_pinyin.title()
else:
data['c_org'] = data.get('c_org', '')
# 设置默认值
template_data = {
'c_regtype': data['c_regtype'],
'c_ln_m': data['c_ln_m'],
'c_fn_m': data['c_fn_m'],
'c_co': data.get('c_co', 'CN'),
'cocode': data.get('cocode', '+86'),
'c_st_m': data['c_st_m'],
'c_ct_m': data['c_ct_m'],
'c_dt_m': data.get('c_dt_m', ''),
'c_adr_m': data['c_adr_m'],
'c_pc': data['c_pc'],
'c_ph_type': data.get('c_ph_type', '0'),
'c_em': data['c_em'],
'c_ln': data.get('c_ln', ''), # 使用生成的英文姓,如果没有则为空
'c_fn': data.get('c_fn', ''), # 使用生成的英文名,如果没有则为空
'c_st': data.get('c_st', ''), # 使用生成的英文省份,如果没有则为空
'c_ct': data.get('c_ct', ''), # 使用生成的英文城市,如果没有则为空
'c_adr': data.get('c_adr', ''), # 使用生成的英文地址,如果没有则为空
'c_idtype_gswl': data['c_idtype_gswl'],
'c_idnum_gswl': data['c_idnum_gswl'],
'fullname': data.get('c_ln_m', '') + data.get('c_fn_m', '') # 完整姓名
}
# 个人类型时不传入c_org_m参数
if data['c_regtype'] == 'E' and data.get('c_org_m'):
template_data['c_org_m'] = data['c_org_m']
template_data['c_org'] = data.get('c_org', '') # 英文单位名称
# 验证企业类型必须填写单位名称
if data['c_regtype'] == 'E' and not data.get('c_org_m'):
return {"status": "error", "message": "企业类型必须填写单位名称"}
# 添加区县信息
if data.get('c_dt_m'):
template_data['c_dt_m'] = data['c_dt_m']
# 添加香港域名相关字段
if data.get('c_idtype_hk'):
template_data['c_idtype_hk'] = data['c_idtype_hk']
if data.get('c_idnum_hk'):
template_data['c_idnum_hk'] = data['c_idnum_hk']
# 根据c_ph_type添加电话相关字段
if data.get('c_ph_type') == '1': # 座机
if data.get('c_ph_code'):
template_data['c_ph_code'] = data['c_ph_code']
if data.get('c_ph_num'):
template_data['c_ph_num'] = data['c_ph_num']
# 构建完整的电话号码格式
if data.get('c_ph_code') and data.get('c_ph_num'):
template_data['c_ph_all'] = f"{data.get('cocode', '+86')}.{data['c_ph_code']}-{data['c_ph_num']}"
else: # 手机
if data.get('c_ph'):
template_data['c_ph'] = data['c_ph']
try:
jingrow.log_error("西部模板API发送", f"模板数据: {template_data}")
result = client.create_contact_template(template_data)
if result.get('result') == 200:
c_sysid = result.get('data', {}).get('c_sysid')
jingrow.log_error("西部模板API成功", f"模板ID: {c_sysid}")
return {
"status": "success",
"message": "域名模板创建成功",
"data": {
"c_sysid": c_sysid
}
}
else:
error_msg = result.get('msg', result.get('message', '未知错误'))
jingrow.log_error("西部模板API失败", f"错误: {error_msg}")
return {"status": "error", "message": f"创建域名模板失败: {error_msg}"}
except Exception as e:
jingrow.log_error("西部模板API异常", str(e))
return {"status": "error", "message": f"创建域名模板失败: {str(e)}"}
@jingrow.whitelist()
def create_domain_owner(**data):
"""创建新的域名所有者"""
try:
team = get_current_team()
if not team:
return {"status": "Error", "message": "未找到当前团队"}
# 验证基本必填字段
basic_required_fields = ['c_regtype', 'c_ln_m', 'c_fn_m', 'c_st_m', 'c_ct_m', 'c_adr_m', 'c_pc', 'c_em']
for field in basic_required_fields:
if not data.get(field):
return {"status": "Error", "message": f"字段 {field} 是必填项"}
# 验证电话号码字段根据c_ph_type判断
c_ph_type = data.get('c_ph_type', '0') # 默认为手机
if c_ph_type == '0': # 手机
if not data.get('c_ph'):
return {"status": "Error", "message": "手机号码 c_ph 是必填项"}
elif c_ph_type == '1': # 座机
if not data.get('c_ph_code'):
return {"status": "Error", "message": "座机区号 c_ph_code 是必填项"}
if not data.get('c_ph_num'):
return {"status": "Error", "message": "座机号码 c_ph_num 是必填项"}
# 生成英文姓名
if data.get('c_ln_m') and data.get('c_fn_m'):
last_name_pinyin = ' '.join(lazy_pinyin(data['c_ln_m']))
first_name_pinyin = ' '.join(lazy_pinyin(data['c_fn_m']))
data['c_ln'] = last_name_pinyin.title()
data['c_fn'] = first_name_pinyin.title()
else:
# 设置默认的英文姓名
data['c_ln'] = data.get('c_ln', '')
data['c_fn'] = data.get('c_fn', '')
# 生成英文地址
if data.get('c_adr_m'):
address_pinyin = ' '.join(lazy_pinyin(data['c_adr_m']))
data['c_adr'] = address_pinyin.title()
else:
data['c_adr'] = data.get('c_adr', '')
# 生成英文省份和城市
if data.get('c_st_m'):
state_pinyin = ' '.join(lazy_pinyin(data['c_st_m']))
data['c_st'] = state_pinyin.title()
else:
data['c_st'] = data.get('c_st', '')
if data.get('c_ct_m'):
city_pinyin = ' '.join(lazy_pinyin(data['c_ct_m']))
data['c_ct'] = city_pinyin.title()
else:
data['c_ct'] = data.get('c_ct', '')
# 生成英文单位名称
if data.get('c_org_m'):
org_pinyin = ' '.join(lazy_pinyin(data['c_org_m']))
data['c_org'] = org_pinyin.title()
else:
data['c_org'] = data.get('c_org', '')
# 设置默认值
data['team'] = team
data['c_co'] = data.get('c_co', 'CN') # 中国
data['cocode'] = data.get('cocode', '+86')
data['c_ph_type'] = data.get('c_ph_type', '0') # 手机
data['reg_contact_type'] = data.get('reg_contact_type', 'cg') # 常规
# 确保c_sysid字段存在如果提供
if data.get('c_sysid'):
data['c_sysid'] = data['c_sysid']
# 生成完整姓名和标题
data['fullname'] = data.get('c_ln_m', '') + data.get('c_fn_m', '')
if data['c_regtype'] == 'I': # 个人
data['title'] = f"{data['c_ln_m']}{data['c_fn_m']}"
else: # 企业
data['title'] = f"{data['c_org_m']}"
# 创建域名所有者记录
domain_owner = jingrow.get_pg({
"pagetype": "Domain Owner",
**data
})
domain_owner.insert(ignore_permissions=True)
jingrow.log_error("域名所有者创建成功", f"名称: {domain_owner.name}, 标题: {domain_owner.title}")
return {
"status": "Success",
"message": "域名所有者创建成功",
"data": {
"name": domain_owner.name,
"title": domain_owner.title
}
}
except Exception as e:
jingrow.log_error("域名所有者创建失败", str(e))
return {"status": "Error", "message": f"创建域名所有者失败: {str(e)}"}
@jingrow.whitelist()
def create_domain_owner_with_template(**data):
"""创建域名所有者(包含模板创建)"""
try:
# 打印接收到的参数
jingrow.log_error("create_domain_owner_with_template 接收到的参数", f"参数: {data}")
# 第一步:创建域名模板
template_result = create_domain_template(**data)
# 打印template_result
jingrow.log_error("create_domain_owner_with_template template_result", f"template_result: {template_result}")
if template_result.get("status") != "success":
return template_result
# 获取模板ID
c_sysid = template_result.get("data", {}).get("c_sysid")
if not c_sysid:
return {"status": "Error", "message": "创建域名模板成功但未返回模板ID"}
# 第二步:创建域名所有者记录
data['c_sysid'] = c_sysid
owner_result = create_domain_owner(**data)
if owner_result.get("status") != "Success":
return owner_result
# 成功完成
result = {
"status": "Success",
"message": "域名所有者创建成功",
"data": {
"c_sysid": c_sysid,
"owner_name": owner_result.get("data", {}).get("name"),
"owner_title": owner_result.get("data", {}).get("title")
}
}
jingrow.log_error("域名所有者完整创建成功", f"模板ID: {c_sysid}, 所有者: {owner_result.get('data', {}).get('name')}")
return result
except Exception as e:
jingrow.log_error("域名所有者完整创建失败", str(e))
return {"status": "Error", "message": f"创建域名所有者失败: {str(e)}"}
@jingrow.whitelist()
def test_create_domain_owner_with_template():
"""测试创建域名所有者包含模板创建的API端点"""
try:
# 硬编码的测试参数
test_data = {
'c_regtype': 'I',
'c_ln_m': '',
'c_fn_m': '长生',
'c_co': 'CN',
'cocode': '+86',
'c_st_m': '广东省',
'c_ct_m': '广州市',
'c_dt_m': '花都区',
'c_adr_m': '狮岭镇龙头市场2栋',
'c_pc': '510000',
'c_ph_type': '0',
'c_ph': '13926598569',
'c_em': '13926598569@139.com',
'c_idtype_gswl': 'SFZ',
'c_idnum_gswl': '430524198506259568'
}
# 调用create_domain_owner_with_template函数
result = create_domain_template(**test_data)
jingrow.log_error("测试API结果", f"结果: {result}")
return result
except Exception as e:
jingrow.log_error("测试API异常", str(e))
return {"status": "Error", "message": f"测试API调用失败: {str(e)}"}
@jingrow.whitelist()
def get_west_domain_real_info(**data):
"""获取域名实名信息"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
domain = data.get('domain')
if not domain:
return {"status": "error", "message": "缺少域名参数"}
response = client.get_domain_real_info(domain)
if response.get("status") == "error":
return response
try:
# 检查响应格式
if response.get("result") != 200:
return {"status": "error", "message": "API查询失败"}
data = response.get("data", {})
# 返回格式化的实名信息
return {
"status": "success",
"data": {
"domain": data.get("domain"),
"c_sysid": data.get("c_sysid"),
"c_regtype": data.get("c_regtype"),
"c_status": data.get("c_status"),
"c_failinfo": data.get("c_failinfo"),
"status": data.get("status"),
"regdate": data.get("regdate"),
"rexpiredate": data.get("rexpiredate"),
"dns_hosts": {
"dns_host1": data.get("dns_host1"),
"dns_host2": data.get("dns_host2"),
"dns_host3": data.get("dns_host3"),
"dns_host4": data.get("dns_host4"),
"dns_host5": data.get("dns_host5"),
"dns_host6": data.get("dns_host6")
},
"owner": {
"dom_ln": data.get("dom_ln"),
"dom_fn": data.get("dom_fn"),
"dom_ln_m": data.get("dom_ln_m"),
"dom_fn_m": data.get("dom_fn_m"),
"dom_org": data.get("dom_org"),
"dom_org_m": data.get("dom_org_m"),
"dom_co": data.get("dom_co"),
"dom_st": data.get("dom_st"),
"dom_st_m": data.get("dom_st_m"),
"dom_ct": data.get("dom_ct"),
"dom_ct_m": data.get("dom_ct_m"),
"dom_adr1": data.get("dom_adr1"),
"dom_adr_m": data.get("dom_adr_m"),
"dom_pc": data.get("dom_pc"),
"dom_ph": data.get("dom_ph"),
"dom_fax": data.get("dom_fax"),
"dom_em": data.get("dom_em")
},
"admin": {
"admi_ln": data.get("admi_ln"),
"admi_fn": data.get("admi_fn"),
"admi_ln_m": data.get("admi_ln_m"),
"admi_fn_m": data.get("admi_fn_m"),
"admi_co": data.get("admi_co"),
"admi_st": data.get("admi_st"),
"admi_st_m": data.get("admi_st_m"),
"admi_ct": data.get("admi_ct"),
"admi_ct_m": data.get("admi_ct_m"),
"admi_adr1": data.get("admi_adr1"),
"admi_adr_m": data.get("admi_adr_m"),
"admi_pc": data.get("admi_pc"),
"admi_ph": data.get("admi_ph"),
"admi_fax": data.get("admi_fax"),
"admi_em": data.get("admi_em")
},
"tech": {
"tech_ln": data.get("tech_ln"),
"tech_fn": data.get("tech_fn"),
"tech_ln_m": data.get("tech_ln_m"),
"tech_fn_m": data.get("tech_fn_m"),
"tech_co": data.get("tech_co"),
"tech_st": data.get("tech_st"),
"tech_st_m": data.get("tech_st_m"),
"tech_ct": data.get("tech_ct"),
"tech_ct_m": data.get("tech_ct_m"),
"tech_adr1": data.get("tech_adr1"),
"tech_adr_m": data.get("tech_adr_m"),
"tech_pc": data.get("tech_pc"),
"tech_ph": data.get("tech_ph"),
"tech_fax": data.get("tech_fax"),
"tech_em": data.get("tech_em")
},
"billing": {
"bill_ln": data.get("bill_ln"),
"bill_fn": data.get("bill_fn"),
"bill_ln_m": data.get("bill_ln_m"),
"bill_fn_m": data.get("bill_fn_m"),
"bill_co": data.get("bill_co"),
"bill_st": data.get("bill_st"),
"bill_st_m": data.get("bill_st_m"),
"bill_ct": data.get("bill_ct"),
"bill_ct_m": data.get("bill_ct_m"),
"bill_adr1": data.get("bill_adr1"),
"bill_adr_m": data.get("bill_adr_m"),
"bill_pc": data.get("bill_pc"),
"bill_ph": data.get("bill_ph"),
"bill_fax": data.get("bill_fax"),
"bill_em": data.get("bill_em")
},
"real_name_status": {
"r_status": data.get("r_status"),
"r_failinfo": data.get("r_failinfo"),
"c_ispublic": data.get("c_ispublic")
},
"orgfile": data.get("orgfile", {})
}
}
except Exception as e:
jingrow.log_error("域名实名信息查询响应解析失败", error=str(e))
return {"status": "error", "message": "域名实名信息查询响应解析失败"}
@jingrow.whitelist()
def get_west_upload_token(**data):
"""获取西部数码实名上传token"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
c_sysid = data.get('c_sysid')
f_type_org = data.get('f_type_org')
f_code_org = data.get('f_code_org')
f_type_lxr = data.get('f_type_lxr')
f_code_lxr = data.get('f_code_lxr')
if not all([c_sysid, f_type_org, f_code_org]):
return {"status": "error", "message": "缺少必要参数c_sysid, f_type_org, f_code_org"}
response = client.get_upload_token(c_sysid, f_type_org, f_code_org, f_type_lxr, f_code_lxr)
if response.get("status") == "error":
return response
try:
# 检查响应格式
if response.get("result") != 200:
return {"status": "error", "message": "API查询失败"}
return {
"status": "success",
"data": {
"token": response.get("data"),
"clientid": response.get("clientid")
}
}
except Exception as e:
jingrow.log_error("获取实名上传token响应解析失败", error=str(e))
return {"status": "error", "message": "获取实名上传token响应解析失败"}
@jingrow.whitelist()
def west_upload_real_name_files(**data):
"""西部数码模板实名资料上传"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
token = data.get('token')
file_org = data.get('file_org')
file_lxr = data.get('file_lxr')
if not all([token, file_org]):
return {"status": "error", "message": "缺少必要参数token, file_org"}
response = client.upload_real_name_files(token, file_org, file_lxr)
if response.get("status") == "error":
return response
try:
# 解析响应格式
d = response.get("d", {})
result = d.get("Result")
msg = d.get("Msg", "")
if result == 200:
return {
"status": "success",
"message": "实名资料上传成功",
"data": {
"result": result,
"msg": msg
}
}
else:
return {
"status": "error",
"message": f"实名资料上传失败: {msg}"
}
except Exception as e:
jingrow.log_error("实名资料上传响应解析失败", error=str(e))
return {"status": "error", "message": "实名资料上传响应解析失败"}
@jingrow.whitelist()
def west_domain_modify_dns_record(**data):
"""修改DNS记录"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
domain = data.get('domain')
value = data.get('value')
ttl = data.get('ttl', 600)
level = data.get('level', 10)
record_id = data.get('record_id')
host = data.get('host')
record_type = data.get('record_type')
line = data.get('line', '')
old_value = data.get('old_value')
if not domain:
return {"status": "error", "message": "缺少域名参数"}
if not value:
return {"status": "error", "message": "缺少新的解析值"}
# 验证TTL值
if ttl < 60 or ttl > 86400:
return {"status": "error", "message": "TTL值必须在60~86400秒之间"}
# 验证优先级
if level < 1 or level > 100:
return {"status": "error", "message": "优先级必须在1~100之间"}
# 如果没有提供record_id则必须提供host和record_type
if not record_id and (not host or not record_type):
return {"status": "error", "message": "当未提供记录ID时主机头和解析类型为必填项"}
# 验证记录类型
if record_type:
valid_types = ['A', 'CNAME', 'MX', 'TXT', 'AAAA', 'SRV']
if record_type not in valid_types:
return {"status": "error", "message": f"不支持的记录类型: {record_type},支持的类型: {', '.join(valid_types)}"}
response = client.modify_dns_record(
domain, value, ttl, level, record_id, host, record_type, line, old_value
)
if response.get("status") == "error":
return response
try:
# 检查响应格式
if response.get("result") != 200:
error_msg = response.get('msg', response.get('message', '未知错误'))
return {"status": "error", "message": f"修改DNS记录失败: {error_msg}"}
# 返回成功结果
return {
"status": "success",
"message": "DNS记录修改成功",
"data": {
"domain": domain,
"value": value,
"ttl": ttl,
"level": level,
"record_id": record_id,
"host": host,
"record_type": record_type,
"line": line
}
}
except Exception as e:
jingrow.log_error("修改DNS记录响应解析失败", error=str(e))
return {"status": "error", "message": "修改DNS记录响应解析失败"}
@jingrow.whitelist()
def test_dns_record_management():
"""测试DNS记录管理功能"""
try:
# 测试参数
test_domain = "test.com"
test_host = "www"
test_value = "127.0.0.1"
test_record_type = "A"
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
# 测试1: 添加DNS记录
jingrow.log_error("DNS测试", "开始测试添加DNS记录")
add_result = client.add_dns_record(
domain=test_domain,
record_type=test_record_type,
host=test_host,
value=test_value,
ttl=900,
level=10,
line=""
)
jingrow.log_error("DNS测试", f"添加DNS记录结果: {add_result}")
# 测试2: 获取DNS记录列表
jingrow.log_error("DNS测试", "开始测试获取DNS记录列表")
list_result = client.get_dns_records_paginated(test_domain, 20, 1)
jingrow.log_error("DNS测试", f"获取DNS记录列表结果: {list_result}")
# 测试3: 修改DNS记录如果添加成功
if add_result.get("result") == 200:
record_id = add_result.get("data", {}).get("id")
if record_id:
jingrow.log_error("DNS测试", f"开始测试修改DNS记录记录ID: {record_id}")
modify_result = client.modify_dns_record(
domain=test_domain,
value="127.0.0.2",
ttl=600,
level=10,
record_id=record_id,
line=""
)
jingrow.log_error("DNS测试", f"修改DNS记录结果: {modify_result}")
# 测试4: 删除DNS记录
jingrow.log_error("DNS测试", "开始测试删除DNS记录")
delete_result = client.delete_dns_record(
domain=test_domain,
record_id=record_id
)
jingrow.log_error("DNS测试", f"删除DNS记录结果: {delete_result}")
return {
"status": "success",
"message": "DNS记录管理功能测试完成",
"data": {
"add_result": add_result,
"list_result": list_result
}
}
except Exception as e:
jingrow.log_error("DNS测试异常", str(e))
return {"status": "error", "message": f"DNS记录管理功能测试失败: {str(e)}"}
@jingrow.whitelist()
def west_domain_modify_dns_server(**data):
"""修改域名DNS服务器"""
client = get_west_client()
if not client:
return {"status": "error", "message": "API客户端初始化失败"}
domain = data.get('domain')
dns1 = data.get('dns1')
dns2 = data.get('dns2')
dns3 = data.get('dns3')
dns4 = data.get('dns4')
dns5 = data.get('dns5')
dns6 = data.get('dns6')
if not domain:
return {"status": "error", "message": "缺少域名参数"}
if not dns1:
return {"status": "error", "message": "缺少主DNS服务器参数"}
if not dns2:
return {"status": "error", "message": "缺少辅DNS服务器参数"}
# 验证DNS服务器格式
dns_servers = [dns1, dns2]
if dns3:
dns_servers.append(dns3)
if dns4:
dns_servers.append(dns4)
if dns5:
dns_servers.append(dns5)
if dns6:
dns_servers.append(dns6)
# 验证DNS服务器格式简单验证
for dns in dns_servers:
if dns and not ('.' in dns and len(dns) > 3):
return {"status": "error", "message": f"DNS服务器格式不正确: {dns}"}
response = client.modify_dns_server(
domain=domain,
dns1=dns1,
dns2=dns2,
dns3=dns3,
dns4=dns4,
dns5=dns5,
dns6=dns6
)
if response.get("status") == "error":
return response
try:
# 检查响应格式
if response.get("result") != 200:
error_msg = response.get('msg', response.get('message', '未知错误'))
return {"status": "error", "message": f"修改DNS服务器失败: {error_msg}"}
# 异步更新本地域名记录的DNS服务器字段
try:
# 查找对应的Jsite Domain记录
domain_records = jingrow.get_all(
"Jsite Domain",
{"domain": domain},
["name"]
)
if domain_records:
# 如果找到记录异步更新DNS服务器
jingrow.enqueue_pg(
"Jsite Domain",
domain_records[0].name,
"update_dns_servers",
dns1=dns1,
dns2=dns2,
dns3=dns3,
dns4=dns4,
dns5=dns5,
dns6=dns6
)
jingrow.log_error("DNS服务器异步更新", f"已为域名 {domain} 安排异步更新DNS服务器")
else:
jingrow.log_error("DNS服务器异步更新", f"未找到域名 {domain} 对应的Jsite Domain记录")
except Exception as e:
jingrow.log_error("DNS服务器异步更新失败", f"域名 {domain} 异步更新失败: {str(e)}")
# 返回成功结果
return {
"status": "success",
"message": "DNS服务器修改成功",
"data": {
"domain": domain,
"dns_servers": {
"dns1": dns1,
"dns2": dns2,
"dns3": dns3,
"dns4": dns4,
"dns5": dns5,
"dns6": dns6
},
"clientid": response.get("clientid")
}
}
except Exception as e:
jingrow.log_error("修改DNS服务器响应解析失败", error=str(e))
return {"status": "error", "message": "修改DNS服务器响应解析失败"}