# Copyright (c) 2024, JINGROW # For license information, please see license.txt import jingrow import requests import time import hashlib import json import random from datetime import datetime from urllib.parse import urlencode from typing import Dict, Any, Optional, List from jcloud.utils import get_current_team from pypinyin import lazy_pinyin class WestDomain: """西部数码域名API客户端""" def __init__(self, username: str, password: str): """ 初始化西部数码API客户端 Args: username: 西部数码用户名 password: 西部数码API密码 """ self.username = username.strip() self.password = password.strip() self.api_base_url = "https://api.west.cn/api/v2" self.time = None self.token = None def _generate_token(self) -> str: """生成认证token""" self.time = self._get_current_timestamp() token_string = f"{self.username}{self.password}{self.time}" return hashlib.md5(token_string.encode('utf-8')).hexdigest() def _get_current_timestamp(self) -> int: """获取当前时间戳(毫秒)""" return int(time.time() * 1000) def _generate_common_parameters(self) -> Dict[str, str]: """生成公共参数""" self.token = self._generate_token() return { 'username': self.username, 'time': str(self.time), 'token': self.token, } def _make_request(self, action: str, method: str = 'GET', query_params: Optional[Dict] = None, body_params: Optional[Dict] = None) -> Dict[str, Any]: """ 发送API请求 Args: action: API动作路径 method: 请求方法 (GET/POST) query_params: 查询参数 body_params: 请求体参数 Returns: API响应结果 """ # 构建URL common_params = self._generate_common_parameters() param_string = urlencode(common_params) url = f"{self.api_base_url}{action}" if '?' in action: url += f"&{param_string}" else: url += f"?{param_string}" # 添加查询参数 if query_params: url += f"&{urlencode(query_params)}" headers = { 'Content-Type': 'application/x-www-form-urlencoded' } try: if method.upper() == 'POST': data = body_params or {} # 确保中文字符正确编码 response = requests.post(url, data=data, headers=headers, timeout=30) else: response = requests.get(url, headers=headers, timeout=30) response.raise_for_status() try: result = response.json() except json.JSONDecodeError: result = {"status": "error", "message": "无法解析API响应"} return result except requests.exceptions.RequestException as e: return {"status": "error", "message": f"API请求失败: {str(e)}"} def check_balance(self) -> Dict[str, Any]: """获取账户可用余额""" return self._make_request('/info/?act=checkbalance', 'GET') def get_domain_price(self, domain: str, year: int = 1) -> Dict[str, Any]: """ 获取域名价格 Args: domain: 域名 year: 注册年限 """ body_params = { 'type': 'domain', 'value': domain, 'year': year, } return self._make_request('/info/?act=getprice', 'POST', body_params=body_params) def get_domain_renew_price(self, domain: str, year: int = 1) -> Dict[str, Any]: """ 获取域名续费价格 Args: domain: 域名 year: 续费年限 """ body_params = { 'act': 'getrenprice', 'domain': domain, 'year': year, } return self._make_request('/domain/', 'POST', body_params=body_params) def query_domain(self, domain: str, suffix: str = '.com') -> Dict[str, Any]: """ 域名查询 Args: domain: 域名前缀 suffix: 域名后缀 """ body_params = { 'domain': domain, 'suffix': suffix, } return self._make_request('/domain/query/', 'POST', body_params=body_params) def register_domain(self, domain: str, regyear: int = 1, domainpwd: Optional[str] = None, dns_host1: Optional[str] = None, dns_host2: Optional[str] = None, dns_host3: Optional[str] = None, dns_host4: Optional[str] = None, dns_host5: Optional[str] = None, dns_host6: Optional[str] = None, c_sysid: Optional[str] = None, client_price: Optional[str] = None, premium: Optional[str] = None, domchannel: Optional[str] = None, westusechn: Optional[str] = None) -> Dict[str, Any]: """ 注册域名 Args: domain: 域名(多个域名用英文逗号分隔) regyear: 注册年限 domainpwd: 域名密码,不填则系统随机生成 dns_host1: 主DNS,必填 dns_host2: 辅DNS,必填 dns_host3-6: 可选DNS c_sysid: 模板ID,必填 client_price: 价格保护,代理商用户购买价格,如果价格低于其成本价,激活失败防止亏损,不需要保护传99999 premium: 普通域名不需要传,溢价域名必须传"yes"才能注册 domchannel: 仅对.cn域名有效,传"hk"为特价渠道,空为默认,"cn"为普通渠道 westusechn: 空为国内渠道,如需国际合作渠道传"hk",此功能支持:.com(需特殊权限)、.top、.cyou、.icu、.vip、.xyz、.site、.shop、.co """ body_params = { 'act': 'regdomain', 'domain': domain, 'regyear': regyear, } # 添加可选参数 if domainpwd: body_params['domainpwd'] = domainpwd if dns_host1: body_params['dns_host1'] = dns_host1 if dns_host2: body_params['dns_host2'] = dns_host2 if dns_host3: body_params['dns_host3'] = dns_host3 if dns_host4: body_params['dns_host4'] = dns_host4 if dns_host5: body_params['dns_host5'] = dns_host5 if dns_host6: body_params['dns_host6'] = dns_host6 if c_sysid: body_params['c_sysid'] = c_sysid if client_price: body_params['client_price'] = client_price if premium: body_params['premium'] = premium if domchannel: body_params['domchannel'] = domchannel if westusechn: body_params['westusechn'] = westusechn return self._make_request('/audit/', 'POST', body_params=body_params) def renew_domain(self, domain: str, year: int = 1, expire_date: Optional[str] = None, client_price: Optional[int] = None) -> Dict[str, Any]: """ 域名续费 Args: domain: 域名 year: 续费年限 expire_date: 到期时间 client_price: 客户价格 """ body_params = { 'domain': domain, 'year': year, } if expire_date: body_params['expiredate'] = expire_date if client_price: body_params['client_price'] = client_price return self._make_request('/domain/?act=renew', 'POST', body_params=body_params) def get_domain_list(self, limit: int = 10, page: int = 1) -> Dict[str, Any]: """ 获取域名列表 Args: limit: 每页数量 page: 页码 """ query_params = { 'limit': limit, 'page': page, } return self._make_request('/domain/?act=getdomains', 'GET', query_params=query_params) def get_domain_info(self, domain: str) -> Dict[str, Any]: """ 获取域名详细信息 Args: domain: 域名 """ body_params = { 'domain': domain, } return self._make_request('/domain/?act=getinfo', 'POST', body_params=body_params) def get_dns_records(self, domain: str) -> Dict[str, Any]: """ 获取域名DNS记录 Args: domain: 域名 """ body_params = { 'domain': domain, } return self._make_request('/domain/?act=getdns', 'POST', body_params=body_params) def get_dns_records_paginated(self, domain: str, limit: int = 20, pageno: int = 1) -> Dict[str, Any]: """ 获取域名解析记录(支持分页) Args: domain: 域名 limit: 每页大小,默认20 pageno: 第几页,默认为1 """ body_params = { 'act': 'getdnsrecord', 'domain': domain, 'limit': limit, 'pageno': pageno, } return self._make_request('/domain/', 'POST', body_params=body_params) def modify_dns_records(self, domain: str, records: List[Dict]) -> Dict[str, Any]: """ 修改域名DNS记录 Args: domain: 域名 records: DNS记录列表 """ body_params = { 'domain': domain, 'records': json.dumps(records), } return self._make_request('/domain/?act=modifydns', 'POST', body_params=body_params) def add_dns_record(self, domain: str, record_type: str, host: str, value: str, ttl: int = 600, level: int = 10, line: str = "") -> Dict[str, Any]: """ 添加DNS记录 Args: domain: 域名 record_type: 记录类型 (A, CNAME, MX, TXT, AAAA, SRV) host: 主机记录 value: 记录值 ttl: TTL值 (60~86400秒,默认900) level: 优先级(MX记录使用,1-100,默认10) line: 线路 (默认="", 电信="LTEL", 联通="LCNC", 移动="LMOB", 教育网="LEDU", 搜索引擎="LSEO") """ body_params = { 'act': 'adddnsrecord', 'domain': domain, 'host': host, 'type': record_type, 'value': value, 'ttl': ttl, 'level': level, 'line': line, } return self._make_request('/domain/', 'POST', body_params=body_params) def modify_dns_record(self, domain: str, value: str, ttl: int = 600, level: int = 10, record_id: Optional[str] = None, host: Optional[str] = None, record_type: Optional[str] = None, line: str = "", old_value: Optional[str] = None) -> Dict[str, Any]: """ 修改DNS记录 Args: domain: 域名 value: 新的解析值 ttl: TTL值 (60~86400秒,默认900) level: 优先级(MX记录使用,1-100,默认10) record_id: 解析记录编号(优先使用) host: 主机头(当record_id未提供时必填) record_type: 解析类型(当record_id未提供时必填) line: 线路 (默认="", 电信="LTEL", 联通="LCNC", 移动="LMOB", 教育网="LEDU", 搜索引擎="LSEO") old_value: 旧解析值(可选,用于确定唯一记录) """ body_params = { 'act': 'moddnsrecord', 'domain': domain, 'value': value, 'ttl': ttl, 'level': level, 'line': line, } # 优先使用record_id if record_id: body_params['id'] = record_id else: # 必须提供host和type if not host or not record_type: return {"status": "error", "message": "当未提供记录ID时,主机头和解析类型为必填项"} body_params['host'] = host body_params['type'] = record_type # 添加旧值(可选) if old_value: body_params['oldvalue'] = old_value return self._make_request('/domain/', 'POST', body_params=body_params) def delete_dns_record(self, domain: str, record_id: Optional[str] = None, host: Optional[str] = None, record_type: Optional[str] = None, value: Optional[str] = None, line: str = "") -> Dict[str, Any]: """ 删除DNS记录 Args: domain: 域名 record_id: 解析记录ID(优先使用) host: 主机头(当record_id未提供时必填) record_type: 解析类型(当record_id未提供时必填) value: 解析值(可选) line: 线路 (默认="", 电信="LTEL", 联通="LCNC", 移动="LMOB", 教育网="LEDU", 搜索引擎="LSEO") """ body_params = { 'act': 'deldnsrecord', 'domain': domain, 'line': line, } # 优先使用record_id if record_id: body_params['id'] = record_id else: # 必须提供host和type if not host or not record_type: return {"status": "error", "message": "当未提供记录ID时,主机头和解析类型为必填项"} body_params['host'] = host body_params['type'] = record_type if value: body_params['value'] = value return self._make_request('/domain/', 'POST', body_params=body_params) def transfer_domain(self, domain: str, auth_code: str) -> Dict[str, Any]: """ 域名转入 Args: domain: 域名 auth_code: 转移授权码 """ body_params = { 'domain': domain, 'auth_code': auth_code, } return self._make_request('/domain/?act=transfer', 'POST', body_params=body_params) def lock_domain(self, domain: str, lock: bool = True) -> Dict[str, Any]: """ 锁定/解锁域名 Args: domain: 域名 lock: 是否锁定 """ action = '/domain/?act=lock' if lock else '/domain/?act=unlock' body_params = { 'domain': domain, } return self._make_request(action, 'POST', body_params=body_params) def get_template_list(self, limit: int = 10, page: int = 1) -> Dict[str, Any]: """ 获取域名模板列表 Args: limit: 每页数量 page: 页码 """ body_params = { 'act': 'gettemplates', 'limit': limit, 'page': page, } return self._make_request('/audit/', 'POST', body_params=body_params) def get_template_detail(self, template_id: str) -> Dict[str, Any]: """ 获取指定模板详情 Args: template_id: 模板ID """ body_params = { 'act': 'auditinfo', 'c_sysid': template_id, } return self._make_request('/audit/', 'POST', body_params=body_params) def create_contact_template(self, template_data: Dict[str, Any]) -> Dict[str, Any]: """ 创建域名模板 Args: template_data: 模板数据,包含所有必要的字段 """ body_params = { 'act': 'auditsub', **template_data } # 对body_params进行GBK编码 encoded_data = urlencode(body_params, encoding='gbk').encode('gbk') return self._make_request('/audit/', 'POST', body_params=encoded_data) def get_domain_real_info(self, domain: str) -> Dict[str, Any]: """ 获取域名实名信息 Args: domain: 域名 Returns: 域名实名信息,包含所有者、管理联系人、技术联系人、缴费联系人等信息 """ body_params = { 'act': 'domaininfo', 'domain': domain, } return self._make_request('/audit/', 'POST', body_params=body_params) def get_upload_token(self, c_sysid: str, f_type_org: str, f_code_org: str, f_type_lxr: Optional[str] = None, f_code_lxr: Optional[str] = None) -> Dict[str, Any]: """ 获取实名上传token Args: c_sysid: 模板标识 f_type_org: 证件类型,详情见附录 f_code_org: 证件号码 f_type_lxr: 联系人证件类型(企业时填写) f_code_lxr: 联系人证件号码(企业时填写) Returns: API响应结果,包含上传token """ body_params = { 'act': 'uploadwcftoken', 'c_sysid': c_sysid, 'f_type_org': f_type_org, 'f_code_org': f_code_org, } # 添加可选参数 if f_type_lxr: body_params['f_type_lxr'] = f_type_lxr if f_code_lxr: body_params['f_code_lxr'] = f_code_lxr return self._make_request('/audit/', 'POST', body_params=body_params) def upload_real_name_files(self, token: str, file_org: str, file_lxr: Optional[str] = None) -> Dict[str, Any]: """ 模板实名资料上传 Args: token: 实名上传Token file_org: 图片完整的base64 file_lxr: 企业联系人图片完整的base64(只有token中设置了联系人的才上传) Returns: API响应结果 """ upload_url = "https://netservice.vhostgo.com/wcfservice/Service1.svc/Wcf_AuditUploadFile" # 构建请求数据 data = { 'token': token, 'file_org': file_org, } # 添加可选参数 if file_lxr: data['file_lxr'] = file_lxr headers = { 'Content-Type': 'application/json' } try: # 发送JSON格式的POST请求 response = requests.post(upload_url, json=data, headers=headers, timeout=30) response.raise_for_status() try: result = response.json() except json.JSONDecodeError: result = {"status": "error", "message": "无法解析API响应"} return result except requests.exceptions.RequestException as e: return {"status": "error", "message": f"API请求失败: {str(e)}"} def modify_dns_server(self, domain: str, dns1: str, dns2: str, dns3: Optional[str] = None, dns4: Optional[str] = None, dns5: Optional[str] = None, dns6: Optional[str] = None) -> Dict[str, Any]: """ 修改域名DNS服务器 Args: domain: 要修改DNS的域名 dns1: 主DNS服务器 dns2: 辅DNS服务器 dns3: 第三个DNS服务器(可选) dns4: 第四个DNS服务器(可选) dns5: 第五个DNS服务器(可选) dns6: 第六个DNS服务器(可选) Returns: API响应结果 """ body_params = { 'act': 'moddns', 'domain': domain, 'dns1': dns1, 'dns2': dns2, } # 添加可选的DNS服务器 if dns3: body_params['dns3'] = dns3 if dns4: body_params['dns4'] = dns4 if dns5: body_params['dns5'] = dns5 if dns6: body_params['dns6'] = dns6 return self._make_request('/domain/', 'POST', body_params=body_params) def format_date(date): """格式化域名到期时间""" if not date: return None if isinstance(date, str): return date return date.strftime('%Y-%m-%d') def get_west_client() -> WestDomain: """获取西部数码域名API客户端实例""" try: # 从Jcloud Settings获取配置 settings = jingrow.get_single("Jcloud Settings") if settings: username = settings.get("west_username") password = settings.get_password("west_api_password") if settings.get("west_api_password") else None if username and password: return WestDomain(username, password) else: return None else: return None except Exception as e: return None # API端点函数 @jingrow.whitelist() def check_west_balance(): """检查西部数码账户余额""" client = get_west_client() if not client: return {"status": "error", "message": "API客户端初始化失败"} return client.check_balance() @jingrow.whitelist() def check_domain(domain: str, suffix: str = '.com'): """查询域名是否可注册""" client = get_west_client() if not client: return {"status": "error", "message": "API客户端初始化失败"} if not domain: return {"status": "error", "message": "缺少域名参数"} response = client.query_domain(domain, suffix) if response.get("status") == "error": return response try: # 直接检查响应格式 if response.get("result") != 200: return {"status": "error", "message": "API查询失败"} full_domain = domain + suffix for item in response.get("data", []): if item.get("name") == full_domain: return { "available": item.get("avail", 0) == 1, "domain": full_domain, "message": "域名可用" if item.get("avail", 0) == 1 else "域名已被注册" } return {"status": "error", "message": f"未找到域名 {full_domain} 的查询结果"} except Exception as e: return {"status": "error", "message": "域名查询响应解析失败"} @jingrow.whitelist() def get_west_domain_price(domain: str, year: int = 1): """获取域名价格""" client = get_west_client() if not client: return {"status": "error", "message": "API客户端初始化失败"} if not domain: return {"status": "error", "message": "缺少域名参数"} response = client.get_domain_price(domain, year) if response.get("status") == "error": return response try: # 直接检查响应格式 if response.get("result") != 200: return {"status": "error", "message": "API查询失败"} data = response.get("data", {}) original_price = data.get("buyprice", 0) # 统一增加10%的利润,参考aliyun_server_light.py的方法 if original_price and original_price > 0: # 确保10%利润率且价格为整数 adjusted_price = int(original_price / (1 - 0.1)) else: adjusted_price = original_price return { "data": { "price": adjusted_price, "original_price": original_price, # 保留原价用于参考 "domain": domain, "year": year } } except Exception as e: return {"status": "error", "message": "域名价格查询响应解析失败"} @jingrow.whitelist() def get_west_domain_renew_price(domain: str, year: int = 1): """获取域名续费价格""" client = get_west_client() if not client: return {"status": "error", "message": "API客户端初始化失败"} if not domain: return {"status": "error", "message": "缺少域名参数"} response = client.get_domain_renew_price(domain, year) if response.get("status") == "error": return response try: # 直接检查响应格式 if response.get("result") != 200: return {"status": "error", "message": "API查询失败"} data = response.get("data", {}) original_price = data.get("price", 0) # 统一增加10%的利润,参考aliyun_server_light.py的方法 if original_price and original_price > 0: # 确保10%利润率且价格为整数 adjusted_price = int(original_price / (1 - 0.1)) else: adjusted_price = original_price return { "data": { "price": adjusted_price, "domain": domain, "year": year, "ispremium": data.get("ispremium", "n") } } except Exception as e: return {"status": "error", "message": "域名续费价格查询响应解析失败"} @jingrow.whitelist() def west_domain_register(domain: str, regyear: int = 1, dns_host1: str = "ns1.myhostadmin.net", dns_host2: str = "ns2.myhostadmin.net", c_sysid: str = None, domainpwd: str = None, dns_host3: str = None, dns_host4: str = None, dns_host5: str = None, dns_host6: str = None, client_price: str = None, premium: str = None, domchannel: str = None, westusechn: str = None): """注册域名""" client = get_west_client() if not client: return {"status": "error", "message": "API客户端初始化失败"} return client.register_domain( domain=domain, regyear=regyear, dns_host1=dns_host1, dns_host2=dns_host2, c_sysid=c_sysid, domainpwd=domainpwd, dns_host3=dns_host3, dns_host4=dns_host4, dns_host5=dns_host5, dns_host6=dns_host6, client_price=client_price, premium=premium, domchannel=domchannel, westusechn=westusechn ) @jingrow.whitelist() def west_domain_renew(**data): """域名续费""" client = get_west_client() if not client: return {"status": "error", "message": "API客户端初始化失败"} domain = data.get('domain') year = data.get('year', 1) expire_date = data.get('expire_date') client_price = data.get('client_price') if not domain: return {"status": "error", "message": "缺少域名参数"} return client.renew_domain(domain, year, expire_date, client_price) @jingrow.whitelist() def west_domain_get_list(**data): """获取域名列表""" client = get_west_client() if not client: return {"status": "error", "message": "API客户端初始化失败"} limit = data.get('limit', 10) page = data.get('page', 1) return client.get_domain_list(limit, page) @jingrow.whitelist() def west_domain_get_info(**data): """获取域名信息""" client = get_west_client() if not client: return {"status": "error", "message": "API客户端初始化失败"} domain = data.get('domain') if not domain: return {"status": "error", "message": "缺少域名参数"} return client.get_domain_info(domain) @jingrow.whitelist() def west_domain_get_dns(**data): """获取域名DNS记录""" client = get_west_client() if not client: return {"status": "error", "message": "API客户端初始化失败"} domain = data.get('domain') if not domain: return {"status": "error", "message": "缺少域名参数"} return client.get_dns_records(domain) @jingrow.whitelist() def get_west_domain_dns_records(**data): """获取域名解析记录(支持分页)""" client = get_west_client() if not client: return {"status": "error", "message": "API客户端初始化失败"} domain = data.get('domain') limit = data.get('limit', 20) pageno = data.get('pageno', 1) if not domain: return {"status": "error", "message": "缺少域名参数"} response = client.get_dns_records_paginated(domain, limit, pageno) if response.get("status") == "error": return response try: # 检查响应格式 if response.get("result") != 200: return {"status": "error", "message": "API查询失败"} data = response.get("data", {}) # 返回格式化的解析记录信息 return { "status": "success", "data": { "pageno": data.get("pageno", 1), "limit": data.get("limit", 20), "total": data.get("total", 0), "pagecount": data.get("pagecount", 0), "items": data.get("items", []) } } except Exception as e: return {"status": "error", "message": "域名解析记录查询响应解析失败"} @jingrow.whitelist() def west_domain_modify_dns(**data): """修改域名DNS记录(批量修改,兼容旧版本)""" client = get_west_client() if not client: return {"status": "error", "message": "API客户端初始化失败"} domain = data.get('domain') records = data.get('records') if not domain: return {"status": "error", "message": "缺少域名参数"} if not records: return {"status": "error", "message": "缺少DNS记录参数"} # 逐个修改记录 results = [] for record in records: record_type = record.get('type') host = record.get('host') value = record.get('value') ttl = record.get('ttl', 600) level = record.get('level', 10) line = record.get('line', '') record_id = record.get('record_id') old_value = record.get('old_value') # 验证必要参数 if not value: results.append({"status": "error", "message": "缺少解析值"}) continue # 如果没有提供record_id,则必须提供host和record_type if not record_id and (not host or not record_type): results.append({"status": "error", "message": "当未提供记录ID时,主机头和解析类型为必填项"}) continue # 验证TTL值 if ttl < 60 or ttl > 86400: results.append({"status": "error", "message": "TTL值必须在60~86400秒之间"}) continue # 验证优先级 if level < 1 or level > 100: results.append({"status": "error", "message": "优先级必须在1~100之间"}) continue # 验证记录类型 if record_type: valid_types = ['A', 'CNAME', 'MX', 'TXT', 'AAAA', 'SRV'] if record_type not in valid_types: results.append({"status": "error", "message": f"不支持的记录类型: {record_type}"}) continue # 调用单个记录修改API response = client.modify_dns_record( domain, value, ttl, level, record_id, host, record_type, line, old_value ) if response.get("status") == "error": results.append(response) elif response.get("result") != 200: error_msg = response.get('msg', response.get('message', '未知错误')) results.append({"status": "error", "message": f"修改DNS记录失败: {error_msg}"}) else: results.append({ "status": "success", "message": "DNS记录修改成功", "data": { "domain": domain, "value": value, "ttl": ttl, "level": level, "record_id": record_id, "host": host, "record_type": record_type, "line": line } }) # 返回批量操作结果 success_count = sum(1 for r in results if r.get("status") == "success") error_count = len(results) - success_count return { "status": "success" if error_count == 0 else "partial_success", "message": f"批量修改完成,成功: {success_count},失败: {error_count}", "results": results } @jingrow.whitelist() def west_domain_add_dns_record(**data): """添加DNS记录""" client = get_west_client() if not client: return {"status": "error", "message": "API客户端初始化失败"} domain = data.get('domain') record_type = data.get('record_type') host = data.get('host') value = data.get('value') ttl = data.get('ttl', 600) level = data.get('level', 10) line = data.get('line', '') if not all([domain, record_type, host, value]): return {"status": "error", "message": "缺少必要参数"} # 验证记录类型 valid_types = ['A', 'CNAME', 'MX', 'TXT', 'AAAA', 'SRV'] if record_type not in valid_types: return {"status": "error", "message": f"不支持的记录类型: {record_type},支持的类型: {', '.join(valid_types)}"} # 验证TTL值 if ttl < 60 or ttl > 86400: return {"status": "error", "message": "TTL值必须在60~86400秒之间"} # 验证优先级 if level < 1 or level > 100: return {"status": "error", "message": "优先级必须在1~100之间"} response = client.add_dns_record(domain, record_type, host, value, ttl, level, line) if response.get("status") == "error": return response try: # 检查响应格式 if response.get("result") != 200: error_msg = response.get('msg', response.get('message', '未知错误')) return {"status": "error", "message": f"添加DNS记录失败: {error_msg}"} # 返回成功结果 return { "status": "success", "message": "DNS记录添加成功", "data": { "id": response.get("data", {}).get("id"), "domain": domain, "host": host, "type": record_type, "value": value, "ttl": ttl, "level": level, "line": line } } except Exception as e: return {"status": "error", "message": "添加DNS记录响应解析失败"} @jingrow.whitelist() def west_domain_delete_dns_record(**data): """删除DNS记录""" client = get_west_client() if not client: return {"status": "error", "message": "API客户端初始化失败"} domain = data.get('domain') record_id = data.get('record_id') host = data.get('host') record_type = data.get('record_type') value = data.get('value') line = data.get('line', '') if not domain: return {"status": "error", "message": "缺少域名参数"} # 如果没有提供record_id,则必须提供host和record_type if not record_id and (not host or not record_type): return {"status": "error", "message": "当未提供记录ID时,主机头和解析类型为必填项"} # 验证记录类型 if record_type: valid_types = ['A', 'CNAME', 'MX', 'TXT', 'AAAA', 'SRV'] if record_type not in valid_types: return {"status": "error", "message": f"不支持的记录类型: {record_type},支持的类型: {', '.join(valid_types)}"} response = client.delete_dns_record(domain, record_id, host, record_type, value, line) if response.get("status") == "error": return response try: # 检查响应格式 if response.get("result") != 200: error_msg = response.get('msg', response.get('message', '未知错误')) return {"status": "error", "message": f"删除DNS记录失败: {error_msg}"} # 返回成功结果 return { "status": "success", "message": "DNS记录删除成功", "data": { "domain": domain, "record_id": record_id, "host": host, "record_type": record_type, "value": value, "line": line } } except Exception as e: return {"status": "error", "message": "删除DNS记录响应解析失败"} @jingrow.whitelist() def west_domain_transfer(**data): """域名转入""" client = get_west_client() if not client: return {"status": "error", "message": "API客户端初始化失败"} domain = data.get('domain') auth_code = data.get('auth_code') if not domain: return {"status": "error", "message": "缺少域名参数"} if not auth_code: return {"status": "error", "message": "缺少转移授权码参数"} return client.transfer_domain(domain, auth_code) @jingrow.whitelist() def west_domain_lock(**data): """锁定域名""" client = get_west_client() if not client: return {"status": "error", "message": "API客户端初始化失败"} domain = data.get('domain') lock = data.get('lock', True) if not domain: return {"status": "error", "message": "缺少域名参数"} return client.lock_domain(domain, lock) @jingrow.whitelist() def west_domain_get_templates(**data): """获取域名模板列表""" client = get_west_client() if not client: return {"status": "error", "message": "API客户端初始化失败"} limit = data.get('limit', 10) page = data.get('page', 1) return client.get_template_list(limit, page) @jingrow.whitelist() def get_west_template_detail(**data): """获取指定模板详情""" client = get_west_client() if not client: return {"status": "error", "message": "API客户端初始化失败"} template_id = data.get('template_id') if not template_id: return {"status": "error", "message": "缺少模板ID参数"} return client.get_template_detail(template_id) @jingrow.whitelist() def create_domain_order(domain, period=1, payment_method='balance', domain_owner=None): """创建域名注册订单""" try: # 获取当前用户团队 team = get_current_team(True) # 验证域名格式 if not domain or '.' not in domain: return {"success": False, "message": "域名格式不正确"} # 验证域名所有者 if not domain_owner: return {"success": False, "message": "请选择域名所有者"} # 检查域名所有者是否存在 owner_exists = jingrow.get_all( "Domain Owner", {"name": domain_owner, "team": team.name}, ["name"] ) if not owner_exists: return {"success": False, "message": "域名所有者不存在"} # 查询域名价格 client = get_west_client() if not client: return {"success": False, "message": "API客户端初始化失败"} # 获取域名价格 - 使用统一的 get_west_domain_price 函数 price_result = get_west_domain_price(domain, 1) if price_result.get("status") == "error": return {"success": False, "message": "获取域名价格失败"} # 计算总价格 - 使用与前端一致的价格字段 yearly_price = price_result.get("data", {}).get("price", 0) # 使用前端一致的价格字段 total_amount = yearly_price * period # 生成订单号 order_id = f"{datetime.now().strftime('%Y%m%d%H%M%S%f')[:-3] + ''.join(random.choices('0123456789', k=6))}" # 获取域名所有者的c_sysid domain_owner_pg = jingrow.get_pg("Domain Owner", domain_owner) if not domain_owner_pg: return {"success": False, "message": "域名所有者不存在"} c_sysid = domain_owner_pg.c_sysid if not c_sysid: return {"success": False, "message": "域名所有者缺少系统ID,请重新创建域名所有者"} # 构建业务参数 biz_params = { "domain": domain, "period": period, "domain_owner": domain_owner, "yearly_price": yearly_price, "auto_renew": False, "whois_protection": True, # 注册域名所需参数 "regyear": period, "dns_host1": "ns1.myhostadmin.net", "dns_host2": "ns2.myhostadmin.net", "c_sysid": c_sysid, "client_price": None } # 创建订单记录 order = jingrow.get_pg({ "pagetype": "Order", "order_id": order_id, "order_type": "域名注册", "team": team.name, "status": "待支付", "total_amount": total_amount, "title": domain, "description": f"{period}年", "biz_params": json.dumps(biz_params, ensure_ascii=False) }) order.insert(ignore_permissions=True) jingrow.db.commit() return { "success": True, "message": "订单创建成功", "order": order.as_dict() } except Exception as e: return {"success": False, "message": f"创建订单失败: {str(e)}"} @jingrow.whitelist() def create_domain_renew_order(**kwargs): """创建域名续费订单""" try: domain = kwargs.get('domain') renewal_years = kwargs.get('renewal_years', 1) if not domain: jingrow.throw("缺少域名信息") # 验证输入 domain_pg = jingrow.get_pg("Jsite Domain", domain) if not domain_pg: jingrow.throw("域名不存在") team = domain_pg.team # 验证当前用户权限 current_team = get_current_team(True) if current_team.name != team: jingrow.throw("您没有权限为此域名创建续费订单") # 计算续费金额 - 使用现有的价格获取函数保持统一 renewal_years = int(renewal_years) # 调用现有的价格获取函数 price_data = get_west_domain_renew_price(domain_pg.domain, 1) if price_data.get("data", {}).get("price"): yearly_price = price_data["data"]["price"] else: yearly_price = domain_pg.price or 0 total_amount = yearly_price * renewal_years # 生成唯一订单号 order_id = f"{datetime.now().strftime('%Y%m%d%H%M%S%f')[:-3] + ''.join(random.choices('0123456789', k=6))}" # 构建业务参数 biz_params = { "domain": domain_pg.domain, "domain_name": domain, "renewal_years": renewal_years, "yearly_price": yearly_price, "domain_owner": domain_pg.domain_owner, # 续费域名所需参数 "year": renewal_years, "expire_date": format_date(domain_pg.end_date), "client_price": None } # 创建订单记录 order = jingrow.get_pg({ "pagetype": "Order", "order_id": order_id, "order_type": "域名续费", "team": team, "status": "待支付", "total_amount": total_amount, "title": domain_pg.domain, "description": str(renewal_years), # 存储续费年数 "biz_params": json.dumps(biz_params, ensure_ascii=False) }) order.insert(ignore_permissions=True) jingrow.db.commit() return { "success": True, "order": order.as_dict() } except Exception as e: return { "success": False, "message": f"创建续费订单失败: {str(e)}" } def register_domain_from_order(order_name): """支付成功后异步注册域名""" try: order = jingrow.get_pg("Order", order_name) if not order: raise Exception("订单不存在") # 从biz_params中读取业务参数 biz_params = json.loads(order.biz_params) if order.biz_params else {} domain_name = biz_params.get("domain") period = biz_params.get("period", 1) domain_owner = biz_params.get("domain_owner") yearly_price = biz_params.get("yearly_price", 0) auto_renew = biz_params.get("auto_renew", False) whois_protection = biz_params.get("whois_protection", True) if not domain_name: raise Exception("订单中缺少域名信息") # 调用西部数码API注册域名 client = get_west_client() if not client: raise Exception("API客户端初始化失败") result = client.register_domain( domain=domain_name, regyear=biz_params.get("regyear", period), dns_host1=biz_params.get("dns_host1", "ns1.myhostadmin.net"), dns_host2=biz_params.get("dns_host2", "ns2.myhostadmin.net"), c_sysid=biz_params.get("c_sysid"), client_price=biz_params.get("client_price") ) if not result or result.get('result') != 200: error_msg = result.get('msg', result.get('message', '未知错误')) raise Exception(f"域名注册失败: {error_msg}") # 创建域名记录 domain_pg = jingrow.get_pg({ "pagetype": "Jsite Domain", "domain": domain_name, "team": order.team, "order_id": order.order_id, "status": "Active", "price": yearly_price, "period": period, "domain_owner": domain_owner, "auto_renew": auto_renew, "whois_protection": whois_protection, "registration_date": jingrow.utils.nowdate(), "end_date": jingrow.utils.add_months(jingrow.utils.nowdate(), period * 12) }) domain_pg.insert(ignore_permissions=True) # 更新订单状态 order.status = "交易成功" order.save(ignore_permissions=True) jingrow.db.commit() return True except Exception as e: raise e def renew_domain_from_order(order_name): """支付成功后异步续费域名""" try: order = jingrow.get_pg("Order", order_name) if not order: raise Exception("订单不存在") # 从biz_params中读取业务参数 biz_params = json.loads(order.biz_params) if order.biz_params else {} domain_name = biz_params.get("domain") renewal_years = biz_params.get("renewal_years", 1) domain_record_name = biz_params.get("domain_name") # 域名记录的名称 if not domain_name: raise Exception("订单中缺少域名信息") # 查找域名记录 domain = jingrow.get_pg("Jsite Domain", domain_record_name) if not domain: raise Exception("找不到对应的域名记录") # 调用西部数码API续费域名 client = get_west_client() if not client: raise Exception("API客户端初始化失败") # 验证域名到期时间 if not biz_params.get("expire_date"): raise Exception("无法获取域名到期时间") result = client.renew_domain( domain_name, biz_params.get("year", renewal_years), biz_params.get("expire_date"), biz_params.get("client_price") ) if not result or result.get('result') != 200: error_msg = result.get('msg', result.get('message', '未知错误')) raise Exception(f"域名续费失败: {error_msg}") # 更新域名到期时间 domain.end_date = jingrow.utils.add_months(domain.end_date or jingrow.utils.nowdate(), renewal_years * 12) domain.save(ignore_permissions=True) # 更新订单状态 order.status = "交易成功" order.save(ignore_permissions=True) jingrow.db.commit() return True except Exception as e: raise e @jingrow.whitelist() def toggle_domain_auto_renew(pagetype, name, auto_renew): """切换域名自动续费状态""" try: # 获取当前用户团队 team = get_current_team(True) # 获取域名记录 domain = jingrow.get_pg(pagetype, name) if not domain: return {"success": False, "message": "找不到域名记录"} # 验证权限 if domain.team != team.name: return {"success": False, "message": "您没有权限操作此域名"} # 更新自动续费状态 domain.auto_renew = bool(auto_renew) domain.save(ignore_permissions=True) return {"success": True, "message": "自动续费状态更新成功"} except Exception as e: return {"success": False, "message": f"操作失败: {str(e)}"} @jingrow.whitelist() def toggle_domain_whois_protection(pagetype, name, whois_protection): """切换域名隐私保护状态""" try: # 获取当前用户团队 team = get_current_team(True) # 获取域名记录 domain = jingrow.get_pg(pagetype, name) if not domain: return {"success": False, "message": "找不到域名记录"} # 验证权限 if domain.team != team.name: return {"success": False, "message": "您没有权限操作此域名"} # 更新隐私保护状态 domain.whois_protection = bool(whois_protection) domain.save(ignore_permissions=True) return {"success": True, "message": "隐私保护状态更新成功"} except Exception as e: return {"success": False, "message": f"操作失败: {str(e)}"} @jingrow.whitelist() def delete_domain(pagetype, name): """删除域名记录""" try: # 获取当前用户团队 team = get_current_team(True) # 获取域名记录 domain = jingrow.get_pg(pagetype, name) if not domain: return {"success": False, "message": "找不到域名记录"} # 验证权限 if domain.team != team.name: return {"success": False, "message": "您没有权限操作此域名"} # 删除域名记录 domain.delete(ignore_permissions=True) return {"success": True, "message": "域名记录删除成功"} except Exception as e: return {"success": False, "message": f"删除失败: {str(e)}"} @jingrow.whitelist() def get_domain_owners(): """获取当前团队的域名所有者列表""" try: team = get_current_team() if not team: return {"status": "Error", "message": "未找到当前团队"} # 获取当前团队的所有域名所有者 domain_owners = jingrow.get_all( "Domain Owner", {"team": team}, ["name", "title", "fullname", "c_regtype", "c_org_m", "c_ln_m", "c_fn_m"] ) return { "status": "Success", "data": domain_owners } except Exception as e: return {"status": "Error", "message": f"获取域名所有者列表失败: {str(e)}"} @jingrow.whitelist() def get_domain_owner(name): """获取单个域名所有者信息""" try: if not name: return {"status": "Error", "message": "域名所有者名称不能为空"} # 获取指定的域名所有者 domain_owner = jingrow.get_pg("Domain Owner", name) if not domain_owner: return {"status": "Error", "message": "未找到指定的域名所有者"} # 检查权限(只能查看当前团队的所有者) team = get_current_team() if not team or domain_owner.team != team: return {"status": "Error", "message": "无权访问该域名所有者信息"} # 返回所有者信息 owner_data = { "name": domain_owner.name, "r_status": domain_owner.r_status, "title": domain_owner.title, "fullname": domain_owner.fullname, "c_regtype": domain_owner.c_regtype, "c_org_m": domain_owner.c_org_m, "c_ln_m": domain_owner.c_ln_m, "c_fn_m": domain_owner.c_fn_m, "c_em": domain_owner.c_em, "c_ph": domain_owner.c_ph, "c_st_m": domain_owner.c_st_m, "c_ct_m": domain_owner.c_ct_m, "c_adr_m": domain_owner.c_adr_m, "c_pc": domain_owner.c_pc } return { "status": "Success", "data": owner_data } except Exception as e: return {"status": "Error", "message": f"获取域名所有者信息失败: {str(e)}"} @jingrow.whitelist() def create_domain_template(**data): """创建域名模板""" client = get_west_client() if not client: return {"status": "error", "message": "API客户端初始化失败"} # 验证必填字段 required_fields = ['c_regtype', 'c_ln_m', 'c_fn_m', 'c_st_m', 'c_ct_m', 'c_adr_m', 'c_pc', 'c_em', 'c_idtype_gswl', 'c_idnum_gswl'] for field in required_fields: if not data.get(field): return {"status": "error", "message": f"字段 {field} 是必填项"} # 验证电话号码字段(根据c_ph_type判断) c_ph_type = data.get('c_ph_type', '0') # 默认为手机 if c_ph_type == '0': # 手机 if not data.get('c_ph'): return {"status": "error", "message": "手机号码 c_ph 是必填项"} elif c_ph_type == '1': # 座机 if not data.get('c_ph_code'): return {"status": "error", "message": "座机区号 c_ph_code 是必填项"} if not data.get('c_ph_num'): return {"status": "error", "message": "座机号码 c_ph_num 是必填项"} # 验证字段长度 if len(data.get('c_ln_m', '')) < 1 or len(data.get('c_ln_m', '')) > 16: return {"status": "error", "message": "姓(中文)长度必须为1~16位"} if len(data.get('c_fn_m', '')) < 1 or len(data.get('c_fn_m', '')) > 64: return {"status": "error", "message": "名(中文)长度必须为1~64位"} if len(data.get('c_st_m', '')) < 2 or len(data.get('c_st_m', '')) > 10: return {"status": "error", "message": "省份(中文)长度必须为2~10位"} # 验证完整姓名长度 fullname = data.get('c_ln_m', '') + data.get('c_fn_m', '') if len(fullname) < 2 or len(fullname) > 64: return {"status": "error", "message": "完整姓名(中文)长度必须为2~64位"} # 生成英文姓名 if data.get('c_ln_m') and data.get('c_fn_m'): last_name_pinyin = ' '.join(lazy_pinyin(data['c_ln_m'])) first_name_pinyin = ' '.join(lazy_pinyin(data['c_fn_m'])) data['c_ln'] = last_name_pinyin.title() data['c_fn'] = first_name_pinyin.title() else: # 设置默认的英文姓名 data['c_ln'] = data.get('c_ln', '') data['c_fn'] = data.get('c_fn', '') # 生成英文地址 if data.get('c_adr_m'): address_pinyin = ' '.join(lazy_pinyin(data['c_adr_m'])) data['c_adr'] = address_pinyin.title() else: data['c_adr'] = data.get('c_adr', '') # 生成英文省份和城市 if data.get('c_st_m'): state_pinyin = ' '.join(lazy_pinyin(data['c_st_m'])) data['c_st'] = state_pinyin.title() else: data['c_st'] = data.get('c_st', '') if data.get('c_ct_m'): city_pinyin = ' '.join(lazy_pinyin(data['c_ct_m'])) data['c_ct'] = city_pinyin.title() else: data['c_ct'] = data.get('c_ct', '') # 生成英文单位名称 if data.get('c_org_m'): org_pinyin = ' '.join(lazy_pinyin(data['c_org_m'])) data['c_org'] = org_pinyin.title() else: data['c_org'] = data.get('c_org', '') # 设置默认值 template_data = { 'c_regtype': data['c_regtype'], 'c_ln_m': data['c_ln_m'], 'c_fn_m': data['c_fn_m'], 'c_co': data.get('c_co', 'CN'), 'cocode': data.get('cocode', '+86'), 'c_st_m': data['c_st_m'], 'c_ct_m': data['c_ct_m'], 'c_dt_m': data.get('c_dt_m', ''), 'c_adr_m': data['c_adr_m'], 'c_pc': data['c_pc'], 'c_ph_type': data.get('c_ph_type', '0'), 'c_em': data['c_em'], 'c_ln': data.get('c_ln', ''), # 使用生成的英文姓,如果没有则为空 'c_fn': data.get('c_fn', ''), # 使用生成的英文名,如果没有则为空 'c_st': data.get('c_st', ''), # 使用生成的英文省份,如果没有则为空 'c_ct': data.get('c_ct', ''), # 使用生成的英文城市,如果没有则为空 'c_adr': data.get('c_adr', ''), # 使用生成的英文地址,如果没有则为空 'c_idtype_gswl': data['c_idtype_gswl'], 'c_idnum_gswl': data['c_idnum_gswl'], 'fullname': data.get('c_ln_m', '') + data.get('c_fn_m', '') # 完整姓名 } # 个人类型时不传入c_org_m参数 if data['c_regtype'] == 'E' and data.get('c_org_m'): template_data['c_org_m'] = data['c_org_m'] template_data['c_org'] = data.get('c_org', '') # 英文单位名称 # 验证企业类型必须填写单位名称 if data['c_regtype'] == 'E' and not data.get('c_org_m'): return {"status": "error", "message": "企业类型必须填写单位名称"} # 添加区县信息 if data.get('c_dt_m'): template_data['c_dt_m'] = data['c_dt_m'] # 添加香港域名相关字段 if data.get('c_idtype_hk'): template_data['c_idtype_hk'] = data['c_idtype_hk'] if data.get('c_idnum_hk'): template_data['c_idnum_hk'] = data['c_idnum_hk'] # 根据c_ph_type添加电话相关字段 if data.get('c_ph_type') == '1': # 座机 if data.get('c_ph_code'): template_data['c_ph_code'] = data['c_ph_code'] if data.get('c_ph_num'): template_data['c_ph_num'] = data['c_ph_num'] # 构建完整的电话号码格式 if data.get('c_ph_code') and data.get('c_ph_num'): template_data['c_ph_all'] = f"{data.get('cocode', '+86')}.{data['c_ph_code']}-{data['c_ph_num']}" else: # 手机 if data.get('c_ph'): template_data['c_ph'] = data['c_ph'] try: result = client.create_contact_template(template_data) if result.get('result') == 200: c_sysid = result.get('data', {}).get('c_sysid') return { "status": "success", "message": "域名模板创建成功", "data": { "c_sysid": c_sysid } } else: error_msg = result.get('msg', result.get('message', '未知错误')) return {"status": "error", "message": f"创建域名模板失败: {error_msg}"} except Exception as e: return {"status": "error", "message": f"创建域名模板失败: {str(e)}"} @jingrow.whitelist() def create_domain_owner(**data): """创建新的域名所有者""" try: team = get_current_team() if not team: return {"status": "Error", "message": "未找到当前团队"} # 验证基本必填字段 basic_required_fields = ['c_regtype', 'c_ln_m', 'c_fn_m', 'c_st_m', 'c_ct_m', 'c_adr_m', 'c_pc', 'c_em'] for field in basic_required_fields: if not data.get(field): return {"status": "Error", "message": f"字段 {field} 是必填项"} # 验证电话号码字段(根据c_ph_type判断) c_ph_type = data.get('c_ph_type', '0') # 默认为手机 if c_ph_type == '0': # 手机 if not data.get('c_ph'): return {"status": "Error", "message": "手机号码 c_ph 是必填项"} elif c_ph_type == '1': # 座机 if not data.get('c_ph_code'): return {"status": "Error", "message": "座机区号 c_ph_code 是必填项"} if not data.get('c_ph_num'): return {"status": "Error", "message": "座机号码 c_ph_num 是必填项"} # 生成英文姓名 if data.get('c_ln_m') and data.get('c_fn_m'): last_name_pinyin = ' '.join(lazy_pinyin(data['c_ln_m'])) first_name_pinyin = ' '.join(lazy_pinyin(data['c_fn_m'])) data['c_ln'] = last_name_pinyin.title() data['c_fn'] = first_name_pinyin.title() else: # 设置默认的英文姓名 data['c_ln'] = data.get('c_ln', '') data['c_fn'] = data.get('c_fn', '') # 生成英文地址 if data.get('c_adr_m'): address_pinyin = ' '.join(lazy_pinyin(data['c_adr_m'])) data['c_adr'] = address_pinyin.title() else: data['c_adr'] = data.get('c_adr', '') # 生成英文省份和城市 if data.get('c_st_m'): state_pinyin = ' '.join(lazy_pinyin(data['c_st_m'])) data['c_st'] = state_pinyin.title() else: data['c_st'] = data.get('c_st', '') if data.get('c_ct_m'): city_pinyin = ' '.join(lazy_pinyin(data['c_ct_m'])) data['c_ct'] = city_pinyin.title() else: data['c_ct'] = data.get('c_ct', '') # 生成英文单位名称 if data.get('c_org_m'): org_pinyin = ' '.join(lazy_pinyin(data['c_org_m'])) data['c_org'] = org_pinyin.title() else: data['c_org'] = data.get('c_org', '') # 设置默认值 data['team'] = team data['c_co'] = data.get('c_co', 'CN') # 中国 data['cocode'] = data.get('cocode', '+86') data['c_ph_type'] = data.get('c_ph_type', '0') # 手机 data['reg_contact_type'] = data.get('reg_contact_type', 'cg') # 常规 # 确保c_sysid字段存在(如果提供) if data.get('c_sysid'): data['c_sysid'] = data['c_sysid'] # 生成完整姓名和标题 data['fullname'] = data.get('c_ln_m', '') + data.get('c_fn_m', '') if data['c_regtype'] == 'I': # 个人 data['title'] = f"{data['c_ln_m']}{data['c_fn_m']}" else: # 企业 data['title'] = f"{data['c_org_m']}" # 创建域名所有者记录 domain_owner = jingrow.get_pg({ "pagetype": "Domain Owner", **data }) domain_owner.insert(ignore_permissions=True) return { "status": "Success", "message": "域名所有者创建成功", "data": { "name": domain_owner.name, "title": domain_owner.title } } except Exception as e: return {"status": "Error", "message": f"创建域名所有者失败: {str(e)}"} @jingrow.whitelist() def create_domain_owner_with_template(**data): """创建域名所有者(包含模板创建)""" try: # 第一步:创建域名模板 template_result = create_domain_template(**data) if template_result.get("status") != "success": return template_result # 获取模板ID c_sysid = template_result.get("data", {}).get("c_sysid") if not c_sysid: return {"status": "Error", "message": "创建域名模板成功但未返回模板ID"} # 第二步:创建域名所有者记录 data['c_sysid'] = c_sysid owner_result = create_domain_owner(**data) if owner_result.get("status") != "Success": return owner_result # 成功完成 result = { "status": "Success", "message": "域名所有者创建成功", "data": { "c_sysid": c_sysid, "owner_name": owner_result.get("data", {}).get("name"), "owner_title": owner_result.get("data", {}).get("title") } } return result except Exception as e: return {"status": "Error", "message": f"创建域名所有者失败: {str(e)}"} @jingrow.whitelist() def test_create_domain_owner_with_template(): """测试创建域名所有者(包含模板创建)的API端点""" try: # 硬编码的测试参数 test_data = { 'c_regtype': 'I', 'c_ln_m': '李', 'c_fn_m': '长生', 'c_co': 'CN', 'cocode': '+86', 'c_st_m': '广东省', 'c_ct_m': '广州市', 'c_dt_m': '花都区', 'c_adr_m': '狮岭镇龙头市场2栋', 'c_pc': '510000', 'c_ph_type': '0', 'c_ph': '13926598569', 'c_em': '13926598569@139.com', 'c_idtype_gswl': 'SFZ', 'c_idnum_gswl': '430524198506259568' } # 调用create_domain_owner_with_template函数 result = create_domain_template(**test_data) return result except Exception as e: return {"status": "Error", "message": f"测试API调用失败: {str(e)}"} @jingrow.whitelist() def get_west_domain_real_info(**data): """获取域名实名信息""" client = get_west_client() if not client: return {"status": "error", "message": "API客户端初始化失败"} domain = data.get('domain') if not domain: return {"status": "error", "message": "缺少域名参数"} response = client.get_domain_real_info(domain) if response.get("status") == "error": return response try: # 检查响应格式 if response.get("result") != 200: return {"status": "error", "message": "API查询失败"} data = response.get("data", {}) # 返回格式化的实名信息 return { "status": "success", "data": { "domain": data.get("domain"), "c_sysid": data.get("c_sysid"), "c_regtype": data.get("c_regtype"), "c_status": data.get("c_status"), "c_failinfo": data.get("c_failinfo"), "status": data.get("status"), "regdate": data.get("regdate"), "rexpiredate": data.get("rexpiredate"), "dns_hosts": { "dns_host1": data.get("dns_host1"), "dns_host2": data.get("dns_host2"), "dns_host3": data.get("dns_host3"), "dns_host4": data.get("dns_host4"), "dns_host5": data.get("dns_host5"), "dns_host6": data.get("dns_host6") }, "owner": { "dom_ln": data.get("dom_ln"), "dom_fn": data.get("dom_fn"), "dom_ln_m": data.get("dom_ln_m"), "dom_fn_m": data.get("dom_fn_m"), "dom_org": data.get("dom_org"), "dom_org_m": data.get("dom_org_m"), "dom_co": data.get("dom_co"), "dom_st": data.get("dom_st"), "dom_st_m": data.get("dom_st_m"), "dom_ct": data.get("dom_ct"), "dom_ct_m": data.get("dom_ct_m"), "dom_adr1": data.get("dom_adr1"), "dom_adr_m": data.get("dom_adr_m"), "dom_pc": data.get("dom_pc"), "dom_ph": data.get("dom_ph"), "dom_fax": data.get("dom_fax"), "dom_em": data.get("dom_em") }, "admin": { "admi_ln": data.get("admi_ln"), "admi_fn": data.get("admi_fn"), "admi_ln_m": data.get("admi_ln_m"), "admi_fn_m": data.get("admi_fn_m"), "admi_co": data.get("admi_co"), "admi_st": data.get("admi_st"), "admi_st_m": data.get("admi_st_m"), "admi_ct": data.get("admi_ct"), "admi_ct_m": data.get("admi_ct_m"), "admi_adr1": data.get("admi_adr1"), "admi_adr_m": data.get("admi_adr_m"), "admi_pc": data.get("admi_pc"), "admi_ph": data.get("admi_ph"), "admi_fax": data.get("admi_fax"), "admi_em": data.get("admi_em") }, "tech": { "tech_ln": data.get("tech_ln"), "tech_fn": data.get("tech_fn"), "tech_ln_m": data.get("tech_ln_m"), "tech_fn_m": data.get("tech_fn_m"), "tech_co": data.get("tech_co"), "tech_st": data.get("tech_st"), "tech_st_m": data.get("tech_st_m"), "tech_ct": data.get("tech_ct"), "tech_ct_m": data.get("tech_ct_m"), "tech_adr1": data.get("tech_adr1"), "tech_adr_m": data.get("tech_adr_m"), "tech_pc": data.get("tech_pc"), "tech_ph": data.get("tech_ph"), "tech_fax": data.get("tech_fax"), "tech_em": data.get("tech_em") }, "billing": { "bill_ln": data.get("bill_ln"), "bill_fn": data.get("bill_fn"), "bill_ln_m": data.get("bill_ln_m"), "bill_fn_m": data.get("bill_fn_m"), "bill_co": data.get("bill_co"), "bill_st": data.get("bill_st"), "bill_st_m": data.get("bill_st_m"), "bill_ct": data.get("bill_ct"), "bill_ct_m": data.get("bill_ct_m"), "bill_adr1": data.get("bill_adr1"), "bill_adr_m": data.get("bill_adr_m"), "bill_pc": data.get("bill_pc"), "bill_ph": data.get("bill_ph"), "bill_fax": data.get("bill_fax"), "bill_em": data.get("bill_em") }, "real_name_status": { "r_status": data.get("r_status"), "r_failinfo": data.get("r_failinfo"), "c_ispublic": data.get("c_ispublic") }, "orgfile": data.get("orgfile", {}) } } except Exception as e: return {"status": "error", "message": "域名实名信息查询响应解析失败"} @jingrow.whitelist() def get_west_upload_token(**data): """获取西部数码实名上传token""" client = get_west_client() if not client: return {"status": "error", "message": "API客户端初始化失败"} c_sysid = data.get('c_sysid') f_type_org = data.get('f_type_org') f_code_org = data.get('f_code_org') f_type_lxr = data.get('f_type_lxr') f_code_lxr = data.get('f_code_lxr') if not all([c_sysid, f_type_org, f_code_org]): return {"status": "error", "message": "缺少必要参数:c_sysid, f_type_org, f_code_org"} response = client.get_upload_token(c_sysid, f_type_org, f_code_org, f_type_lxr, f_code_lxr) if response.get("status") == "error": return response try: # 检查响应格式 if response.get("result") != 200: return {"status": "error", "message": "API查询失败"} return { "status": "success", "data": { "token": response.get("data"), "clientid": response.get("clientid") } } except Exception as e: return {"status": "error", "message": "获取实名上传token响应解析失败"} @jingrow.whitelist() def west_upload_real_name_files(**data): """西部数码模板实名资料上传""" client = get_west_client() if not client: return {"status": "error", "message": "API客户端初始化失败"} token = data.get('token') file_org = data.get('file_org') file_lxr = data.get('file_lxr') if not all([token, file_org]): return {"status": "error", "message": "缺少必要参数:token, file_org"} response = client.upload_real_name_files(token, file_org, file_lxr) if response.get("status") == "error": return response try: # 解析响应格式 d = response.get("d", {}) result = d.get("Result") msg = d.get("Msg", "") if result == 200: return { "status": "success", "message": "实名资料上传成功", "data": { "result": result, "msg": msg } } else: return { "status": "error", "message": f"实名资料上传失败: {msg}" } except Exception as e: return {"status": "error", "message": "实名资料上传响应解析失败"} @jingrow.whitelist() def west_domain_modify_dns_record(**data): """修改DNS记录""" client = get_west_client() if not client: return {"status": "error", "message": "API客户端初始化失败"} domain = data.get('domain') value = data.get('value') ttl = data.get('ttl', 600) level = data.get('level', 10) record_id = data.get('record_id') host = data.get('host') record_type = data.get('record_type') line = data.get('line', '') old_value = data.get('old_value') if not domain: return {"status": "error", "message": "缺少域名参数"} if not value: return {"status": "error", "message": "缺少新的解析值"} # 验证TTL值 if ttl < 60 or ttl > 86400: return {"status": "error", "message": "TTL值必须在60~86400秒之间"} # 验证优先级 if level < 1 or level > 100: return {"status": "error", "message": "优先级必须在1~100之间"} # 如果没有提供record_id,则必须提供host和record_type if not record_id and (not host or not record_type): return {"status": "error", "message": "当未提供记录ID时,主机头和解析类型为必填项"} # 验证记录类型 if record_type: valid_types = ['A', 'CNAME', 'MX', 'TXT', 'AAAA', 'SRV'] if record_type not in valid_types: return {"status": "error", "message": f"不支持的记录类型: {record_type},支持的类型: {', '.join(valid_types)}"} response = client.modify_dns_record( domain, value, ttl, level, record_id, host, record_type, line, old_value ) if response.get("status") == "error": return response try: # 检查响应格式 if response.get("result") != 200: error_msg = response.get('msg', response.get('message', '未知错误')) return {"status": "error", "message": f"修改DNS记录失败: {error_msg}"} # 返回成功结果 return { "status": "success", "message": "DNS记录修改成功", "data": { "domain": domain, "value": value, "ttl": ttl, "level": level, "record_id": record_id, "host": host, "record_type": record_type, "line": line } } except Exception as e: return {"status": "error", "message": "修改DNS记录响应解析失败"} @jingrow.whitelist() def test_dns_record_management(): """测试DNS记录管理功能""" try: # 测试参数 test_domain = "test.com" test_host = "www" test_value = "127.0.0.1" test_record_type = "A" client = get_west_client() if not client: return {"status": "error", "message": "API客户端初始化失败"} # 测试1: 添加DNS记录 add_result = client.add_dns_record( domain=test_domain, record_type=test_record_type, host=test_host, value=test_value, ttl=900, level=10, line="" ) # 测试2: 获取DNS记录列表 list_result = client.get_dns_records_paginated(test_domain, 20, 1) # 测试3: 修改DNS记录(如果添加成功) if add_result.get("result") == 200: record_id = add_result.get("data", {}).get("id") if record_id: modify_result = client.modify_dns_record( domain=test_domain, value="127.0.0.2", ttl=600, level=10, record_id=record_id, line="" ) # 测试4: 删除DNS记录 delete_result = client.delete_dns_record( domain=test_domain, record_id=record_id ) return { "status": "success", "message": "DNS记录管理功能测试完成", "data": { "add_result": add_result, "list_result": list_result } } except Exception as e: return {"status": "error", "message": f"DNS记录管理功能测试失败: {str(e)}"} @jingrow.whitelist() def west_domain_modify_dns_server(**data): """修改域名DNS服务器""" client = get_west_client() if not client: return {"status": "error", "message": "API客户端初始化失败"} domain = data.get('domain') dns1 = data.get('dns1') dns2 = data.get('dns2') dns3 = data.get('dns3') dns4 = data.get('dns4') dns5 = data.get('dns5') dns6 = data.get('dns6') if not domain: return {"status": "error", "message": "缺少域名参数"} if not dns1: return {"status": "error", "message": "缺少主DNS服务器参数"} if not dns2: return {"status": "error", "message": "缺少辅DNS服务器参数"} # 验证DNS服务器格式 dns_servers = [dns1, dns2] if dns3: dns_servers.append(dns3) if dns4: dns_servers.append(dns4) if dns5: dns_servers.append(dns5) if dns6: dns_servers.append(dns6) # 验证DNS服务器格式(简单验证) for dns in dns_servers: if dns and not ('.' in dns and len(dns) > 3): return {"status": "error", "message": f"DNS服务器格式不正确: {dns}"} response = client.modify_dns_server( domain=domain, dns1=dns1, dns2=dns2, dns3=dns3, dns4=dns4, dns5=dns5, dns6=dns6 ) if response.get("status") == "error": return response try: # 检查响应格式 if response.get("result") != 200: error_msg = response.get('msg', response.get('message', '未知错误')) return {"status": "error", "message": f"修改DNS服务器失败: {error_msg}"} # 异步更新本地域名记录的DNS服务器字段 try: domain_records = jingrow.get_all( "Jsite Domain", {"domain": domain}, ["name"] ) if domain_records: jingrow.enqueue_pg( "Jsite Domain", domain_records[0].name, "update_dns_servers", dns1=dns1, dns2=dns2, dns3=dns3, dns4=dns4, dns5=dns5, dns6=dns6 ) except Exception as e: jingrow.log_error(f"域名 {domain} DNS服务器更新失败", f"错误: {str(e)}") # 返回成功结果 return { "status": "success", "message": "DNS服务器修改成功", "data": { "domain": domain, "dns_servers": { "dns1": dns1, "dns2": dns2, "dns3": dns3, "dns4": dns4, "dns5": dns5, "dns6": dns6 }, "clientid": response.get("clientid") } } except Exception as e: return {"status": "error", "message": "修改DNS服务器响应解析失败"}