911 lines
25 KiB
Python
911 lines
25 KiB
Python
import json
|
|
|
|
import jingrow
|
|
from jingrow import _
|
|
from jingrow.custom.pagetype.property_setter.property_setter import make_property_setter
|
|
from jingrow.desk.form.assign_to import set_status
|
|
from jingrow.model import no_value_fields
|
|
from jingrow.model.document import get_controller
|
|
from jingrow.utils import make_filter_tuple
|
|
from pypika import Criterion
|
|
|
|
from crm.api.views import get_views
|
|
from crm.fcrm.pagetype.crm_form_script.crm_form_script import get_form_script
|
|
from crm.utils import get_dynamic_linked_docs, get_linked_docs
|
|
|
|
|
|
@jingrow.whitelist()
|
|
def sort_options(pagetype: str):
|
|
fields = jingrow.get_meta(pagetype).fields
|
|
fields = [field for field in fields if field.fieldtype not in no_value_fields]
|
|
fields = [
|
|
{
|
|
"label": _(field.label),
|
|
"value": field.fieldname,
|
|
"fieldname": field.fieldname,
|
|
}
|
|
for field in fields
|
|
if field.label and field.fieldname
|
|
]
|
|
|
|
standard_fields = [
|
|
{"label": "Name", "fieldname": "name"},
|
|
{"label": "Created On", "fieldname": "creation"},
|
|
{"label": "Last Modified", "fieldname": "modified"},
|
|
{"label": "Modified By", "fieldname": "modified_by"},
|
|
{"label": "Owner", "fieldname": "owner"},
|
|
]
|
|
|
|
for field in standard_fields:
|
|
field["label"] = _(field["label"])
|
|
field["value"] = field["fieldname"]
|
|
fields.append(field)
|
|
|
|
return fields
|
|
|
|
|
|
@jingrow.whitelist()
|
|
def get_filterable_fields(pagetype: str):
|
|
allowed_fieldtypes = [
|
|
"Check",
|
|
"Data",
|
|
"Float",
|
|
"Int",
|
|
"Currency",
|
|
"Dynamic Link",
|
|
"Link",
|
|
"Long Text",
|
|
"Select",
|
|
"Small Text",
|
|
"Text Editor",
|
|
"Text",
|
|
"Duration",
|
|
"Date",
|
|
"Datetime",
|
|
]
|
|
|
|
c = get_controller(pagetype)
|
|
restricted_fields = []
|
|
if hasattr(c, "get_non_filterable_fields"):
|
|
restricted_fields = c.get_non_filterable_fields()
|
|
|
|
res = []
|
|
|
|
# append DocFields
|
|
DocField = jingrow.qb.PageType("DocField")
|
|
pg_fields = get_pagetype_fields_meta(DocField, pagetype, allowed_fieldtypes, restricted_fields)
|
|
res.extend(pg_fields)
|
|
|
|
# append Custom Fields
|
|
CustomField = jingrow.qb.PageType("Custom Field")
|
|
custom_fields = get_pagetype_fields_meta(CustomField, pagetype, allowed_fieldtypes, restricted_fields)
|
|
res.extend(custom_fields)
|
|
|
|
# append standard fields (getting error when using jingrow.model.std_fields)
|
|
standard_fields = [
|
|
{"fieldname": "name", "fieldtype": "Link", "label": "ID", "options": pagetype},
|
|
{"fieldname": "owner", "fieldtype": "Link", "label": "Created By", "options": "User"},
|
|
{
|
|
"fieldname": "modified_by",
|
|
"fieldtype": "Link",
|
|
"label": "Last Updated By",
|
|
"options": "User",
|
|
},
|
|
{"fieldname": "_user_tags", "fieldtype": "Data", "label": "Tags"},
|
|
{"fieldname": "_liked_by", "fieldtype": "Data", "label": "Like"},
|
|
{"fieldname": "_comments", "fieldtype": "Text", "label": "Comments"},
|
|
{"fieldname": "_assign", "fieldtype": "Text", "label": "Assigned To"},
|
|
{"fieldname": "creation", "fieldtype": "Datetime", "label": "Created On"},
|
|
{"fieldname": "modified", "fieldtype": "Datetime", "label": "Last Updated On"},
|
|
]
|
|
for field in standard_fields:
|
|
if field.get("fieldname") not in restricted_fields and field.get("fieldtype") in allowed_fieldtypes:
|
|
field["name"] = field.get("fieldname")
|
|
res.append(field)
|
|
|
|
for field in res:
|
|
field["label"] = _(field.get("label"))
|
|
field["value"] = field.get("fieldname")
|
|
|
|
return res
|
|
|
|
|
|
@jingrow.whitelist()
|
|
def get_group_by_fields(pagetype: str):
|
|
allowed_fieldtypes = [
|
|
"Check",
|
|
"Data",
|
|
"Float",
|
|
"Int",
|
|
"Currency",
|
|
"Dynamic Link",
|
|
"Link",
|
|
"Select",
|
|
"Duration",
|
|
"Date",
|
|
"Datetime",
|
|
]
|
|
|
|
fields = jingrow.get_meta(pagetype).fields
|
|
fields = [
|
|
field
|
|
for field in fields
|
|
if field.fieldtype not in no_value_fields and field.fieldtype in allowed_fieldtypes
|
|
]
|
|
fields = [
|
|
{
|
|
"label": _(field.label),
|
|
"fieldname": field.fieldname,
|
|
}
|
|
for field in fields
|
|
if field.label and field.fieldname
|
|
]
|
|
|
|
standard_fields = [
|
|
{"label": "Name", "fieldname": "name"},
|
|
{"label": "Created On", "fieldname": "creation"},
|
|
{"label": "Last Modified", "fieldname": "modified"},
|
|
{"label": "Modified By", "fieldname": "modified_by"},
|
|
{"label": "Owner", "fieldname": "owner"},
|
|
{"label": "Liked By", "fieldname": "_liked_by"},
|
|
{"label": "Assigned To", "fieldname": "_assign"},
|
|
{"label": "Comments", "fieldname": "_comments"},
|
|
{"label": "Created On", "fieldname": "creation"},
|
|
{"label": "Modified On", "fieldname": "modified"},
|
|
]
|
|
|
|
for field in standard_fields:
|
|
field["label"] = _(field["label"])
|
|
fields.append(field)
|
|
|
|
return fields
|
|
|
|
|
|
def get_pagetype_fields_meta(DocField, pagetype, allowed_fieldtypes, restricted_fields):
|
|
parent = "parent" if DocField._table_name == "tabDocField" else "dt"
|
|
return (
|
|
jingrow.qb.from_(DocField)
|
|
.select(
|
|
DocField.fieldname,
|
|
DocField.fieldtype,
|
|
DocField.label,
|
|
DocField.name,
|
|
DocField.options,
|
|
)
|
|
.where(DocField[parent] == pagetype)
|
|
.where(DocField.hidden == False) # noqa: E712
|
|
.where(Criterion.any([DocField.fieldtype == i for i in allowed_fieldtypes]))
|
|
.where(Criterion.all([DocField.fieldname != i for i in restricted_fields]))
|
|
.run(as_dict=True)
|
|
)
|
|
|
|
|
|
@jingrow.whitelist()
|
|
def get_quick_filters(pagetype: str, cached: bool = True):
|
|
meta = jingrow.get_meta(pagetype, cached)
|
|
quick_filters = []
|
|
|
|
if global_settings := jingrow.db.exists("CRM Global Settings", {"dt": pagetype, "type": "Quick Filters"}):
|
|
_quick_filters = jingrow.db.get_value("CRM Global Settings", global_settings, "json")
|
|
_quick_filters = json.loads(_quick_filters) or []
|
|
|
|
fields = []
|
|
|
|
for filter in _quick_filters:
|
|
if filter == "name":
|
|
fields.append({"label": "Name", "fieldname": "name", "fieldtype": "Data"})
|
|
else:
|
|
field = next((f for f in meta.fields if f.fieldname == filter), None)
|
|
if field:
|
|
fields.append(field)
|
|
|
|
else:
|
|
fields = [field for field in meta.fields if field.in_standard_filter]
|
|
|
|
for field in fields:
|
|
options = field.get("options")
|
|
if field.get("fieldtype") == "Select" and options and isinstance(options, str):
|
|
options = options.split("\n")
|
|
options = [{"label": option, "value": option} for option in options]
|
|
if not any([not option.get("value") for option in options]):
|
|
options.insert(0, {"label": "", "value": ""})
|
|
quick_filters.append(
|
|
{
|
|
"label": _(field.get("label")),
|
|
"fieldname": field.get("fieldname"),
|
|
"fieldtype": field.get("fieldtype"),
|
|
"options": options,
|
|
}
|
|
)
|
|
|
|
if pagetype == "CRM Lead":
|
|
quick_filters = [filter for filter in quick_filters if filter.get("fieldname") != "converted"]
|
|
|
|
return quick_filters
|
|
|
|
|
|
@jingrow.whitelist()
|
|
def update_quick_filters(quick_filters: str, old_filters: str, pagetype: str):
|
|
quick_filters = json.loads(quick_filters)
|
|
old_filters = json.loads(old_filters)
|
|
|
|
new_filters = [filter for filter in quick_filters if filter not in old_filters]
|
|
removed_filters = [filter for filter in old_filters if filter not in quick_filters]
|
|
|
|
# update or create global quick filter settings
|
|
create_update_global_settings(pagetype, quick_filters)
|
|
|
|
# remove old filters
|
|
for filter in removed_filters:
|
|
update_in_standard_filter(filter, pagetype, 0)
|
|
|
|
# add new filters
|
|
for filter in new_filters:
|
|
update_in_standard_filter(filter, pagetype, 1)
|
|
|
|
|
|
def create_update_global_settings(pagetype, quick_filters):
|
|
if global_settings := jingrow.db.exists("CRM Global Settings", {"dt": pagetype, "type": "Quick Filters"}):
|
|
jingrow.db.set_value("CRM Global Settings", global_settings, "json", json.dumps(quick_filters))
|
|
else:
|
|
# create CRM Global Settings pg
|
|
pg = jingrow.new_pg("CRM Global Settings")
|
|
pg.dt = pagetype
|
|
pg.type = "Quick Filters"
|
|
pg.json = json.dumps(quick_filters)
|
|
pg.insert()
|
|
|
|
|
|
def update_in_standard_filter(fieldname, pagetype, value):
|
|
if property_name := jingrow.db.exists(
|
|
"Property Setter",
|
|
{"pg_type": pagetype, "field_name": fieldname, "property": "in_standard_filter"},
|
|
):
|
|
jingrow.db.set_value("Property Setter", property_name, "value", value)
|
|
else:
|
|
make_property_setter(
|
|
pagetype,
|
|
fieldname,
|
|
"in_standard_filter",
|
|
value,
|
|
"Check",
|
|
validate_fields_for_pagetype=False,
|
|
)
|
|
|
|
|
|
@jingrow.whitelist()
|
|
def get_data(
|
|
pagetype: str,
|
|
filters: dict,
|
|
order_by: str,
|
|
page_length=20,
|
|
page_length_count=20,
|
|
column_field=None,
|
|
title_field=None,
|
|
columns=[],
|
|
rows=[],
|
|
kanban_columns=[],
|
|
kanban_fields=[],
|
|
view=None,
|
|
default_filters=None,
|
|
):
|
|
custom_view = False
|
|
filters = jingrow._dict(filters)
|
|
rows = jingrow.parse_json(rows or "[]")
|
|
columns = jingrow.parse_json(columns or "[]")
|
|
kanban_fields = jingrow.parse_json(kanban_fields or "[]")
|
|
kanban_columns = jingrow.parse_json(kanban_columns or "[]")
|
|
|
|
custom_view_name = view.get("custom_view_name") if view else None
|
|
view_type = view.get("view_type") if view else None
|
|
group_by_field = view.get("group_by_field") if view else None
|
|
|
|
for key in filters:
|
|
value = filters[key]
|
|
if isinstance(value, list):
|
|
if "@me" in value:
|
|
value[value.index("@me")] = jingrow.session.user
|
|
elif "%@me%" in value:
|
|
index = [i for i, v in enumerate(value) if v == "%@me%"]
|
|
for i in index:
|
|
value[i] = "%" + jingrow.session.user + "%"
|
|
elif value == "@me":
|
|
filters[key] = jingrow.session.user
|
|
|
|
if default_filters:
|
|
default_filters = jingrow.parse_json(default_filters)
|
|
filters.update(default_filters)
|
|
|
|
is_default = True
|
|
data = []
|
|
_list = get_controller(pagetype)
|
|
default_rows = []
|
|
if hasattr(_list, "default_list_data"):
|
|
default_rows = _list.default_list_data().get("rows")
|
|
|
|
meta = jingrow.get_meta(pagetype)
|
|
|
|
if view_type != "kanban":
|
|
if columns or rows:
|
|
custom_view = True
|
|
is_default = False
|
|
columns = jingrow.parse_json(columns)
|
|
rows = jingrow.parse_json(rows)
|
|
|
|
if not columns:
|
|
columns = [
|
|
{"label": "Name", "type": "Data", "key": "name", "width": "16rem"},
|
|
{"label": "Last Modified", "type": "Datetime", "key": "modified", "width": "8rem"},
|
|
]
|
|
|
|
if not rows:
|
|
rows = ["name"]
|
|
|
|
default_view_filters = {
|
|
"dt": pagetype,
|
|
"type": view_type or "list",
|
|
"is_standard": 1,
|
|
"user": jingrow.session.user,
|
|
}
|
|
|
|
if not custom_view and jingrow.db.exists("CRM View Settings", default_view_filters):
|
|
list_view_settings = jingrow.get_pg("CRM View Settings", default_view_filters)
|
|
columns = jingrow.parse_json(list_view_settings.columns)
|
|
rows = jingrow.parse_json(list_view_settings.rows)
|
|
is_default = False
|
|
elif not custom_view or (is_default and hasattr(_list, "default_list_data")):
|
|
rows = default_rows
|
|
columns = _list.default_list_data().get("columns")
|
|
|
|
# check if rows has all keys from columns if not add them
|
|
for column in columns:
|
|
if column.get("key") not in rows:
|
|
rows.append(column.get("key"))
|
|
column["label"] = _(column.get("label"))
|
|
|
|
if column.get("key") == "_liked_by" and column.get("width") == "10rem":
|
|
column["width"] = "50px"
|
|
|
|
# remove column if column.hidden is True
|
|
column_meta = meta.get_field(column.get("key"))
|
|
if column_meta and column_meta.get("hidden"):
|
|
columns.remove(column)
|
|
|
|
# check if rows has group_by_field if not add it
|
|
if group_by_field and group_by_field not in rows:
|
|
rows.append(group_by_field)
|
|
|
|
data = (
|
|
jingrow.get_list(
|
|
pagetype,
|
|
fields=rows,
|
|
filters=filters,
|
|
order_by=order_by,
|
|
page_length=page_length,
|
|
)
|
|
or []
|
|
)
|
|
data = parse_list_data(data, pagetype)
|
|
|
|
if view_type == "kanban":
|
|
if not rows:
|
|
rows = default_rows
|
|
|
|
if not kanban_columns and column_field:
|
|
field_meta = jingrow.get_meta(pagetype).get_field(column_field)
|
|
if field_meta.fieldtype == "Link":
|
|
kanban_columns = jingrow.get_all(
|
|
field_meta.options,
|
|
fields=["name"],
|
|
order_by="modified asc",
|
|
)
|
|
elif field_meta.fieldtype == "Select":
|
|
kanban_columns = [{"name": option} for option in field_meta.options.split("\n")]
|
|
|
|
if not title_field:
|
|
title_field = "name"
|
|
if hasattr(_list, "default_kanban_settings"):
|
|
title_field = _list.default_kanban_settings().get("title_field")
|
|
|
|
if title_field not in rows:
|
|
rows.append(title_field)
|
|
|
|
if not kanban_fields:
|
|
kanban_fields = ["name"]
|
|
if hasattr(_list, "default_kanban_settings"):
|
|
kanban_fields = json.loads(_list.default_kanban_settings().get("kanban_fields"))
|
|
|
|
for field in kanban_fields:
|
|
if field not in rows:
|
|
rows.append(field)
|
|
|
|
for kc in kanban_columns:
|
|
column_filters = {column_field: kc.get("name")}
|
|
order = kc.get("order")
|
|
if (column_field in filters and filters.get(column_field) != kc.get("name")) or kc.get("delete"):
|
|
column_data = []
|
|
else:
|
|
column_filters.update(filters.copy())
|
|
page_length = 20
|
|
|
|
if kc.get("page_length"):
|
|
page_length = kc.get("page_length")
|
|
|
|
if order:
|
|
column_data = get_records_based_on_order(
|
|
pagetype, rows, column_filters, page_length, order
|
|
)
|
|
else:
|
|
column_data = jingrow.get_list(
|
|
pagetype,
|
|
fields=rows,
|
|
filters=convert_filter_to_tuple(pagetype, column_filters),
|
|
order_by=order_by,
|
|
page_length=page_length,
|
|
)
|
|
|
|
new_filters = filters.copy()
|
|
new_filters.update({column_field: kc.get("name")})
|
|
|
|
all_count = jingrow.get_list(
|
|
pagetype,
|
|
filters=convert_filter_to_tuple(pagetype, new_filters),
|
|
fields="count(*) as total_count",
|
|
)[0].total_count
|
|
|
|
kc["all_count"] = all_count
|
|
kc["count"] = len(column_data)
|
|
|
|
for d in column_data:
|
|
getCounts(d, pagetype)
|
|
|
|
if order:
|
|
column_data = sorted(
|
|
column_data,
|
|
key=lambda x: order.index(x.get("name")) if x.get("name") in order else len(order),
|
|
)
|
|
|
|
data.append({"column": kc, "fields": kanban_fields, "data": column_data})
|
|
|
|
fields = jingrow.get_meta(pagetype).fields
|
|
fields = [field for field in fields if field.fieldtype not in no_value_fields]
|
|
fields = [
|
|
{
|
|
"label": _(field.label),
|
|
"fieldtype": field.fieldtype,
|
|
"fieldname": field.fieldname,
|
|
"options": field.options,
|
|
}
|
|
for field in fields
|
|
if field.label and field.fieldname
|
|
]
|
|
|
|
std_fields = [
|
|
{"label": "Name", "fieldtype": "Data", "fieldname": "name"},
|
|
{"label": "Created On", "fieldtype": "Datetime", "fieldname": "creation"},
|
|
{"label": "Last Modified", "fieldtype": "Datetime", "fieldname": "modified"},
|
|
{
|
|
"label": "Modified By",
|
|
"fieldtype": "Link",
|
|
"fieldname": "modified_by",
|
|
"options": "User",
|
|
},
|
|
{"label": "Assigned To", "fieldtype": "Text", "fieldname": "_assign"},
|
|
{"label": "Owner", "fieldtype": "Link", "fieldname": "owner", "options": "User"},
|
|
{"label": "Like", "fieldtype": "Data", "fieldname": "_liked_by"},
|
|
]
|
|
|
|
for field in std_fields:
|
|
if field.get("fieldname") not in rows:
|
|
rows.append(field.get("fieldname"))
|
|
if field not in fields:
|
|
field["label"] = _(field["label"])
|
|
fields.append(field)
|
|
|
|
if not is_default and custom_view_name:
|
|
is_default = jingrow.db.get_value("CRM View Settings", custom_view_name, "load_default_columns")
|
|
|
|
if group_by_field and view_type == "group_by":
|
|
|
|
def get_options(type, options):
|
|
if type == "Select":
|
|
return [option for option in options.split("\n")]
|
|
else:
|
|
has_empty_values = any([not d.get(group_by_field) for d in data])
|
|
options = list(set([d.get(group_by_field) for d in data]))
|
|
options = [u for u in options if u]
|
|
if has_empty_values:
|
|
options.append("")
|
|
|
|
if order_by and group_by_field in order_by:
|
|
order_by_fields = order_by.split(",")
|
|
order_by_fields = [
|
|
(field.split(" ")[0], field.split(" ")[1]) for field in order_by_fields
|
|
]
|
|
if (group_by_field, "asc") in order_by_fields:
|
|
options.sort()
|
|
elif (group_by_field, "desc") in order_by_fields:
|
|
options.sort(reverse=True)
|
|
else:
|
|
options.sort()
|
|
return options
|
|
|
|
for field in fields:
|
|
if field.get("fieldname") == group_by_field:
|
|
group_by_field = {
|
|
"label": field.get("label"),
|
|
"fieldname": field.get("fieldname"),
|
|
"fieldtype": field.get("fieldtype"),
|
|
"options": get_options(field.get("fieldtype"), field.get("options")),
|
|
}
|
|
|
|
return {
|
|
"data": data,
|
|
"columns": columns,
|
|
"rows": rows,
|
|
"fields": fields,
|
|
"column_field": column_field,
|
|
"title_field": title_field,
|
|
"kanban_columns": kanban_columns,
|
|
"kanban_fields": kanban_fields,
|
|
"group_by_field": group_by_field,
|
|
"page_length": page_length,
|
|
"page_length_count": page_length_count,
|
|
"is_default": is_default,
|
|
"views": get_views(pagetype),
|
|
"total_count": jingrow.get_list(pagetype, filters=filters, fields="count(*) as total_count")[
|
|
0
|
|
].total_count,
|
|
"row_count": len(data),
|
|
"form_script": get_form_script(pagetype),
|
|
"list_script": get_form_script(pagetype, "List"),
|
|
"view_type": view_type,
|
|
}
|
|
|
|
|
|
def parse_list_data(data, pagetype):
|
|
_list = get_controller(pagetype)
|
|
if hasattr(_list, "parse_list_data"):
|
|
data = _list.parse_list_data(data)
|
|
return data
|
|
|
|
|
|
def convert_filter_to_tuple(pagetype, filters):
|
|
if isinstance(filters, dict):
|
|
filters_items = filters.items()
|
|
filters = []
|
|
for key, value in filters_items:
|
|
filters.append(make_filter_tuple(pagetype, key, value))
|
|
return filters
|
|
|
|
|
|
def get_records_based_on_order(pagetype, rows, filters, page_length, order):
|
|
records = []
|
|
filters = convert_filter_to_tuple(pagetype, filters)
|
|
in_filters = filters.copy()
|
|
in_filters.append([pagetype, "name", "in", order[:page_length]])
|
|
records = jingrow.get_list(
|
|
pagetype,
|
|
fields=rows,
|
|
filters=in_filters,
|
|
order_by="creation desc",
|
|
page_length=page_length,
|
|
)
|
|
|
|
if len(records) < page_length:
|
|
not_in_filters = filters.copy()
|
|
not_in_filters.append([pagetype, "name", "not in", order])
|
|
remaining_records = jingrow.get_list(
|
|
pagetype,
|
|
fields=rows,
|
|
filters=not_in_filters,
|
|
order_by="creation desc",
|
|
page_length=page_length - len(records),
|
|
)
|
|
for record in remaining_records:
|
|
records.append(record)
|
|
|
|
return records
|
|
|
|
|
|
@jingrow.whitelist()
|
|
def get_fields_meta(pagetype, restricted_fieldtypes=None, as_array=False, only_required=False):
|
|
not_allowed_fieldtypes = [
|
|
"Tab Break",
|
|
"Section Break",
|
|
"Column Break",
|
|
]
|
|
|
|
if restricted_fieldtypes:
|
|
restricted_fieldtypes = jingrow.parse_json(restricted_fieldtypes)
|
|
not_allowed_fieldtypes += restricted_fieldtypes
|
|
|
|
fields = jingrow.get_meta(pagetype).fields
|
|
fields = [field for field in fields if field.fieldtype not in not_allowed_fieldtypes]
|
|
|
|
standard_fields = [
|
|
{"fieldname": "name", "fieldtype": "Link", "label": "ID", "options": pagetype},
|
|
{"fieldname": "owner", "fieldtype": "Link", "label": "Created By", "options": "User"},
|
|
{
|
|
"fieldname": "modified_by",
|
|
"fieldtype": "Link",
|
|
"label": "Last Updated By",
|
|
"options": "User",
|
|
},
|
|
{"fieldname": "_user_tags", "fieldtype": "Data", "label": "Tags"},
|
|
{"fieldname": "_liked_by", "fieldtype": "Data", "label": "Like"},
|
|
{"fieldname": "_comments", "fieldtype": "Text", "label": "Comments"},
|
|
{"fieldname": "_assign", "fieldtype": "Text", "label": "Assigned To"},
|
|
{"fieldname": "creation", "fieldtype": "Datetime", "label": "Created On"},
|
|
{"fieldname": "modified", "fieldtype": "Datetime", "label": "Last Updated On"},
|
|
]
|
|
|
|
for field in standard_fields:
|
|
if not restricted_fieldtypes or field.get("fieldtype") not in restricted_fieldtypes:
|
|
fields.append(field)
|
|
|
|
if only_required:
|
|
fields = [field for field in fields if field.get("reqd")]
|
|
|
|
if as_array:
|
|
return fields
|
|
|
|
fields_meta = {}
|
|
for field in fields:
|
|
fields_meta[field.get("fieldname")] = field
|
|
if field.get("fieldtype") == "Table":
|
|
_fields = jingrow.get_meta(field.get("options")).fields
|
|
fields_meta[field.get("fieldname")] = {"df": field, "fields": _fields}
|
|
|
|
return fields_meta
|
|
|
|
|
|
@jingrow.whitelist()
|
|
def remove_assignments(pagetype, name, assignees, ignore_permissions=False):
|
|
assignees = jingrow.parse_json(assignees)
|
|
|
|
if not assignees:
|
|
return
|
|
|
|
for assign_to in assignees:
|
|
set_status(
|
|
pagetype,
|
|
name,
|
|
todo=None,
|
|
assign_to=assign_to,
|
|
status="Cancelled",
|
|
ignore_permissions=ignore_permissions,
|
|
)
|
|
|
|
|
|
@jingrow.whitelist()
|
|
def get_assigned_users(pagetype, name, default_assigned_to=None):
|
|
assigned_users = jingrow.get_all(
|
|
"ToDo",
|
|
fields=["allocated_to"],
|
|
filters={
|
|
"reference_type": pagetype,
|
|
"reference_name": name,
|
|
"status": ("!=", "Cancelled"),
|
|
},
|
|
pluck="allocated_to",
|
|
)
|
|
|
|
users = list(set(assigned_users))
|
|
|
|
# if users is empty, add default_assigned_to
|
|
if not users and default_assigned_to:
|
|
users = [default_assigned_to]
|
|
return users
|
|
|
|
|
|
@jingrow.whitelist()
|
|
def get_fields(pagetype: str, allow_all_fieldtypes: bool = False):
|
|
not_allowed_fieldtypes = [*list(jingrow.model.no_value_fields), "Read Only"]
|
|
if allow_all_fieldtypes:
|
|
not_allowed_fieldtypes = []
|
|
fields = jingrow.get_meta(pagetype).fields
|
|
|
|
_fields = []
|
|
|
|
for field in fields:
|
|
if field.fieldtype not in not_allowed_fieldtypes and field.fieldname:
|
|
_fields.append(field)
|
|
|
|
return _fields
|
|
|
|
|
|
def getCounts(d, pagetype):
|
|
d["_email_count"] = (
|
|
jingrow.db.count(
|
|
"Communication",
|
|
filters={
|
|
"reference_pagetype": pagetype,
|
|
"reference_name": d.get("name"),
|
|
"communication_type": "Communication",
|
|
},
|
|
)
|
|
or 0
|
|
)
|
|
d["_email_count"] = d["_email_count"] + jingrow.db.count(
|
|
"Communication",
|
|
filters={
|
|
"reference_pagetype": pagetype,
|
|
"reference_name": d.get("name"),
|
|
"communication_type": "Automated Message",
|
|
},
|
|
)
|
|
d["_comment_count"] = jingrow.db.count(
|
|
"Comment",
|
|
filters={"reference_pagetype": pagetype, "reference_name": d.get("name"), "comment_type": "Comment"},
|
|
)
|
|
d["_task_count"] = jingrow.db.count(
|
|
"CRM Task", filters={"reference_pagetype": pagetype, "reference_docname": d.get("name")}
|
|
)
|
|
d["_note_count"] = jingrow.db.count(
|
|
"FCRM Note", filters={"reference_pagetype": pagetype, "reference_docname": d.get("name")}
|
|
)
|
|
return d
|
|
|
|
|
|
@jingrow.whitelist()
|
|
def get_linked_docs_of_document(pagetype, docname):
|
|
try:
|
|
pg = jingrow.get_pg(pagetype, docname)
|
|
except jingrow.DoesNotExistError:
|
|
return []
|
|
|
|
linked_docs = get_linked_docs(pg)
|
|
dynamic_linked_docs = get_dynamic_linked_docs(pg)
|
|
|
|
linked_docs.extend(dynamic_linked_docs)
|
|
linked_docs = list({pg["reference_docname"]: pg for pg in linked_docs}.values())
|
|
|
|
docs_data = []
|
|
for pg in linked_docs:
|
|
if not pg.get("reference_pagetype") or not pg.get("reference_docname"):
|
|
continue
|
|
|
|
try:
|
|
data = jingrow.get_pg(pg["reference_pagetype"], pg["reference_docname"])
|
|
except (jingrow.DoesNotExistError, jingrow.ValidationError):
|
|
continue
|
|
|
|
title = data.get("title")
|
|
if data.pagetype == "CRM Call Log":
|
|
title = f"Call from {data.get('from')} to {data.get('to')}"
|
|
|
|
if data.pagetype == "CRM Deal":
|
|
title = data.get("organization")
|
|
|
|
if data.pagetype == "CRM Notification":
|
|
title = data.get("message")
|
|
|
|
docs_data.append(
|
|
{
|
|
"pg": data.pagetype,
|
|
"title": title or data.get("name"),
|
|
"reference_docname": pg["reference_docname"],
|
|
"reference_pagetype": pg["reference_pagetype"],
|
|
}
|
|
)
|
|
return docs_data
|
|
|
|
|
|
def remove_pg_link(pagetype, docname):
|
|
if not pagetype or not docname:
|
|
return
|
|
|
|
try:
|
|
linked_pg_data = jingrow.get_pg(pagetype, docname)
|
|
if pagetype == "CRM Notification":
|
|
delete_notification_type = {
|
|
"notification_type_pagetype": "",
|
|
"notification_type_pg": "",
|
|
}
|
|
delete_references = {
|
|
"reference_pagetype": "",
|
|
"reference_name": "",
|
|
}
|
|
if linked_pg_data.get("notification_type_pagetype") == linked_pg_data.get("reference_pagetype"):
|
|
delete_references.update(delete_notification_type)
|
|
|
|
linked_pg_data.update(delete_references)
|
|
else:
|
|
linked_pg_data.update(
|
|
{
|
|
"reference_pagetype": "",
|
|
"reference_docname": "",
|
|
}
|
|
)
|
|
linked_pg_data.save(ignore_permissions=True)
|
|
except (jingrow.DoesNotExistError, jingrow.ValidationError):
|
|
pass
|
|
|
|
|
|
def remove_contact_link(pagetype, docname):
|
|
if not pagetype or not docname:
|
|
return
|
|
|
|
try:
|
|
linked_pg_data = jingrow.get_pg(pagetype, docname)
|
|
linked_pg_data.update(
|
|
{
|
|
"contact": None,
|
|
"contacts": [],
|
|
}
|
|
)
|
|
linked_pg_data.save(ignore_permissions=True)
|
|
except (jingrow.DoesNotExistError, jingrow.ValidationError):
|
|
pass
|
|
|
|
|
|
@jingrow.whitelist()
|
|
def remove_linked_pg_reference(items, remove_contact=None, delete=False):
|
|
if isinstance(items, str):
|
|
items = jingrow.parse_json(items)
|
|
|
|
for item in items:
|
|
if not item.get("pagetype") or not item.get("docname"):
|
|
continue
|
|
|
|
try:
|
|
if remove_contact:
|
|
remove_contact_link(item["pagetype"], item["docname"])
|
|
else:
|
|
remove_pg_link(item["pagetype"], item["docname"])
|
|
if delete:
|
|
jingrow.delete_pg(item["pagetype"], item["docname"])
|
|
except (jingrow.DoesNotExistError, jingrow.ValidationError):
|
|
# Skip if document doesn't exist or has validation errors
|
|
continue
|
|
|
|
return "success"
|
|
|
|
|
|
@jingrow.whitelist()
|
|
def delete_bulk_docs(pagetype, items, delete_linked=False):
|
|
from jingrow.desk.reportview import delete_bulk
|
|
|
|
if not pagetype:
|
|
jingrow.throw("Pagetype is required")
|
|
|
|
if not items:
|
|
jingrow.throw("Items are required")
|
|
|
|
items = jingrow.parse_json(items)
|
|
if not isinstance(items, list):
|
|
jingrow.throw("Items must be a list")
|
|
|
|
for pg in items:
|
|
try:
|
|
if not jingrow.db.exists(pagetype, pg):
|
|
jingrow.log_error(f"Document {pagetype} {pg} does not exist", "Bulk Delete Error")
|
|
continue
|
|
|
|
linked_docs = get_linked_docs_of_document(pagetype, pg)
|
|
for linked_pg in linked_docs:
|
|
if not linked_pg.get("reference_pagetype") or not linked_pg.get("reference_docname"):
|
|
continue
|
|
|
|
remove_linked_pg_reference(
|
|
[
|
|
{
|
|
"pagetype": linked_pg["reference_pagetype"],
|
|
"docname": linked_pg["reference_docname"],
|
|
}
|
|
],
|
|
remove_contact=pagetype == "Contact",
|
|
delete=delete_linked,
|
|
)
|
|
except Exception as e:
|
|
jingrow.log_error(
|
|
f"Error processing linked docs for {pagetype} {pg}: {str(e)}", "Bulk Delete Error"
|
|
)
|
|
|
|
if len(items) > 10:
|
|
jingrow.enqueue("jingrow.desk.reportview.delete_bulk", pagetype=pagetype, items=items)
|
|
else:
|
|
delete_bulk(pagetype, items)
|
|
return "success"
|