import json import jingrow from bs4 import BeautifulSoup from jingrow import _ from jingrow.desk.form.load import get_docinfo from jingrow.query_builder import JoinType from crm.fcrm.pagetype.crm_call_log.crm_call_log import parse_call_log @jingrow.whitelist() def get_activities(name): if jingrow.db.exists("CRM Deal", name): return get_deal_activities(name) elif jingrow.db.exists("CRM Lead", name): return get_lead_activities(name) else: jingrow.throw(_("Document not found"), jingrow.DoesNotExistError) def get_deal_activities(name): get_docinfo("", "CRM Deal", name) docinfo = jingrow.response["docinfo"] deal_meta = jingrow.get_meta("CRM Deal") deal_fields = { field.fieldname: {"label": field.label, "options": field.options} for field in deal_meta.fields } avoid_fields = [ "lead", "response_by", "sla_creation", "sla", "first_response_time", "first_responded_on", ] pg = jingrow.db.get_values("CRM Deal", name, ["creation", "owner", "lead"])[0] lead = pg[2] activities = [] calls = [] notes = [] tasks = [] attachments = [] creation_text = "created this deal" if lead: activities, calls, notes, tasks, attachments = get_lead_activities(lead) creation_text = "converted the lead to this deal" activities.append( { "activity_type": "creation", "creation": pg[0], "owner": pg[1], "data": creation_text, "is_lead": False, } ) docinfo.versions.reverse() for version in docinfo.versions: data = json.loads(version.data) if not data.get("changed"): continue if change := data.get("changed")[0]: field = deal_fields.get(change[0], None) if not field or change[0] in avoid_fields or (not change[1] and not change[2]): continue field_label = field.get("label") or change[0] field_option = field.get("options") or None activity_type = "changed" data = { "field": change[0], "field_label": field_label, "old_value": change[1], "value": change[2], } if not change[1] and change[2]: activity_type = "added" data = { "field": change[0], "field_label": field_label, "value": change[2], } elif change[1] and not change[2]: activity_type = "removed" data = { "field": change[0], "field_label": field_label, "value": change[1], } activity = { "activity_type": activity_type, "creation": version.creation, "owner": version.owner, "data": data, "is_lead": False, "options": field_option, } activities.append(activity) for comment in docinfo.comments: activity = { "name": comment.name, "activity_type": "comment", "creation": comment.creation, "owner": comment.owner, "content": comment.content, "attachments": get_attachments("Comment", comment.name), "is_lead": False, } activities.append(activity) for communication in docinfo.communications + docinfo.automated_messages: activity = { "activity_type": "communication", "communication_type": communication.communication_type, "communication_date": communication.communication_date or communication.creation, "creation": communication.creation, "data": { "subject": communication.subject, "content": communication.content, "sender_full_name": communication.sender_full_name, "sender": communication.sender, "recipients": communication.recipients, "cc": communication.cc, "bcc": communication.bcc, "attachments": get_attachments("Communication", communication.name), "read_by_recipient": communication.read_by_recipient, "delivery_status": communication.delivery_status, }, "is_lead": False, } activities.append(activity) for attachment_log in docinfo.attachment_logs: activity = { "name": attachment_log.name, "activity_type": "attachment_log", "creation": attachment_log.creation, "owner": attachment_log.owner, "data": parse_attachment_log(attachment_log.content, attachment_log.comment_type), "is_lead": False, } activities.append(activity) calls = calls + get_linked_calls(name).get("calls", []) notes = notes + get_linked_notes(name) + get_linked_calls(name).get("notes", []) tasks = tasks + get_linked_tasks(name) + get_linked_calls(name).get("tasks", []) attachments = attachments + get_attachments("CRM Deal", name) activities.sort(key=lambda x: x["creation"], reverse=True) activities = handle_multiple_versions(activities) return activities, calls, notes, tasks, attachments def get_lead_activities(name): get_docinfo("", "CRM Lead", name) docinfo = jingrow.response["docinfo"] lead_meta = jingrow.get_meta("CRM Lead") lead_fields = { field.fieldname: {"label": field.label, "options": field.options} for field in lead_meta.fields } avoid_fields = [ "converted", "response_by", "sla_creation", "sla", "first_response_time", "first_responded_on", ] pg = jingrow.db.get_values("CRM Lead", name, ["creation", "owner"])[0] activities = [ { "activity_type": "creation", "creation": pg[0], "owner": pg[1], "data": "created this lead", "is_lead": True, } ] docinfo.versions.reverse() for version in docinfo.versions: data = json.loads(version.data) if not data.get("changed"): continue if change := data.get("changed")[0]: field = lead_fields.get(change[0], None) if not field or change[0] in avoid_fields or (not change[1] and not change[2]): continue field_label = field.get("label") or change[0] field_option = field.get("options") or None activity_type = "changed" data = { "field": change[0], "field_label": field_label, "old_value": change[1], "value": change[2], } if not change[1] and change[2]: activity_type = "added" data = { "field": change[0], "field_label": field_label, "value": change[2], } elif change[1] and not change[2]: activity_type = "removed" data = { "field": change[0], "field_label": field_label, "value": change[1], } activity = { "activity_type": activity_type, "creation": version.creation, "owner": version.owner, "data": data, "is_lead": True, "options": field_option, } activities.append(activity) for comment in docinfo.comments: activity = { "name": comment.name, "activity_type": "comment", "creation": comment.creation, "owner": comment.owner, "content": comment.content, "attachments": get_attachments("Comment", comment.name), "is_lead": True, } activities.append(activity) for communication in docinfo.communications + docinfo.automated_messages: activity = { "activity_type": "communication", "communication_type": communication.communication_type, "communication_date": communication.communication_date or communication.creation, "creation": communication.creation, "data": { "subject": communication.subject, "content": communication.content, "sender_full_name": communication.sender_full_name, "sender": communication.sender, "recipients": communication.recipients, "cc": communication.cc, "bcc": communication.bcc, "attachments": get_attachments("Communication", communication.name), "read_by_recipient": communication.read_by_recipient, "delivery_status": communication.delivery_status, }, "is_lead": True, } activities.append(activity) for attachment_log in docinfo.attachment_logs: activity = { "name": attachment_log.name, "activity_type": "attachment_log", "creation": attachment_log.creation, "owner": attachment_log.owner, "data": parse_attachment_log(attachment_log.content, attachment_log.comment_type), "is_lead": True, } activities.append(activity) calls = get_linked_calls(name).get("calls", []) notes = get_linked_notes(name) + get_linked_calls(name).get("notes", []) tasks = get_linked_tasks(name) + get_linked_calls(name).get("tasks", []) attachments = get_attachments("CRM Lead", name) activities.sort(key=lambda x: x["creation"], reverse=True) activities = handle_multiple_versions(activities) return activities, calls, notes, tasks, attachments def get_attachments(pagetype, name): return ( jingrow.db.get_all( "File", filters={"attached_to_pagetype": pagetype, "attached_to_name": name}, fields=[ "name", "file_name", "file_type", "file_url", "file_size", "is_private", "modified", "creation", "owner", ], ) or [] ) def handle_multiple_versions(versions): activities = [] grouped_versions = [] old_version = None for version in versions: is_version = version["activity_type"] in ["changed", "added", "removed"] if not is_version: activities.append(version) if not old_version: old_version = version if is_version: grouped_versions.append(version) continue if is_version and old_version.get("owner") and version["owner"] == old_version["owner"]: grouped_versions.append(version) else: if grouped_versions: activities.append(parse_grouped_versions(grouped_versions)) grouped_versions = [] if is_version: grouped_versions.append(version) old_version = version if version == versions[-1] and grouped_versions: activities.append(parse_grouped_versions(grouped_versions)) return activities def parse_grouped_versions(versions): version = versions[0] if len(versions) == 1: return version other_versions = versions[1:] version["other_versions"] = other_versions return version def get_linked_calls(name): calls = jingrow.db.get_all( "CRM Call Log", filters={"reference_docname": name}, fields=[ "name", "caller", "receiver", "from", "to", "duration", "start_time", "end_time", "status", "type", "recording_url", "creation", "note", ], ) linked_calls = jingrow.db.get_all( "Dynamic Link", filters={"link_name": name, "parenttype": "CRM Call Log"}, pluck="parent" ) notes = [] tasks = [] if linked_calls: CallLog = jingrow.qb.PageType("CRM Call Log") Link = jingrow.qb.PageType("Dynamic Link") query = ( jingrow.qb.from_(CallLog) .select( CallLog.name, CallLog.caller, CallLog.receiver, CallLog["from"], CallLog.to, CallLog.duration, CallLog.start_time, CallLog.end_time, CallLog.status, CallLog.type, CallLog.recording_url, CallLog.creation, CallLog.note, Link.link_pagetype, Link.link_name, ) .join(Link, JoinType.inner) .on(Link.parent == CallLog.name) .where(CallLog.name.isin(linked_calls)) ) _calls = query.run(as_dict=True) for call in _calls: if call.get("link_pagetype") == "FCRM Note": notes.append(call.link_name) elif call.get("link_pagetype") == "CRM Task": tasks.append(call.link_name) _calls = [call for call in _calls if call.get("link_pagetype") not in ["FCRM Note", "CRM Task"]] if _calls: calls = calls + _calls if notes: notes = jingrow.db.get_all( "FCRM Note", filters={"name": ("in", notes)}, fields=["name", "title", "content", "owner", "modified"], ) if tasks: tasks = jingrow.db.get_all( "CRM Task", filters={"name": ("in", tasks)}, fields=[ "name", "title", "description", "assigned_to", "due_date", "priority", "status", "modified", ], ) calls = [parse_call_log(call) for call in calls] if calls else [] return {"calls": calls, "notes": notes, "tasks": tasks} def get_linked_notes(name): notes = jingrow.db.get_all( "FCRM Note", filters={"reference_docname": name}, fields=["name", "title", "content", "owner", "modified"], ) return notes or [] def get_linked_tasks(name): tasks = jingrow.db.get_all( "CRM Task", filters={"reference_docname": name}, fields=[ "name", "title", "description", "assigned_to", "due_date", "priority", "status", "modified", ], ) return tasks or [] def parse_attachment_log(html, type): soup = BeautifulSoup(html, "html.parser") a_tag = soup.find("a") type = "added" if type == "Attachment" else "removed" if not a_tag: return { "type": type, "file_name": html.replace("Removed ", ""), "file_url": "", "is_private": False, } is_private = False if "private/files" in a_tag["href"]: is_private = True return { "type": type, "file_name": a_tag.text, "file_url": a_tag["href"], "is_private": is_private, }