1
0
forked from test/crm
jcrm/crm/api/whatsapp.py

347 lines
9.5 KiB
Python

import json
import jingrow
from jingrow import _
from crm.api.pg import get_assigned_users
from crm.fcrm.pagetype.crm_notification.crm_notification import notify_user
def validate(pg, method):
if pg.type == "Incoming" and pg.get("from"):
name, pagetype = get_lead_or_deal_from_number(pg.get("from"))
if name != None:
pg.reference_pagetype = pagetype
pg.reference_name = name
if pg.type == "Outgoing" and pg.get("to"):
name, pagetype = get_lead_or_deal_from_number(pg.get("to"))
if name != None:
pg.reference_pagetype = pagetype
pg.reference_name = name
def on_update(pg, method):
jingrow.publish_realtime(
"whatsapp_message",
{
"reference_pagetype": pg.reference_pagetype,
"reference_name": pg.reference_name,
},
)
notify_agent(pg)
def notify_agent(pg):
if pg.type == "Incoming":
pagetype = pg.reference_pagetype
if pagetype and pagetype.startswith("CRM "):
pagetype = pagetype[4:].lower()
notification_text = f"""
<div class="mb-2 leading-5 text-ink-gray-5">
<span class="font-medium text-ink-gray-9">{ _('You') }</span>
<span>{ _('received a whatsapp message in {0}').format(pagetype) }</span>
<span class="font-medium text-ink-gray-9">{ pg.reference_name }</span>
</div>
"""
assigned_users = get_assigned_users(pg.reference_pagetype, pg.reference_name)
for user in assigned_users:
notify_user(
{
"owner": pg.owner,
"assigned_to": user,
"notification_type": "WhatsApp",
"message": pg.message,
"notification_text": notification_text,
"reference_pagetype": "WhatsApp Message",
"reference_docname": pg.name,
"redirect_to_pagetype": pg.reference_pagetype,
"redirect_to_docname": pg.reference_name,
}
)
def get_lead_or_deal_from_number(number):
"""Get lead/deal from the given number."""
def find_record(pagetype, mobile_no, where=""):
mobile_no = parse_mobile_no(mobile_no)
query = f"""
SELECT name, mobile_no
FROM `tab{pagetype}`
WHERE CONCAT('+', REGEXP_REPLACE(mobile_no, '[^0-9]', '')) = {mobile_no}
"""
data = jingrow.db.sql(query + where, as_dict=True)
return data[0].name if data else None
pagetype = "CRM Deal"
pg = find_record(pagetype, number) or None
if not pg:
pagetype = "CRM Lead"
pg = find_record(pagetype, number, "AND converted is not True")
if not pg:
pg = find_record(pagetype, number)
return pg, pagetype
def parse_mobile_no(mobile_no: str):
"""Parse mobile number to remove spaces, brackets, etc.
>>> parse_mobile_no("+91 (766) 667 6666")
... "+917666676666"
"""
return "".join([c for c in mobile_no if c.isdigit() or c == "+"])
@jingrow.whitelist()
def is_whatsapp_enabled():
if not jingrow.db.exists("PageType", "WhatsApp Settings"):
return False
return jingrow.get_cached_value("WhatsApp Settings", "WhatsApp Settings", "enabled")
@jingrow.whitelist()
def is_whatsapp_installed():
if not jingrow.db.exists("PageType", "WhatsApp Settings"):
return False
return True
@jingrow.whitelist()
def get_whatsapp_messages(reference_pagetype, reference_name):
# twilio integration app is not compatible with crm app
# crm has its own twilio integration in built
if "twilio_integration" in jingrow.get_installed_apps():
return []
if not jingrow.db.exists("PageType", "WhatsApp Message"):
return []
messages = []
if reference_pagetype == "CRM Deal":
lead = jingrow.db.get_value(reference_pagetype, reference_name, "lead")
if lead:
messages = jingrow.get_all(
"WhatsApp Message",
filters={
"reference_pagetype": "CRM Lead",
"reference_name": lead,
},
fields=[
"name",
"type",
"to",
"from",
"content_type",
"message_type",
"attach",
"template",
"use_template",
"message_id",
"is_reply",
"reply_to_message_id",
"creation",
"message",
"status",
"reference_pagetype",
"reference_name",
"template_parameters",
"template_header_parameters",
],
)
messages += jingrow.get_all(
"WhatsApp Message",
filters={
"reference_pagetype": reference_pagetype,
"reference_name": reference_name,
},
fields=[
"name",
"type",
"to",
"from",
"content_type",
"message_type",
"attach",
"template",
"use_template",
"message_id",
"is_reply",
"reply_to_message_id",
"creation",
"message",
"status",
"reference_pagetype",
"reference_name",
"template_parameters",
"template_header_parameters",
],
)
# Filter messages to get only Template messages
template_messages = [message for message in messages if message["message_type"] == "Template"]
# Iterate through template messages
for template_message in template_messages:
# Find the template that this message is using
template = jingrow.get_pg("WhatsApp Templates", template_message["template"])
# If the template is found, add the template details to the template message
if template:
template_message["template_name"] = template.template_name
if template_message["template_parameters"]:
parameters = json.loads(template_message["template_parameters"])
template.template = parse_template_parameters(template.template, parameters)
template_message["template"] = template.template
if template_message["template_header_parameters"]:
header_parameters = json.loads(template_message["template_header_parameters"])
template.header = parse_template_parameters(template.header, header_parameters)
template_message["header"] = template.header
template_message["footer"] = template.footer
# Filter messages to get only reaction messages
reaction_messages = [message for message in messages if message["content_type"] == "reaction"]
# Iterate through reaction messages
for reaction_message in reaction_messages:
# Find the message that this reaction is reacting to
reacted_message = next(
(m for m in messages if m["message_id"] == reaction_message["reply_to_message_id"]),
None,
)
# If the reacted message is found, add the reaction to it
if reacted_message:
reacted_message["reaction"] = reaction_message["message"]
for message in messages:
from_name = get_from_name(message) if message["from"] else _("You")
message["from_name"] = from_name
# Filter messages to get only replies
reply_messages = [message for message in messages if message["is_reply"]]
# Iterate through reply messages
for reply_message in reply_messages:
# Find the message that this message is replying to
replied_message = next(
(m for m in messages if m["message_id"] == reply_message["reply_to_message_id"]),
None,
)
# If the replied message is found, add the reply details to the reply message
from_name = get_from_name(reply_message) if replied_message["from"] else _("You")
if replied_message:
message = replied_message["message"]
if replied_message["message_type"] == "Template":
message = replied_message["template"]
reply_message["reply_message"] = message
reply_message["header"] = replied_message.get("header") or ""
reply_message["footer"] = replied_message.get("footer") or ""
reply_message["reply_to"] = replied_message["name"]
reply_message["reply_to_type"] = replied_message["type"]
reply_message["reply_to_from"] = from_name
return [message for message in messages if message["content_type"] != "reaction"]
@jingrow.whitelist()
def create_whatsapp_message(
reference_pagetype,
reference_name,
message,
to,
attach,
reply_to,
content_type="text",
):
pg = jingrow.new_pg("WhatsApp Message")
if reply_to:
reply_pg = jingrow.get_pg("WhatsApp Message", reply_to)
pg.update(
{
"is_reply": True,
"reply_to_message_id": reply_pg.message_id,
}
)
pg.update(
{
"reference_pagetype": reference_pagetype,
"reference_name": reference_name,
"message": message or attach,
"to": to,
"attach": attach,
"content_type": content_type,
}
)
pg.insert(ignore_permissions=True)
return pg.name
@jingrow.whitelist()
def send_whatsapp_template(reference_pagetype, reference_name, template, to):
pg = jingrow.new_pg("WhatsApp Message")
pg.update(
{
"reference_pagetype": reference_pagetype,
"reference_name": reference_name,
"message_type": "Template",
"message": "Template message",
"content_type": "text",
"use_template": True,
"template": template,
"to": to,
}
)
pg.insert(ignore_permissions=True)
return pg.name
@jingrow.whitelist()
def react_on_whatsapp_message(emoji, reply_to_name):
reply_to_pg = jingrow.get_pg("WhatsApp Message", reply_to_name)
to = reply_to_pg.type == "Incoming" and reply_to_pg.get("from") or reply_to_pg.to
pg = jingrow.new_pg("WhatsApp Message")
pg.update(
{
"reference_pagetype": reply_to_pg.reference_pagetype,
"reference_name": reply_to_pg.reference_name,
"message": emoji,
"to": to,
"reply_to_message_id": reply_to_pg.message_id,
"content_type": "reaction",
}
)
pg.insert(ignore_permissions=True)
return pg.name
def parse_template_parameters(string, parameters):
for i, parameter in enumerate(parameters, start=1):
placeholder = "{{" + str(i) + "}}"
string = string.replace(placeholder, parameter)
return string
def get_from_name(message):
pg = jingrow.get_pg(message["reference_pagetype"], message["reference_name"])
from_name = ""
if message["reference_pagetype"] == "CRM Deal":
if pg.get("contacts"):
for c in pg.get("contacts"):
if c.is_primary:
from_name = c.full_name or c.mobile_no
break
else:
from_name = pg.get("lead_name")
else:
from_name = " ".join(filter(None, [pg.get("first_name"), pg.get("last_name")]))
return from_name