import json from datetime import datetime import jingrow import requests from jingrow import _ from jingrow.query_builder import PageType from jingrow.utils.password import get_decrypted_password from jcloude.utils import get_current_team supported_mpesa_currencies = ["KES"] @jingrow.whitelist() def update_mpesa_setup(mpesa_details): """Create Mpesa Settings for the team.""" mpesa_info = jingrow._dict(mpesa_details) team = get_current_team() try: if not jingrow.db.exists("Mpesa Setup", {"team": team}): mpesa_setup = jingrow.get_pg( { "pagetype": "Mpesa Setup", "team": team, "mpesa_setup_id": mpesa_info.mpesa_setup_id, "api_type": "Mpesa Express", "consumer_key": mpesa_info.consumer_key, "consumer_secret": mpesa_info.consumer_secret, "business_shortcode": mpesa_info.short_code, "till_number": mpesa_info.till_number, "pass_key": mpesa_info.pass_key, "security_credential": mpesa_info.security_credential, "initiator_name": mpesa_info.initiator_name, "sandbox": 1 if mpesa_info.sandbox else 0, } ) mpesa_setup.insert(ignore_permissions=True) else: mpesa_setup = jingrow.get_pg("Mpesa Setup", {"team": team}) mpesa_setup.mpesa_setup_id = mpesa_info.mpesa_setup_id mpesa_setup.consumer_key = mpesa_info.consumer_key mpesa_setup.consumer_secret = mpesa_info.consumer_secret mpesa_setup.business_shortcode = mpesa_info.short_code mpesa_setup.till_number = mpesa_info.till_number mpesa_setup.pass_key = mpesa_info.pass_key mpesa_setup.security_credential = mpesa_info.security_credential mpesa_setup.initiator_name = mpesa_info.initiator_name mpesa_setup.sandbox = 1 if mpesa_info.sandbox else 0 mpesa_setup.save() mpesa_setup.reload() return mpesa_setup.name except Exception as e: jingrow.log_error( message=f"Error creating Mpesa Settings: {e!s}", title="MPesa Settings Creation Error" ) return None @jingrow.whitelist() def fetch_mpesa_setup(): team = get_current_team() if jingrow.db.exists("Mpesa Setup", {"team": team}): mpesa_setup = jingrow.get_pg("Mpesa Setup", {"team": team}) return { "mpesa_setup_id": mpesa_setup.mpesa_setup_id, "consumer_key": mpesa_setup.consumer_key, "consumer_secret": mpesa_setup.consumer_secret, "business_shortcode": mpesa_setup.business_shortcode, "till_number": mpesa_setup.till_number, "pass_key": mpesa_setup.pass_key, "initiator_name": mpesa_setup.initiator_name, "security_credential": mpesa_setup.security_credential, "api_type": mpesa_setup.api_type, } return None @jingrow.whitelist() def display_invoices_by_partner(): """Display the list of invoices by partner.""" team = get_current_team() invoices = jingrow.get_all( "Mpesa Payment Record", filters={"team": team}, fields=[ "name", "posting_date", "amount", "local_invoice", "payment_partner", "amount_usd", "exchange_rate", ], ) return invoices # noqa: RET504 @jingrow.whitelist() def get_exchange_rate(from_currency, to_currency): """Get the latest exchange rate for the given currencies.""" exchange_rate = jingrow.db.get_value( "Currency Exchange", {"from_currency": from_currency, "to_currency": to_currency}, "exchange_rate", order_by="creation DESC", ) return exchange_rate # noqa: RET504 @jingrow.whitelist() def update_payment_gateway_settings(gateway_details): """Create Payment Gateway Settings for the team.""" team = get_current_team() gateway_data = jingrow._dict(gateway_details) try: if jingrow.db.exists("Payment Gateway", {"team": team}): payment_gateway = jingrow.get_pg("Payment Gateway", {"team": team}) payment_gateway.update( { "gateway": gateway_data.gateway_name, "currency": gateway_data.currency, "gateway_settings": gateway_data.gateway_setting, "gateway_controller": gateway_data.gateway_controller, "url": gateway_data.url, "api_key": gateway_data.api_key, "api_secret": gateway_data.api_secret, "taxes_and_charges": gateway_data.taxes_and_charges, "print_format": gateway_data.print_format, } ) return payment_gateway.save().name payment_gateway_settings = jingrow.get_pg( { "pagetype": "Payment Gateway", "team": team, "gateway": gateway_data.gateway_name, "currency": gateway_data.currency, "gateway_settings": gateway_data.gateway_setting, "gateway_controller": gateway_data.gateway_controller, "url": gateway_data.url, "api_key": gateway_data.api_key, "api_secret": gateway_data.api_secret, "taxes_and_charges": gateway_data.taxes_and_charges, "print_format": gateway_data.print_format, } ) payment_gateway_settings.insert(ignore_permissions=True) return payment_gateway_settings except Exception as e: jingrow.log_error( message=f"Error creating Payment Gateway Settings: {e!s}", title="Payment Gateway Settings Creation Error", ) return None @jingrow.whitelist() def get_payment_gateway_details(): team = get_current_team() if jingrow.db.exists("Payment Gateway", {"team": team}): payment_gateway = jingrow.get_pg("Payment Gateway", {"team": team}) return { "gateway_name": payment_gateway.gateway, "currency": payment_gateway.currency, "gateway_settings": payment_gateway.gateway_settings, "gateway_controller": payment_gateway.gateway_controller, "url": payment_gateway.url, "api_key": payment_gateway.api_key, "api_secret": payment_gateway.api_secret, "taxes_and_charges": payment_gateway.taxes_and_charges, "print_format": payment_gateway.print_format, } return None @jingrow.whitelist() def get_gateway_controller(): # """Get the list of controllers for the given pagetype.""" team = get_current_team(get_pg=True) gateway_setting = "Mpesa Setup" if team.country == "Kenya" else None if gateway_setting: return jingrow.db.get_value(gateway_setting, {"team": team.name}, "name") return None @jingrow.whitelist() def get_tax_percentage(payment_partner): team = jingrow.db.get_value("Team", {"user": payment_partner}, "name") mpesa_setups = jingrow.get_all("Mpesa Setup", {"api_type": "Mpesa Express", "team": team}, pluck="name") taxes_and_charges = 0 for mpesa_setup in mpesa_setups: payment_gateways = jingrow.get_all( "Payment Gateway", filters={"gateway_settings": "Mpesa Setup", "gateway_controller": mpesa_setup}, fields=["taxes_and_charges"], ) if payment_gateways: taxes_and_charges = payment_gateways[0].taxes_and_charges break # we don't need the loop entirely return taxes_and_charges def update_tax_id_or_phone_no(team, tax_id, phone_number): """Update the tax ID or phone number for the team, only if they are different from existing values.""" team_pg = jingrow.get_pg("Team", team) # Check if updates are needed new_tax_id = tax_id and team_pg.mpesa_tax_id != tax_id new_phone_number = phone_number and team_pg.mpesa_phone_number != phone_number # Update only if at least one value needs updating if new_tax_id or new_phone_number: if tax_id: team_pg.mpesa_tax_id = tax_id if phone_number: team_pg.mpesa_phone_number = phone_number team_pg.save() @jingrow.whitelist() def display_mpesa_payment_partners(): """Display the list of partners in the system with Mpesa integration enabled.""" Team = PageType("Team") MpesaSetup = PageType("Mpesa Setup") query = ( jingrow.qb.from_(Team) .join(MpesaSetup) .on(Team.name == MpesaSetup.team) .select(Team.user) .where(Team.country == "Kenya") # (MpesaSetup.sandbox == 1) ) mpesa_partners = query.run(as_dict=True) return [partner["user"] for partner in mpesa_partners] @jingrow.whitelist() def display_payment_partners(): """Display the list of partners in the system.""" Team = PageType("Team") query = jingrow.qb.from_(Team).select(Team.user).where(Team.erpnext_partner == 1) partners = query.run(as_dict=True) return [partner["user"] for partner in partners] @jingrow.whitelist() def display_payment_gateway(): """Display the payment gateway for the partner.""" gateways = jingrow.get_all("Payment Gateway", filters={}, fields=["gateway"]) return [gateway["gateway"] for gateway in gateways] def get_details_from_request_log(transaction_id): """Get the team and partner associated with the Mpesa Request Log.""" request_log = jingrow.get_pg("Mpesa Request Log", {"request_id": transaction_id, "status": "Queued"}) request_data = request_log.data team = partner = None # Parse the request_data as a dictionary if request_data: try: request_data_dict = json.loads(request_data) team = request_data_dict.get("team") partner_ = request_data_dict.get("partner") partner = jingrow.get_value("Team", {"user": partner_, "erpnext_partner": 1, "enabled": 1}, "name") requested_amount = request_data_dict.get("request_amount") amount_usd = request_data_dict.get("amount_usd") exchange_rate = request_data_dict.get("exchange_rate") except json.JSONDecodeError: jingrow.throw(_("Invalid JSON format in request_data")) team = None partner = None return jingrow._dict( { "team": team, "partner": partner, "requested_amount": requested_amount, "amount_usd": amount_usd, "exchange_rate": exchange_rate, } ) def get_payment_gateway(partner_value): """Get the payment gateway for the partner.""" partner = jingrow.get_pg("Team", partner_value) mpesa_setup = get_mpesa_setup_for_team(partner.name) payment_gateway = jingrow.get_all( "Payment Gateway", filters={"gateway_settings": "Mpesa Setup", "gateway_controller": mpesa_setup.name}, pluck="name", ) if not payment_gateway: jingrow.throw(_("Payment Gateway not found"), title=_("Mpesa Express Error")) gateway = jingrow.get_pg("Payment Gateway", payment_gateway[0]) return gateway.name def get_mpesa_setup_for_team(team_name): """Fetch Mpesa setup for a given team.""" mpesa_setup = jingrow.get_all("Mpesa Setup", {"team": team_name}, pluck="name") if not mpesa_setup: jingrow.throw( _(f"Mpesa Setup not configured for the team {team_name}"), title=_("Mpesa Express Error") ) return jingrow.get_pg("Mpesa Setup", mpesa_setup[0]) def sanitize_mobile_number(number): """ensures number take the right format""" """Add country code and strip leading zeroes from the phone number.""" return "254" + str(number).lstrip("0") def fetch_param_value(response, key, key_field): """Fetch the specified key from list of dictionary. Key is identified via the key field.""" for param in response: if param[key_field] == key: return param["Value"] return None @jingrow.whitelist() def create_exchange_rate(**kwargs): """Create a new exchange rate record.""" try: from_currency = kwargs.get("from_currency", {}).get("value") to_currency = kwargs.get("to_currency", {}).get("value") exchange_rate = kwargs.get("exchange_rate") if not from_currency or not to_currency or not exchange_rate: raise ValueError("Missing required fields.") exchange_rate_pg = jingrow.get_pg( { "pagetype": "Currency Exchange", "from_currency": from_currency, "to_currency": to_currency, "exchange_rate": exchange_rate, "date": jingrow.utils.today(), } ) exchange_rate_pg.insert(ignore_permissions=True) return exchange_rate_pg.name except Exception as e: jingrow.log_error("Error creating exchange rate") raise e def create_payment_partner_transaction( team, payment_partner, exchange_rate, amount, paid_amount, payment_gateway, payload=None ): """Create a Payment Partner Transaction record.""" try: transaction_pg = jingrow.get_pg( { "pagetype": "Payment Partner Transaction", "team": team, "payment_partner": payment_partner, "exchange_rate": exchange_rate, "payment_gateway": payment_gateway, "amount": amount, "actual_amount": paid_amount, "payment_transaction_details": payload, } ) transaction_pg.insert(ignore_permissions=True) transaction_pg.submit() except Exception: jingrow.log_error("Error creating Payment Partner Transaction") raise return transaction_pg.name @jingrow.whitelist() def fetch_payments(payment_gateway, partner, from_date, to_date): partner = ( partner if jingrow.db.exists("Team", partner) else jingrow.get_value("Team", {"user": partner}, "name") ) filters = { "pagestatus": 1, "submitted_to_jingrow": 0, "payment_gateway": payment_gateway, "payment_partner": partner, } from_date = convert_string_to_date(from_date) to_date = convert_string_to_date(to_date) if from_date and to_date: filters["posting_date"] = ["between", [from_date, to_date]] partner_payments = jingrow.get_all( "Payment Partner Transaction", filters=filters, fields=["name", "amount", "posting_date"] ) return partner_payments # noqa: RET504 @jingrow.whitelist() def fetch_percentage_commission(partner): """Fetch the percentage commission for the partner.""" return jingrow.get_value("Team", {"user": partner}, "partner_commission") @jingrow.whitelist() def create_invoice_partner_site(data, gateway_controller): gateway = jingrow.get_pg("Payment Gateway", gateway_controller) api_url_ = gateway.url api_key = gateway.api_key api_secret = get_decrypted_password("Payment Gateway", gateway.name, fieldname="api_secret") transaction_id = data.get("transaction_id") amount = data.get("amount") team = data.get("team") default_currency = data.get("default_currency") rate = data.get("rate") tax_id = data.get("tax_id") # Validate the necessary fields if not transaction_id or not amount: jingrow.throw(_("Invalid transaction data received")) api_url = api_url_ headers = { "Authorization": f"token {api_key}:{api_secret}", } # Define the payload to send with the POST request payload = { "transaction_id": transaction_id, "amount": amount, "team": team, "default_currency": default_currency, "rate": rate, "tax_id": tax_id, } # Make the POST request to your API try: response = requests.post(api_url, data=payload, headers=headers) if response.status_code == 200: response_data = response.json() download_link = response_data.get("message", "") invoice_name = response_data.get("invoice_name", "") return download_link, invoice_name jingrow.log_error(f"API Error: {response.status_code} - {response.text}") jingrow.throw(_("Failed to create the invoice via API")) except requests.exceptions.RequestException as e: jingrow.log_error(f"Error calling API: {e}") jingrow.throw(_("There was an issue connecting to the API.")) @jingrow.whitelist() def display_payment_gateways(payment_partner): """Display the list of payment gateways for the partner.""" Team = PageType("Team") PaymentGateway = PageType("Payment Gateway") query = ( jingrow.qb.from_(Team) .join(PaymentGateway) .on(Team.name == PaymentGateway.team) .select(PaymentGateway.name) .where(Team.user == payment_partner) ) payment_gateways = query.run(as_dict=True) return [gateway["name"] for gateway in payment_gateways] @jingrow.whitelist() def fetch_payouts(): team = get_current_team() payouts = jingrow.get_all( "Partner Payment Payout", filters={"partner": team}, fields=["name", "total_amount", "commission", "net_amount", "posting_date"], ) print("here", len(payouts)) return payouts def convert_string_to_date(date_string): return datetime.strptime(date_string, "%Y-%m-%d").date()