1
0
forked from test/crm
jcrm/crm/api/activities.py

500 lines
12 KiB
Python

import json
import jingrow
from bs4 import BeautifulSoup
from jingrow import _
from jingrow.desk.form.load import get_docinfo
from jingrow.query_builder import JoinType
from crm.fcrm.pagetype.crm_call_log.crm_call_log import parse_call_log
@jingrow.whitelist()
def get_activities(name):
if jingrow.db.exists("CRM Deal", name):
return get_deal_activities(name)
elif jingrow.db.exists("CRM Lead", name):
return get_lead_activities(name)
else:
jingrow.throw(_("Document not found"), jingrow.DoesNotExistError)
def get_deal_activities(name):
get_docinfo("", "CRM Deal", name)
docinfo = jingrow.response["docinfo"]
deal_meta = jingrow.get_meta("CRM Deal")
deal_fields = {
field.fieldname: {"label": field.label, "options": field.options} for field in deal_meta.fields
}
avoid_fields = [
"lead",
"response_by",
"sla_creation",
"sla",
"first_response_time",
"first_responded_on",
]
pg = jingrow.db.get_values("CRM Deal", name, ["creation", "owner", "lead"])[0]
lead = pg[2]
activities = []
calls = []
notes = []
tasks = []
attachments = []
creation_text = "created this deal"
if lead:
activities, calls, notes, tasks, attachments = get_lead_activities(lead)
creation_text = "converted the lead to this deal"
activities.append(
{
"activity_type": "creation",
"creation": pg[0],
"owner": pg[1],
"data": creation_text,
"is_lead": False,
}
)
docinfo.versions.reverse()
for version in docinfo.versions:
data = json.loads(version.data)
if not data.get("changed"):
continue
if change := data.get("changed")[0]:
field = deal_fields.get(change[0], None)
if not field or change[0] in avoid_fields or (not change[1] and not change[2]):
continue
field_label = field.get("label") or change[0]
field_option = field.get("options") or None
activity_type = "changed"
data = {
"field": change[0],
"field_label": field_label,
"old_value": change[1],
"value": change[2],
}
if not change[1] and change[2]:
activity_type = "added"
data = {
"field": change[0],
"field_label": field_label,
"value": change[2],
}
elif change[1] and not change[2]:
activity_type = "removed"
data = {
"field": change[0],
"field_label": field_label,
"value": change[1],
}
activity = {
"activity_type": activity_type,
"creation": version.creation,
"owner": version.owner,
"data": data,
"is_lead": False,
"options": field_option,
}
activities.append(activity)
for comment in docinfo.comments:
activity = {
"name": comment.name,
"activity_type": "comment",
"creation": comment.creation,
"owner": comment.owner,
"content": comment.content,
"attachments": get_attachments("Comment", comment.name),
"is_lead": False,
}
activities.append(activity)
for communication in docinfo.communications + docinfo.automated_messages:
activity = {
"activity_type": "communication",
"communication_type": communication.communication_type,
"communication_date": communication.communication_date or communication.creation,
"creation": communication.creation,
"data": {
"subject": communication.subject,
"content": communication.content,
"sender_full_name": communication.sender_full_name,
"sender": communication.sender,
"recipients": communication.recipients,
"cc": communication.cc,
"bcc": communication.bcc,
"attachments": get_attachments("Communication", communication.name),
"read_by_recipient": communication.read_by_recipient,
"delivery_status": communication.delivery_status,
},
"is_lead": False,
}
activities.append(activity)
for attachment_log in docinfo.attachment_logs:
activity = {
"name": attachment_log.name,
"activity_type": "attachment_log",
"creation": attachment_log.creation,
"owner": attachment_log.owner,
"data": parse_attachment_log(attachment_log.content, attachment_log.comment_type),
"is_lead": False,
}
activities.append(activity)
calls = calls + get_linked_calls(name).get("calls", [])
notes = notes + get_linked_notes(name) + get_linked_calls(name).get("notes", [])
tasks = tasks + get_linked_tasks(name) + get_linked_calls(name).get("tasks", [])
attachments = attachments + get_attachments("CRM Deal", name)
activities.sort(key=lambda x: x["creation"], reverse=True)
activities = handle_multiple_versions(activities)
return activities, calls, notes, tasks, attachments
def get_lead_activities(name):
get_docinfo("", "CRM Lead", name)
docinfo = jingrow.response["docinfo"]
lead_meta = jingrow.get_meta("CRM Lead")
lead_fields = {
field.fieldname: {"label": field.label, "options": field.options} for field in lead_meta.fields
}
avoid_fields = [
"converted",
"response_by",
"sla_creation",
"sla",
"first_response_time",
"first_responded_on",
]
pg = jingrow.db.get_values("CRM Lead", name, ["creation", "owner"])[0]
activities = [
{
"activity_type": "creation",
"creation": pg[0],
"owner": pg[1],
"data": "created this lead",
"is_lead": True,
}
]
docinfo.versions.reverse()
for version in docinfo.versions:
data = json.loads(version.data)
if not data.get("changed"):
continue
if change := data.get("changed")[0]:
field = lead_fields.get(change[0], None)
if not field or change[0] in avoid_fields or (not change[1] and not change[2]):
continue
field_label = field.get("label") or change[0]
field_option = field.get("options") or None
activity_type = "changed"
data = {
"field": change[0],
"field_label": field_label,
"old_value": change[1],
"value": change[2],
}
if not change[1] and change[2]:
activity_type = "added"
data = {
"field": change[0],
"field_label": field_label,
"value": change[2],
}
elif change[1] and not change[2]:
activity_type = "removed"
data = {
"field": change[0],
"field_label": field_label,
"value": change[1],
}
activity = {
"activity_type": activity_type,
"creation": version.creation,
"owner": version.owner,
"data": data,
"is_lead": True,
"options": field_option,
}
activities.append(activity)
for comment in docinfo.comments:
activity = {
"name": comment.name,
"activity_type": "comment",
"creation": comment.creation,
"owner": comment.owner,
"content": comment.content,
"attachments": get_attachments("Comment", comment.name),
"is_lead": True,
}
activities.append(activity)
for communication in docinfo.communications + docinfo.automated_messages:
activity = {
"activity_type": "communication",
"communication_type": communication.communication_type,
"communication_date": communication.communication_date or communication.creation,
"creation": communication.creation,
"data": {
"subject": communication.subject,
"content": communication.content,
"sender_full_name": communication.sender_full_name,
"sender": communication.sender,
"recipients": communication.recipients,
"cc": communication.cc,
"bcc": communication.bcc,
"attachments": get_attachments("Communication", communication.name),
"read_by_recipient": communication.read_by_recipient,
"delivery_status": communication.delivery_status,
},
"is_lead": True,
}
activities.append(activity)
for attachment_log in docinfo.attachment_logs:
activity = {
"name": attachment_log.name,
"activity_type": "attachment_log",
"creation": attachment_log.creation,
"owner": attachment_log.owner,
"data": parse_attachment_log(attachment_log.content, attachment_log.comment_type),
"is_lead": True,
}
activities.append(activity)
calls = get_linked_calls(name).get("calls", [])
notes = get_linked_notes(name) + get_linked_calls(name).get("notes", [])
tasks = get_linked_tasks(name) + get_linked_calls(name).get("tasks", [])
attachments = get_attachments("CRM Lead", name)
activities.sort(key=lambda x: x["creation"], reverse=True)
activities = handle_multiple_versions(activities)
return activities, calls, notes, tasks, attachments
def get_attachments(pagetype, name):
return (
jingrow.db.get_all(
"File",
filters={"attached_to_pagetype": pagetype, "attached_to_name": name},
fields=[
"name",
"file_name",
"file_type",
"file_url",
"file_size",
"is_private",
"modified",
"creation",
"owner",
],
)
or []
)
def handle_multiple_versions(versions):
activities = []
grouped_versions = []
old_version = None
for version in versions:
is_version = version["activity_type"] in ["changed", "added", "removed"]
if not is_version:
activities.append(version)
if not old_version:
old_version = version
if is_version:
grouped_versions.append(version)
continue
if is_version and old_version.get("owner") and version["owner"] == old_version["owner"]:
grouped_versions.append(version)
else:
if grouped_versions:
activities.append(parse_grouped_versions(grouped_versions))
grouped_versions = []
if is_version:
grouped_versions.append(version)
old_version = version
if version == versions[-1] and grouped_versions:
activities.append(parse_grouped_versions(grouped_versions))
return activities
def parse_grouped_versions(versions):
version = versions[0]
if len(versions) == 1:
return version
other_versions = versions[1:]
version["other_versions"] = other_versions
return version
def get_linked_calls(name):
calls = jingrow.db.get_all(
"CRM Call Log",
filters={"reference_docname": name},
fields=[
"name",
"caller",
"receiver",
"from",
"to",
"duration",
"start_time",
"end_time",
"status",
"type",
"recording_url",
"creation",
"note",
],
)
linked_calls = jingrow.db.get_all(
"Dynamic Link", filters={"link_name": name, "parenttype": "CRM Call Log"}, pluck="parent"
)
notes = []
tasks = []
if linked_calls:
CallLog = jingrow.qb.PageType("CRM Call Log")
Link = jingrow.qb.PageType("Dynamic Link")
query = (
jingrow.qb.from_(CallLog)
.select(
CallLog.name,
CallLog.caller,
CallLog.receiver,
CallLog["from"],
CallLog.to,
CallLog.duration,
CallLog.start_time,
CallLog.end_time,
CallLog.status,
CallLog.type,
CallLog.recording_url,
CallLog.creation,
CallLog.note,
Link.link_pagetype,
Link.link_name,
)
.join(Link, JoinType.inner)
.on(Link.parent == CallLog.name)
.where(CallLog.name.isin(linked_calls))
)
_calls = query.run(as_dict=True)
for call in _calls:
if call.get("link_pagetype") == "FCRM Note":
notes.append(call.link_name)
elif call.get("link_pagetype") == "CRM Task":
tasks.append(call.link_name)
_calls = [call for call in _calls if call.get("link_pagetype") not in ["FCRM Note", "CRM Task"]]
if _calls:
calls = calls + _calls
if notes:
notes = jingrow.db.get_all(
"FCRM Note",
filters={"name": ("in", notes)},
fields=["name", "title", "content", "owner", "modified"],
)
if tasks:
tasks = jingrow.db.get_all(
"CRM Task",
filters={"name": ("in", tasks)},
fields=[
"name",
"title",
"description",
"assigned_to",
"due_date",
"priority",
"status",
"modified",
],
)
calls = [parse_call_log(call) for call in calls] if calls else []
return {"calls": calls, "notes": notes, "tasks": tasks}
def get_linked_notes(name):
notes = jingrow.db.get_all(
"FCRM Note",
filters={"reference_docname": name},
fields=["name", "title", "content", "owner", "modified"],
)
return notes or []
def get_linked_tasks(name):
tasks = jingrow.db.get_all(
"CRM Task",
filters={"reference_docname": name},
fields=[
"name",
"title",
"description",
"assigned_to",
"due_date",
"priority",
"status",
"modified",
],
)
return tasks or []
def parse_attachment_log(html, type):
soup = BeautifulSoup(html, "html.parser")
a_tag = soup.find("a")
type = "added" if type == "Attachment" else "removed"
if not a_tag:
return {
"type": type,
"file_name": html.replace("Removed ", ""),
"file_url": "",
"is_private": False,
}
is_private = False
if "private/files" in a_tag["href"]:
is_private = True
return {
"type": type,
"file_name": a_tag.text,
"file_url": a_tag["href"],
"is_private": is_private,
}