refactor: class based Sync source

This commit is contained in:
Hussain Nagaria 2025-10-19 18:34:35 +05:30
parent 91b8a48c0e
commit 176c7c73bb
5 changed files with 127 additions and 56 deletions

View File

@ -6,6 +6,9 @@
"engine": "InnoDB",
"field_order": [
"type",
"column_break_dhay",
"source",
"section_break_fhot",
"lead_data"
],
"fields": [
@ -25,13 +28,28 @@
"label": "Lead Data",
"options": "JSON",
"read_only": 1
},
{
"fieldname": "column_break_dhay",
"fieldtype": "Column Break"
},
{
"fieldname": "source",
"fieldtype": "Link",
"label": "Source",
"options": "Lead Sync Source",
"read_only": 1
},
{
"fieldname": "section_break_fhot",
"fieldtype": "Section Break"
}
],
"grid_page_length": 50,
"in_create": 1,
"index_web_pages_for_search": 1,
"links": [],
"modified": "2025-10-19 17:41:28.640446",
"modified": "2025-10-19 18:30:15.381205",
"modified_by": "Administrator",
"module": "Lead Syncing",
"name": "Failed Lead Sync Log",

View File

@ -15,6 +15,7 @@ class FailedLeadSyncLog(Document):
from frappe.types import DF
lead_data: DF.Code | None
source: DF.Link | None
type: DF.Literal["Duplicate", "Failure"]
# end: auto-generated types

View File

