239 lines
6.0 KiB
Python
239 lines
6.0 KiB
Python
# Copyright (c) 2019, JINGROW
|
|
# For license information, please see license.txt
|
|
|
|
|
|
from functools import partial
|
|
|
|
import jingrow
|
|
from ansible.utils.path import cleanup_tmp_file
|
|
from jingrow.core.pagetype.user.user import User, ask_pass_update
|
|
from jingrow.handler import is_whitelisted
|
|
from jingrow.utils import cint
|
|
|
|
from jcloud.runner import constants
|
|
from jcloud.utils import _get_current_team, _system_user
|
|
|
|
|
|
@jingrow.whitelist(allow_guest=True)
|
|
def upload_file():
|
|
if jingrow.session.user == "Guest":
|
|
return None
|
|
|
|
files = jingrow.request.files
|
|
is_private = jingrow.form_dict.is_private
|
|
pagetype = jingrow.form_dict.pagetype
|
|
docname = jingrow.form_dict.docname
|
|
fieldname = jingrow.form_dict.fieldname
|
|
file_url = jingrow.form_dict.file_url
|
|
folder = jingrow.form_dict.folder or "Home"
|
|
method = jingrow.form_dict.method
|
|
content = None
|
|
filename = None
|
|
|
|
if "file" in files:
|
|
file = files["file"]
|
|
content = file.stream.read()
|
|
filename = file.filename
|
|
|
|
jingrow.local.uploaded_file = content
|
|
jingrow.local.uploaded_filename = filename
|
|
|
|
if method:
|
|
method = jingrow.get_attr(method)
|
|
is_whitelisted(method)
|
|
return method()
|
|
ret = jingrow.get_pg(
|
|
{
|
|
"pagetype": "File",
|
|
"attached_to_pagetype": pagetype,
|
|
"attached_to_name": docname,
|
|
"attached_to_field": fieldname,
|
|
"folder": folder,
|
|
"file_name": filename,
|
|
"file_url": file_url,
|
|
"is_private": cint(is_private),
|
|
"content": content,
|
|
}
|
|
)
|
|
ret.save()
|
|
return ret
|
|
|
|
|
|
def on_session_creation():
|
|
from jcloud.utils import get_current_team
|
|
|
|
if (
|
|
not jingrow.db.exists("Team", {"user": jingrow.session.user})
|
|
and jingrow.session.data.user_type == "System User"
|
|
):
|
|
return
|
|
|
|
try:
|
|
team = get_current_team(get_pg=True)
|
|
route = team.get_route_on_login()
|
|
jingrow.local.response.update({"dashboard_route": route})
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def before_job():
|
|
jingrow.local.team = _get_current_team
|
|
jingrow.local.system_user = _system_user
|
|
|
|
|
|
def before_request():
|
|
jingrow.local.team = _get_current_team
|
|
jingrow.local.system_user = _system_user
|
|
|
|
|
|
def cleanup_ansible_tmp_files():
|
|
if hasattr(constants, "DEFAULT_LOCAL_TMP"):
|
|
cleanup_tmp_file(constants.DEFAULT_LOCAL_TMP)
|
|
|
|
|
|
def after_job():
|
|
cleanup_ansible_tmp_files()
|
|
|
|
|
|
def update_website_context(context):
|
|
if (jingrow.request and jingrow.request.path.startswith("/docs")) and not jingrow.db.get_single_value(
|
|
"Jcloud Settings", "publish_docs"
|
|
):
|
|
raise jingrow.DoesNotExistError
|
|
|
|
|
|
def has_permission(pg, ptype, user):
|
|
from jcloud.utils import get_current_team, has_role
|
|
|
|
if not user:
|
|
user = jingrow.session.user
|
|
|
|
user_type = jingrow.db.get_value("User", user, "user_type", cache=True)
|
|
if user_type == "System User":
|
|
return True
|
|
|
|
if ptype == "create":
|
|
return True
|
|
|
|
if has_role("Jcloud Support Agent", user) and ptype == "read":
|
|
return True
|
|
|
|
team = get_current_team()
|
|
child_team_members = [d.name for d in jingrow.db.get_all("Team", {"parent_team": team}, ["name"])]
|
|
if pg.team == team or pg.team in child_team_members:
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def get_permission_query_conditions_for_pagetype_and_user(pagetype, user):
|
|
from jcloud.utils import get_current_team
|
|
|
|
if not user:
|
|
user = jingrow.session.user
|
|
|
|
user_type = jingrow.db.get_value("User", user, "user_type", cache=True)
|
|
if user_type == "System User":
|
|
return ""
|
|
|
|
team = get_current_team()
|
|
|
|
return f"(`tab{pagetype}`.`team` = {jingrow.db.escape(team)})"
|
|
|
|
|
|
def get_permission_query_conditions_for_pagetype(pagetype):
|
|
return partial(get_permission_query_conditions_for_pagetype_and_user, pagetype)
|
|
|
|
|
|
class CustomUser(User):
|
|
dashboard_fields = ("full_name", "email", "user_image", "enabled", "user_type")
|
|
|
|
@staticmethod
|
|
def get_list_query(query):
|
|
team = jingrow.local.team()
|
|
allowed_users = [d.user for d in team.team_members]
|
|
User = jingrow.qb.PageType("User")
|
|
return query.where(User.name.isin(allowed_users))
|
|
|
|
def autoname(self):
|
|
"""set name as Email Address"""
|
|
if self.get("is_admin") or self.get("is_guest"):
|
|
self.name = self.first_name
|
|
else:
|
|
self.name = self.username
|
|
|
|
def validate(self):
|
|
# clear new password
|
|
self.__new_password = self.new_password
|
|
self.new_password = ""
|
|
|
|
if not jingrow.flags.in_test:
|
|
self.password_strength_test()
|
|
|
|
self.populate_role_profile_roles()
|
|
self.check_roles_added()
|
|
self.set_system_user()
|
|
self.set_full_name()
|
|
self.check_enable_disable()
|
|
self.ensure_unique_roles()
|
|
self.remove_all_roles_for_guest()
|
|
self.validate_username()
|
|
self.remove_disabled_roles()
|
|
self.validate_user_email_inbox()
|
|
ask_pass_update()
|
|
self.validate_allowed_modules()
|
|
self.validate_user_image()
|
|
self.set_time_zone()
|
|
|
|
if self.language == "Loading...":
|
|
self.language = None
|
|
|
|
if (self.name not in ["Administrator", "Guest"]) and (not self.get_social_login_userid("jingrow")):
|
|
self.set_social_login_userid("jingrow", jingrow.generate_hash(length=39))
|
|
|
|
def after_rename(self, old_name, new_name, merge=False):
|
|
"""
|
|
Changes:
|
|
- Excluding update operations on MyISAM tables
|
|
"""
|
|
myisam_tables = jingrow.db.sql_list(
|
|
"""SELECT
|
|
TABLE_NAME FROM information_schema.TABLES
|
|
WHERE
|
|
ENGINE='MyISAM'
|
|
AND TABLE_SCHEMA NOT IN ('mysql','information_schema','performance_schema')
|
|
"""
|
|
)
|
|
tables = [x for x in jingrow.db.get_tables() if x not in myisam_tables]
|
|
|
|
for tab in tables:
|
|
desc = jingrow.db.get_table_columns_description(tab)
|
|
has_fields = []
|
|
for d in desc:
|
|
if d.get("name") in ["owner", "modified_by"]:
|
|
has_fields.append(d.get("name"))
|
|
for field in has_fields:
|
|
jingrow.db.sql(
|
|
"""UPDATE `{}`
|
|
SET `{}` = {}
|
|
WHERE `{}` = {}""".format(tab, field, "%s", field, "%s"),
|
|
(new_name, old_name),
|
|
)
|
|
|
|
for dt in ["Chat Profile", "Notification Settings"]:
|
|
if jingrow.db.exists(dt, old_name):
|
|
jingrow.rename_pg(dt, old_name, new_name, force=True, show_alert=False)
|
|
|
|
# set username
|
|
jingrow.db.sql(
|
|
"""UPDATE `tabUser`
|
|
SET username = %s
|
|
WHERE name = %s""",
|
|
(new_name, new_name),
|
|
)
|
|
|
|
|
|
def before_after_migrate():
|
|
# jingrow.clear_cache() on jcloud doesn't clear everything. See hooks.py
|
|
jingrow.cache.flushall()
|