import json import jingrow from jingrow import _ from jingrow.custom.pagetype.property_setter.property_setter import make_property_setter from jingrow.desk.form.assign_to import set_status from jingrow.model import no_value_fields from jingrow.model.page import get_controller from jingrow.utils import make_filter_tuple from pypika import Criterion from crm.api.views import get_views from crm.fcrm.pagetype.crm_form_script.crm_form_script import get_form_script from crm.utils import get_dynamic_linked_docs, get_linked_docs @jingrow.whitelist() def sort_options(pagetype: str): fields = jingrow.get_meta(pagetype).fields fields = [field for field in fields if field.fieldtype not in no_value_fields] fields = [ { "label": _(field.label), "value": field.fieldname, "fieldname": field.fieldname, } for field in fields if field.label and field.fieldname ] standard_fields = [ {"label": "Name", "fieldname": "name"}, {"label": "Created On", "fieldname": "creation"}, {"label": "Last Modified", "fieldname": "modified"}, {"label": "Modified By", "fieldname": "modified_by"}, {"label": "Owner", "fieldname": "owner"}, ] for field in standard_fields: field["label"] = _(field["label"]) field["value"] = field["fieldname"] fields.append(field) return fields @jingrow.whitelist() def get_filterable_fields(pagetype: str): allowed_fieldtypes = [ "Check", "Data", "Float", "Int", "Currency", "Dynamic Link", "Link", "Long Text", "Select", "Small Text", "Text Editor", "Text", "Duration", "Date", "Datetime", ] c = get_controller(pagetype) restricted_fields = [] if hasattr(c, "get_non_filterable_fields"): restricted_fields = c.get_non_filterable_fields() res = [] # append PageFields PageField = jingrow.qb.PageType("PageField") pg_fields = get_pagetype_fields_meta(PageField, pagetype, allowed_fieldtypes, restricted_fields) res.extend(pg_fields) # append Custom Fields CustomField = jingrow.qb.PageType("Custom Field") custom_fields = get_pagetype_fields_meta(CustomField, pagetype, allowed_fieldtypes, restricted_fields) res.extend(custom_fields) # append standard fields (getting error when using jingrow.model.std_fields) standard_fields = [ {"fieldname": "name", "fieldtype": "Link", "label": "ID", "options": pagetype}, {"fieldname": "owner", "fieldtype": "Link", "label": "Created By", "options": "User"}, { "fieldname": "modified_by", "fieldtype": "Link", "label": "Last Updated By", "options": "User", }, {"fieldname": "_user_tags", "fieldtype": "Data", "label": "Tags"}, {"fieldname": "_liked_by", "fieldtype": "Data", "label": "Like"}, {"fieldname": "_comments", "fieldtype": "Text", "label": "Comments"}, {"fieldname": "_assign", "fieldtype": "Text", "label": "Assigned To"}, {"fieldname": "creation", "fieldtype": "Datetime", "label": "Created On"}, {"fieldname": "modified", "fieldtype": "Datetime", "label": "Last Updated On"}, ] for field in standard_fields: if field.get("fieldname") not in restricted_fields and field.get("fieldtype") in allowed_fieldtypes: field["name"] = field.get("fieldname") res.append(field) for field in res: field["label"] = _(field.get("label")) field["value"] = field.get("fieldname") return res @jingrow.whitelist() def get_group_by_fields(pagetype: str): allowed_fieldtypes = [ "Check", "Data", "Float", "Int", "Currency", "Dynamic Link", "Link", "Select", "Duration", "Date", "Datetime", ] fields = jingrow.get_meta(pagetype).fields fields = [ field for field in fields if field.fieldtype not in no_value_fields and field.fieldtype in allowed_fieldtypes ] fields = [ { "label": _(field.label), "fieldname": field.fieldname, } for field in fields if field.label and field.fieldname ] standard_fields = [ {"label": "Name", "fieldname": "name"}, {"label": "Created On", "fieldname": "creation"}, {"label": "Last Modified", "fieldname": "modified"}, {"label": "Modified By", "fieldname": "modified_by"}, {"label": "Owner", "fieldname": "owner"}, {"label": "Liked By", "fieldname": "_liked_by"}, {"label": "Assigned To", "fieldname": "_assign"}, {"label": "Comments", "fieldname": "_comments"}, {"label": "Created On", "fieldname": "creation"}, {"label": "Modified On", "fieldname": "modified"}, ] for field in standard_fields: field["label"] = _(field["label"]) fields.append(field) return fields def get_pagetype_fields_meta(PageField, pagetype, allowed_fieldtypes, restricted_fields): parent = "parent" if PageField._table_name == "tabPageField" else "dt" return ( jingrow.qb.from_(PageField) .select( PageField.fieldname, PageField.fieldtype, PageField.label, PageField.name, PageField.options, ) .where(PageField[parent] == pagetype) .where(PageField.hidden == False) # noqa: E712 .where(Criterion.any([PageField.fieldtype == i for i in allowed_fieldtypes])) .where(Criterion.all([PageField.fieldname != i for i in restricted_fields])) .run(as_dict=True) ) @jingrow.whitelist() def get_quick_filters(pagetype: str, cached: bool = True): meta = jingrow.get_meta(pagetype, cached) quick_filters = [] if global_settings := jingrow.db.exists("CRM Global Settings", {"dt": pagetype, "type": "Quick Filters"}): _quick_filters = jingrow.db.get_value("CRM Global Settings", global_settings, "json") _quick_filters = json.loads(_quick_filters) or [] fields = [] for filter in _quick_filters: if filter == "name": fields.append({"label": "Name", "fieldname": "name", "fieldtype": "Data"}) else: field = next((f for f in meta.fields if f.fieldname == filter), None) if field: fields.append(field) else: fields = [field for field in meta.fields if field.in_standard_filter] for field in fields: options = field.get("options") if field.get("fieldtype") == "Select" and options and isinstance(options, str): options = options.split("\n") options = [{"label": option, "value": option} for option in options] if not any([not option.get("value") for option in options]): options.insert(0, {"label": "", "value": ""}) quick_filters.append( { "label": _(field.get("label")), "fieldname": field.get("fieldname"), "fieldtype": field.get("fieldtype"), "options": options, } ) if pagetype == "CRM Lead": quick_filters = [filter for filter in quick_filters if filter.get("fieldname") != "converted"] return quick_filters @jingrow.whitelist() def update_quick_filters(quick_filters: str, old_filters: str, pagetype: str): quick_filters = json.loads(quick_filters) old_filters = json.loads(old_filters) new_filters = [filter for filter in quick_filters if filter not in old_filters] removed_filters = [filter for filter in old_filters if filter not in quick_filters] # update or create global quick filter settings create_update_global_settings(pagetype, quick_filters) # remove old filters for filter in removed_filters: update_in_standard_filter(filter, pagetype, 0) # add new filters for filter in new_filters: update_in_standard_filter(filter, pagetype, 1) def create_update_global_settings(pagetype, quick_filters): if global_settings := jingrow.db.exists("CRM Global Settings", {"dt": pagetype, "type": "Quick Filters"}): jingrow.db.set_value("CRM Global Settings", global_settings, "json", json.dumps(quick_filters)) else: # create CRM Global Settings pg pg = jingrow.new_pg("CRM Global Settings") pg.dt = pagetype pg.type = "Quick Filters" pg.json = json.dumps(quick_filters) pg.insert() def update_in_standard_filter(fieldname, pagetype, value): if property_name := jingrow.db.exists( "Property Setter", {"pg_type": pagetype, "field_name": fieldname, "property": "in_standard_filter"}, ): jingrow.db.set_value("Property Setter", property_name, "value", value) else: make_property_setter( pagetype, fieldname, "in_standard_filter", value, "Check", validate_fields_for_pagetype=False, ) @jingrow.whitelist() def get_data( pagetype: str, filters: dict, order_by: str, page_length=20, page_length_count=20, column_field=None, title_field=None, columns=[], rows=[], kanban_columns=[], kanban_fields=[], view=None, default_filters=None, ): custom_view = False filters = jingrow._dict(filters) rows = jingrow.parse_json(rows or "[]") columns = jingrow.parse_json(columns or "[]") kanban_fields = jingrow.parse_json(kanban_fields or "[]") kanban_columns = jingrow.parse_json(kanban_columns or "[]") custom_view_name = view.get("custom_view_name") if view else None view_type = view.get("view_type") if view else None group_by_field = view.get("group_by_field") if view else None for key in filters: value = filters[key] if isinstance(value, list): if "@me" in value: value[value.index("@me")] = jingrow.session.user elif "%@me%" in value: index = [i for i, v in enumerate(value) if v == "%@me%"] for i in index: value[i] = "%" + jingrow.session.user + "%" elif value == "@me": filters[key] = jingrow.session.user if default_filters: default_filters = jingrow.parse_json(default_filters) filters.update(default_filters) is_default = True data = [] _list = get_controller(pagetype) default_rows = [] if hasattr(_list, "default_list_data"): default_rows = _list.default_list_data().get("rows") meta = jingrow.get_meta(pagetype) if view_type != "kanban": if columns or rows: custom_view = True is_default = False columns = jingrow.parse_json(columns) rows = jingrow.parse_json(rows) if not columns: columns = [ {"label": "Name", "type": "Data", "key": "name", "width": "16rem"}, {"label": "Last Modified", "type": "Datetime", "key": "modified", "width": "8rem"}, ] if not rows: rows = ["name"] default_view_filters = { "dt": pagetype, "type": view_type or "list", "is_standard": 1, "user": jingrow.session.user, } if not custom_view and jingrow.db.exists("CRM View Settings", default_view_filters): list_view_settings = jingrow.get_pg("CRM View Settings", default_view_filters) columns = jingrow.parse_json(list_view_settings.columns) rows = jingrow.parse_json(list_view_settings.rows) is_default = False elif not custom_view or (is_default and hasattr(_list, "default_list_data")): rows = default_rows columns = _list.default_list_data().get("columns") # check if rows has all keys from columns if not add them for column in columns: if column.get("key") not in rows: rows.append(column.get("key")) column["label"] = _(column.get("label")) if column.get("key") == "_liked_by" and column.get("width") == "10rem": column["width"] = "50px" # remove column if column.hidden is True column_meta = meta.get_field(column.get("key")) if column_meta and column_meta.get("hidden"): columns.remove(column) # check if rows has group_by_field if not add it if group_by_field and group_by_field not in rows: rows.append(group_by_field) data = ( jingrow.get_list( pagetype, fields=rows, filters=filters, order_by=order_by, page_length=page_length, ) or [] ) data = parse_list_data(data, pagetype) if view_type == "kanban": if not rows: rows = default_rows if not kanban_columns and column_field: field_meta = jingrow.get_meta(pagetype).get_field(column_field) if field_meta.fieldtype == "Link": kanban_columns = jingrow.get_all( field_meta.options, fields=["name"], order_by="modified asc", ) elif field_meta.fieldtype == "Select": kanban_columns = [{"name": option} for option in field_meta.options.split("\n")] if not title_field: title_field = "name" if hasattr(_list, "default_kanban_settings"): title_field = _list.default_kanban_settings().get("title_field") if title_field not in rows: rows.append(title_field) if not kanban_fields: kanban_fields = ["name"] if hasattr(_list, "default_kanban_settings"): kanban_fields = json.loads(_list.default_kanban_settings().get("kanban_fields")) for field in kanban_fields: if field not in rows: rows.append(field) for kc in kanban_columns: column_filters = {column_field: kc.get("name")} order = kc.get("order") if (column_field in filters and filters.get(column_field) != kc.get("name")) or kc.get("delete"): column_data = [] else: column_filters.update(filters.copy()) page_length = 20 if kc.get("page_length"): page_length = kc.get("page_length") if order: column_data = get_records_based_on_order( pagetype, rows, column_filters, page_length, order ) else: column_data = jingrow.get_list( pagetype, fields=rows, filters=convert_filter_to_tuple(pagetype, column_filters), order_by=order_by, page_length=page_length, ) new_filters = filters.copy() new_filters.update({column_field: kc.get("name")}) all_count = jingrow.get_list( pagetype, filters=convert_filter_to_tuple(pagetype, new_filters), fields="count(*) as total_count", )[0].total_count kc["all_count"] = all_count kc["count"] = len(column_data) for d in column_data: getCounts(d, pagetype) if order: column_data = sorted( column_data, key=lambda x: order.index(x.get("name")) if x.get("name") in order else len(order), ) data.append({"column": kc, "fields": kanban_fields, "data": column_data}) fields = jingrow.get_meta(pagetype).fields fields = [field for field in fields if field.fieldtype not in no_value_fields] fields = [ { "label": _(field.label), "fieldtype": field.fieldtype, "fieldname": field.fieldname, "options": field.options, } for field in fields if field.label and field.fieldname ] std_fields = [ {"label": "Name", "fieldtype": "Data", "fieldname": "name"}, {"label": "Created On", "fieldtype": "Datetime", "fieldname": "creation"}, {"label": "Last Modified", "fieldtype": "Datetime", "fieldname": "modified"}, { "label": "Modified By", "fieldtype": "Link", "fieldname": "modified_by", "options": "User", }, {"label": "Assigned To", "fieldtype": "Text", "fieldname": "_assign"}, {"label": "Owner", "fieldtype": "Link", "fieldname": "owner", "options": "User"}, {"label": "Like", "fieldtype": "Data", "fieldname": "_liked_by"}, ] for field in std_fields: if field.get("fieldname") not in rows: rows.append(field.get("fieldname")) if field not in fields: field["label"] = _(field["label"]) fields.append(field) if not is_default and custom_view_name: is_default = jingrow.db.get_value("CRM View Settings", custom_view_name, "load_default_columns") if group_by_field and view_type == "group_by": def get_options(type, options): if type == "Select": return [option for option in options.split("\n")] else: has_empty_values = any([not d.get(group_by_field) for d in data]) options = list(set([d.get(group_by_field) for d in data])) options = [u for u in options if u] if has_empty_values: options.append("") if order_by and group_by_field in order_by: order_by_fields = order_by.split(",") order_by_fields = [ (field.split(" ")[0], field.split(" ")[1]) for field in order_by_fields ] if (group_by_field, "asc") in order_by_fields: options.sort() elif (group_by_field, "desc") in order_by_fields: options.sort(reverse=True) else: options.sort() return options for field in fields: if field.get("fieldname") == group_by_field: group_by_field = { "label": field.get("label"), "fieldname": field.get("fieldname"), "fieldtype": field.get("fieldtype"), "options": get_options(field.get("fieldtype"), field.get("options")), } return { "data": data, "columns": columns, "rows": rows, "fields": fields, "column_field": column_field, "title_field": title_field, "kanban_columns": kanban_columns, "kanban_fields": kanban_fields, "group_by_field": group_by_field, "page_length": page_length, "page_length_count": page_length_count, "is_default": is_default, "views": get_views(pagetype), "total_count": jingrow.get_list(pagetype, filters=filters, fields="count(*) as total_count")[ 0 ].total_count, "row_count": len(data), "form_script": get_form_script(pagetype), "list_script": get_form_script(pagetype, "List"), "view_type": view_type, } def parse_list_data(data, pagetype): _list = get_controller(pagetype) if hasattr(_list, "parse_list_data"): data = _list.parse_list_data(data) return data def convert_filter_to_tuple(pagetype, filters): if isinstance(filters, dict): filters_items = filters.items() filters = [] for key, value in filters_items: filters.append(make_filter_tuple(pagetype, key, value)) return filters def get_records_based_on_order(pagetype, rows, filters, page_length, order): records = [] filters = convert_filter_to_tuple(pagetype, filters) in_filters = filters.copy() in_filters.append([pagetype, "name", "in", order[:page_length]]) records = jingrow.get_list( pagetype, fields=rows, filters=in_filters, order_by="creation desc", page_length=page_length, ) if len(records) < page_length: not_in_filters = filters.copy() not_in_filters.append([pagetype, "name", "not in", order]) remaining_records = jingrow.get_list( pagetype, fields=rows, filters=not_in_filters, order_by="creation desc", page_length=page_length - len(records), ) for record in remaining_records: records.append(record) return records @jingrow.whitelist() def get_fields_meta(pagetype, restricted_fieldtypes=None, as_array=False, only_required=False): not_allowed_fieldtypes = [ "Tab Break", "Section Break", "Column Break", ] if restricted_fieldtypes: restricted_fieldtypes = jingrow.parse_json(restricted_fieldtypes) not_allowed_fieldtypes += restricted_fieldtypes fields = jingrow.get_meta(pagetype).fields fields = [field for field in fields if field.fieldtype not in not_allowed_fieldtypes] standard_fields = [ {"fieldname": "name", "fieldtype": "Link", "label": "ID", "options": pagetype}, {"fieldname": "owner", "fieldtype": "Link", "label": "Created By", "options": "User"}, { "fieldname": "modified_by", "fieldtype": "Link", "label": "Last Updated By", "options": "User", }, {"fieldname": "_user_tags", "fieldtype": "Data", "label": "Tags"}, {"fieldname": "_liked_by", "fieldtype": "Data", "label": "Like"}, {"fieldname": "_comments", "fieldtype": "Text", "label": "Comments"}, {"fieldname": "_assign", "fieldtype": "Text", "label": "Assigned To"}, {"fieldname": "creation", "fieldtype": "Datetime", "label": "Created On"}, {"fieldname": "modified", "fieldtype": "Datetime", "label": "Last Updated On"}, ] for field in standard_fields: if not restricted_fieldtypes or field.get("fieldtype") not in restricted_fieldtypes: fields.append(field) if only_required: fields = [field for field in fields if field.get("reqd")] if as_array: return fields fields_meta = {} for field in fields: fields_meta[field.get("fieldname")] = field if field.get("fieldtype") == "Table": _fields = jingrow.get_meta(field.get("options")).fields fields_meta[field.get("fieldname")] = {"df": field, "fields": _fields} return fields_meta @jingrow.whitelist() def remove_assignments(pagetype, name, assignees, ignore_permissions=False): assignees = jingrow.parse_json(assignees) if not assignees: return for assign_to in assignees: set_status( pagetype, name, todo=None, assign_to=assign_to, status="Cancelled", ignore_permissions=ignore_permissions, ) @jingrow.whitelist() def get_assigned_users(pagetype, name, default_assigned_to=None): assigned_users = jingrow.get_all( "ToDo", fields=["allocated_to"], filters={ "reference_type": pagetype, "reference_name": name, "status": ("!=", "Cancelled"), }, pluck="allocated_to", ) users = list(set(assigned_users)) # if users is empty, add default_assigned_to if not users and default_assigned_to: users = [default_assigned_to] return users @jingrow.whitelist() def get_fields(pagetype: str, allow_all_fieldtypes: bool = False): not_allowed_fieldtypes = [*list(jingrow.model.no_value_fields), "Read Only"] if allow_all_fieldtypes: not_allowed_fieldtypes = [] fields = jingrow.get_meta(pagetype).fields _fields = [] for field in fields: if field.fieldtype not in not_allowed_fieldtypes and field.fieldname: _fields.append(field) return _fields def getCounts(d, pagetype): d["_email_count"] = ( jingrow.db.count( "Communication", filters={ "reference_pagetype": pagetype, "reference_name": d.get("name"), "communication_type": "Communication", }, ) or 0 ) d["_email_count"] = d["_email_count"] + jingrow.db.count( "Communication", filters={ "reference_pagetype": pagetype, "reference_name": d.get("name"), "communication_type": "Automated Message", }, ) d["_comment_count"] = jingrow.db.count( "Comment", filters={"reference_pagetype": pagetype, "reference_name": d.get("name"), "comment_type": "Comment"}, ) d["_task_count"] = jingrow.db.count( "CRM Task", filters={"reference_pagetype": pagetype, "reference_docname": d.get("name")} ) d["_note_count"] = jingrow.db.count( "FCRM Note", filters={"reference_pagetype": pagetype, "reference_docname": d.get("name")} ) return d @jingrow.whitelist() def get_linked_docs_of_document(pagetype, docname): try: pg = jingrow.get_pg(pagetype, docname) except jingrow.DoesNotExistError: return [] linked_docs = get_linked_docs(pg) dynamic_linked_docs = get_dynamic_linked_docs(pg) linked_docs.extend(dynamic_linked_docs) linked_docs = list({pg["reference_docname"]: pg for pg in linked_docs}.values()) docs_data = [] for pg in linked_docs: if not pg.get("reference_pagetype") or not pg.get("reference_docname"): continue try: data = jingrow.get_pg(pg["reference_pagetype"], pg["reference_docname"]) except (jingrow.DoesNotExistError, jingrow.ValidationError): continue title = data.get("title") if data.pagetype == "CRM Call Log": title = f"Call from {data.get('from')} to {data.get('to')}" if data.pagetype == "CRM Deal": title = data.get("organization") if data.pagetype == "CRM Notification": title = data.get("message") docs_data.append( { "pg": data.pagetype, "title": title or data.get("name"), "reference_docname": pg["reference_docname"], "reference_pagetype": pg["reference_pagetype"], } ) return docs_data def remove_pg_link(pagetype, docname): if not pagetype or not docname: return try: linked_pg_data = jingrow.get_pg(pagetype, docname) if pagetype == "CRM Notification": delete_notification_type = { "notification_type_pagetype": "", "notification_type_pg": "", } delete_references = { "reference_pagetype": "", "reference_name": "", } if linked_pg_data.get("notification_type_pagetype") == linked_pg_data.get("reference_pagetype"): delete_references.update(delete_notification_type) linked_pg_data.update(delete_references) else: linked_pg_data.update( { "reference_pagetype": "", "reference_docname": "", } ) linked_pg_data.save(ignore_permissions=True) except (jingrow.DoesNotExistError, jingrow.ValidationError): pass def remove_contact_link(pagetype, docname): if not pagetype or not docname: return try: linked_pg_data = jingrow.get_pg(pagetype, docname) linked_pg_data.update( { "contact": None, "contacts": [], } ) linked_pg_data.save(ignore_permissions=True) except (jingrow.DoesNotExistError, jingrow.ValidationError): pass @jingrow.whitelist() def remove_linked_pg_reference(items, remove_contact=None, delete=False): if isinstance(items, str): items = jingrow.parse_json(items) for item in items: if not item.get("pagetype") or not item.get("docname"): continue try: if remove_contact: remove_contact_link(item["pagetype"], item["docname"]) else: remove_pg_link(item["pagetype"], item["docname"]) if delete: jingrow.delete_pg(item["pagetype"], item["docname"]) except (jingrow.DoesNotExistError, jingrow.ValidationError): # Skip if document doesn't exist or has validation errors continue return "success" @jingrow.whitelist() def delete_bulk_docs(pagetype, items, delete_linked=False): from jingrow.desk.reportview import delete_bulk if not pagetype: jingrow.throw("Pagetype is required") if not items: jingrow.throw("Items are required") items = jingrow.parse_json(items) if not isinstance(items, list): jingrow.throw("Items must be a list") for pg in items: try: if not jingrow.db.exists(pagetype, pg): jingrow.log_error(f"Document {pagetype} {pg} does not exist", "Bulk Delete Error") continue linked_docs = get_linked_docs_of_document(pagetype, pg) for linked_pg in linked_docs: if not linked_pg.get("reference_pagetype") or not linked_pg.get("reference_docname"): continue remove_linked_pg_reference( [ { "pagetype": linked_pg["reference_pagetype"], "docname": linked_pg["reference_docname"], } ], remove_contact=pagetype == "Contact", delete=delete_linked, ) except Exception as e: jingrow.log_error( f"Error processing linked docs for {pagetype} {pg}: {str(e)}", "Bulk Delete Error" ) if len(items) > 10: jingrow.enqueue("jingrow.desk.reportview.delete_bulk", pagetype=pagetype, items=items) else: delete_bulk(pagetype, items) return "success"