Added input validation

Added document existence checks
Added reference field validation
Changed None to empty strings for reference fields
This commit is contained in:
naaa760 2025-08-08 14:12:36 +05:30
parent 499888b4a7
commit 51823d1b88

View File

@ -751,7 +751,12 @@ def getCounts(d, doctype):
@frappe.whitelist() @frappe.whitelist()
def get_linked_docs_of_document(doctype, docname): def get_linked_docs_of_document(doctype, docname):
try:
doc = frappe.get_doc(doctype, docname) doc = frappe.get_doc(doctype, docname)
except frappe.DoesNotExistError:
return []
linked_docs = get_linked_docs(doc) linked_docs = get_linked_docs(doc)
dynamic_linked_docs = get_dynamic_linked_docs(doc) dynamic_linked_docs = get_dynamic_linked_docs(doc)
@ -760,7 +765,16 @@ def get_linked_docs_of_document(doctype, docname):
docs_data = [] docs_data = []
for doc in linked_docs: for doc in linked_docs:
if not doc.get("reference_doctype") or not doc.get("reference_docname"):
continue
try:
data = frappe.get_doc(doc["reference_doctype"], doc["reference_docname"]) data = frappe.get_doc(doc["reference_doctype"], doc["reference_docname"])
except (frappe.DoesNotExistError, frappe.ValidationError):
continue
title = data.get("title") title = data.get("title")
if data.doctype == "CRM Call Log": if data.doctype == "CRM Call Log":
title = f"Call from {data.get('from')} to {data.get('to')}" title = f"Call from {data.get('from')} to {data.get('to')}"
@ -780,17 +794,31 @@ def get_linked_docs_of_document(doctype, docname):
def remove_doc_link(doctype, docname): def remove_doc_link(doctype, docname):
if not doctype or not docname:
return
try:
linked_doc_data = frappe.get_doc(doctype, docname) linked_doc_data = frappe.get_doc(doctype, docname)
linked_doc_data.update( linked_doc_data.update(
{ {
"reference_doctype": None, "reference_doctype": "",
"reference_docname": None, "reference_docname": "",
} }
) )
linked_doc_data.save(ignore_permissions=True) linked_doc_data.save(ignore_permissions=True)
except (frappe.DoesNotExistError, frappe.ValidationError):
pass
def remove_contact_link(doctype, docname): def remove_contact_link(doctype, docname):
if not doctype or not docname:
return
try:
linked_doc_data = frappe.get_doc(doctype, docname) linked_doc_data = frappe.get_doc(doctype, docname)
linked_doc_data.update( linked_doc_data.update(
{ {
@ -799,6 +827,9 @@ def remove_contact_link(doctype, docname):
} }
) )
linked_doc_data.save(ignore_permissions=True) linked_doc_data.save(ignore_permissions=True)
except (frappe.DoesNotExistError, frappe.ValidationError):
pass
@frappe.whitelist() @frappe.whitelist()
@ -807,6 +838,11 @@ def remove_linked_doc_reference(items, remove_contact=None, delete=False):
items = frappe.parse_json(items) items = frappe.parse_json(items)
for item in items: for item in items:
if not item.get("doctype") or not item.get("docname"):
continue
try:
if remove_contact: if remove_contact:
remove_contact_link(item["doctype"], item["docname"]) remove_contact_link(item["doctype"], item["docname"])
else: else:
@ -814,6 +850,9 @@ def remove_linked_doc_reference(items, remove_contact=None, delete=False):
if delete: if delete:
frappe.delete_doc(item["doctype"], item["docname"]) frappe.delete_doc(item["doctype"], item["docname"])
except (frappe.DoesNotExistError, frappe.ValidationError):
# Skip if document doesn't exist or has validation errors
continue
return "success" return "success"
@ -822,10 +861,30 @@ def remove_linked_doc_reference(items, remove_contact=None, delete=False):
def delete_bulk_docs(doctype, items, delete_linked=False): def delete_bulk_docs(doctype, items, delete_linked=False):
from frappe.desk.reportview import delete_bulk from frappe.desk.reportview import delete_bulk
if not doctype:
frappe.throw("Doctype is required")
if not items:
frappe.throw("Items are required")
items = frappe.parse_json(items) items = frappe.parse_json(items)
if not isinstance(items, list):
frappe.throw("Items must be a list")
for doc in items: for doc in items:
try:
if not frappe.db.exists(doctype, doc):
frappe.log_error(f"Document {doctype} {doc} does not exist", "Bulk Delete Error")
continue
linked_docs = get_linked_docs_of_document(doctype, doc) linked_docs = get_linked_docs_of_document(doctype, doc)
for linked_doc in linked_docs: for linked_doc in linked_docs:
if not linked_doc.get("reference_doctype") or not linked_doc.get("reference_docname"):
continue
remove_linked_doc_reference( remove_linked_doc_reference(
[ [
{ {
@ -836,6 +895,9 @@ def delete_bulk_docs(doctype, items, delete_linked=False):
remove_contact=doctype == "Contact", remove_contact=doctype == "Contact",
delete=delete_linked, delete=delete_linked,
) )
except Exception as e:
frappe.log_error(f"Error processing linked docs for {doctype} {doc}: {str(e)}", "Bulk Delete Error")
if len(items) > 10: if len(items) > 10:
frappe.enqueue("frappe.desk.reportview.delete_bulk", doctype=doctype, items=items) frappe.enqueue("frappe.desk.reportview.delete_bulk", doctype=doctype, items=items)