jcloude/press/overrides.py
2025-12-23 21:23:54 +08:00

236 lines
6.1 KiB
Python

# Copyright (c) 2019, JINGROW
# For license information, please see license.txt
import os
from functools import partial
import jingrow
from jingrow.core.doctype.user.user import User
from jingrow.handler import is_whitelisted
from jingrow.utils import cint
from jcloude.access.support_access import has_support_access
from jcloude.runner import constants
from jcloude.utils import _get_current_team, _system_user
@jingrow.whitelist()
def upload_file():
files = jingrow.request.files
is_private = jingrow.form_dict.is_private
doctype = jingrow.form_dict.doctype
docname = jingrow.form_dict.docname
fieldname = jingrow.form_dict.fieldname
file_url = jingrow.form_dict.file_url
folder = jingrow.form_dict.folder or "Home"
method = jingrow.form_dict.method
content = None
filename = None
if "file" in files:
file = files["file"]
content = file.stream.read()
filename = file.filename
jingrow.local.uploaded_file = content
jingrow.local.uploaded_filename = filename
if method:
method = jingrow.get_attr(method)
is_whitelisted(method)
return method()
ret = jingrow.get_pg(
{
"doctype": "File",
"attached_to_doctype": doctype,
"attached_to_name": docname,
"attached_to_field": fieldname,
"folder": folder,
"file_name": filename,
"file_url": file_url,
"is_private": cint(is_private),
"content": content,
}
)
ret.save()
return ret
def on_session_creation():
from jcloude.utils import get_current_team
if (
not jingrow.db.exists("Team", {"user": jingrow.session.user})
and jingrow.session.data.user_type == "System User"
):
return
try:
team = get_current_team(get_pg=True)
route = team.get_route_on_login()
jingrow.local.response.update({"dashboard_route": route})
except Exception:
pass
def on_login(login_manager):
if jingrow.session.user and jingrow.session.data and jingrow.session.data.user_type == "System User":
return
user = login_manager.user
has_2fa = jingrow.db.get_value(
"User 2FA", {"user": user, "enabled": 1}, ["last_verified_at"], as_dict=True
)
if has_2fa and (
not has_2fa.get("last_verified_at")
or has_2fa.get("last_verified_at") < jingrow.utils.add_to_date(None, seconds=-10)
):
jingrow.throw("Please re-login to verify your identity.")
if not jingrow.db.exists("Team", {"user": jingrow.session.user, "enabled": 1}) and jingrow.db.exists(
"Team", {"user": jingrow.session.user, "enabled": 0}
):
jingrow.db.set_value("Team", {"user": jingrow.session.user, "enabled": 0}, "enabled", 1)
jingrow.db.commit()
def before_job():
jingrow.local.team = _get_current_team
jingrow.local.system_user = _system_user
def before_request():
jingrow.local.team = _get_current_team
jingrow.local.system_user = _system_user
def cleanup_ansible_tmp_files():
import pathlib
import shutil
import time
if not hasattr(constants, "DEFAULT_LOCAL_TMP"):
return
threshold = time.time() - 60 * 60 # >One hour old
if os.environ.get("FRAPPE_BACKGROUND_WORKERS_NOFORK"):
# Long running processes, don't cleanup
threshold = time.time() - 2 * 24 * 60 * 60 # >2 days old
temp_dir = pathlib.Path(constants.DEFAULT_LOCAL_TMP).parent
ansible_dir = pathlib.Path.home() / ".ansible"
# Avoid clearing unknown directories
assert temp_dir.is_relative_to(ansible_dir) and temp_dir != ansible_dir
for folder in temp_dir.iterdir():
if folder.is_dir() and folder.stat().st_mtime < threshold:
shutil.rmtree(folder)
def update_website_context(context):
if (jingrow.request and jingrow.request.path.startswith("/docs")) and not jingrow.db.get_single_value(
"Jcloude Settings", "publish_docs"
):
raise jingrow.DoesNotExistError
def has_permission(pg, ptype, user):
from jcloude.utils import get_current_team
if not user:
user = jingrow.session.user
user_type = jingrow.db.get_value("User", user, "user_type", cache=True)
if user_type == "System User":
return True
if ptype == "create":
return True
team = get_current_team()
child_team_members = [d.name for d in jingrow.db.get_all("Team", {"parent_team": team}, ["name"])]
if pg.team == team or pg.team in child_team_members:
return True
if has_support_access(pg.doctype, pg.name):
return True
return False
def get_permission_query_conditions_for_doctype_and_user(doctype, user):
from jcloude.utils import get_current_team
if not user:
user = jingrow.session.user
user_type = jingrow.db.get_value("User", user, "user_type", cache=True)
if user_type == "System User":
return ""
team = get_current_team()
return f"(`tab{doctype}`.`team` = {jingrow.db.escape(team)})"
def get_permission_query_conditions_for_doctype(doctype):
return partial(get_permission_query_conditions_for_doctype_and_user, doctype)
class CustomUser(User):
dashboard_fields = ("full_name", "email", "user_image", "enabled", "user_type")
@staticmethod
def get_list_query(query):
team = jingrow.local.team()
allowed_users = [d.user for d in team.team_members]
User = jingrow.qb.DocType("User")
return query.where(User.name.isin(allowed_users))
def after_rename(self, old_name, new_name, merge=False):
"""
Changes:
- Excluding update operations on MyISAM tables
"""
myisam_tables = jingrow.db.sql_list(
"""SELECT
TABLE_NAME FROM information_schema.TABLES
WHERE
ENGINE='MyISAM'
AND TABLE_SCHEMA NOT IN ('mysql','information_schema','performance_schema')
"""
)
tables = [x for x in jingrow.db.get_tables() if x not in myisam_tables]
for tab in tables:
desc = jingrow.db.get_table_columns_description(tab)
has_fields = []
for d in desc:
if d.get("name") in ["owner", "modified_by"]:
has_fields.append(d.get("name"))
for field in has_fields:
jingrow.db.sql(
"""UPDATE `{}`
SET `{}` = {}
WHERE `{}` = {}""".format(tab, field, "%s", field, "%s"),
(new_name, old_name),
)
for dt in ["Chat Profile", "Notification Settings"]:
if jingrow.db.exists(dt, old_name):
jingrow.rename_pg(dt, old_name, new_name, force=True, show_alert=False)
# set email
jingrow.db.sql(
"""UPDATE `tabUser`
SET email = %s
WHERE name = %s""",
(new_name, new_name),
)
def before_after_migrate():
# jingrow.clear_cache() on jcloude doesn't clear everything. See hooks.py
jingrow.cache.flushall()