@ -12,63 +12,105 @@ def get_fb_graph_api_url(endpoint: str) -> str:
return f"{FB_GRAPH_API_BASE}/{FB_GRAPH_API_VERSION}/{endpoint}"
def sync_leads_from_facebook(access_token: str, lead_form_id: str) -> None:
url = get_fb_graph_api_url(f"/{lead_form_id}/leads")
last_synced_at = frappe.db.get_value(
"Lead Sync Source", {"facebook_lead_form": lead_form_id}, "last_synced_at"
)
if last_synced_at:
timestamp = frappe.utils.data.get_timestamp(last_synced_at)
filtering = f"filtering=[{{'field':'time_created','operator':'GREATER_THAN','value':{timestamp}}}]"
url = f"{url}?{filtering}"
class FacebookSyncSource:
def __init__(
self,
access_token: str,
form_id: str,
source_name: str | None = None,
):
self.access_token = access_token
self.form_id = form_id
self.source_name = source_name
leads = make_get_request(
url,
params={
"access_token": access_token,
"fields": "id,created_time,field_data",
"limit": 15000,
},
).get("data", [])
def get_api_url(self, endpoint: str) -> str:
return get_fb_graph_api_url(endpoint)
form_questions = frappe.db.get_all(
"Facebook Lead Form Question", filters={"parent": lead_form_id}, fields=["key", "mapped_to_crm_field"]
)
def sync(self):
leads = self.fetch_leads()
question_to_field_map = self.get_form_questions_mapping()
# Map form questions to CRM Lead fields
question_to_field_map = {
q["key"]: q["mapped_to_crm_field"] for q in form_questions if q["mapped_to_crm_field"]
}
for lead in leads:
lead_data = {item["name"]: item["values"][0] for item in lead["field_data"]}
crm_lead_data = {
question_to_field_map.get(k): v for k, v in lead_data.items() if k in question_to_field_map
}
crm_lead_data["source"] = "Facebook"
crm_lead_data["facebook_lead_id"] = lead["id"]
for lead in leads:
lead_data = {item["name"]: item["values"][0] for item in lead["field_data"]}
crm_lead_data = {
question_to_field_map.get(k): v for k, v in lead_data.items() if k in question_to_field_map
}
crm_lead_data["source"] = "Facebook"
crm_lead_data["facebook_lead_id"] = lead["id"]
try:
frappe.get_doc(
{
"doctype": "CRM Lead",
**crm_lead_data,
}
).insert(ignore_permissions=True)
except frappe.UniqueValidationError:
# Skip duplicate leads based on facebook_lead_id
# TODO: de-duplication based on field values
self.create_failure_log(lead, "Duplicate")
except Exception:
self.create_failure_log(lead)
try:
frappe.get_doc(
{
"doctype": "CRM Lead",
**crm_lead_data,
}
).insert(ignore_permissions=True)
except frappe.UniqueValidationError:
# Skip duplicate leads based on facebook_lead_id
# TODO: de-duplication based on field values
frappe.get_doc(
{"doctype": "Failed Lead Sync Log", "type": "Duplicate", "lead_data": frappe.as_json(lead)}
).insert(ignore_permissions=True)
except Exception:
frappe.get_doc(
{"doctype": "Failed Lead Sync Log", "type": "Failure", "lead_data": frappe.as_json(lead)}
).insert(ignore_permissions=True)
self.update_last_synced_at()
frappe.db.set_value(
"Lead Sync Source", {"facebook_lead_form": lead_form_id}, "last_synced_at", frappe.utils.now()
)
def fetch_leads(self):
url = self.get_api_url(f"/{self.form_id}/leads")
if self.last_synced_at:
timestamp = frappe.utils.data.get_timestamp(self.last_synced_at)
filtering = (
f"filtering=[{{'field':'time_created','operator':'GREATER_THAN','value':{timestamp}}}]"
)
url = f"{url}?{filtering}"
return make_get_request(
url,
params={
"access_token": self.access_token,
"fields": "id,created_time,field_data",
"limit": 100000, # TODO: pagination
},
).get("data", [])
def get_form_questions_mapping(self):
form_questions = frappe.db.get_all(
"Facebook Lead Form Question",
filters={"parent": self.form_id},
fields=["key", "mapped_to_crm_field"],
)
return {q["key"]: q["mapped_to_crm_field"] for q in form_questions if q["mapped_to_crm_field"]}
@property
def last_synced_at(self):
return frappe.db.get_value(
"Lead Sync Source", self.source_name or {"facebook_lead_form": self.form_id}, "last_synced_at"
)
def create_failure_log(self, lead_data: dict | None = None, type: str = "Failure"):
return frappe.get_doc(
{
"doctype": "Failed Lead Sync Log",
"type": type,
"lead_data": frappe.as_json(lead_data),
"source": self.get_source_name()
}
).insert(ignore_permissions=True)
def update_last_synced_at(self):
frappe.db.set_value(
"Lead Sync Source",
self.source_name or {"facebook_lead_form": self.form_id},
"last_synced_at",
frappe.utils.now(),
)
def get_source_name(self):
if self.source_name:
return self.source_name
return frappe.db.get_value("Lead Sync Source", {"facebook_lead_form": self.form_id}, "name")
@frappe.whitelist()
@ -156,3 +198,8 @@ def get_pages_with_forms() -> list[dict]:
forms = frappe.db.get_all("Facebook Lead Form", filters={"page": page["id"]}, fields=["id", "name"])
page["forms"] = forms
return pages
def validate_duplicate(lead: dict, field_mapping: dict):
# if a lead exists with
pass

View File

@ -54,7 +54,8 @@
"fieldname": "facebook_lead_form",
"fieldtype": "Link",
"label": "Facebook Lead Form",
"options": "Facebook Lead Form"
"options": "Facebook Lead Form",
"unique": 1
},
{
"default": "1",
@ -84,7 +85,7 @@
"grid_page_length": 50,
"index_web_pages_for_search": 1,
"links": [],
"modified": "2025-10-19 15:07:26.256720",
"modified": "2025-10-19 18:23:24.725350",
"modified_by": "Administrator",
"module": "Lead Syncing",
"name": "Lead Sync Source",

View File

@ -5,8 +5,8 @@ import frappe
from frappe.model.document import Document
from crm.lead_syncing.doctype.lead_sync_source.facebook import (
FacebookSyncSource,
fetch_and_store_pages_from_facebook,
sync_leads_from_facebook,
)
@ -60,4 +60,8 @@ class LeadSyncSource(Document):
if self.type == "Facebook" and self.access_token:
if not self.facebook_lead_form:
frappe.throw(frappe._("Please select a lead gen form before syncing!"))
sync_leads_from_facebook(self.get_password("access_token"), self.facebook_lead_form)
FacebookSyncSource(
self.get_password("access_token"),
self.facebook_lead_form
).sync()