# Copyright (c) 2020, JINGROW # For license information, please see license.txt from __future__ import annotations import contextlib import functools import json import re import socket import ssl import time from datetime import datetime, timedelta from functools import wraps from pathlib import Path from typing import TypedDict, TypeVar from urllib.parse import urljoin from urllib.request import urlopen import jingrow import jingrow.utils import pytz import requests import wrapt from babel.dates import format_timedelta from cryptography import x509 from cryptography.hazmat.backends import default_backend from cryptography.x509.oid import ExtensionOID from jingrow.utils import get_datetime, get_system_timezone from jingrow.utils.caching import site_cache from jcloude.utils.email_validator import validate_email class SupervisorProcess(TypedDict): program: str name: str status: str uptime: float | None uptime_string: str | None message: str | None group: str | None pid: int | None def log_error(title, **kwargs): if jingrow.flags.in_test: try: raise except RuntimeError as e: if e.args[0] == "No active exception to reraise": pass else: raise reference_pagetype = kwargs.get("reference_pagetype") reference_name = kwargs.get("reference_name") # Prevent double logging as `message` if reference_pagetype and reference_name: del kwargs["reference_pagetype"] del kwargs["reference_name"] if pg := kwargs.get("pg"): reference_pagetype = pg.pagetype reference_name = pg.name del kwargs["pg"] with contextlib.suppress(Exception): kwargs["user"] = jingrow.session.user kwargs["team"] = jingrow.local.team() message = "" if serialized := json.dumps( kwargs, indent=4, sort_keys=True, default=str, skipkeys=True, ): message += f"Data:\n{serialized}\n" if traceback := jingrow.get_traceback(with_context=True): message += f"Exception:\n{traceback}\n" with contextlib.suppress(Exception): jingrow.log_error( title=title, message=message, reference_pagetype=reference_pagetype, reference_name=reference_name, ) def get_current_team(get_pg=False): if jingrow.session.user == "Guest": jingrow.throw("Not Permitted", jingrow.AuthenticationError) if not hasattr(jingrow.local, "request"): # if this is not a request, send the current user as default team # always use parent_team for background jobs return ( jingrow.get_pg( "Team", {"user": jingrow.session.user, "enabled": 1, "parent_team": ("is", "not set")}, ) if get_pg else jingrow.get_value( "Team", {"user": jingrow.session.user, "enabled": 1, "parent_team": ("is", "not set")}, "name", ) ) system_user = jingrow.session.data.user_type == "System User" # get team passed via request header x_jcloude_team = jingrow.get_request_header("X-Jcloude-Team") # In case if X-Jcloude-Team is not passed, check if `team_name` is available in jingrow.local # `team_name` getting injected by jcloude.saas.api.whitelist_saas_api decorator team = x_jcloude_team if x_jcloude_team else getattr(jingrow.local, "team_name", "") if not team and has_role("Jcloude Admin") and jingrow.db.exists("Team", {"user": jingrow.session.user}): # if user has_role of Jcloude Admin then just return current user as default team return ( jingrow.get_pg("Team", {"user": jingrow.session.user, "enabled": 1}) if get_pg else jingrow.get_value("Team", {"user": jingrow.session.user, "enabled": 1}, "name") ) # if team is not passed via header, get the default team for user team = team if team else get_default_team_for_user(jingrow.session.user) if not system_user and not is_user_part_of_team(jingrow.session.user, team): # if user is not part of the team, get the default team for user team = get_default_team_for_user(jingrow.session.user) if not team: jingrow.throw( f"User {jingrow.session.user} is not part of any team", jingrow.AuthenticationError, ) if not jingrow.db.exists("Team", {"name": team, "enabled": 1}): jingrow.throw("Invalid Team", jingrow.AuthenticationError) if get_pg: return jingrow.get_pg("Team", team) return team def _get_current_team(): if not getattr(jingrow.local, "_current_team", None): jingrow.local._current_team = get_current_team(get_pg=True) return jingrow.local._current_team def _system_user(): return jingrow.get_cached_value("User", jingrow.session.user, "user_type") == "System User" def has_role(role, user=None): if not user: user = jingrow.session.user return jingrow.db.exists("Has Role", {"parenttype": "User", "parent": user, "role": role}) @functools.lru_cache(maxsize=1024) def get_app_tag(repository, repository_owner, hash): return jingrow.db.get_value( "App Tag", {"repository": repository, "repository_owner": repository_owner, "hash": hash}, "tag", ) def get_default_team_for_user(user): """Returns the Team if user has one, or returns the Team in which they belong""" if jingrow.db.exists("Team", {"user": user, "enabled": 1}): return jingrow.db.get_value("Team", {"user": user, "enabled": 1}, "name") teams = jingrow.db.get_values( "Team Member", filters={"parenttype": "Team", "user": user}, fieldname="parent", pluck="parent", ) for team in teams: # if user is part of multiple teams, send the first enabled one if jingrow.db.exists("Team", {"name": team, "enabled": 1}): return team return None def get_valid_teams_for_user(user): teams = jingrow.db.get_all("Team Member", filters={"user": user}, pluck="parent") return jingrow.db.get_all("Team", filters={"name": ("in", teams), "enabled": 1}, fields=["name", "user"]) def is_user_part_of_team(user, team): """Returns True if user is part of the team""" return jingrow.db.exists("Team Member", {"parenttype": "Team", "parent": team, "user": user}) def get_country_info(): if jingrow.flags.in_test: return {} ip = jingrow.local.request_ip ip_api_key = jingrow.conf.get("ip-api-key") def _get_country_info(): fields = [ "status", "message", "continent", "continentCode", "country", "countryCode", "region", "regionName", "city", "district", "zip", "lat", "lon", "timezone", "offset", "currency", "isp", "org", "as", "asname", "reverse", "mobile", "proxy", "hosting", "query", ] res = requests.get( "https://pro.ip-api.com/json/{ip}?key={key}&fields={fields}".format( ip=ip, key=ip_api_key, fields=",".join(fields) ) ) try: data = res.json() if data.get("status") != "fail": return data except Exception: pass return {} return jingrow.cache().hget("ip_country_map", ip, generator=_get_country_info) def get_last_pg(*args, **kwargs): """Wrapper around jingrow.get_last_pg but does not throw""" try: return jingrow.get_last_pg(*args, **kwargs) except Exception: return None def cache(seconds: int, maxsize: int = 128, typed: bool = False): def wrapper_cache(func): func = functools.lru_cache(maxsize=maxsize, typed=typed)(func) func.delta = timedelta(seconds=seconds) func.expiration = datetime.utcnow() + func.delta @functools.wraps(func) def wrapped_func(*args, **kwargs): if datetime.utcnow() >= func.expiration: func.cache_clear() func.expiration = datetime.utcnow() + func.delta return func(*args, **kwargs) return wrapped_func return wrapper_cache def chunk(iterable, size): """Creates list of elements split into groups of n.""" for i in range(0, len(iterable), size): yield iterable[i : i + size] @cache(seconds=1800) def get_minified_script(): migration_script = "../apps/jcloude/jcloude/scripts/migrate.py" with open(migration_script) as f: return f.read() @cache(seconds=1800) def get_minified_script_2(): migration_script = "../apps/jcloude/jcloude/scripts/migrate_2.py" with open(migration_script) as f: return f.read() def get_jingrow_backups(url, email, password): return RemoteFrappeSite(url, email, password).get_backups() def is_allowed_access_performance_tuning(): team = get_current_team(get_pg=True) return team.enable_performance_tuning class RemoteFrappeSite: def __init__(self, url, usr, pwd): if not url.startswith("http"): # http will be redirected to https in requests url = f"http://{url}" self.user_site = url.strip() self.user_login = usr.strip() self.password_login = pwd self._remote_backup_links = {} self._validate_jingrow_site() self._validate_user_permissions() @property def user_sid(self): return self._user_sid @property def site(self): return self._site @property def backup_links(self): return self._remote_backup_links def _validate_jingrow_site(self): """Validates if Jingrow Site and sets RemoteBackupRetrieval.site""" res = requests.get(f"{self.user_site}/api/method/jingrow.ping", timeout=(5, 10)) if not res.ok: jingrow.throw("Invalid Jingrow Site") if res.json().get("message") == "pong": # Get final redirect URL url = res.url.split("/api/method")[0] self._site = url def _validate_user_permissions(self): """Validates user permssions on Jingrow Site and sets RemoteBackupRetrieval.user_sid""" response = requests.post( f"{self.site}/api/method/login", data={"usr": self.user_login, "pwd": self.password_login}, timeout=(5, 10), ) if not response.ok: if response.status_code == 401: jingrow.throw("Invalid Credentials") else: response.raise_for_status() self._user_sid = response.cookies.get("sid") def _handle_backups_retrieval_failure(self, response): log_error( "Backups Retrieval Error - Magic Migration", response=response.text, remote_site=self.site, ) if response.status_code == 403: error_msg = "Insufficient Permissions" else: side = "Client" if 400 <= response.status_code < 500 else "Server" error_msg = ( f"{side} Error occurred: {response.status_code} {response.raw.reason}" f" received from {self.site}" ) jingrow.throw(error_msg) def get_backups(self): self._create_fetch_backups_request() self._processed_backups_from_response() self._validate_missing_backups() return self.backup_links def _create_fetch_backups_request(self): headers = {"Accept": "application/json", "Content-Type": "application/json"} suffix = f"?sid={self.user_sid}" if self.user_sid else "" res = requests.get( f"{self.site}/api/method/jingrow.utils.backups.fetch_latest_backups{suffix}", headers=headers, timeout=(5, 10), ) if not res.ok: self._handle_backups_retrieval_failure(res) self._fetch_latest_backups_response = res.json().get("message", {}) def _validate_missing_backups(self): missing_files = [] for file_type, file_path in self.backup_links.items(): if not file_path: missing_files.append(file_type) if missing_files: missing_config = "site config and " if not self.backup_links.get("config") else "" missing_backups = ( f"Missing {missing_config}backup files: {', '.join([x.title() for x in missing_files])}" ) jingrow.throw(missing_backups) def __process_jingrow_url(self, path): if not path: return None backup_path = path.split("/private")[1] return urljoin(self.site, f"{backup_path}?sid={self.user_sid}") def _processed_backups_from_response(self): for file_type, file_path in self._fetch_latest_backups_response.items(): self._remote_backup_links[file_type] = self.__process_jingrow_url(file_path) @site_cache(ttl=5 * 60) def get_client_blacklisted_keys() -> list: """Returns list of blacklisted Site Config Keys accessible to Jcloude /dashboard users.""" return list( set( [ x.key for x in jingrow.get_all("Site Config Key Blacklist", fields=["`key`"]) + jingrow.get_all("Site Config Key", fields=["`key`"], filters={"internal": True}) ] ) ) def sanitize_config(config: dict) -> dict: client_blacklisted_keys = get_client_blacklisted_keys() sanitized_config = config.copy() for key in config: if key in client_blacklisted_keys: sanitized_config.pop(key) return sanitized_config def developer_mode_only(): if not jingrow.conf.developer_mode: jingrow.throw("You don't know what you're doing. Go away!", jingrow.ValidationError) def human_readable(num: int | float) -> str: """Assumes int data to describe size is in Bytes""" for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]: if abs(num) < 1024: return f"{num:3.1f} {unit}B" num /= 1024 return f"{num:.1f} YiB" def is_json(string): if isinstance(string, str): string = string.strip() return string.startswith("{") and string.endswith("}") if isinstance(string, (dict, list)): return True return None def is_list(string): if isinstance(string, list): return True if isinstance(string, str): string = string.strip() return string.startswith("[") and string.endswith("]") return False def guess_type(value): type_dict = { int: "Number", float: "Number", bool: "Boolean", dict: "JSON", list: "JSON", str: "String", } value_type = type(value) if value_type in type_dict: return type_dict[value_type] if is_json(value): return "JSON" return "String" def convert(string): if isinstance(string, (dict, list)): return json.dumps(string) return string def unique(seq, unique_by=None): """Remove duplicates from a list based on an expression Usage: unique([{'x': 1, 'y': 2}, {'x': 1, 'y': 2}], lambda d: d['x']) # output: [{'x': 1, 'y': 2}] """ unique_by = unique_by or (lambda x: x) out = [] seen = set() for d in seq: unique_key = unique_by(d) if unique_key not in seen: out.append(d) seen.add(unique_key) return out def group_children_in_result(result, child_field_map): """Usage: result = [ {'name': 'test1', 'full_name': 'Faris Ansari', role: 'System Manager'}, {'name': 'test1', 'full_name': 'Faris Ansari', role: 'Jcloude Admin'}, {'name': 'test2', 'full_name': 'Aditya Hase', role: 'Jcloude Admin'}, {'name': 'test2', 'full_name': 'Aditya Hase', role: 'Jcloude Member'}, ] out = group_children_in_result(result, {'role': 'roles'}) print(out) [ {'name': 'test1', 'full_name': 'Faris Ansari', roles: ['System Manager', 'Jcloude Admin']}, {'name': 'test2', 'full_name': 'Aditya Hase', roles: ['Jcloude Admin', 'Jcloude Member']}, ] """ out = {} for d in result: out[d.name] = out.get(d.name) or d for child_field, target in child_field_map.items(): out[d.name][target] = out[d.name].get(target) or [] out[d.name][target].append(d.get(child_field)) out[d.name].pop(child_field, "") return out.values() def convert_user_timezone_to_utc(datetime): timezone = pytz.timezone(get_system_timezone()) datetime_obj = get_datetime(datetime) return timezone.localize(datetime_obj).astimezone(pytz.utc).isoformat() class ttl_cache: """ Does not invalidate cache depending on function args. Ideally it's for functions with 0 arity. Example: # or use it as a decorator cached_func = ttl_cache()(func) # to invalidate cache cached_func.cache.invalidate() """ def __init__(self, ttl: int = 60): self.ttl = ttl self.result = None self.start = 0 def invalidate(self): self.result = None self.start = 0 def __call__(self, func): self.result = None self.start = time.time() def wrapper_func(*args, **kwargs): if self.result is not None and (time.time() - self.start) < self.ttl: return self.result self.start = time.time() self.result = func(*args, **kwargs) return self.result wrapper_func.cache = self return wrapper_func def poly_get_pagetype(doctypes, name): """Get the pagetype value from the given name of a pg from a list of doctypes""" for pagetype in doctypes: if jingrow.db.exists(pagetype, name): return pagetype return doctypes[-1] def reconnect_on_failure(): @wrapt.decorator def wrapper(wrapped, instance, args, kwargs): try: return wrapped(*args, **kwargs) except Exception as e: if jingrow.db.is_interface_error(e): jingrow.db.connect() return wrapped(*args, **kwargs) raise return wrapper def parse_supervisor_status(output: str): # Note: this function is verbose due to supervisor status being kinda # unstructured, and I'm not entirely sure of all possible input formats. # # example lines: # ``` # jingrow-bench-web:jingrow-bench-jingrow-web RUNNING pid 1327, uptime 23:13:00 # jingrow-bench-workers:jingrow-bench-jingrow-worker-4 RUNNING pid 3794915, uptime 68 days, 6:10:37 # sshd FATAL Exited too quickly (process log may have details) # ``` pid_rex = re.compile(r"^pid\s+\d+") lines = output.split("\n") parsed = [] for line in lines: if "DeprecationWarning:" in line or "pkg_resources is deprecated" in line: continue entry = { "program": "", "status": "", } splits = strip_split(line, maxsplit=1) if len(splits) != 2: continue program, info = splits # example: "code-server" entry["program"] = program entry["name"] = program prog_splits = program.split(":") if len(prog_splits) == 2: # example: "jingrow-bench-web:jingrow-bench-jingrow-web" entry["group"] = prog_splits[0] entry["name"] = prog_splits[1] info_splits = strip_split(info, maxsplit=1) if len(info_splits) != 2: continue # example: "STOPPED Not started" entry["status"] = info_splits[0].title() if not pid_rex.match(info_splits[1]): entry["message"] = info_splits[1] else: # example: "RUNNING pid 9, uptime 150 days, 2:55:52" pid, uptime, uptime_string = parse_pid_uptime(info_splits[1]) entry["pid"] = pid entry["uptime"] = uptime entry["uptime_string"] = uptime_string parsed.append(entry) return parsed def parse_pid_uptime(s: str): pid: int | None = None uptime: float | None = None splits = strip_split(s, ",", maxsplit=1) if len(splits) != 2: return pid, uptime # example: "pid 9" pid_split = splits[0] # example: "uptime 150 days, 2:55:52" uptime_split = splits[1] pid_split, uptime_split = splits pid_splits = strip_split(pid_split, maxsplit=1) if len(pid_splits) == 2 and pid_splits[0] == "pid": pid = int(pid_splits[1]) uptime_string = "" uptime_splits = strip_split(uptime_split, maxsplit=1) if len(uptime_splits) == 2 and uptime_splits[0] == "uptime": uptime_string = uptime_splits[1] uptime = parse_uptime(uptime_string) return pid, uptime, uptime_string def parse_uptime(s: str) -> float | None: # example `s`: "uptime 68 days, 6:10:37" days = 0.0 hours = 0.0 minutes = 0.0 seconds = 0.0 t_string = "" splits = strip_split(s, sep=",", maxsplit=1) # Uptime has date info too if len(splits) == 2 and (splits[0].endswith("days") or splits[0].endswith("day")): t_string = splits[1] d_string = splits[0].split(" ")[0] days = float(d_string) # Uptime less than a day elif len(splits) == 1: t_string = splits[0] else: return None # Time string format hh:mm:ss t_splits = t_string.split(":") if len(t_splits) == 3: hours = float(t_splits[0]) minutes = float(t_splits[1]) seconds = float(t_splits[2]) return timedelta( days=days, hours=hours, minutes=minutes, seconds=seconds, ).total_seconds() def strip_split(string: str, sep: str = " ", maxsplit: int = -1) -> list[str]: splits: list[str] = [] for part in string.split(sep, maxsplit): if p_stripped := part.strip(): splits.append(p_stripped) return splits def get_filepath(root: str, filename: str, max_depth: int = 1): """ Returns the absolute path of a `filename` under `root`. If it is not found, returns None. Example: get_filepath("apps/hrms", "hooks.py", 2) Depth of search under file tree can be set using `max_depth`. """ path = _get_filepath( Path(root), filename, max_depth, ) if path is None: return path return path.absolute().as_posix() def _get_filepath(root: Path, filename: str, max_depth: int) -> Path | None: if root.name == filename: return root if max_depth == 0 or not root.is_dir(): return None for new_root in root.iterdir(): if possible_path := _get_filepath( new_root, filename, max_depth - 1, ): return possible_path return None def fmt_timedelta(td: timedelta | int): locale = jingrow.local.lang.replace("-", "_") if jingrow.local.lang else None return format_timedelta(td, locale=locale) V = TypeVar("V") def flatten(value_lists: "list[list[V]]") -> "list[V]": return [value for values in value_lists for value in values] def is_valid_hostname(hostname): if len(hostname) > 255: return False allowed = re.compile(r"(?!-)[A-Z\d-]{1,63}(? str: """ Mask email address with 'x' Example: > mask_email("tanmoysarkar@gmail.com", 50) > tanxxxxxxkar@gmxxxxcom > mask_email("tanmoysarkar@gmail.com", 30) > tanmxxxarkar@gmaxx.com """ if "@" not in email: return "Invalid email address" local_part, domain = email.split("@") local_mask_length = int(len(local_part) * (percentage / 100)) domain_mask_length = int(len(domain) * (percentage / 100)) def mask_middle(s: str, mask_len: int) -> str: if mask_len == 0: return s start_idx = (len(s) - mask_len) // 2 end_idx = start_idx + mask_len return s[:start_idx] + "x" * mask_len + s[end_idx:] masked_local_part = mask_middle(local_part, local_mask_length) masked_domain = mask_middle(domain, domain_mask_length) return masked_local_part + "@" + masked_domain def get_mariadb_root_password(site): from jingrow.utils.password import get_decrypted_password database_server, managed_database_service = jingrow.get_cached_value( "Bench", site.bench, ["database_server", "managed_database_service"] ) if managed_database_service: pagetype = "Managed Database Service" name = managed_database_service field = "root_user_password" else: pagetype = "Database Server" name = database_server field = "mariadb_root_password" return get_decrypted_password(pagetype, name, field) def is_valid_email_address(email) -> bool: if jingrow.cache.exists(f"email_validity:{email}"): return bool(jingrow.utils.data.cint(jingrow.cache.get_value(f"email_validity:{email}"))) try: is_valid = bool(validate_email(email=email, check_mx=True, verify=True, smtp_timeout=10)) jingrow.cache.set_value(f"email_validity:{email}", int(is_valid), expires_in_sec=3600) if not is_valid: log_error("Invalid email address on signup", data=email) return bool(is_valid) except Exception as e: log_error("Email validation error on signup", data=e) jingrow.cache.set_value(f"email_validity:{email}", 0, expires_in_sec=3600) return False def get_full_chain_cert_of_domain(domain: str) -> str: cert_chain = [] # Get initial certificate context = ssl.create_default_context() with socket.create_connection((domain, 443)) as sock: # noqa: SIM117 with context.wrap_socket(sock, server_hostname=domain) as ssl_socket: cert_pem = ssl.DER_cert_to_PEM_cert(ssl_socket.getpeercert(True)) # type: ignore cert = x509.load_pem_x509_certificate(cert_pem.encode(), default_backend()) cert_chain.append(cert_pem) # Walk up the chain via certificate authority information access (AIA) while True: try: aia = cert.extensions.get_extension_for_oid(ExtensionOID.AUTHORITY_INFORMATION_ACCESS) for access in aia.value: if access.access_method._name == "caIssuers": uri = access.access_location._value with urlopen(uri) as response: der_cert = response.read() pem_cert = ssl.DER_cert_to_PEM_cert(der_cert) cert = x509.load_pem_x509_certificate(pem_cert.encode(), default_backend()) cert_chain.append(pem_cert) break except: # noqa: E722 break cert_chain_str = "" for cert in cert_chain: cert_chain_str += cert + "\n" return cert_chain_str def timer(f): @wraps(f) def wrap(*args, **kwargs): start_timestamp = time.time() result = f(*args, **kwargs) end_timestamp = time.time() duration = end_timestamp - start_timestamp if not hasattr(jingrow.local, "timers"): jingrow.local.timers = {} jingrow.local.timers[f.__name__] = jingrow.utils.rounded(duration, precision=3) return result return wrap def validate_subdomain(subdomain: str): site_regex = r"^[a-z0-9][a-z0-9-]*[a-z0-9]$" if not subdomain: jingrow.throw("Subdomain is required to create a site.") if not re.match(site_regex, subdomain): jingrow.throw("Subdomain contains invalid characters. Use lowercase characters, numbers and hyphens") if len(subdomain) > 32: jingrow.throw("Subdomain too long. Use 32 or less characters") if len(subdomain) < 5: jingrow.throw("Subdomain too short. Use 5 or more characters") @site_cache(ttl=120) def servers_using_alternative_port_for_communication() -> list: servers = jingrow.db.get_value( "Jcloude Settings", None, "servers_using_alternative_http_port_for_communication" ) if not servers: return [] sl: list[str] = servers.split("\n") return [x.strip() for x in sl if x.strip()] def get_nearest_cluster(): import math cluster_locations = { "Mumbai": {"latitude": 19.0760, "longitude": 72.8777}, "Zurich": {"latitude": 47.3769, "longitude": 8.5417}, "Frankfurt": {"latitude": 50.1109, "longitude": 8.6821}, "Singapore": {"latitude": 1.3521, "longitude": 103.8198}, "London": {"latitude": 51.5074, "longitude": -0.1278}, "Virginia": {"latitude": 38.8048, "longitude": -77.0469}, "Jakarta": {"latitude": -6.2088, "longitude": 106.8456}, "Bahrain": {"latitude": 26.0667, "longitude": 50.5577}, "UAE": {"latitude": 24.4539, "longitude": 54.3773}, "KSA": {"latitude": 24.7136, "longitude": 46.6753}, "Cape Town": {"latitude": -33.9249, "longitude": 18.4241}, "Johannesburg": {"latitude": -26.2041, "longitude": 28.0473}, } def haversine_distance(lat1, lon1, lat2, lon2): R = 6371 # Radius of Earth in kilometers lat1_rad = math.radians(lat1) lon1_rad = math.radians(lon1) lat2_rad = math.radians(lat2) lon2_rad = math.radians(lon2) longitude_diff = lon2_rad - lon1_rad latitude_diff = lat2_rad - lat1_rad a = ( math.sin(latitude_diff / 2) ** 2 + math.cos(lat1_rad) * math.cos(lat2_rad) * math.sin(longitude_diff / 2) ** 2 ) c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) return R * c user_geo_data = get_country_info() if not user_geo_data: return None user_latitude = user_geo_data.get("lat", 0.0) user_longitude = user_geo_data.get("lon", 0.0) min_distance = float("inf") nearest_cluster = None for cluster_name, cluster_coords in cluster_locations.items(): cluster_latitude = cluster_coords["latitude"] cluster_longitude = cluster_coords["longitude"] distance = haversine_distance(user_latitude, user_longitude, cluster_latitude, cluster_longitude) if distance < min_distance: min_distance = distance nearest_cluster = cluster_name return nearest_cluster