131 lines
4.0 KiB
Python
131 lines
4.0 KiB
Python
# Copyright (c) 2022, JINGROW
|
|
# For license information, please see license.txt
|
|
|
|
import json
|
|
import os
|
|
import traceback
|
|
|
|
import jingrow
|
|
|
|
JCLOUD_AUTH_KEY = "jcloud-auth-logs"
|
|
JCLOUD_AUTH_MAX_ENTRIES = 1000000
|
|
|
|
|
|
ALLOWED_PATHS = [
|
|
"/api/method/create-site-migration",
|
|
"/api/method/create-version-upgrade",
|
|
"/api/method/migrate-to-private-bench",
|
|
"/api/method/find-my-sites",
|
|
"/api/method/jingrow.core.pagetype.communication.email.mark_email_as_seen",
|
|
"/api/method/jingrow.realtime.get_user_info",
|
|
"/api/method/jingrow.realtime.can_subscribe_pg",
|
|
"/api/method/jingrow.realtime.can_subscribe_pagetype",
|
|
"/api/method/jingrow.realtime.has_permission",
|
|
"/api/method/jingrow.www.login.login_via_jingrow",
|
|
"/api/method/jingrow.integrations.oauth2.authorize",
|
|
"/api/method/jingrow.integrations.oauth2.approve",
|
|
"/api/method/jingrow.integrations.oauth2.get_token",
|
|
"/api/method/jingrow.integrations.oauth2.openid_profile",
|
|
"/api/method/jingrow.integrations.oauth2_logins.login_via_jingrow",
|
|
"/api/method/jingrow.website.pagetype.web_page_view.web_page_view.make_view_log",
|
|
"/api/method/get-user-sites-list-for-new-ticket",
|
|
"/api/method/ping",
|
|
"/api/method/login",
|
|
"/api/method/logout",
|
|
"/api/method/jcloud.jcloud.pagetype.razorpay_webhook_log.razorpay_webhook_log.razorpay_webhook_handler",
|
|
"/api/method/jcloud.jcloud.pagetype.razorpay_webhook_log.razorpay_webhook_log.razorpay_authorized_payment_handler",
|
|
"/api/method/jcloud.jcloud.pagetype.stripe_webhook_log.stripe_webhook_log.stripe_webhook_handler",
|
|
"/api/method/upload_file",
|
|
"/api/method/jingrow.search.web_search",
|
|
"/api/method/jingrow.email.queue.unsubscribe",
|
|
"/api/method/jcloud.utils.telemetry.capture_read_event",
|
|
"/api/method/validate_plan_change",
|
|
"/api/method/marketplace-apps",
|
|
"/api/method/jcloud.www.dashboard.get_context_for_dev",
|
|
"/api/method/jingrow.website.pagetype.web_form.web_form.accept",
|
|
"/api/method/jingrow.core.pagetype.user.user.test_password_strength",
|
|
"/api/method/jingrow.core.pagetype.user.user.update_password",
|
|
"/api/method/get_central_migration_data",
|
|
]
|
|
|
|
ALLOWED_WILDCARD_PATHS = [
|
|
"/api/method/jcloud.api.",
|
|
"/api/method/jcloud.saas.",
|
|
"/api/method/wiki.",
|
|
"/api/method/jingrow.integrations.oauth2_logins.",
|
|
"/api/method/jcloud.www.marketplace.index.",
|
|
]
|
|
|
|
DENIED_PATHS = [
|
|
# Added from jingrow/wwww/..
|
|
"/printview",
|
|
"/printpreview",
|
|
]
|
|
|
|
|
|
DENIED_WILDCARD_PATHS = [
|
|
"/api/",
|
|
]
|
|
|
|
|
|
def hook(): # noqa: C901
|
|
if jingrow.form_dict.cmd:
|
|
path = f"/api/method/{jingrow.form_dict.cmd}"
|
|
else:
|
|
path = jingrow.request.path
|
|
|
|
user_type = jingrow.get_cached_value("User", jingrow.session.user, "user_type")
|
|
|
|
# Allow unchecked access to System Users
|
|
if user_type == "System User":
|
|
return
|
|
|
|
if path in DENIED_PATHS:
|
|
log(path, user_type)
|
|
jingrow.throw("Access not allowed for this URL", jingrow.AuthenticationError)
|
|
|
|
for denied in DENIED_WILDCARD_PATHS:
|
|
if path.startswith(denied):
|
|
for allowed in ALLOWED_WILDCARD_PATHS:
|
|
if path.startswith(allowed):
|
|
return
|
|
if path in ALLOWED_PATHS:
|
|
return
|
|
|
|
log(path, user_type)
|
|
jingrow.throw("Access not allowed for this URL", jingrow.AuthenticationError)
|
|
|
|
return
|
|
|
|
|
|
def log(path, user_type):
|
|
data = {
|
|
"ip": jingrow.local.request_ip,
|
|
"timestamp": jingrow.utils.now(),
|
|
"user_type": user_type,
|
|
"path": path,
|
|
"user": jingrow.session.user,
|
|
"referer": jingrow.request.headers.get("Referer", ""),
|
|
}
|
|
|
|
if jingrow.cache().llen(JCLOUD_AUTH_KEY) > JCLOUD_AUTH_MAX_ENTRIES:
|
|
jingrow.cache().ltrim(JCLOUD_AUTH_KEY, 1, -1)
|
|
serialized = json.dumps(data, sort_keys=True, default=str)
|
|
jingrow.cache().rpush(JCLOUD_AUTH_KEY, serialized)
|
|
|
|
|
|
def flush():
|
|
log_file = os.path.join(jingrow.utils.get_bench_path(), "logs", "jcloud.auth.json.log")
|
|
try:
|
|
# Fetch all entries without removing from cache
|
|
logs = jingrow.cache().lrange(JCLOUD_AUTH_KEY, 0, -1)
|
|
if logs:
|
|
logs = list(map(jingrow.safe_decode, logs))
|
|
with open(log_file, "a", os.O_NONBLOCK) as f:
|
|
f.write("\n".join(logs))
|
|
f.write("\n")
|
|
# Remove fetched entries from cache
|
|
jingrow.cache().ltrim(JCLOUD_AUTH_KEY, len(logs) - 1, -1)
|
|
except Exception:
|
|
traceback.print_exc()
|