167 lines
5.0 KiB
Python
167 lines
5.0 KiB
Python
import json
|
|
|
|
import jingrow
|
|
from jingrow import _
|
|
from werkzeug.wrappers import Response
|
|
|
|
from crm.integrations.api import get_contact_by_phone_number
|
|
|
|
from .twilio_handler import IncomingCall, Twilio, TwilioCallDetails
|
|
|
|
|
|
@jingrow.whitelist()
|
|
def is_enabled():
|
|
return jingrow.db.get_single_value("CRM Twilio Settings", "enabled")
|
|
|
|
|
|
@jingrow.whitelist()
|
|
def generate_access_token():
|
|
"""Returns access token that is required to authenticate Twilio Client SDK."""
|
|
twilio = Twilio.connect()
|
|
if not twilio:
|
|
return {}
|
|
|
|
from_number = jingrow.db.get_value("CRM Telephony Agent", jingrow.session.user, "twilio_number")
|
|
if not from_number:
|
|
return {
|
|
"ok": False,
|
|
"error": "caller_phone_identity_missing",
|
|
"detail": "Phone number is not mapped to the caller",
|
|
}
|
|
|
|
token = twilio.generate_voice_access_token(identity=jingrow.session.user)
|
|
return {"token": jingrow.safe_decode(token)}
|
|
|
|
|
|
@jingrow.whitelist(allow_guest=True)
|
|
def voice(**kwargs):
|
|
"""This is a webhook called by twilio to get instructions when the voice call request comes to twilio server."""
|
|
|
|
def _get_caller_number(caller):
|
|
identity = caller.replace("client:", "").strip()
|
|
user = Twilio.emailid_from_identity(identity)
|
|
return jingrow.db.get_value("CRM Telephony Agent", user, "twilio_number")
|
|
|
|
args = jingrow._dict(kwargs)
|
|
twilio = Twilio.connect()
|
|
if not twilio:
|
|
return
|
|
|
|
assert args.AccountSid == twilio.account_sid
|
|
assert args.ApplicationSid == twilio.application_sid
|
|
|
|
# Generate TwiML instructions to make a call
|
|
from_number = _get_caller_number(args.Caller)
|
|
resp = twilio.generate_twilio_dial_response(from_number, args.To)
|
|
|
|
call_details = TwilioCallDetails(args, call_from=from_number)
|
|
create_call_log(call_details)
|
|
return Response(resp.to_xml(), mimetype="text/xml")
|
|
|
|
|
|
@jingrow.whitelist(allow_guest=True)
|
|
def twilio_incoming_call_handler(**kwargs):
|
|
args = jingrow._dict(kwargs)
|
|
call_details = TwilioCallDetails(args)
|
|
create_call_log(call_details)
|
|
|
|
resp = IncomingCall(args.From, args.To).process()
|
|
return Response(resp.to_xml(), mimetype="text/xml")
|
|
|
|
|
|
def create_call_log(call_details: TwilioCallDetails):
|
|
details = call_details.to_dict()
|
|
|
|
call_log = jingrow.get_pg({**details, "pagetype": "CRM Call Log", "telephony_medium": "Twilio"})
|
|
|
|
# link call log with lead/deal
|
|
contact_number = details.get("from") if details.get("type") == "Incoming" else details.get("to")
|
|
link(contact_number, call_log)
|
|
|
|
call_log.save(ignore_permissions=True)
|
|
jingrow.db.commit()
|
|
return call_log
|
|
|
|
|
|
def link(contact_number, call_log):
|
|
contact = get_contact_by_phone_number(contact_number)
|
|
if contact.get("name"):
|
|
pagetype = "Contact"
|
|
docname = contact.get("name")
|
|
if contact.get("lead"):
|
|
pagetype = "CRM Lead"
|
|
docname = contact.get("lead")
|
|
elif contact.get("deal"):
|
|
pagetype = "CRM Deal"
|
|
docname = contact.get("deal")
|
|
call_log.link_with_reference_pg(pagetype, docname)
|
|
|
|
|
|
def update_call_log(call_sid, status=None):
|
|
"""Update call log status."""
|
|
twilio = Twilio.connect()
|
|
if not (twilio and jingrow.db.exists("CRM Call Log", call_sid)):
|
|
return
|
|
|
|
try:
|
|
call_details = twilio.get_call_info(call_sid)
|
|
call_log = jingrow.get_pg("CRM Call Log", call_sid)
|
|
call_log.status = TwilioCallDetails.get_call_status(status or call_details.status)
|
|
call_log.duration = call_details.duration
|
|
call_log.start_time = get_datetime_from_timestamp(call_details.start_time)
|
|
call_log.end_time = get_datetime_from_timestamp(call_details.end_time)
|
|
call_log.save(ignore_permissions=True)
|
|
jingrow.db.commit()
|
|
return call_log
|
|
except Exception:
|
|
jingrow.log_error(title="Error while updating call record")
|
|
jingrow.db.commit()
|
|
|
|
|
|
@jingrow.whitelist(allow_guest=True)
|
|
def update_recording_info(**kwargs):
|
|
try:
|
|
args = jingrow._dict(kwargs)
|
|
recording_url = args.RecordingUrl
|
|
call_sid = args.CallSid
|
|
update_call_log(call_sid)
|
|
jingrow.db.set_value("CRM Call Log", call_sid, "recording_url", recording_url)
|
|
except Exception:
|
|
jingrow.log_error(title=_("Failed to capture Twilio recording"))
|
|
|
|
|
|
@jingrow.whitelist(allow_guest=True)
|
|
def update_call_status_info(**kwargs):
|
|
try:
|
|
args = jingrow._dict(kwargs)
|
|
parent_call_sid = args.ParentCallSid
|
|
update_call_log(parent_call_sid, status=args.CallStatus)
|
|
|
|
call_info = {
|
|
"ParentCallSid": args.ParentCallSid,
|
|
"CallSid": args.CallSid,
|
|
"CallStatus": args.CallStatus,
|
|
"CallDuration": args.CallDuration,
|
|
"From": args.From,
|
|
"To": args.To,
|
|
}
|
|
|
|
client = Twilio.get_twilio_client()
|
|
client.calls(args.ParentCallSid).user_defined_messages.create(content=json.dumps(call_info))
|
|
except Exception:
|
|
jingrow.log_error(title=_("Failed to update Twilio call status"))
|
|
|
|
|
|
def get_datetime_from_timestamp(timestamp):
|
|
from datetime import datetime
|
|
from zoneinfo import ZoneInfo
|
|
|
|
if not timestamp:
|
|
return None
|
|
|
|
datetime_utc_tz_str = timestamp.strftime("%Y-%m-%d %H:%M:%S%z")
|
|
datetime_utc_tz = datetime.strptime(datetime_utc_tz_str, "%Y-%m-%d %H:%M:%S%z")
|
|
system_timezone = jingrow.utils.get_system_timezone()
|
|
converted_datetime = datetime_utc_tz.astimezone(ZoneInfo(system_timezone))
|
|
return jingrow.utils.format_datetime(converted_datetime, "yyyy-MM-dd HH:mm:ss")
|