1
0
forked from test/crm
jcrm/crm/integrations/twilio/twilio_handler.py
2025-10-23 22:53:56 +08:00

268 lines
8.7 KiB
Python

import jingrow
from jingrow import _
from jingrow.utils.password import get_decrypted_password
from twilio.jwt.access_token import AccessToken
from twilio.jwt.access_token.grants import VoiceGrant
from twilio.rest import Client as TwilioClient
from twilio.twiml.voice_response import Dial, VoiceResponse
from .utils import get_public_url, merge_dicts
class Twilio:
"""Twilio connector over TwilioClient."""
def __init__(self, settings):
"""
:param settings: `CRM Twilio Settings` pagetype
"""
self.settings = settings
self.account_sid = settings.account_sid
self.application_sid = settings.twiml_sid
self.api_key = settings.api_key
self.api_secret = settings.get_password("api_secret")
self.twilio_client = self.get_twilio_client()
@classmethod
def connect(self):
"""Make a twilio connection."""
settings = jingrow.get_pg("CRM Twilio Settings")
if not (settings and settings.enabled):
return
return Twilio(settings=settings)
def get_phone_numbers(self):
"""Get account's twilio phone numbers."""
numbers = self.twilio_client.incoming_phone_numbers.list()
return [n.phone_number for n in numbers]
def generate_voice_access_token(self, identity: str, ttl=60 * 60):
"""Generates a token required to make voice calls from the browser."""
# identity is used by twilio to identify the user uniqueness at browser(or any endpoints).
identity = self.safe_identity(identity)
# Create access token with credentials
token = AccessToken(self.account_sid, self.api_key, self.api_secret, identity=identity, ttl=ttl)
# Create a Voice grant and add to token
voice_grant = VoiceGrant(
outgoing_application_sid=self.application_sid,
incoming_allow=True, # Allow incoming calls
)
token.add_grant(voice_grant)
return token.to_jwt()
@classmethod
def safe_identity(cls, identity: str):
"""Create a safe identity by replacing unsupported special charaters `@` with (at)).
Twilio Client JS fails to make a call connection if identity has special characters like @, [, / etc)
https://www.twilio.com/docs/voice/client/errors (#31105)
"""
return identity.replace("@", "(at)")
@classmethod
def emailid_from_identity(cls, identity: str):
"""Convert safe identity string into emailID."""
return identity.replace("(at)", "@")
def get_recording_status_callback_url(self):
url_path = "/api/action/crm.integrations.twilio.api.update_recording_info"
return get_public_url(url_path)
def get_update_call_status_callback_url(self):
url_path = "/api/action/crm.integrations.twilio.api.update_call_status_info"
return get_public_url(url_path)
def generate_twilio_dial_response(self, from_number: str, to_number: str):
"""Generates voice call instructions to forward the call to agents Phone."""
resp = VoiceResponse()
dial = Dial(
caller_id=from_number,
record=self.settings.record_calls,
recording_status_callback=self.get_recording_status_callback_url(),
recording_status_callback_event="completed",
)
dial.number(
to_number,
status_callback_event="initiated ringing answered completed",
status_callback=self.get_update_call_status_callback_url(),
status_callback_method="POST",
)
resp.append(dial)
return resp
def get_call_info(self, call_sid):
return self.twilio_client.calls(call_sid).fetch()
def generate_twilio_client_response(self, client, ring_tone="at"):
"""Generates voice call instructions to forward the call to agents computer."""
resp = VoiceResponse()
dial = Dial(
ring_tone=ring_tone,
record=self.settings.record_calls,
recording_status_callback=self.get_recording_status_callback_url(),
recording_status_callback_event="completed",
)
dial.client(
client,
status_callback_event="initiated ringing answered completed",
status_callback=self.get_update_call_status_callback_url(),
status_callback_method="POST",
)
resp.append(dial)
return resp
@classmethod
def get_twilio_client(self):
twilio_settings = jingrow.get_pg("CRM Twilio Settings")
if not twilio_settings.enabled:
jingrow.throw(_("Please enable twilio settings before making a call."))
auth_token = get_decrypted_password("CRM Twilio Settings", "CRM Twilio Settings", "auth_token")
client = TwilioClient(twilio_settings.account_sid, auth_token)
return client
class IncomingCall:
def __init__(self, from_number, to_number, meta=None):
self.from_number = from_number
self.to_number = to_number
self.meta = meta
def process(self):
"""Process the incoming call
* Figure out who is going to pick the call (call attender)
* Check call attender settings and forward the call to Phone
"""
twilio = Twilio.connect()
owners = get_twilio_number_owners(self.to_number)
attender = get_the_call_attender(owners, self.from_number)
if not attender:
resp = VoiceResponse()
resp.say(_("Agent is unavailable to take the call, please call after some time."))
return resp
if attender["call_receiving_device"] == "Phone":
return twilio.generate_twilio_dial_response(self.from_number, attender["mobile_no"])
else:
return twilio.generate_twilio_client_response(twilio.safe_identity(attender["name"]))
def get_twilio_number_owners(phone_number):
"""Get list of users who is using the phone_number.
>>> get_twilio_number_owners("+11234567890")
{
'owner1': {'name': '..', 'mobile_no': '..', 'call_receiving_device': '...'},
'owner2': {....}
}
"""
# remove special characters from phone number and get only digits also remove white spaces
# keep + sign in the number at start of the number
phone_number = "".join([c for c in phone_number if c.isdigit() or c == "+"])
user_voice_settings = jingrow.get_all(
"CRM Telephony Agent",
filters={"twilio_number": phone_number},
fields=["name", "call_receiving_device"],
)
user_wise_voice_settings = {user["name"]: user for user in user_voice_settings}
user_general_settings = jingrow.get_all(
"User", filters=[["name", "IN", user_wise_voice_settings.keys()]], fields=["name", "mobile_no"]
)
user_wise_general_settings = {user["name"]: user for user in user_general_settings}
return merge_dicts(user_wise_general_settings, user_wise_voice_settings)
def get_active_loggedin_users(users):
"""Filter the current loggedin users from the given users list"""
rows = jingrow.db.sql(
"""
SELECT `user`
FROM `tabSessions`
WHERE `user` IN %(users)s
""",
{"users": users},
)
return [row[0] for row in set(rows)]
def get_the_call_attender(owners, caller=None):
"""Get attender details from list of owners"""
if not owners:
return
current_loggedin_users = get_active_loggedin_users(list(owners.keys()))
if len(current_loggedin_users) > 1 and caller:
deal_owner = jingrow.db.get_value("CRM Deal", {"mobile_no": caller}, "deal_owner")
if not deal_owner:
deal_owner = jingrow.db.get_value(
"CRM Lead", {"mobile_no": caller, "converted": False}, "lead_owner"
)
for user in current_loggedin_users:
if user == deal_owner:
current_loggedin_users = [user]
for name, details in owners.items():
if (details["call_receiving_device"] == "Phone" and details["mobile_no"]) or (
details["call_receiving_device"] == "Computer" and name in current_loggedin_users
):
return details
class TwilioCallDetails:
def __init__(self, call_info, call_from=None, call_to=None):
self.call_info = call_info
self.account_sid = call_info.get("AccountSid")
self.application_sid = call_info.get("ApplicationSid")
self.call_sid = call_info.get("CallSid")
self.call_status = self.get_call_status(call_info.get("CallStatus"))
self._call_from = call_from or call_info.get("From")
self._call_to = call_to or call_info.get("To")
def get_direction(self):
if self.call_info.get("Caller").lower().startswith("client"):
return "Outgoing"
return "Incoming"
def get_from_number(self):
return self._call_from or self.call_info.get("From")
def get_to_number(self):
return self._call_to or self.call_info.get("To")
@classmethod
def get_call_status(cls, twilio_status):
"""Convert Twilio given status into system status."""
twilio_status = twilio_status or ""
return " ".join(twilio_status.split("-")).title()
def to_dict(self):
"""Convert call details into dict."""
direction = self.get_direction()
from_number = self.get_from_number()
to_number = self.get_to_number()
caller = ""
receiver = ""
if direction == "Outgoing":
caller = self.call_info.get("Caller")
identity = caller.replace("client:", "").strip()
caller = Twilio.emailid_from_identity(identity) if identity else ""
else:
owners = get_twilio_number_owners(to_number)
attender = get_the_call_attender(owners, from_number)
receiver = attender["name"] if attender else ""
return {
"type": direction,
"status": self.call_status,
"id": self.call_sid,
"from": from_number,
"to": to_number,
"receiver": receiver,
"caller": caller,
}