jcloud/jcloud/api/client.py

540 lines
13 KiB
Python

# Copyright (c) 2023, JINGROW
# MIT License. See license.txt
from __future__ import annotations
import inspect
import typing
import jingrow
from jingrow.client import set_value as _set_value
from jingrow.handler import run_pg_method as _run_pg_method
from jingrow.model import child_table_fields, default_fields
from jingrow.model.base_document import get_controller
from jingrow.utils import cstr
from pypika.queries import QueryBuilder
from jcloud.exceptions import TeamHeaderNotInRequestError
from jcloud.utils import has_role
if typing.TYPE_CHECKING:
from jingrow.model.meta import Meta
ALLOWED_PAGETYPES = [
"Site",
"Site App",
"Site Domain",
"Site Backup",
"Site Activity",
"Site Config",
"Site Plan",
"Site Update",
"Site Group Deploy",
"Invoice",
"Balance Transaction",
"Stripe Payment Method",
"Bench",
"Bench App",
"Bench Dependency Version",
"Release Group",
"Release Group App",
"Release Group Dependency",
"Cluster",
"Jcloud Permission Group",
"Jcloud Role",
"Jcloud Role Permission",
"Team",
"Product Trial Request",
"Deploy Candidate",
"Deploy Candidate Difference",
"Deploy Candidate Difference App",
"Agent Job",
"Agent Job Type",
"Common Site Config",
"Server",
"Database Server",
"Ansible Play",
"Server Plan",
"Release Group Variable",
"Resource Tag",
"Jcloud Tag",
"Partner Approval Request",
"Marketplace App",
"Subscription",
"Marketplace App Version",
"Marketplace App Plan",
"App Release",
"Payout Order",
"App Patch",
"Product Trial",
"Jcloud Notification",
"User SSH Key",
"Jingrow Version",
"Dashboard Banner",
"App Release Approval Request",
"Jcloud Webhook",
"SQL Playground Log",
"Site Database User",
"Jcloud Settings",
"Mpesa Payment Record",
"Jsite Server",
]
ALLOWED_PAGETYPES_FOR_SUPPORT = [
"Site",
"Bench",
"Release Group",
]
whitelisted_methods = set()
@jingrow.whitelist()
def get_list(
pagetype: str,
fields: list | None = None,
filters: dict | None = None,
order_by: str | None = None,
start: int = 0,
limit: int = 20,
parent: str | None = None,
debug: bool = False,
):
if filters is None:
filters = {}
# these doctypes doesn't have a team field to filter by but are used in get or run_pg_method
if pagetype in ["Team", "User SSH Key"]:
return []
check_permissions(pagetype)
valid_fields = validate_fields(pagetype, fields)
valid_filters = validate_filters(pagetype, filters)
meta = jingrow.get_meta(pagetype)
if meta.istable and not (filters.get("parenttype") and filters.get("parent")):
jingrow.throw("parenttype and parent are required to get child records")
apply_team_filter = not (
filters.get("skip_team_filter_for_system_user_and_support_agent")
and (jingrow.local.system_user() or has_role("Jcloud Support Agent"))
)
if apply_team_filter and meta.has_field("team"):
valid_filters.team = jingrow.local.team().name
query = get_list_query(
pagetype,
meta,
filters,
valid_filters,
valid_fields,
start,
limit,
order_by,
)
filters = jingrow._dict(filters or {})
list_args = dict(
fields=fields,
filters=filters,
order_by=order_by,
start=start,
limit=limit,
parent=parent,
debug=debug,
)
query = apply_custom_filters(pagetype, query, **list_args)
if isinstance(query, QueryBuilder):
return query.run(as_dict=1, debug=debug)
if isinstance(query, list):
return query
return []
def get_list_query(
pagetype: str,
meta: "Meta",
filters: dict,
valid_filters: jingrow._dict,
valid_fields: list | None,
start: int,
limit: int,
order_by: str | None,
):
from jcloud.jcloud.pagetype.jcloud_role.jcloud_role import check_role_permissions
query = jingrow.qb.get_query(
pagetype,
filters=valid_filters,
fields=valid_fields,
offset=start,
limit=limit,
order_by=order_by,
)
if meta.istable and jingrow.get_meta(filters.get("parenttype")).has_field("team"):
ParentDocType = jingrow.qb.PageType(filters.get("parenttype"))
ChildDocType = jingrow.qb.PageType(pagetype)
query = (
query.join(ParentDocType)
.on(ParentDocType.name == ChildDocType.parent)
.where(ParentDocType.team == jingrow.local.team().name)
)
if roles := check_role_permissions(pagetype):
JcloudRolePermission = jingrow.qb.PageType("Jcloud Role Permission")
QueriedDocType = jingrow.qb.PageType(pagetype)
field = pagetype.lower().replace(" ", "_")
query = (
query.join(JcloudRolePermission)
.on(JcloudRolePermission[field] == QueriedDocType.name & JcloudRolePermission.role.isin(roles))
.distinct()
)
return query
@jingrow.whitelist()
def get(pagetype, name):
from jcloud.jcloud.pagetype.jcloud_role.jcloud_role import check_role_permissions
check_permissions(pagetype)
try:
pg = jingrow.get_pg(pagetype, name)
except jingrow.DoesNotExistError:
controller = get_controller(pagetype)
if hasattr(controller, "on_not_found"):
return controller.on_not_found(name)
raise
if (
not (jingrow.local.system_user() or has_role("Jcloud Support Agent"))
and jingrow.get_meta(pagetype).has_field("team")
and pg.team != jingrow.local.team().name
):
raise_not_permitted()
check_role_permissions(pagetype, name)
fields = tuple(default_fields)
if hasattr(pg, "dashboard_fields"):
fields += tuple(pg.dashboard_fields)
_pg = jingrow._dict()
for fieldname in fields:
_pg[fieldname] = pg.get(fieldname)
if hasattr(pg, "get_pg"):
result = pg.get_pg(_pg)
if isinstance(result, dict):
_pg.update(result)
return _pg
@jingrow.whitelist(methods=["POST", "PUT"])
def insert(pg=None):
if not pg or not pg.get("pagetype"):
jingrow.throw(jingrow._("pg.pagetype is required"))
check_permissions(pg.get("pagetype"))
pg = jingrow._dict(pg)
if jingrow.is_table(pg.pagetype):
if not (pg.parenttype and pg.parent and pg.parentfield):
jingrow.throw(jingrow._("Parenttype, Parent and Parentfield are required to insert a child record"))
# inserting a child record
parent = jingrow.get_pg(pg.parenttype, pg.parent)
if jingrow.get_meta(parent.pagetype).has_field("team") and parent.team != jingrow.local.team().name:
raise_not_permitted()
parent.append(pg.parentfield, pg)
parent.save()
return get(parent.pagetype, parent.name)
_pg = jingrow.get_pg(pg)
if jingrow.get_meta(pg.pagetype).has_field("team"):
if not _pg.team:
# set team if not set
_pg.team = jingrow.local.team().name
if not jingrow.local.system_user():
# don't allow dashboard user to set any other team
_pg.team = jingrow.local.team().name
_pg.insert()
return get(_pg.pagetype, _pg.name)
@jingrow.whitelist(methods=["POST", "PUT"])
def set_value(pagetype: str, name: str, fieldname: dict | str, value: str | None = None):
check_permissions(pagetype)
check_document_access(pagetype, name)
for field in fieldname:
# fields mentioned in dashboard_fields are allowed to be set via set_value
is_allowed_field(pagetype, field)
_set_value(pagetype, name, fieldname, value)
# jingrow set_value returns just the pg and not jcloud's overriden `get_pg`
return get(pagetype, name)
@jingrow.whitelist(methods=["DELETE", "POST"])
def delete(pagetype: str, name: str):
method = "delete"
check_permissions(pagetype)
check_document_access(pagetype, name)
check_dashboard_actions(pagetype, name, method)
_run_pg_method(dt=pagetype, dn=name, method=method, args=None)
@jingrow.whitelist()
def run_pg_method(dt: str, dn: str, method: str, args: dict | None = None):
check_permissions(dt)
check_document_access(dt, dn)
check_dashboard_actions(dt, dn, method)
_run_pg_method(
dt=dt,
dn=dn,
method=method,
args=fix_args(method, args),
)
jingrow.response.docs = [get(dt, dn)]
@jingrow.whitelist()
def search_link(
pagetype: str,
query: str | None = None,
filters: dict | None = None,
order_by: str | None = None,
page_length: int | None = None,
):
check_permissions(pagetype)
if pagetype == "Team" and not jingrow.local.system_user():
raise_not_permitted()
meta = jingrow.get_meta(pagetype)
PageType = jingrow.qb.PageType(pagetype)
valid_filters = validate_filters(pagetype, filters)
valid_fields = validate_fields(pagetype, ["name", meta.title_field or "name"])
q = get_list_query(
pagetype,
meta,
filters,
valid_filters,
valid_fields,
0,
page_length or 10,
order_by or "modified desc",
)
q = q.select(PageType.name.as_("value"))
if meta.title_field:
q = q.select(PageType[meta.title_field].as_("label"))
if meta.has_field("enabled"):
q = q.where(PageType.enabled == 1)
if meta.has_field("disabled"):
q = q.where(PageType.disabled != 1)
if meta.has_field("team") and (not jingrow.local.system_user() or 1):
q = q.where(PageType.team == jingrow.local.team().name)
if query:
condition = PageType.name.like(f"%{query}%")
if meta.title_field:
condition = condition | PageType[meta.title_field].like(f"%{query}%")
q = q.where(condition)
return q.run(as_dict=1)
def check_document_access(pagetype: str, name: str):
if jingrow.local.system_user():
return
if has_role("Jcloud Support Agent") and pagetype in ALLOWED_PAGETYPES_FOR_SUPPORT:
return
team = ""
meta = jingrow.get_meta(pagetype)
if meta.has_field("team"):
team = jingrow.db.get_value(pagetype, name, "team")
elif meta.has_field("bench"):
bench = jingrow.db.get_value(pagetype, name, "bench")
team = jingrow.db.get_value("Bench", bench, "team")
elif meta.has_field("group"):
group = jingrow.db.get_value(pagetype, name, "group")
team = jingrow.db.get_value("Release Group", group, "team")
else:
return
if team == jingrow.local.team().name:
return
raise_not_permitted()
def check_dashboard_actions(pagetype, name, method):
pg = jingrow.get_pg(pagetype, name)
method_obj = getattr(pg, method)
fn = getattr(method_obj, "__func__", method_obj)
if fn not in whitelisted_methods:
raise_not_permitted()
def apply_custom_filters(pagetype, query, **list_args):
"""Apply custom filters to query"""
controller = get_controller(pagetype)
if hasattr(controller, "get_list_query"):
if inspect.getfullargspec(controller.get_list_query).varkw:
return controller.get_list_query(query, **list_args)
return controller.get_list_query(query)
return query
def validate_filters(pagetype, filters):
"""Filter filters based on permissions"""
if not filters:
filters = {}
out = jingrow._dict()
for fieldname, value in filters.items():
if is_allowed_field(pagetype, fieldname):
out[fieldname] = value
return out
def validate_fields(pagetype, fields):
"""Filter fields based on permissions"""
if not fields:
return fields
filtered_fields = []
for field in fields:
if is_allowed_field(pagetype, field):
filtered_fields.append(field)
return filtered_fields
def is_allowed_field(pagetype, field):
"""Check if field is valid"""
if not field:
return False
controller = get_controller(pagetype)
dashboard_fields = getattr(controller, "dashboard_fields", ())
if field in dashboard_fields:
return True
if "." in field and is_allowed_linked_field(pagetype, field):
return True
if isinstance(field, dict) and is_allowed_table_field(pagetype, field):
return True
if field in [*default_fields, *child_table_fields]:
return True
return False
def is_allowed_linked_field(pagetype, field):
linked_field = linked_field_fieldname = None
if " as " in field:
linked_field, _ = field.split(" as ")
else:
linked_field = field
linked_field, linked_field_fieldname = linked_field.split(".")
if not is_allowed_field(pagetype, linked_field):
return False
linked_field_pagetype = jingrow.get_meta(pagetype).get_field(linked_field).options
if not is_allowed_field(linked_field_pagetype, linked_field_fieldname):
return False
return True
def is_allowed_table_field(pagetype, field):
for table_fieldname, table_fields in field.items():
if not is_allowed_field(pagetype, table_fieldname):
return False
table_pagetype = jingrow.get_meta(pagetype).get_field(table_fieldname).options
for table_field in table_fields:
if not is_allowed_field(table_pagetype, table_field):
return False
return True
def check_permissions(pagetype):
if pagetype not in ALLOWED_PAGETYPES:
raise_not_permitted()
if not hasattr(jingrow.local, "team") or not jingrow.local.team():
jingrow.throw(
"current_team is not set. Use X-JCLOUD-TEAM header in the request to set it.",
TeamHeaderNotInRequestError,
)
return True
def is_owned_by_team(pagetype, docname, raise_exception=True):
if not jingrow.local.team():
return False
docname = cstr(docname)
owned = jingrow.db.get_value(pagetype, docname, "team") == jingrow.local.team().name
if not owned and raise_exception:
raise_not_permitted()
return owned
def raise_not_permitted():
jingrow.throw("不允许", jingrow.PermissionError)
def dashboard_whitelist(allow_guest=False, xss_safe=False, methods=None):
def wrapper(func):
global whitelisted_methods
decorated_func = jingrow.whitelist(allow_guest=allow_guest, xss_safe=xss_safe, methods=methods)(func)
def inner(*args, **kwargs):
return decorated_func(*args, **kwargs)
whitelisted_methods.add(decorated_func)
return decorated_func
return wrapper
def fix_args(method, args):
# This is a fixer function. Certain callers of `run_pg_method`
# pass duplicates of the passed kwargs in the `args` arg.
#
# This causes "got multiple values for argument 'method'"
if not isinstance(args, dict):
return args
# Even if it doesn't match it'll probably throw
# down the call stack, but in that case it's unexpected
# behavior and so it's better to error-out.
if args.get("method") == method:
del args["method"]
return args