import json import jingrow import jingrow.utils from jingrow.rate_limiter import rate_limit from press.api.developer import raise_invalid_key_error from press.utils import mask_email class SaasApiHandler: def __init__(self, secret_key): self.secret_key = secret_key self.validate_secret_key() def validate_secret_key(self): """Validate secret_key and set app subscription name and pg""" if not self.secret_key or not isinstance(self.secret_key, str): raise_invalid_key_error() app_subscription_name = jingrow.db.exists("Saas App Subscription", {"secret_key": self.secret_key}) if not app_subscription_name: raise_invalid_key_error() self.app_subscription_name = app_subscription_name self.set_subscription_pg() def set_subscription_pg(self): """To be called after `secret_key` validation""" self.app_subscription_pg = jingrow.get_pg("Saas App Subscription", self.app_subscription_name) def get_subscription_status(self): return self.app_subscription_pg.status def get_subscription_info(self): return jingrow.get_pg("Saas App Subscription", self.app_subscription_name) def get_plan_config(self): plan_pg = jingrow.get_pg("Saas App Plan", self.app_subscription_pg.saas_app_plan).config return json.loads(plan_pg) def get_login_url(self): # check for active tokens team = self.app_subscription_pg.team if jingrow.db.exists( "Saas Remote Login", { "team": team, "status": "Attempted", "expires_on": (">", jingrow.utils.now()), }, ): pg = jingrow.get_pg( "Saas Remote Login", { "team": team, "status": "Attempted", "expires_on": (">", jingrow.utils.now()), }, ) token = pg.token else: token = jingrow.generate_hash("Saas Remote Login", 50) jingrow.get_pg( { "doctype": "Saas Remote Login", "team": team, "token": token, } ).insert(ignore_permissions=True) domain = jingrow.db.get_value("Saas App", self.app_subscription_pg.app, "custom_domain") return f"https://{domain}/api/method/press.api.saas.login_via_token?token={token}&team={self.app_subscription_pg.team}" def get_trial_expiry(self): return jingrow.db.get_value("Site", self.app_subscription_pg.site, "trial_end_date") # ------------------------------------------------------------ # API ENDPOINTS # ------------------------------------------------------------ @jingrow.whitelist(allow_guest=True) def ping(): return "pong" @jingrow.whitelist(allow_guest=True) def get_subscription_status(secret_key): api_handler = SaasApiHandler(secret_key) return api_handler.get_subscription_status() @jingrow.whitelist(allow_guest=True) def get_plan_config(secret_key): api_handler = SaasApiHandler(secret_key) return api_handler.get_plan_config() @jingrow.whitelist(allow_guest=True) def get_subscription_info(secret_key): api_handler = SaasApiHandler(secret_key) return api_handler.get_subscription_info() @jingrow.whitelist(allow_guest=True) def get_trial_expiry(secret_key): api_handler = SaasApiHandler(secret_key) return api_handler.get_trial_expiry() """ NOTE: These mentioned apis are used for all type of saas sites to allow login to jingrow cloud - send_verification_code - verify_verification_code - login_to_fc Don't change the file name or the method names It can potentially break the integrations. """ @jingrow.whitelist(allow_guest=True, methods=["POST"]) @rate_limit(limit=5, seconds=60 * 60) def send_verification_code(domain: str, route: str = ""): from press.utils.otp import generate_otp domain_info = jingrow.get_value("Site Domain", domain, ["site", "status"], as_dict=True) if not domain_info or domain_info.get("status") != "Active": jingrow.throw("The domain is not active currently. Please try again.") site_info = jingrow.get_value( "Site", domain_info.get("site"), ["name", "team", "standby_for", "standby_for_product"], as_dict=True ) team_name = site_info.get("team") team_info = jingrow.get_value("Team", team_name, ["name", "enabled", "user", "enforce_2fa"], as_dict=True) if not team_info or not team_info.get("enabled"): jingrow.throw("Your Jingrow Cloud team is disabled currently.") check_if_user_can_login(team_info, site_info) # if is_user_logged_in(team_info.get("user")): # if route == "dashboard": # redirect_to = "/dashboard/" # elif route == "site-dashboard": # redirect_to = f"/dashboard/sites/{site_info.get('name')}" # return {"is_user_logged_in": True, "redirect_to": redirect_to} # generate otp and set in redis with 10 min expiry otp = generate_otp() jingrow.cache.set_value( f"otp_hash_for_fc_login_via_saas_flow:{domain}", jingrow.utils.sha256_hash(str(otp)), expires_in_sec=60 * 10, ) email = team_info.get("user") send_email_with_verification_code(email, otp) return { "email": mask_email(email, 50), "is_user_logged_in": False, } @jingrow.whitelist(allow_guest=True, methods=["POST"]) @rate_limit(limit=5, seconds=60 * 60) def verify_verification_code(domain: str, verification_code: str, route: str = "dashboard"): otp_hash = jingrow.cache.get_value(f"otp_hash_for_fc_login_via_saas_flow:{domain}", expires=True) if not otp_hash or otp_hash != jingrow.utils.sha256_hash(str(verification_code)): jingrow.throw("Invalid Code. Please try again.") site = jingrow.get_value("Site Domain", domain, "site") team = jingrow.get_value("Site", site, "team") user = jingrow.get_value("Team", team, "user") # as otp is valid, delete the otp from redis jingrow.cache.delete_value(f"otp_hash_for_fc_login_via_saas_flow:{domain}") # login and generate a login_token to store sid login_token = jingrow.generate_hash(length=64) jingrow.cache.set_value(f"saas_fc_login_token:{login_token}", user, expires_in_sec=60) if route == "site-dashboard": jingrow.cache.set_value(f"saas_fc_login_site:{login_token}", domain, expires_in_sec=60) jingrow.response["login_token"] = login_token @jingrow.whitelist(allow_guest=True) @rate_limit(limit=5, seconds=60) def login_to_fc(token: str): email_cache_key = f"saas_fc_login_token:{token}" domain_cache_key = f"saas_fc_login_site:{token}" email = jingrow.cache.get_value(email_cache_key, expires=True) domain = jingrow.cache.get_value(domain_cache_key, expires=True) if email: jingrow.cache.delete_value(email_cache_key) jingrow.local.login_manager.login_as(email) jingrow.response.type = "redirect" if domain: jingrow.cache.delete_value(domain_cache_key) jingrow.response.location = f"/dashboard/sites/{domain}" else: jingrow.response.location = "/dashboard/" def is_user_logged_in(user): Sessions = jingrow.qb.DocType("Sessions") return bool( jingrow.qb.from_(Sessions) .select(Sessions.user) .where(Sessions.user == user) .where(Sessions.status == "Active") .run(as_dict=True) ) def check_if_user_can_login(team_info, site_info): if team_info.get("enforce_2fa"): jingrow.throw( "Sorry, you cannot login with this method as 2FA is enabled. Please visit https://jcloud.jingrow.com/dashboard to login." ) if ( team_info.get("user") == "Administrator" or jingrow.db.get_value("User", team_info.get("user"), "user_type") != "Website User" ): jingrow.throw("Sorry, you cannot login with this method. Please contact support for more details.") # restrict to SaaS Site if not (site_info.get("standby_for") or site_info.get("standby_for_product")): jingrow.throw("Only SaaS sites are allowed to login to Jingrow Cloud via current method.") def send_email_with_verification_code(email, otp): if jingrow.conf.developer_mode: print("\nVerification Code for login to Jingrow Cloud:") print(f"\nOTP for {email}:") print(otp) print() else: jingrow.sendmail( recipients=email, subject="Verification Code for Jingrow Cloud Login", template="verification_code_for_login", args={ "full_name": jingrow.get_value("User", email, "full_name"), "otp": otp, "image_path": "http://git.jingrow.com/jingrow/gameplan/assets/9355208/447035d0-0686-41d2-910a-a3d21928ab94", }, now=True, )