jcloude/press/api/client.py
2025-12-23 22:43:56 +08:00

543 lines
14 KiB
Python

# Copyright (c) 2023, Jingrow Technologies Pvt. Ltd. and Contributors
# MIT License. See license.txt
from __future__ import annotations
import inspect
import typing
import jingrow
from jingrow.client import set_value as _set_value
from jingrow.handler import run_pg_method as _run_pg_method
from jingrow.model import child_table_fields, default_fields
from jingrow.model.base_document import get_controller
from jingrow.utils import cstr
from pypika.queries import QueryBuilder
from jcloude.access import dashboard_access_rules
from jcloude.access.support_access import has_support_access
from jcloude.exceptions import TeamHeaderNotInRequestError
from jcloude.guards import role_guard
from jcloude.utils import has_role
if typing.TYPE_CHECKING:
from jingrow.model.meta import Meta
ALLOWED_DOCTYPES = [
"Site",
"Site App",
"Site Domain",
"Site Backup",
"Site Activity",
"Server Activity",
"Site Config",
"Site Plan",
"Site Update",
"Site Group Deploy",
"Invoice",
"Balance Transaction",
"Stripe Payment Method",
"Bench",
"Bench App",
"Bench Dependency Version",
"Release Group",
"Release Group App",
"Release Group Dependency",
"Cluster",
"Jcloude Permission Group",
"Jcloude Role",
"Team",
"Product Trial Request",
"Deploy Candidate",
"Deploy Candidate Difference",
"Deploy Candidate Difference App",
"Agent Job",
"Agent Job Type",
"Common Site Config",
"Server",
"Database Server",
"Ansible Play",
"Server Plan",
"Release Group Variable",
"Resource Tag",
"Jcloude Tag",
"Partner Approval Request",
"Marketplace App",
"Subscription",
"Marketplace App Version",
"Marketplace App Plan",
"App Release",
"Payout Order",
"App Patch",
"Product Trial",
"Jcloude Notification",
"User SSH Key",
"Jingrow Version",
"Dashboard Banner",
"App Release Approval Request",
"Jcloude Webhook",
"SQL Playground Log",
"Site Database User",
"Jcloude Settings",
"Mpesa Payment Record",
"Partner Certificate",
"Partner Payment Payout",
"Deploy Candidate Build",
"Partner Lead",
"Partner Lead Type",
"Lead Followup",
"Partner Consent",
"Account Request",
"Server Snapshot",
"Server Snapshot Recovery",
"Support Access",
"Partner Lead Origin",
"Auto Scale Record",
]
whitelisted_methods = set()
@jingrow.whitelist()
def get_list(
pagetype: str,
fields: list | None = None,
filters: dict | None = None,
order_by: str | None = None,
start: int = 0,
limit: int = 20,
parent: str | None = None,
debug: bool = False,
):
if filters is None:
filters = {}
# these pagetypes doesn't have a team field to filter by but are used in get or run_pg_method
if pagetype in ["Team", "User SSH Key"]:
return []
check_permissions(pagetype)
valid_fields = validate_fields(pagetype, fields)
valid_filters = validate_filters(pagetype, filters)
meta = jingrow.get_meta(pagetype)
if meta.istable and not (filters.get("parenttype") and filters.get("parent")):
jingrow.throw("parenttype and parent are required to get child records")
apply_team_filter = not (
filters.get("skip_team_filter_for_system_user_and_support_agent")
and (jingrow.local.system_user() or has_role("Jcloude Support Agent"))
)
if apply_team_filter and meta.has_field("team"):
valid_filters.team = jingrow.local.team().name
query = get_list_query(
pagetype,
meta,
filters,
valid_filters,
valid_fields,
start,
limit,
order_by,
)
filters = jingrow._dict(filters or {})
list_args = dict(
fields=fields,
filters=filters,
order_by=order_by,
start=start,
limit=limit,
parent=parent,
debug=debug,
)
query = apply_custom_filters(pagetype, query, **list_args)
if isinstance(query, QueryBuilder):
return query.run(as_dict=1, debug=debug)
if isinstance(query, list):
return query
return []
@role_guard.document(
document_type=lambda args: str(args.get("pagetype")),
should_throw=False,
inject_values=True,
injection_key="document_options",
)
def get_list_query(
pagetype: str,
meta: "Meta",
filters: dict,
valid_filters: jingrow._dict,
valid_fields: list | None,
start: int,
limit: int,
order_by: str | None,
document_options=None,
):
query = jingrow.qb.get_query(
pagetype, filters=valid_filters, fields=valid_fields, offset=start, limit=limit, order_by=order_by
)
if meta.istable and jingrow.get_meta(filters.get("parenttype")).has_field("team"):
ParentPageType = jingrow.qb.PageType(filters.get("parenttype"))
ChildPageType = jingrow.qb.PageType(pagetype)
query = (
query.join(ParentPageType)
.on(ParentPageType.name == ChildPageType.parent)
.where(ParentPageType.team == jingrow.local.team().name)
)
if document_options and isinstance(document_options, list):
QueryPagetype = jingrow.qb.PageType(pagetype)
query = query.where(QueryPagetype.name.isin(document_options))
return query
@jingrow.whitelist()
@role_guard.document(
document_type=lambda args: str(args.get("pagetype")),
document_name=lambda args: str(args.get("name")),
)
def get(pagetype, name):
check_permissions(pagetype)
try:
pg = jingrow.get_pg(pagetype, name)
except jingrow.DoesNotExistError:
controller = get_controller(pagetype)
if hasattr(controller, "on_not_found"):
return controller.on_not_found(name)
raise
if (
not (jingrow.local.system_user() or has_support_access(pagetype, name))
and jingrow.get_meta(pagetype).has_field("team")
and pg.team != jingrow.local.team().name
):
raise_not_permitted()
fields = tuple(default_fields)
if hasattr(pg, "dashboard_fields"):
fields += tuple(pg.dashboard_fields)
_pg = jingrow._dict()
for fieldname in fields:
_pg[fieldname] = pg.get(fieldname)
if hasattr(pg, "get_pg"):
result = pg.get_pg(_pg)
if isinstance(result, dict):
_pg.update(result)
return dashboard_access_rules(_pg)
@jingrow.whitelist(methods=["POST", "PUT"])
def insert(pg=None):
if not pg or not pg.get("pagetype"):
jingrow.throw(jingrow._("pg.pagetype is required"))
check_permissions(pg.get("pagetype"))
pg = jingrow._dict(pg)
if jingrow.is_table(pg.pagetype):
if not (pg.parenttype and pg.parent and pg.parentfield):
jingrow.throw(jingrow._("Parenttype, Parent and Parentfield are required to insert a child record"))
# inserting a child record
parent = jingrow.get_pg(pg.parenttype, pg.parent)
if jingrow.get_meta(parent.pagetype).has_field("team") and parent.team != jingrow.local.team().name:
raise_not_permitted()
parent.append(pg.parentfield, pg)
parent.save()
return get(parent.pagetype, parent.name)
_pg = jingrow.get_pg(pg)
if jingrow.get_meta(pg.pagetype).has_field("team"):
if not _pg.team:
# set team if not set
_pg.team = jingrow.local.team().name
if not jingrow.local.system_user():
# don't allow dashboard user to set any other team
_pg.team = jingrow.local.team().name
_pg.insert()
return get(_pg.pagetype, _pg.name)
@jingrow.whitelist(methods=["POST", "PUT"])
def set_value(pagetype: str, name: str, fieldname: dict | str, value: str | None = None):
check_permissions(pagetype)
check_document_access(pagetype, name)
for field in fieldname:
# fields mentioned in dashboard_fields are allowed to be set via set_value
is_allowed_field(pagetype, field)
_set_value(pagetype, name, fieldname, value)
# jingrow set_value returns just the pg and not jcloude's overriden `get_pg`
return get(pagetype, name)
@jingrow.whitelist(methods=["DELETE", "POST"])
def delete(pagetype: str, name: str):
method = "delete"
check_permissions(pagetype)
check_document_access(pagetype, name)
check_dashboard_actions(pagetype, name, method)
_run_pg_method(dt=pagetype, dn=name, method=method, args=None)
@jingrow.whitelist()
def run_pg_method(dt: str, dn: str, method: str, args: dict | None = None):
check_permissions(dt)
check_document_access(dt, dn)
check_dashboard_actions(dt, dn, method)
_run_pg_method(
dt=dt,
dn=dn,
method=method,
args=fix_args(method, args),
)
jingrow.response.docs = [get(dt, dn)]
@jingrow.whitelist()
def search_link(
pagetype: str,
query: str | None = None,
filters: dict | None = None,
order_by: str | None = None,
page_length: int | None = None,
):
check_permissions(pagetype)
if pagetype == "Team" and not jingrow.local.system_user():
raise_not_permitted()
meta = jingrow.get_meta(pagetype)
PageType = jingrow.qb.PageType(pagetype)
valid_filters = validate_filters(pagetype, filters)
valid_fields = validate_fields(pagetype, ["name", meta.title_field or "name"])
q = get_list_query(
pagetype,
meta,
filters,
valid_filters,
valid_fields,
0,
page_length or 10,
order_by or "modified desc",
)
q = q.select(PageType.name.as_("value"))
if meta.title_field:
q = q.select(PageType[meta.title_field].as_("label"))
if meta.has_field("enabled"):
q = q.where(PageType.enabled == 1)
if meta.has_field("disabled"):
q = q.where(PageType.disabled != 1)
if meta.has_field("team") and (not jingrow.local.system_user() or 1):
q = q.where(PageType.team == jingrow.local.team().name)
if query:
condition = PageType.name.like(f"%{query}%")
if meta.title_field:
condition = condition | PageType[meta.title_field].like(f"%{query}%")
q = q.where(condition)
return q.run(as_dict=1)
def check_document_access(pagetype: str, name: str):
if jingrow.local.system_user():
return
team = ""
meta = jingrow.get_meta(pagetype)
if meta.has_field("team"):
team = jingrow.db.get_value(pagetype, name, "team")
elif meta.has_field("bench"):
bench = jingrow.db.get_value(pagetype, name, "bench")
team = jingrow.db.get_value("Bench", bench, "team")
elif meta.has_field("group"):
group = jingrow.db.get_value(pagetype, name, "group")
team = jingrow.db.get_value("Release Group", group, "team")
else:
return
if team == jingrow.local.team().name:
return
if has_support_access(pagetype, name):
return
raise_not_permitted()
def check_dashboard_actions(pagetype, name, method):
pg = jingrow.get_pg(pagetype, name)
method_obj = getattr(pg, method)
fn = getattr(method_obj, "__func__", method_obj)
if fn not in whitelisted_methods:
raise_not_permitted()
def apply_custom_filters(pagetype, query, **list_args):
"""Apply custom filters to query"""
controller = get_controller(pagetype)
if hasattr(controller, "get_list_query"):
if inspect.getfullargspec(controller.get_list_query).varkw:
return controller.get_list_query(query, **list_args)
return controller.get_list_query(query)
return query
def validate_filters(pagetype, filters):
"""Filter filters based on permissions"""
if not filters:
filters = {}
out = jingrow._dict()
for fieldname, value in filters.items():
if is_allowed_field(pagetype, fieldname):
out[fieldname] = value
return out
def validate_fields(pagetype, fields):
"""Filter fields based on permissions"""
if not fields:
return fields
filtered_fields = []
for field in fields:
if is_allowed_field(pagetype, field):
filtered_fields.append(field)
return filtered_fields
def is_allowed_field(pagetype, field):
"""Check if field is valid"""
if not field:
return False
controller = get_controller(pagetype)
dashboard_fields = getattr(controller, "dashboard_fields", ())
if field in dashboard_fields:
return True
if "." in field and is_allowed_linked_field(pagetype, field):
return True
if isinstance(field, dict) and is_allowed_table_field(pagetype, field):
return True
if field in [*default_fields, *child_table_fields]:
return True
return False
def is_allowed_linked_field(pagetype, field):
linked_field = linked_field_fieldname = None
if " as " in field:
linked_field, _ = field.split(" as ")
else:
linked_field = field
linked_field, linked_field_fieldname = linked_field.split(".")
if not is_allowed_field(pagetype, linked_field):
return False
linked_field_pagetype = jingrow.get_meta(pagetype).get_field(linked_field).options
if not is_allowed_field(linked_field_pagetype, linked_field_fieldname):
return False
return True
def is_allowed_table_field(pagetype, field):
for table_fieldname, table_fields in field.items():
if not is_allowed_field(pagetype, table_fieldname):
return False
table_pagetype = jingrow.get_meta(pagetype).get_field(table_fieldname).options
for table_field in table_fields:
if not is_allowed_field(table_pagetype, table_field):
return False
return True
def check_permissions(pagetype):
if pagetype not in ALLOWED_DOCTYPES:
raise_not_permitted()
if not hasattr(jingrow.local, "team") or not jingrow.local.team():
jingrow.throw(
"current_team is not set. Use X-JCLOUDE -TEAM header in the request to set it.",
TeamHeaderNotInRequestError,
)
return True
def is_owned_by_team(pagetype, docname, raise_exception=True):
if not jingrow.local.team():
return False
docname = cstr(docname)
owned = jingrow.db.get_value(pagetype, docname, "team") == jingrow.local.team().name
if not owned and raise_exception:
raise_not_permitted()
return owned
def raise_not_permitted():
jingrow.throw("Not permitted", jingrow.PermissionError)
def dashboard_whitelist(allow_guest=False, xss_safe=False, methods=None):
def wrapper(func):
global whitelisted_methods
decorated_func = jingrow.whitelist(allow_guest=allow_guest, xss_safe=xss_safe, methods=methods)(func)
def inner(*args, **kwargs):
return decorated_func(*args, **kwargs)
whitelisted_methods.add(decorated_func)
return decorated_func
return wrapper
def fix_args(method, args):
# This is a fixer function. Certain callers of `run_pg_method`
# pass duplicates of the passed kwargs in the `args` arg.
#
# This causes "got multiple values for argument 'method'"
if not isinstance(args, dict):
return args
# Even if it doesn't match it'll probably throw
# down the call stack, but in that case it's unexpected
# behavior and so it's better to error-out.
if args.get("method") == method:
del args["method"]
return args