jcloud/jcloud/api/billing.py

2034 lines
62 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Copyright (c) 2020, JINGROW
# For license information, please see license.txt
from __future__ import annotations
from itertools import groupby
import urllib.parse
import json
import segno
import io
import base64
import jingrow
from jingrow import _ # Import this for translation functionality
from jingrow.core.utils import find
from jingrow.utils import fmt_money, get_request_site_address, getdate, add_months
import random
from datetime import datetime
from jcloud.api.regional_payments.mpesa.utils import (
create_invoice_partner_site,
create_payment_partner_transaction,
fetch_param_value,
get_details_from_request_log,
get_mpesa_setup_for_team,
get_payment_gateway,
sanitize_mobile_number,
update_tax_id_or_phone_no,
)
from jcloud.jcloud.pagetype.mpesa_setup.mpesa_connector import MpesaConnector
from jcloud.jcloud.pagetype.team.team import (
has_unsettled_invoices,
)
from jcloud.utils import get_current_team
from jcloud.utils.billing import (
GSTIN_FORMAT,
clear_setup_intent,
get_publishable_key,
get_razorpay_client,
get_setup_intent,
get_stripe,
make_formatted_pg,
states_with_tin,
validate_gstin_check_digit,
)
from jcloud.utils.mpesa_utils import create_mpesa_request_log
from jcloud.api.payment.wechatpay import WeChatPayAPI
from jcloud.api.payment.alipay import AlipayAPI
@jingrow.whitelist()
def get_publishable_key_and_setup_intent():
team = get_current_team()
return {
"publishable_key": get_publishable_key(),
"setup_intent": get_setup_intent(team),
}
@jingrow.whitelist()
def upcoming_invoice():
team = get_current_team(True)
invoice = team.get_upcoming_invoice()
if invoice:
upcoming_invoice = invoice.as_dict()
upcoming_invoice.formatted = make_formatted_pg(invoice, ["Currency"])
else:
upcoming_invoice = None
return {
"upcoming_invoice": upcoming_invoice,
"available_credits": fmt_money(team.get_balance(), 2, team.currency),
}
@jingrow.whitelist()
def get_balance_credit():
team = get_current_team(True)
return team.get_balance()
@jingrow.whitelist()
def past_invoices():
return get_current_team(True).get_past_invoices()
@jingrow.whitelist()
def invoices_and_payments():
team = get_current_team(True)
return team.get_past_invoices()
@jingrow.whitelist()
def refresh_invoice_link(invoice):
pg = jingrow.get_pg("Invoice", invoice)
return pg.refresh_stripe_payment_link()
@jingrow.whitelist()
def balances():
team = get_current_team()
has_bought_credits = jingrow.db.get_all(
"Balance Transaction",
filters={
"source": ("in", ("Prepaid Credits", "Transferred Credits", "Free Credits")),
"team": team,
"pagestatus": 1,
"type": ("!=", "Partnership Fee"),
},
limit=1,
)
if not has_bought_credits:
return []
bt = jingrow.qb.PageType("Balance Transaction")
inv = jingrow.qb.PageType("Invoice")
query = (
jingrow.qb.from_(bt)
.left_join(inv)
.on(bt.invoice == inv.name)
.select(
bt.name,
bt.creation,
bt.amount,
bt.currency,
bt.source,
bt.type,
bt.ending_balance,
bt.description,
inv.period_start,
)
.where((bt.pagestatus == 1) & (bt.team == team))
.orderby(bt.creation, order=jingrow.qb.desc)
)
data = query.run(as_dict=True)
for d in data:
d.formatted = dict(
amount=fmt_money(d.amount, 2, d.currency),
ending_balance=fmt_money(d.ending_balance, 2, d.currency),
)
if d.period_start:
d.formatted["invoice_for"] = d.period_start.strftime("%B %Y")
return data
def get_processed_balance_transactions(transactions: list[dict]):
"""Cleans up transactions and adjusts ending balances accordingly"""
cleaned_up_transations = get_cleaned_up_transactions(transactions)
processed_balance_transactions = []
for bt in reversed(cleaned_up_transations):
if is_added_credits_bt(bt) and len(processed_balance_transactions) < 1:
processed_balance_transactions.append(bt)
elif is_added_credits_bt(bt):
bt.ending_balance += processed_balance_transactions[
-1
].ending_balance # Adjust the ending balance
processed_balance_transactions.append(bt)
elif bt.type == "Applied To Invoice":
processed_balance_transactions.append(bt)
return list(reversed(processed_balance_transactions))
def get_cleaned_up_transactions(transactions: list[dict]):
"""Only picks Balance transactions that the users care about"""
cleaned_up_transations = []
for bt in transactions:
if is_added_credits_bt(bt):
cleaned_up_transations.append(bt)
continue
if bt.type == "Applied To Invoice" and not find(
cleaned_up_transations, lambda x: x.invoice == bt.invoice
):
cleaned_up_transations.append(bt)
continue
return cleaned_up_transations
def is_added_credits_bt(bt):
"""Returns `true` if credits were added and not some reverse transaction"""
if not (
bt.type == "Adjustment"
and bt.source
in (
"Prepaid Credits",
"Free Credits",
"Transferred Credits",
) # Might need to re-think this
):
return False
# Is not a reverse of a previous balance transaction
bt.description = bt.description or ""
return not bt.description.startswith("Reverse")
@jingrow.whitelist()
def details():
team = get_current_team(True)
address = None
if team.billing_address:
address = jingrow.get_pg("Address", team.billing_address)
address_parts = [
address.address_line1,
address.city,
address.state,
address.country,
address.pincode,
]
billing_address = ", ".join([d for d in address_parts if d])
else:
billing_address = ""
return {
"billing_name": team.billing_name,
"billing_address": billing_address,
"gstin": address.gstin if address else None,
}
@jingrow.whitelist()
def get_customer_details(team):
"""This method is called by jingrow.com for creating Customer and Address"""
team_pg = jingrow.db.get_value("Team", team, "*")
return {
"team": team_pg,
"address": jingrow.get_pg("Address", team_pg.billing_address),
}
@jingrow.whitelist()
def create_payment_intent_for_micro_debit(payment_method_name):
team = get_current_team(True)
stripe = get_stripe()
micro_debit_charge_field = (
"micro_debit_charge_usd" if team.currency == "USD" else "micro_debit_charge_cny"
)
amount = jingrow.db.get_single_value("Jcloud Settings", micro_debit_charge_field)
intent = stripe.PaymentIntent.create(
amount=int(amount * 100),
currency=team.currency.lower(),
customer=team.stripe_customer_id,
description="Micro-Debit Card Test Charge",
metadata={
"payment_for": "micro_debit_test_charge",
"payment_method_name": payment_method_name,
},
)
return {"client_secret": intent["client_secret"]}
@jingrow.whitelist()
def create_payment_intent_for_partnership_fees():
team = get_current_team(True)
jcloud_settings = jingrow.get_cached_pg("Jcloud Settings")
metadata = {"payment_for": "partnership_fee"}
fee_amount = jcloud_settings.partnership_fee_usd
if team.currency == "CNY":
fee_amount = jcloud_settings.partnership_fee_cny
gst_amount = fee_amount * jcloud_settings.gst_percentage
fee_amount += gst_amount
metadata.update({"gst": round(gst_amount, 2)})
stripe = get_stripe()
intent = stripe.PaymentIntent.create(
amount=int(fee_amount * 100),
currency=team.currency.lower(),
customer=team.stripe_customer_id,
description="Partnership Fee",
metadata=metadata,
)
return {
"client_secret": intent["client_secret"],
"publishable_key": get_publishable_key(),
}
@jingrow.whitelist()
def create_payment_intent_for_buying_credits(amount):
team = get_current_team(True)
metadata = {"payment_for": "prepaid_credits"}
total_unpaid = total_unpaid_amount()
if amount < total_unpaid and not team.jerp_partner:
jingrow.throw(f"Amount {amount} is less than the total unpaid amount {total_unpaid}.")
if team.currency == "CNY":
gst_amount = amount * jingrow.db.get_single_value("Jcloud Settings", "gst_percentage")
amount += gst_amount
metadata.update({"gst": round(gst_amount, 2)})
amount = round(amount, 2)
stripe = get_stripe()
intent = stripe.PaymentIntent.create(
amount=int(amount * 100),
currency=team.currency.lower(),
customer=team.stripe_customer_id,
description="Prepaid Credits",
metadata=metadata,
)
return {
"client_secret": intent["client_secret"],
"publishable_key": get_publishable_key(),
}
@jingrow.whitelist()
def create_payment_intent_for_prepaid_app(amount, metadata):
stripe = get_stripe()
team = get_current_team(True)
payment_method = jingrow.get_value(
"Stripe Payment Method", team.default_payment_method, "stripe_payment_method_id"
)
try:
if not payment_method:
intent = stripe.PaymentIntent.create(
amount=amount * 100,
currency=team.currency.lower(),
customer=team.stripe_customer_id,
description="Prepaid App Purchase",
metadata=metadata,
)
else:
intent = stripe.PaymentIntent.create(
amount=amount * 100,
currency=team.currency.lower(),
customer=team.stripe_customer_id,
description="Prepaid App Purchase",
off_session=True,
confirm=True,
metadata=metadata,
payment_method=payment_method,
payment_method_options={"card": {"request_three_d_secure": "any"}},
)
return {
"payment_method": payment_method,
"client_secret": intent["client_secret"],
"publishable_key": get_publishable_key(),
}
except stripe.error.CardError as e:
err = e.error
if err.code == "authentication_required":
# Bring the customer back on-session to authenticate the purchase
return {
"error": "authentication_required",
"payment_method": err.payment_method.id,
"amount": amount,
"card": err.payment_method.card,
"publishable_key": get_publishable_key(),
"client_secret": err.payment_intent.client_secret,
}
if err.code:
# The card was declined for other reasons (e.g. insufficient funds)
# Bring the customer back on-session to ask them for a new payment method
return {
"error": err.code,
"payment_method": err.payment_method.id,
"publishable_key": get_publishable_key(),
"client_secret": err.payment_intent.client_secret,
}
@jingrow.whitelist()
def get_payment_methods():
team = get_current_team()
return jingrow.get_pg("Team", team).get_payment_methods()
@jingrow.whitelist()
def set_as_default(name):
payment_method = jingrow.get_pg("Stripe Payment Method", {"name": name, "team": get_current_team()})
payment_method.set_default()
@jingrow.whitelist()
def remove_payment_method(name):
team = get_current_team()
payment_method_count = jingrow.db.count("Stripe Payment Method", {"team": team})
if has_unsettled_invoices(team) and payment_method_count == 1:
return "Unpaid Invoices"
payment_method = jingrow.get_pg("Stripe Payment Method", {"name": name, "team": team})
payment_method.delete()
return None
@jingrow.whitelist()
def finalize_invoices():
unsettled_invoices = jingrow.get_all(
"Invoice",
{"team": get_current_team(), "status": ("in", ("Draft", "Unpaid"))},
pluck="name",
)
for inv in unsettled_invoices:
inv_pg = jingrow.get_pg("Invoice", inv)
inv_pg.finalize_invoice()
@jingrow.whitelist()
def unpaid_invoices():
team = get_current_team()
return jingrow.db.get_all(
"Invoice",
{
"team": team,
"status": ("in", ["Draft", "Unpaid", "Invoice Created"]),
"type": "Subscription",
},
["name", "status", "period_end", "currency", "amount_due", "total"],
order_by="creation asc",
)
@jingrow.whitelist()
def get_unpaid_invoices():
team = get_current_team()
unpaid_invoices = jingrow.db.get_all(
"Invoice",
{
"team": team,
"status": "Unpaid",
"type": "Subscription",
},
["name", "status", "period_end", "currency", "amount_due", "total"],
order_by="creation asc",
)
if len(unpaid_invoices) == 1:
return jingrow.get_pg("Invoice", unpaid_invoices[0].name)
return unpaid_invoices
@jingrow.whitelist()
def change_payment_mode(mode):
team = get_current_team(get_pg=True)
team.payment_mode = mode
if team.partner_email and mode == "Paid By Partner" and not team.billing_team:
team.billing_team = jingrow.db.get_value(
"Team",
{"enabled": 1, "jerp_partner": 1, "partner_email": team.partner_email},
"name",
)
if team.billing_team and mode != "Paid By Partner":
team.billing_team = ""
team.save()
return None
@jingrow.whitelist()
def prepaid_credits_via_onboarding():
"""When prepaid credits are bought, the balance is not immediately reflected.
This method will check balance every second and then set payment_mode"""
from time import sleep
team = get_current_team(get_pg=True)
seconds = 0
# block until balance is updated
while team.get_balance() == 0 or seconds > 20:
seconds += 1
sleep(1)
jingrow.db.rollback()
team.payment_mode = "Prepaid Credits"
team.save()
@jingrow.whitelist()
def get_invoice_usage(invoice):
team = get_current_team()
# apply team filter for safety
pg = jingrow.get_pg("Invoice", {"name": invoice, "team": team})
out = pg.as_dict()
# a dict with formatted currency values for display
out.formatted = make_formatted_pg(pg)
out.invoice_pdf = pg.invoice_pdf or (pg.currency == "USD" and pg.get_pdf())
return out
@jingrow.whitelist()
def get_summary():
team = get_current_team()
invoices = jingrow.get_all(
"Invoice",
filters={"team": team, "status": ("in", ["Paid", "Unpaid"])},
fields=[
"name",
"status",
"period_end",
"payment_mode",
"type",
"currency",
"amount_paid",
],
order_by="creation desc",
)
invoice_names = [x.name for x in invoices]
grouped_invoice_items = get_grouped_invoice_items(invoice_names)
for invoice in invoices:
invoice.items = grouped_invoice_items.get(invoice.name, [])
return invoices
def get_grouped_invoice_items(invoices: list[str]) -> dict:
"""Takes a list of invoices (invoice names) and returns a dict of the form:
{ "<invoice_name1>": [<invoice_items>], "<invoice_name2>": [<invoice_items>], }
"""
invoice_items = jingrow.get_all(
"Invoice Item",
filters={"parent": ("in", invoices)},
fields=[
"amount",
"document_name AS name",
"document_type AS type",
"parent",
"quantity",
"rate",
"plan",
],
)
grouped_items = groupby(invoice_items, key=lambda x: x["parent"])
invoice_items_map = {}
for invoice_name, items in grouped_items:
invoice_items_map[invoice_name] = list(items)
return invoice_items_map
@jingrow.whitelist()
def after_card_add():
clear_setup_intent()
@jingrow.whitelist()
def setup_intent_success(setup_intent, address=None):
setup_intent = jingrow._dict(setup_intent)
# refetching the setup intent to get mandate_id from stripe
stripe = get_stripe()
setup_intent = stripe.SetupIntent.retrieve(setup_intent.id)
team = get_current_team(True)
clear_setup_intent()
mandate_reference = setup_intent.payment_method_options.card.mandate_options.reference
payment_method = team.create_payment_method(
setup_intent.payment_method,
setup_intent.id,
setup_intent.mandate,
mandate_reference,
set_default=True,
)
if address:
address = jingrow._dict(address)
team.update_billing_details(address)
return {"payment_method_name": payment_method.name}
@jingrow.whitelist()
def validate_gst(address, method=None):
# 保留函数以维持代码结构
return
@jingrow.whitelist()
def get_latest_unpaid_invoice():
team = get_current_team()
unpaid_invoices = jingrow.get_all(
"Invoice",
{"team": team, "status": "Unpaid", "payment_attempt_count": (">", 0)},
pluck="name",
order_by="creation desc",
limit=1,
)
if unpaid_invoices:
unpaid_invoice = jingrow.db.get_value(
"Invoice",
unpaid_invoices[0],
["amount_due", "payment_mode", "amount_due", "currency"],
as_dict=True,
)
if unpaid_invoice.payment_mode == "Prepaid Credits" and team_has_balance_for_invoice(unpaid_invoice):
return None
return unpaid_invoice
return None
def team_has_balance_for_invoice(prepaid_mode_invoice):
team = get_current_team(get_pg=True)
return team.get_balance() >= prepaid_mode_invoice.amount_due
@jingrow.whitelist()
def create_razorpay_order(amount, type=None):
client = get_razorpay_client()
team = get_current_team(get_pg=True)
if team.currency == "CNY":
gst_amount = amount * jingrow.db.get_single_value("Jcloud Settings", "gst_percentage")
amount += gst_amount
amount = round(amount, 2)
data = {
"amount": int(amount * 100),
"currency": team.currency,
"notes": {
"Description": "Order for Jingrow Prepaid Credits",
"Team (Jingrow ID)": team.name,
"gst": gst_amount if team.currency == "CNY" else 0,
},
}
if type and type == "Partnership Fee":
data.get("notes").update({"Type": type})
order = client.order.create(data=data)
payment_record = jingrow.get_pg(
{"pagetype": "Razorpay Payment Record", "order_id": order.get("id"), "team": team.name, "type": type}
).insert(ignore_permissions=True)
return {
"order_id": order.get("id"),
"key_id": client.auth[0],
"payment_record": payment_record.name,
}
@jingrow.whitelist()
def handle_razorpay_payment_success(response):
client = get_razorpay_client()
client.utility.verify_payment_signature(response)
payment_record = jingrow.get_pg(
"Razorpay Payment Record",
{"order_id": response.get("razorpay_order_id")},
for_update=True,
)
payment_record.update(
{
"payment_id": response.get("razorpay_payment_id"),
"signature": response.get("razorpay_signature"),
"status": "Captured",
}
)
payment_record.save(ignore_permissions=True)
@jingrow.whitelist()
def handle_razorpay_payment_failed(response):
payment_record = jingrow.get_pg(
"Razorpay Payment Record",
{"order_id": response["error"]["metadata"].get("order_id")},
for_update=True,
)
payment_record.status = "Failed"
payment_record.failure_reason = response["error"]["description"]
payment_record.save(ignore_permissions=True)
@jingrow.whitelist()
def total_unpaid_amount():
team = get_current_team(get_pg=True)
balance = team.get_balance()
negative_balance = -1 * balance if balance < 0 else 0
return (
jingrow.get_all(
"Invoice",
{"status": "Unpaid", "team": team.name, "type": "Subscription", "pagestatus": ("!=", 2)},
["sum(amount_due) as total"],
pluck="total",
)[0]
or 0
) + negative_balance
# Mpesa integrations, mpesa exjcloud
"""Send stk push to the user"""
def generate_stk_push(**kwargs):
"""Generate stk push by making a API call to the stk push API."""
args = jingrow._dict(kwargs)
partner_value = args.partner
# Fetch the team document based on the extracted partner value
partner = jingrow.get_all("Team", filters={"user": partner_value, "jerp_partner": 1}, pluck="name")
if not partner:
jingrow.throw(_(f"Partner team {partner_value} not found"), title=_("Mpesa Exjcloud Error"))
# Get Mpesa settings for the partner's team
mpesa_setup = get_mpesa_setup_for_team(partner[0])
try:
callback_url = (
get_request_site_address(True) + "/api/action/jcloud.api.billing.verify_m_pesa_transaction"
)
env = "production" if not mpesa_setup.sandbox else "sandbox"
# for sandbox, business shortcode is same as till number
business_shortcode = (
mpesa_setup.business_shortcode if env == "production" else mpesa_setup.till_number
)
connector = MpesaConnector(
env=env,
app_key=mpesa_setup.consumer_key,
app_secret=mpesa_setup.get_password("consumer_secret"),
)
mobile_number = sanitize_mobile_number(args.sender)
response = connector.stk_push(
business_shortcode=business_shortcode,
amount=args.amount_with_tax,
passcode=mpesa_setup.get_password("pass_key"),
callback_url=callback_url,
reference_code=mpesa_setup.till_number,
phone_number=mobile_number,
description="Jingrow Payment",
)
return response # noqa: RET504
except Exception as e:
jingrow.log_error(f"Mpesa Exjcloud Transaction Error")
jingrow.throw(
_("Issue detected with Mpesa configuration, check the error logs for more details"),
title=_("Mpesa Exjcloud Error"),
)
@jingrow.whitelist(allow_guest=True)
def verify_m_pesa_transaction(**kwargs):
"""Verify the transaction result received via callback from STK."""
transaction_response, request_id = parse_transaction_response(kwargs)
status = handle_transaction_result(transaction_response, request_id)
return {"status": status, "ResultDesc": transaction_response.get("ResultDesc")}
def parse_transaction_response(kwargs):
"""Parse and validate the transaction response."""
if "Body" not in kwargs or "stkCallback" not in kwargs["Body"]:
jingrow.log_error(title="Invalid transaction response format", message=kwargs)
jingrow.throw(_("Invalid transaction response format"))
transaction_response = jingrow._dict(kwargs["Body"]["stkCallback"])
checkout_id = getattr(transaction_response, "CheckoutRequestID", "")
if not isinstance(checkout_id, str):
jingrow.throw(_("Invalid Checkout Request ID"))
return transaction_response, checkout_id
def handle_transaction_result(transaction_response, integration_request):
"""Handle the logic based on ResultCode in the transaction response."""
result_code = transaction_response.get("ResultCode")
status = None
if result_code == 0:
try:
status = "Completed"
create_mpesa_request_log(
transaction_response, "Host", "Mpesa Exjcloud", integration_request, None, status
)
create_mpesa_payment_record(transaction_response)
except Exception as e:
status = "Failed"
create_mpesa_request_log(
transaction_response, "Host", "Mpesa Exjcloud", integration_request, None, status
)
elif result_code == 1037: # User unreachable (Phone off or timeout)
status = "Failed"
create_mpesa_request_log(
transaction_response, "Host", "Mpesa Exjcloud", integration_request, None, status
)
elif result_code == 1032: # User cancelled the request
status = "Cancelled"
create_mpesa_request_log(
transaction_response, "Host", "Mpesa Exjcloud", integration_request, None, status
)
else: # Other failure codes
status = "Failed"
create_mpesa_request_log(
transaction_response, "Host", "Mpesa Exjcloud", integration_request, None, status
)
return status
@jingrow.whitelist()
def request_for_payment(**kwargs):
"""request for payments"""
team = get_current_team()
kwargs.setdefault("team", team)
args = jingrow._dict(kwargs)
update_tax_id_or_phone_no(team, args.tax_id, args.phone_number)
amount = args.request_amount
args.request_amount = jingrow.utils.rounded(amount, 2)
response = jingrow._dict(generate_stk_push(**args))
handle_api_mpesa_response("CheckoutRequestID", args, response)
return response
def handle_api_mpesa_response(global_id, request_dict, response):
"""Response received from API calls returns a global identifier for each transaction, this code is returned during the callback."""
# check error response
if response.requestId:
req_name = response.requestId
error = response
else:
# global checkout id used as request name
req_name = getattr(response, global_id)
error = None
create_mpesa_request_log(request_dict, "Host", "Mpesa Exjcloud", req_name, error, output=response)
if error:
jingrow.throw(_(response.errorMessage), title=_("Transaction Error"))
def create_mpesa_payment_record(transaction_response):
"""Create a new entry in the Mpesa Payment Record for a successful transaction."""
item_response = transaction_response.get("CallbackMetadata", {}).get("Item", [])
mpesa_receipt_number = fetch_param_value(item_response, "MpesaReceiptNumber", "Name")
transaction_time = fetch_param_value(item_response, "TransactionDate", "Name")
phone_number = fetch_param_value(item_response, "PhoneNumber", "Name")
transaction_id = transaction_response.get("CheckoutRequestID")
amount = fetch_param_value(item_response, "Amount", "Name")
merchant_request_id = transaction_response.get("MerchantRequestID")
info = get_details_from_request_log(transaction_id)
gateway_name = get_payment_gateway(info.partner)
# Create a new entry in M-Pesa Payment Record
data = {
"transaction_id": transaction_id,
"amount": amount,
"team": jingrow.get_value("Team", info.team, "user"),
"default_currency": "KES",
"rate": info.requested_amount,
}
mpesa_invoice, invoice_name = create_invoice_partner_site(data, gateway_name)
payment_record = jingrow.get_pg(
{
"pagetype": "Mpesa Payment Record",
"transaction_id": transaction_id,
"transaction_time": parse_datetime(transaction_time),
"transaction_type": "Mpesa Exjcloud",
"team": info.team,
"phone_number": str(phone_number),
"amount": info.requested_amount,
"grand_total": amount,
"merchant_request_id": merchant_request_id,
"payment_partner": info.partner,
"amount_usd": info.amount_usd,
"exchange_rate": info.exchange_rate,
"local_invoice": mpesa_invoice,
"mpesa_receipt_number": mpesa_receipt_number,
}
)
payment_record.insert(ignore_permissions=True)
payment_record.submit()
"""create payment partner transaction which will then create balance transaction"""
create_payment_partner_transaction(
info.team, info.partner, info.exchange_rate, info.amount_usd, info.requested_amount, gateway_name
)
mpesa_details = {
"mpesa_receipt_number": mpesa_receipt_number,
"mpesa_merchant_id": merchant_request_id,
"mpesa_payment_record": payment_record.name,
"mpesa_request_id": transaction_id,
"mpesa_invoice": invoice_name,
}
create_balance_transaction_and_invoice(info.team, info.amount_usd, mpesa_details)
jingrow.msgprint(_("Mpesa Payment Record entry created successfully"))
def create_balance_transaction_and_invoice(team, amount, mpesa_details):
balance_transaction = jingrow.get_pg(
pagetype="Balance Transaction",
team=team,
source="Prepaid Credits",
type="Adjustment",
amount=amount,
description=mpesa_details.get("mpesa_payment_record"),
paid_via_local_pg=1,
)
balance_transaction.insert(ignore_permissions=True)
balance_transaction.submit()
invoice = jingrow.get_pg(
pagetype="Invoice",
team=team,
type="Prepaid Credits",
status="Paid",
total=amount,
amount_due=amount,
amount_paid=amount,
amount_due_with_tax=amount,
due_date=jingrow.utils.nowdate(),
mpesa_merchant_id=mpesa_details.get("mpesa_merchant_id", ""),
mpesa_receipt_number=mpesa_details.get("mpesa_receipt_number", ""),
mpesa_request_id=mpesa_details.get("mpesa_request_id", ""),
mpesa_payment_record=mpesa_details.get("mpesa_payment_record", ""),
mpesa_invoice=mpesa_details.get("mpesa_invoice", ""),
)
invoice.append(
"items",
{
"description": "Prepaid Credits",
"document_type": "Balance Transaction",
"document_name": balance_transaction.name,
"quantity": 1,
"rate": amount,
},
)
invoice.insert(ignore_permissions=True)
invoice.submit()
def parse_datetime(date):
from datetime import datetime
return datetime.strptime(str(date), "%Y%m%d%H%M%S")
@jingrow.whitelist(allow_guest=True)
def handle_alipay_notification():
"""处理支付宝支付通知回调"""
try:
# 获取支付宝通知数据
alipay_data = jingrow.request.form.to_dict()
# 获取必要参数
order_id = alipay_data.get("out_trade_no")
trade_no = alipay_data.get("trade_no")
trade_status = alipay_data.get("trade_status")
team_name = urllib.parse.unquote(alipay_data.get("passback_params", ""))
# 检查订单号是否存在
if not order_id:
jingrow.log_error("订单号为空", "支付宝错误")
return "fail"
# 查找支付记录
payment_record = jingrow.db.get_value(
"Order",
{"order_id": order_id},
["name", "total_amount", "status", "team"],
as_dict=True
)
if not payment_record:
jingrow.log_error(f"未找到支付记录: {order_id}", "支付宝错误")
return "fail"
# 检查记录状态,避免重复处理
if payment_record.status == "交易成功":
return "success"
# 只有交易成功时才更新状态
if trade_status == "TRADE_SUCCESS":
# 更新订单记录状态
jingrow.db.set_value(
"Order",
payment_record.name,
{
"status": "已支付",
"trade_no": trade_no,
"payment_method": "支付宝"
}
)
# 执行支付完成后的业务逻辑
handle_order_payment_complete(order_id)
# 立即提交数据库事务
jingrow.db.commit()
return "success"
return "success"
except Exception as e:
jingrow.log_error("支付宝错误", f"处理失败: {str(e)}")
return "fail"
@jingrow.whitelist(allow_guest=True)
def handle_wechatpay_notification():
"""处理微信支付通知回调"""
try:
# 获取请求数据
headers = jingrow.local.request.headers
body = jingrow.request.get_data()
# 初始化微信支付API
wechat_pay = WeChatPayAPI()
# 使用SDK的方法解密回调数据
try:
# 调用SDK提供的decrypt_callback方法解密数据
decrypted_data = wechat_pay.wxpay.decrypt_callback(headers, body)
# 如果返回值是字符串则解析为JSON
if isinstance(decrypted_data, str):
decrypted_data = json.loads(decrypted_data)
# 获取关键字段
order_id = decrypted_data.get("out_trade_no")
trade_no = decrypted_data.get("transaction_id")
trade_state = decrypted_data.get("trade_state")
total_amount = decrypted_data.get("amount", {}).get("total", 0) / 100 # 转换为元
team_name = decrypted_data.get("attach", "")
# 确认交易状态为成功
if trade_state != "SUCCESS":
jingrow.log_error(f"微信支付交易状态不成功: {trade_state}", "微信支付通知")
return "SUCCESS" # 返回成功以避免微信重复通知
# 查询支付记录
payment_record = jingrow.db.get_value(
"Order",
{"order_id": order_id},
["name", "total_amount", "status", "team"],
as_dict=True
)
if not payment_record:
jingrow.log_error(f"未找到支付记录: {order_id}", "微信支付错误")
return "SUCCESS"
# 检查记录状态,避免重复处理
if payment_record.status == "交易成功":
jingrow.log_error(f"订单已处理: {order_id}", "微信支付通知")
return "SUCCESS"
# 更新订单记录状态
jingrow.db.set_value(
"Order",
payment_record.name,
{
"status": "已支付",
"trade_no": trade_no,
"payment_method": "微信支付"
}
)
# 执行支付完成后的业务逻辑
handle_order_payment_complete(order_id)
# 立即提交数据库事务
jingrow.db.commit()
return "SUCCESS"
except Exception as e:
jingrow.log_error("微信支付解密错误", f"处理微信支付通知数据失败: {str(e)}\n请求头: {headers}\n请求体: {body}")
return "SUCCESS" # 返回成功避免微信重复发送通知
except Exception as e:
jingrow.log_error("微信支付错误", f"处理微信支付通知失败: {str(e)}")
return "SUCCESS" # 返回成功避免微信重复发送通知
def handle_order_payment_complete(order_id):
"""处理订单支付完成后的业务逻辑"""
try:
order = jingrow.get_pg("Order", {"order_id": order_id})
# 根据订单类型执行不同的业务逻辑
if order.order_type == "余额充值":
process_balance_recharge(order)
elif order.order_type == "网站续费":
process_site_renew(order_id)
elif order.order_type == "服务器续费":
# 异步续费服务器
jingrow.enqueue('jcloud.api.aliyun_server_light.renew_server', order_name=order.name)
elif order.order_type == "服务器升级":
# 异步升级服务器
jingrow.enqueue('jcloud.api.aliyun_server_light.upgrade_server', order_name=order.name)
elif order.order_type == "新建服务器":
# 异步创建服务器
jingrow.enqueue('jcloud.api.aliyun_server_light.create_aliyun_server', order_name=order.name)
elif order.order_type == "域名注册":
# 异步注册域名
jingrow.enqueue('jcloud.api.domain_west.register_domain_from_order', order_name=order.name)
elif order.order_type == "域名续费":
# 异步续费域名
jingrow.enqueue('jcloud.api.domain_west.renew_domain_from_order', order_name=order.name)
return True
except Exception as e:
jingrow.log_error("订单处理错误", f"处理订单 {order_id} 支付完成事件失败: {str(e)}")
return False
def process_balance_recharge(order):
"""处理余额充值业务逻辑"""
try:
balance_transaction = jingrow.get_pg({
"pagetype": "Balance Transaction",
"team": order.team,
"type": "Adjustment",
"source": "Prepaid Credits",
"amount": order.total_amount,
"description": f"{order.payment_method}充值 (订单: {order.order_id})",
})
balance_transaction.flags.ignore_permissions = True
balance_transaction.insert()
balance_transaction.submit()
# 更新订单状态为交易成功
jingrow.db.set_value("Order", order.name, "status", "交易成功")
jingrow.db.commit()
except Exception as e:
jingrow.log_error("余额充值错误", f"余额充值失败: 团队 {order.team}, 金额 {order.total_amount}, 错误: {str(e)}")
raise
def process_site_renew(order_id):
"""处理网站续费,更新到期时间"""
# 获取订单文档
order = jingrow.get_pg("Order", {"order_id": order_id})
# 检查订单状态,避免重复处理
if order.status == "交易成功":
jingrow.log_error(
message=f"订单 {order_id} 已处理完成,跳过重复处理",
title="站点续费提示"
)
return {
"name": order.title,
"url": order.title,
"new_end_date": jingrow.db.get_value("Site", order.title, "site_end_date"),
"already_processed": True
}
# 从订单中提取信息
site_name = order.title # 网站URL保存在订单的title字段中
renewal_months = int(order.description) # 续费月数保存在订单的description字段中
# 获取站点文档
site = jingrow.get_pg("Site", site_name)
# 计算新的到期日期
current_end_date = getdate(site.site_end_date or jingrow.utils.today())
if current_end_date < getdate(jingrow.utils.today()):
current_end_date = getdate(jingrow.utils.today())
new_end_date = add_months(current_end_date, renewal_months)
# 更新站点到期日期
site.site_end_date = new_end_date
site.save(ignore_permissions=True)
# 续费后自动激活站点(如有需要)
if site.status in ["Inactive", "Suspended"]:
try:
site.activate()
except Exception as e:
jingrow.log_error("站点自动激活失败", f"站点 {site_name} 续费后自动激活失败: {str(e)}")
# 更新订单状态为交易成功,防止重复处理
jingrow.db.set_value("Order", order.name, "status", "交易成功")
return {
"name": site.name,
"url": site_name,
"new_end_date": new_end_date
}
@jingrow.whitelist()
def check_payment_status(order_id, payment_type):
"""检查支付状态"""
if payment_type == "alipay":
pagetype = "Order"
elif payment_type == "wechatpay":
pagetype = "Order"
else:
jingrow.throw("不支持的支付类型")
team = get_current_team()
payment_record = jingrow.db.get_value(
pagetype,
{"order_id": order_id, "team": team},
["status", "total_amount"],
as_dict=True
)
if not payment_record:
return {"status": "not_found"}
return payment_record
@jingrow.whitelist()
def create_alipay_order_for_recharge(amount):
"""创建支付宝订单用于购买预付费信用额度"""
team = get_current_team(True)
# 金额取整到两位小数
total_amount = round(float(amount), 2)
# 生成唯一订单号
order_id = datetime.now().strftime('%Y%m%d%H%M%S%f')[:-3] + ''.join(random.choices('0123456789', k=6))
# 创建订单记录
payment_record = jingrow.get_pg({
"pagetype": "Order",
"order_id": order_id,
"order_type": "余额充值",
"team": team.name,
"total_amount": float(total_amount),
"status": "待支付",
"payment_method": "支付宝"
})
payment_record.insert(ignore_permissions=True)
jingrow.db.commit()
# 直接使用AlipayAPI类生成支付链接
api = AlipayAPI()
try:
# 生成支付链接使用API类中已配置的默认URL
payment_url = api.generate_payment_url(
order_id=order_id,
amount=amount,
subject="Jingrow 余额充值",
team_name=team.name
)
return {
"payment_url": payment_url,
"order_id": order_id,
"payment_record": payment_record.name
}
except Exception as e:
jingrow.log_error("Order", f"创建支付宝订单失败: {str(e)}")
jingrow.throw(f"创建支付宝订单失败: {str(e)}")
def generate_qr_code(payment_url):
"""生成二维码图片的Base64字符串"""
qr = segno.make(payment_url)
buffer = io.BytesIO()
qr.save(buffer, kind='png', scale=6)
img_str = base64.b64encode(buffer.getvalue()).decode("utf-8")
return f"data:image/png;base64,{img_str}"
@jingrow.whitelist()
def create_wechatpay_order_for_recharge(amount):
"""创建微信支付订单用于购买预付费信用额度"""
team = get_current_team(True)
total_amount = round(float(amount), 2)
# 生成唯一订单号
order_id = datetime.now().strftime('%Y%m%d%H%M%S%f')[:-3] + ''.join(random.choices('0123456789', k=6))
# 创建订单记录
payment_record = jingrow.get_pg({
"pagetype": "Order",
"order_id": order_id,
"order_type": "余额充值",
"team": team.name,
"total_amount": float(total_amount),
"status": "待支付",
"payment_method": "微信支付"
})
payment_record.insert(ignore_permissions=True)
jingrow.db.commit()
# 使用WeChatPayAPI生成支付链接
wechat_pay = WeChatPayAPI()
try:
qr_code_url = wechat_pay.generate_payment_url(
order_id=order_id,
amount=amount,
subject="Jingrow 余额充值",
team_name=team.name
)
# 检查URL是否为空
if not qr_code_url:
jingrow.log_error("微信支付错误", "微信支付URL生成为空")
# 使用提供的函数生成二维码图片
qr_code_image = generate_qr_code(qr_code_url)
result = {
"qr_code_url": qr_code_url,
"qr_code_image": qr_code_image,
"order_id": order_id,
"payment_record": payment_record.name
}
return result
except Exception as e:
jingrow.log_error("微信支付错误", f"创建微信支付订单失败: {str(e)}")
jingrow.throw(f"创建微信支付订单失败")
@jingrow.whitelist()
def create_order(**kwargs):
"""创建站点订单"""
try:
# 从kwargs中获取参数更灵活且使用正确的字段名
title = kwargs.get('title')
description = kwargs.get('description')
total_amount = kwargs.get('total_amount')
order_type = kwargs.get('order_type')
# 参数验证
if not title or not description or not total_amount:
jingrow.throw("必须提供标题、描述和金额")
# 获取当前用户团队
team = get_current_team(True)
# 生成唯一订单号
order_id = datetime.now().strftime('%Y%m%d%H%M%S%f')[:-3] + ''.join(random.choices('0123456789', k=6))
# 创建订单记录
order = jingrow.get_pg({
"pagetype": "Order",
"order_id": order_id,
"order_type": order_type,
"title": title,
"description": description,
"team": team.name,
"total_amount": float(total_amount),
"status": "待支付",
})
order.insert(ignore_permissions=True)
jingrow.db.commit()
return {
"success": True,
"order": order.as_dict()
}
except Exception as e:
jingrow.log_error("订单错误", f"创建站点订单失败: {str(e)}")
return {
"success": False,
"message": f"创建订单失败: {str(e)}"
}
@jingrow.whitelist()
def create_renewal_order(site, renewal_months=1):
"""创建网站续费订单"""
try:
# 验证输入
site_pg = jingrow.get_pg("Site", site)
site_url = f"{site_pg.subdomain}.{site_pg.domain}"
team = site_pg.team
# 验证当前用户权限
current_team = get_current_team(True)
if current_team.name != team:
jingrow.throw("您没有权限为此站点创建续费订单")
# 获取当前计划 - 使用正确的字段名 plan 而非 current_plan
current_plan = jingrow.get_pg("Site Plan", site_pg.plan)
# 计算续费金额
renewal_months = int(renewal_months)
team_currency = jingrow.db.get_value("Team", team, "currency")
if renewal_months == 12:
# 年付9折
amount = round(current_plan.price_cny * 12 * 0.9) if team_currency == "CNY" else round(current_plan.price_usd * 12 * 0.9)
else:
amount = current_plan.price_cny * renewal_months if team_currency == "CNY" else current_plan.price_usd * renewal_months
# 生成唯一订单号
order_id = datetime.now().strftime('%Y%m%d%H%M%S%f')[:-3] + ''.join(random.choices('0123456789', k=6))
# 创建订单记录
order = jingrow.get_pg({
"pagetype": "Order",
"order_id": order_id,
"order_type": "网站续费",
"team": team,
"status": "待支付",
"total_amount": amount,
"title": site_url,
"description": str(renewal_months) # 简单存储续费月数
})
order.insert(ignore_permissions=True)
jingrow.db.commit()
return {
"success": True,
"order": order.as_dict()
}
except Exception as e:
jingrow.log_error("续费订单错误", f"创建续费订单失败: {str(e)}")
return {
"success": False,
"message": f"创建续费订单失败: {str(e)}"
}
@jingrow.whitelist()
def process_balance_payment_for_order(order_id):
"""使用账户余额支付订单"""
try:
# 获取当前用户团队
team = get_current_team(True)
# 获取订单信息
order = jingrow.get_pg("Order", {"order_id": order_id})
if not order:
jingrow.throw(f"找不到订单: {order_id}")
# 验证订单是否属于当前团队
if order.team != team.name:
jingrow.throw("您没有权限支付此订单")
# 检查订单状态
if order.status != "待支付":
return {
"success": False,
"message": "该订单已支付或已取消"
}
# 使用 Team 类的 get_balance 方法获取余额
balance = team.get_balance()
# 检查余额是否足够
if balance < order.total_amount:
return {
"success": False,
"message": "余额不足"
}
# 创建余额交易记录(扣款)
balance_transaction = jingrow.get_pg({
"pagetype": "Balance Transaction",
"team": team.name,
"type": "Adjustment",
"source": "Prepaid Credits",
"amount": -1 * float(order.total_amount), # 使用负数表示扣减
"description": f"{order.order_type}-{order.title}",
"paid_via_local_pg": 1
})
balance_transaction.flags.ignore_permissions = True
balance_transaction.insert()
balance_transaction.submit()
# 更新订单状态
order.status = "已支付"
order.payment_method = "余额支付"
order.save(ignore_permissions=True)
jingrow.db.commit()
return {
"status": "Success",
"message": "支付成功",
"order": order.as_dict()
}
except Exception as e:
jingrow.log_error("支付错误", f"余额支付失败: {str(e)}")
return {
"status": "Error",
"message": f"余额支付失败: {str(e)}"
}
@jingrow.whitelist()
def process_balance_payment_for_renew_order(order_id):
"""使用账户余额支付网站续费订单"""
try:
# 获取当前用户团队
team = get_current_team(True)
# 获取订单信息
order = jingrow.get_pg("Order", {"order_id": order_id})
if not order:
jingrow.throw(_(f"找不到订单: {order_id}"))
# 验证订单是否属于当前团队
if order.team != team.name:
jingrow.throw(_("您没有权限支付此订单"))
# 验证订单类型
if order.order_type != "网站续费":
jingrow.throw(_("此订单不是网站续费订单"))
# 检查订单状态
if order.status != "待支付":
return {
"success": False,
"message": _("该订单已支付或已取消")
}
# 使用 Team 类的 get_balance 方法获取余额
balance = team.get_balance()
# 检查余额是否足够
if balance < order.total_amount:
return {
"success": False,
"message": _("余额不足")
}
# 开始数据库事务
jingrow.db.begin()
try:
# 创建余额交易记录(扣款)
balance_transaction = jingrow.get_pg({
"pagetype": "Balance Transaction",
"team": team.name,
"type": "Adjustment",
"source": "Prepaid Credits",
"amount": -1 * float(order.total_amount), # 使用负数表示扣减
"description": f"网站续费: {order.title}",
"paid_via_local_pg": 1
})
balance_transaction.flags.ignore_permissions = True
balance_transaction.insert()
balance_transaction.submit()
# 更新订单状态
order.status = "已支付"
order.payment_method = "余额支付"
order.save(ignore_permissions=True)
# 提取网站URL和续费周期
site_name = order.title # 网站URL就是name字段
renewal_months = int(order.description)
# 调用网站续期处理函数
renew_result = process_site_renew(order_id)
# 提交数据库事务
jingrow.db.commit()
return {
"success": True,
"status": "Success",
"message": _("支付成功,网站已续费"),
"order": order.as_dict(),
"site": renew_result
}
except Exception as inner_error:
# 回滚事务
jingrow.db.rollback()
raise inner_error
except Exception as e:
jingrow.log_error("续费支付错误", f"余额支付续费失败: {str(e)}")
return {
"success": False,
"status": "Error",
"message": _(f"余额支付失败: {str(e)}")
}
@jingrow.whitelist()
def process_balance_payment_for_server_order(order_id):
"""使用账户余额支付订单"""
try:
# 获取当前用户团队
team = get_current_team(True)
# 获取订单信息
order = jingrow.get_pg("Order", {"order_id": order_id})
if not order:
jingrow.throw(f"找不到订单: {order_id}")
# 验证订单是否属于当前团队
if order.team != team.name:
jingrow.throw("您没有权限支付此订单")
# 检查订单状态
if order.status != "待支付":
return {
"success": False,
"message": "该订单已支付或已取消"
}
# 使用 Team 类的 get_balance 方法获取余额
balance = team.get_balance()
# 检查余额是否足够
if balance < order.total_amount:
return {
"success": False,
"message": "余额不足"
}
# 创建余额交易记录(扣款)
balance_transaction = jingrow.get_pg({
"pagetype": "Balance Transaction",
"team": team.name,
"type": "Adjustment",
"source": "Prepaid Credits",
"amount": -1 * float(order.total_amount), # 使用负数表示扣减
"description": f"{order.order_type}-{order.title}",
"paid_via_local_pg": 1
})
balance_transaction.flags.ignore_permissions = True
balance_transaction.insert()
balance_transaction.submit()
# 更新订单状态
order.status = "已支付"
order.payment_method = "余额支付"
order.save(ignore_permissions=True)
jingrow.db.commit()
# 支付成功后,调用订单完成处理函数
handle_order_payment_complete(order_id)
return {
"status": "Success",
"message": "支付成功",
"order": order.as_dict()
}
except Exception as e:
jingrow.log_error("支付错误", f"余额支付失败: {str(e)}")
return {
"status": "Error",
"message": f"余额支付失败: {str(e)}"
}
@jingrow.whitelist()
def process_alipay_order(order_id):
"""创建支付宝订单支付链接"""
team = get_current_team(True)
# 获取订单信息
order = jingrow.get_pg("Order", {"order_id": order_id})
if not order:
jingrow.throw(f"找不到订单: {order_id}")
# 验证订单是否属于当前团队
if order.team != team.name:
jingrow.throw("您没有权限支付此订单")
# 检查订单状态
if order.status != "待支付":
jingrow.throw("该订单已支付或已取消")
# 金额取整到两位小数
amount = round(float(order.total_amount), 2)
# 直接使用AlipayAPI类生成支付链接
api = AlipayAPI()
try:
# 生成支付链接
payment_url = api.generate_payment_url(
order_id=order_id,
amount=amount,
subject=order.title or "Jingrow 站点订单",
team_name=team.name
)
# 更新订单支付方式
order.payment_method = "支付宝"
order.save(ignore_permissions=True)
return {
"payment_url": payment_url,
"order_id": order_id,
"success": True
}
except Exception as e:
jingrow.log_error("Order", f"创建支付宝订单失败: {str(e)}")
jingrow.throw(f"创建支付宝订单失败: {str(e)}")
@jingrow.whitelist()
def process_wechatpay_order(order_id):
"""创建微信支付订单"""
try:
# 获取当前用户团队
team = get_current_team(True)
# 获取订单信息
order = jingrow.get_pg("Order", {"order_id": order_id})
if not order:
jingrow.throw(f"找不到订单: {order_id}")
# 验证订单是否属于当前团队
if order.team != team.name:
jingrow.throw("您没有权限支付此订单")
# 检查订单状态
if order.status != "待支付":
jingrow.throw("该订单已支付或已取消")
# 获取金额
amount = order.total_amount
# 创建微信支付客户端
wechat_pay = WeChatPayAPI()
try:
# 生成支付URL
qr_code_url = wechat_pay.generate_payment_url(
order_id=order_id,
amount=amount,
subject=order.title or "Jingrow 站点订单",
team_name=team.name
)
# 检查URL是否为空
if not qr_code_url:
jingrow.log_error("微信支付错误", "微信支付URL生成为空")
jingrow.throw("生成支付URL失败")
# 生成二维码图片
qr_code_image = generate_qr_code(qr_code_url)
# 更新订单支付方式
order.payment_method = "微信支付"
order.save(ignore_permissions=True)
return {
"payment_url": qr_code_url,
"qr_code_image": qr_code_image,
"order_id": order_id,
"success": True
}
except Exception as e:
jingrow.log_error("微信支付错误", f"创建微信支付订单失败: {str(e)}")
jingrow.throw(f"创建微信支付订单失败")
except Exception as e:
jingrow.log_error("微信支付错误", f"创建微信支付订单失败: {str(e)}")
jingrow.throw(f"创建微信支付订单失败")
@jingrow.whitelist()
def check_order_payment_status(order_id):
"""检查订单支付状态"""
try:
# 获取订单信息
order = jingrow.get_pg("Order", {"order_id": order_id})
if not order:
jingrow.throw(f"找不到订单: {order_id}")
return {
"success": True,
"status": order.status,
"order": order.as_dict()
}
except Exception as e:
jingrow.log_error("订单错误", f"检查订单状态失败: {str(e)}")
return {
"success": False,
"message": f"检查订单状态失败: {str(e)}"
}
@jingrow.whitelist()
def get_orders(page=1, page_size=20, search=None):
try:
# 获取当前用户团队
team = get_current_team(True)
# 构建过滤条件
filters = {"team": team.name}
# 添加搜索条件
if search:
filters = [
["Order", "team", "=", team.name],
[
"Order",
"name|order_id|title|description",
"like",
f"%{search}%"
]
]
# 计算分页参数
page = int(page)
page_size = int(page_size)
start = (page - 1) * page_size
# 获取订单总数
total_count = jingrow.db.count("Order", filters=filters)
# 获取分页后的订单列表 - 根据需求修改字段列表
orders = jingrow.get_all(
"Order",
filters=filters,
fields=[
"title",
"order_id",
"trade_no",
"order_type",
"payment_method",
"description",
"total_amount as total",
"status",
"creation"
],
order_by="creation desc",
start=start,
limit=page_size
)
# 记录日志以便调试
jingrow.logger().info(f"获取订单成功: 团队={team.name}, 总数={total_count}")
return {
"orders": orders,
"total": total_count
}
except Exception as e:
jingrow.log_error("订单列表错误", f"获取订单列表失败: {str(e)}")
return {
"orders": [],
"total": 0,
"error": str(e)
}
@jingrow.whitelist()
def get_order_details(name):
"""
获取订单详情
参数:
name (str): 订单名称
返回:
dict: 包含订单详情
"""
try:
# 获取当前用户团队
team = get_current_team(True)
# 获取订单详情
order = jingrow.get_pg("Order", name)
# 验证订单是否属于当前团队
if order.team != team.name:
jingrow.throw("您无权查看此订单")
# 构建返回数据
order_dict = order.as_dict()
return {
"order": order_dict
}
except Exception as e:
jingrow.log_error("订单详情错误", f"获取订单详情失败: {str(e)}")
return {
"error": str(e)
}
@jingrow.whitelist()
def get_balance_transactions(page=1, page_size=20, search=None):
"""
获取余额变动记录列表
参数:
page (int): 页码默认为1
page_size (int): 每页记录数默认为20
search (str): 搜索关键词,可选
返回:
dict: 包含transactions列表和total总数
"""
try:
# 获取当前用户团队
team = get_current_team(True)
# 构建基础过滤条件
filters = {"team": team.name, "pagestatus": 1} # 已提交的文档
# 添加搜索条件(如果提供)
if search:
filters = [
["Balance Transaction", "team", "=", team.name],
["Balance Transaction", "pagestatus", "=", 1],
[
"Balance Transaction",
"description|source|type|invoice",
"like",
f"%{search}%"
]
]
# 计算分页参数
page = int(page)
page_size = int(page_size)
start = (page - 1) * page_size
# 获取总记录数
total_count = jingrow.db.count("Balance Transaction", filters=filters)
# 获取分页数据
transactions = jingrow.get_all(
"Balance Transaction",
filters=filters,
fields=[
"name",
"creation",
"description",
"amount",
"ending_balance",
"type",
"source",
"invoice"
],
order_by="creation desc",
start=start,
limit=page_size
)
return {
"transactions": transactions,
"total": total_count
}
except Exception as e:
jingrow.log_error("余额记录错误", f"获取余额记录失败: {str(e)}")
return {
"transactions": [],
"total": 0,
"error": str(e)
}
@jingrow.whitelist()
def process_balance_payment_for_domain_order(order_id):
"""处理域名订单的余额支付"""
try:
# 获取当前用户团队
team = get_current_team(True)
# 获取订单信息
order = jingrow.get_pg("Order", {"order_id": order_id})
if not order:
jingrow.throw(f"找不到订单: {order_id}")
# 验证订单是否属于当前团队
if order.team != team.name:
jingrow.throw("您没有权限支付此订单")
# 检查订单状态
if order.status != "待支付":
return {
"success": False,
"message": "该订单已支付或已取消"
}
# 使用 Team 类的 get_balance 方法获取余额
balance = team.get_balance()
# 检查余额是否足够
if balance < order.total_amount:
return {
"success": False,
"message": "余额不足"
}
# 创建余额交易记录(扣款)
balance_transaction = jingrow.get_pg({
"pagetype": "Balance Transaction",
"team": team.name,
"type": "Adjustment",
"source": "Prepaid Credits",
"amount": -1 * float(order.total_amount), # 使用负数表示扣减
"description": f"{order.order_type}-{order.title}",
"paid_via_local_pg": 1
})
balance_transaction.flags.ignore_permissions = True
balance_transaction.insert()
balance_transaction.submit()
# 更新订单状态
order.status = "已支付"
order.payment_method = "余额支付"
order.save(ignore_permissions=True)
jingrow.db.commit()
# 支付成功,订单状态已更新
# 调用统一的订单支付完成处理函数
handle_order_payment_complete(order_id)
return {
"status": "Success",
"message": "支付成功",
"order": order.as_dict()
}
except Exception as e:
jingrow.log_error("支付错误", f"域名订单余额支付失败: {str(e)}")
return {
"status": "Error",
"message": f"余额支付失败: {str(e)}"
}