From bed55db16f76729e20a5bba0d65ebb4b24e68fa1 Mon Sep 17 00:00:00 2001 From: jingrow Date: Fri, 27 Jun 2025 15:34:18 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0API=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent/bench.py | 2624 ++++++++++++------------ agent/callbacks.py | 16 +- agent/monitor.py | 228 +- agent/site.py | 1964 +++++++++--------- agent/templates/prometheus/domains.yml | 16 +- agent/templates/prometheus/sites.yml | 26 +- 6 files changed, 2437 insertions(+), 2437 deletions(-) diff --git a/agent/bench.py b/agent/bench.py index bdbfb25..da52f56 100644 --- a/agent/bench.py +++ b/agent/bench.py @@ -1,1312 +1,1312 @@ -from __future__ import annotations - -import hashlib -import json -import os -import shutil -import string -import tempfile -import traceback -from contextlib import suppress -from datetime import datetime, timedelta -from functools import partial -from glob import glob -from pathlib import Path, PurePath -from random import choices -from textwrap import indent -from typing import TYPE_CHECKING, TypedDict - -import requests - -from agent.app import App -from agent.base import AgentException, Base -from agent.exceptions import InvalidSiteConfigException, SiteNotExistsException -from agent.job import job, step -from agent.site import Site -from agent.utils import download_file, end_execution, get_execution_result, get_size - -if TYPE_CHECKING: - from agent.server import Server - - class BenchUpdateApp(TypedDict): - app: string - url: string - hash: string - - class ShouldRunUpdatePhase(TypedDict): - setup_requirements_node: bool - setup_requirements_python: bool - rebuild_frontend: bool - migrate_sites: bool - - -class Bench(Base): - def __init__(self, name: str, server: Server, mounts=None): - super().__init__() - - self.name = name - self.server = server - self.directory = os.path.join(self.server.benches_directory, name) - self.sites_directory = os.path.join(self.directory, "sites") - self.config_directory = os.path.join(self.directory, "config") - self.logs_directory = os.path.join(self.directory, "logs") - self.apps_file = os.path.join(self.directory, "sites", "apps.txt") - self.bench_config_file = os.path.join(self.directory, "config.json") - self.config_file = os.path.join(self.directory, "sites", "common_site_config.json") - self.host = self.config.get("db_host", "localhost") - self.docker_image = self.bench_config.get("docker_image") - self.mounts = mounts - if not ( - os.path.isdir(self.directory) - and os.path.exists(self.sites_directory) - and os.path.exists(self.config_file) - and os.path.exists(self.bench_config_file) - ): - raise Exception - - @step("Deploy Bench") - def deploy(self): - return self.start() - - def dump(self): - return { - "name": self.name, - "apps": {name: app.dump() for name, app in self.apps.items()}, - "config": self.config, - "sites": {name: site.dump() for name, site in self.sites.items()}, - } - - def _delete_older_usage_files(self, max_retention_time): - log_files = glob( - os.path.join( - self.directory, - "logs", - f"{self.server.name}-usage-*.json.log", - ) - ) - - for file in log_files: - if os.stat(file).st_mtime < max_retention_time: - print(f"Deleting {file} as it's older than {max_retention_time}") - os.remove(file) - - def fetch_sites_info(self, since=None): - max_retention_time = (datetime.utcnow() - timedelta(days=7)).timestamp() - self._delete_older_usage_files(max_retention_time) - - if not since: - since = max_retention_time - - info = {} - usage_data = [] - log_files = glob( - os.path.join( - self.server.directory, - "logs", - f"{self.server.name}-usage-*.json.log", - ) - ) - - for file in log_files: - # Only load files that are newer than the since timestamp - if os.stat(file).st_mtime <= since: - continue - - with open(file) as f: - try: - usage_data.extend(json.load(f)) - except json.decoder.JSONDecodeError: - print(f"Error loading JSON from {file}") - - usage_data.sort( - key=lambda x: datetime.fromisoformat(x["timestamp"]), - reverse=True, - ) - - for site in self.sites.values(): - try: - timezone_data = {d["timestamp"]: d["timezone"] for d in usage_data if d["site"] == site.name} - timezone = timezone_data[max(timezone_data)] - except Exception: - timezone = None - - if not (usage_data and timezone): - timezone = site.timezone - - info[site.name] = { - "config": site.config, - "usage": [ - { - "database": d["database"], - "public": d["public"], - "private": d["private"], - "backups": d["backups"], - "timestamp": d["timestamp"], - } - for d in usage_data - if d["site"] == site.name - ], - "timezone": timezone, - } - - return info - - def fetch_sites_analytics(self): - analytics = {} - for site in self.sites.values(): - try: - analytics[site.name] = site.fetch_site_analytics() - except Exception: - import traceback - - traceback.print_exc() - return analytics - - def execute(self, command, input=None, non_zero_throw=True): - return super().execute( - command, - directory=self.directory, - input=input, - non_zero_throw=non_zero_throw, - ) - - def docker_execute(self, command, input=None, subdir=None, non_zero_throw=True): - interactive = "-i" if input else "" - workdir = "/home/jingrow/jingrow-bench" - if subdir: - workdir = os.path.join(workdir, subdir) - - if self.bench_config.get("single_container"): - command = f"docker exec -w {workdir} {interactive} {self.name} {command}" - else: - service = f"{self.name}_worker_default" - task = self.execute(f"docker service ps -f desired-state=Running -q --no-trunc {service}")[ - "output" - ].split()[0] - command = f"docker exec -w {workdir} {interactive} {service}.1.{task} {command}" - return self.execute(command, input=input, non_zero_throw=non_zero_throw) - - @step("New Site") - def bench_new_site(self, name, mariadb_root_password, admin_password): - site_database, temp_user, temp_password = self.create_mariadb_user(name, mariadb_root_password) - try: - return self.docker_execute( - f"bench new-site --no-mariadb-socket " - f"--mariadb-root-username {temp_user} " - f"--mariadb-root-password {temp_password} " - f"--admin-password {admin_password} " - f"--db-name {site_database} {name}" - ) - finally: - self.drop_mariadb_user(name, mariadb_root_password, site_database) - - @job("Create User", priority="high") - def create_user( - self, - site: str, - email: str, - first_name: str, - last_name: str, - password: str | None = None, - ): - _site = Site(site, self) - _site.create_user(email, first_name, last_name, password) - - @job("Complete Setup Wizard") - def complete_setup_wizard(self, site: str, data: dict): - _site = Site(site, self) - return _site.complete_setup_wizard(data) - - @job("Rename Site", priority="high") - def rename_site_job( - self, - site: str, - new_name: str, - create_user: dict | None = None, - config: dict | None = None, - ): - site = get_site_from_name(site, new_name, self) - site.enable_maintenance_mode() - site.wait_till_ready() - if config: - if site.config.get("host_name") == f"https://{site.name}": - config.update({"host_name": f"https://{new_name}"}) - site.update_config(config) - else: - if site.config.get("host_name") == f"https://{site.name}": - site.update_config({"host_name": f"https://{new_name}"}) - site.rename(new_name) - self.setup_nginx() - self.server.reload_nginx() - site.disable_maintenance_mode() - site.enable_scheduler() - if create_user and create_user.get("email"): - site.create_user( - create_user.get("email"), - create_user.get("first_name"), - create_user.get("last_name"), - create_user.get("password"), - ) - - def get_database_name(self, site): - site_directory = os.path.join(self.sites_directory, "sites", site) - return "_" + hashlib.sha1(site_directory.encode()).hexdigest()[:16] - - def get_random_string(self, length): - return "".join(choices(string.ascii_letters + string.digits, k=length)) - - def create_mariadb_user(self, site, mariadb_root_password, database=None): - database = database or self.get_database_name(site) - user = f"{database}_limited" - password = self.get_random_string(16) - queries = [ - f"CREATE OR REPLACE USER '{user}'@'%' IDENTIFIED BY '{password}'", - f"CREATE OR REPLACE DATABASE {user}", - f"GRANT ALL ON {user}.* TO '{user}'@'%'", - f"GRANT RELOAD, CREATE USER ON *.* TO '{user}'@'%'", - f"GRANT ALL ON {database}.* TO '{user}'@'%' WITH GRANT OPTION", - "FLUSH PRIVILEGES", - ] - for query in queries: - command = f'mysql -h {self.host} -uroot -p{mariadb_root_password} -e "{query}"' - self.execute(command) - return database, user, password - - def drop_mariadb_user(self, site, mariadb_root_password, database=None): - database = database or self.get_database_name(site) - user = f"{database}_limited" - queries = [ - f"DROP DATABASE IF EXISTS {user}", - f"DROP USER IF EXISTS '{user}'@'%'", - "FLUSH PRIVILEGES", - ] - for query in queries: - command = f'mysql -h {self.host} -uroot -p{mariadb_root_password} -e "{query}"' - self.execute(command) - - def fetch_monitor_data(self): - lines = [] - try: - monitor_log_file = os.path.join(self.directory, "logs", "monitor.json.log") - time = datetime.utcnow().isoformat() - logs_directory = os.path.join( - self.server.directory, - "logs", - ) - target_file = os.path.join( - logs_directory, - f"{self.name}-{time}-monitor.json.log", - ) - if os.path.exists(monitor_log_file): - shutil.move(monitor_log_file, target_file) - - with open(target_file) as f: - for line in f.readlines(): - try: - lines.append(json.loads(line)) - except Exception: - traceback.print_exc() - - now = datetime.now().timestamp() - for file in os.listdir(logs_directory): - path = os.path.join(logs_directory, file) - if file.endswith("-monitor.json.log") and (now - os.stat(path).st_mtime) > (7 * 86400): - os.remove(path) - except FileNotFoundError: - pass - except Exception: - traceback.print_exc() - return lines - - def _parse_pids(self, lines): - pids = [] - lines = lines.strip().split("\n") - - for line in lines: - parts = line.strip().split() - name, pid = parts[0], parts[1] - pids.append((name, pid)) - - return pids - - def get_worker_pids(self): - """Get all the processes running gunicorn for now""" - return self._parse_pids(self.execute(f"docker top {self.name} | grep gunicorn")["output"]) - - def take_snapshot(self, pid_info: list[tuple[str, str]]): - snapshots = {} - pyspy_bin = os.path.join(self.server.directory, "env/bin/py-spy") - - for name, pid in pid_info: - try: - snapshots[f"{name}:{pid}"] = json.loads( - self.execute(f"sudo {pyspy_bin} dump --pid {pid} --json")["output"] - ) - except AgentException as e: - snapshots[f"{name}:{pid}"] = str(e) - - return snapshots - - def status(self): - status = { - "sites": {site: {"scheduler": True, "web": True} for site in self.sites}, - "timestamp": str(datetime.now()), - } - - for site in _inactive_scheduler_sites(self): - status["sites"][site]["scheduler"] = False - - for site in _inactive_web_sites(self): - status["sites"][site]["web"] = False - - return status - - @job("New Site", priority="high") - def new_site( - self, - name, - config, - apps, - mariadb_root_password, - admin_password, - create_user: dict | None = None, - ): - self.bench_new_site(name, mariadb_root_password, admin_password) - site = Site(name, self) - site.install_apps(apps) - site.update_config(config) - site.enable_scheduler() - if create_user and create_user.get("email"): - site.create_user( - create_user.get("email"), - create_user.get("first_name"), - create_user.get("last_name"), - create_user.get("password"), - ) - self.setup_nginx() - self.server.reload_nginx() - - @job("New Site from Backup", priority="high") - def new_site_from_backup( - self, - name, - default_config, - apps, - mariadb_root_password, - admin_password, - site_config, - database, - public, - private, - skip_failing_patches, - ): - files = self.download_files(name, database, public, private) - self.bench_new_site(name, mariadb_root_password, admin_password) - site = Site(name, self) - site.update_config(default_config) - try: - site.restore( - mariadb_root_password, - admin_password, - files["database"], - files["public"], - files["private"], - ) - if site_config: - site_config = json.loads(site_config) - site.update_config(site_config) - finally: - self.delete_downloaded_files(files["directory"]) - site.uninstall_unavailable_apps(apps) - site.migrate(skip_failing_patches=skip_failing_patches) - site.set_admin_password(admin_password) - site.enable_scheduler() - self.setup_nginx() - self.server.reload_nginx() - - return site.bench_execute("list-apps") - - @step("Archive Site") - def bench_archive_site(self, name, mariadb_root_password, force): - site_database, temp_user, temp_password = self.create_mariadb_user( - name, mariadb_root_password, self.valid_sites[name].database - ) - force_flag = "--force" if force else "" - try: - return self.docker_execute( - f"bench drop-site --no-backup {force_flag} " - f"--root-login {temp_user} --root-password {temp_password} " - f"--archived-sites-path archived {name}" - ) - finally: - self.drop_mariadb_user(name, mariadb_root_password, site_database) - - @step("Download Backup Files") - def download_files(self, name, database_url, public_url, private_url): - download_directory = os.path.join(self.sites_directory, "downloads") - if not os.path.exists(download_directory): - os.mkdir(download_directory) - directory = tempfile.mkdtemp(prefix="agent-upload-", suffix=f"-{name}", dir=download_directory) - database_file = download_file(database_url, prefix=directory) - private_file = download_file(private_url, prefix=directory) if private_url else "" - public_file = download_file(public_url, prefix=directory) if public_url else "" - return { - "directory": directory, - "database": database_file, - "private": private_file, - "public": public_file, - } - - @step("Delete Downloaded Backup Files") - def delete_downloaded_files(self, backup_files_directory): - shutil.rmtree(backup_files_directory) - - @job("Archive Site") - def archive_site(self, name, mariadb_root_password, force): - site_directory = os.path.join(self.sites_directory, name) - if os.path.exists(site_directory): - self.bench_archive_site(name, mariadb_root_password, force) - self.setup_nginx() - self.server._reload_nginx() - - @step("Bench Setup NGINX") - def setup_nginx(self): - from filelock import FileLock - - with FileLock(os.path.join(self.directory, "nginx.config.lock")): - self.generate_nginx_config() - return self.server._reload_nginx() - - @step("Bench Setup NGINX Target") - def setup_nginx_target(self): - from filelock import FileLock - - with FileLock(os.path.join(self.directory, "nginx.config.lock")): - self.generate_nginx_config() - return self.server._reload_nginx() - - def _set_sites_host(self, sites: list[Site]): - for site in sites: - for wildcard_domain in self.server.wildcards: - if site.name.endswith("." + wildcard_domain): - site.host = "*." + wildcard_domain - - def generate_nginx_config(self): - sites = [s for s in self.valid_sites.values()] - domains = _get_domains(sites) - - if standalone := self.server.config.get("standalone"): - self._set_sites_host(sites) - - codeserver = _get_codeserver_config(self.directory) - - config = { - "bench_name": self.name, - "bench_name_slug": self.name.replace("-", "_"), - "domain": self.server.config.get("domain"), - "sites": sites, - "domains": domains, - "http_timeout": self.bench_config["http_timeout"], - "web_port": self.bench_config["web_port"], - "socketio_port": self.bench_config["socketio_port"], - "sites_directory": self.sites_directory, - "standalone": standalone, - "error_pages_directory": self.server.error_pages_directory, - "nginx_directory": self.server.nginx_directory, - "tls_protocols": self.server.config.get("tls_protocols"), - "code_server": codeserver, - } - nginx_config = os.path.join(self.directory, "nginx.conf") - - self.server._render_template("bench/nginx.conf.jinja2", config, nginx_config) - - @step("Bench Disable Production") - def disable_production(self): - try: - return self.stop() - except AgentException as e: - if "No such container" in e.data["output"]: - pass - else: - raise - - @job("Bench Restart") - def restart_job(self, web_only=False): - return self.restart(web_only=web_only) - - @step("Bench Restart") - def restart(self, web_only=False): - return self.docker_execute(f"bench restart {'--web' if web_only else ''}") - - @job("Rebuild Bench Assets") - def rebuild_job(self): - return self.rebuild() - - @step("Rebuild Bench Assets") - def rebuild(self, apps: list[str] | None = None, is_inplace: bool = False): - if not apps: - return self.docker_execute("bench build") - - if len(apps) == 1 and not is_inplace: - return self.docker_execute(f"bench build --app {apps[0]}") - - return self.docker_execute(f"bench build --apps {','.join(apps)}") - - @property - def apps(self): - with open(self.apps_file, "r") as f: - apps_list = f.read().split("\n") - - apps = {} - for directory in apps_list: - with suppress(Exception): - apps[directory] = App(directory, self) - return apps - - @step("Update Bench Configuration") - def update_config(self, common_site_config, bench_config): - self._update_config(common_site_config, bench_config) - - def _update_config( - self, - common_site_config: dict | None = None, - bench_config: dict | None = None, - ): - if common_site_config: - new_common_site_config = self.get_config(for_update=True) - new_common_site_config.update(common_site_config) - self.set_config(new_common_site_config) - - if bench_config: - new_bench_config = self.bench_config - new_bench_config.update(bench_config) - self.set_bench_config(new_bench_config) - - @job("Update Bench Configuration", priority="high") - def update_config_job(self, common_site_config, bench_config): - old_config = self.bench_config - self.update_config(common_site_config, bench_config) - self.setup_nginx() - if self.bench_config.get("single_container"): - self.update_supervisor() - self.update_runtime_limits() - if (old_config["web_port"] != bench_config["web_port"]) or ( - old_config["socketio_port"] != bench_config["socketio_port"] - ): - self.deploy() - else: - self.generate_docker_compose_file() - self.deploy() - - @step("Update Supervisor Configuration") - def update_supervisor(self): - self.generate_supervisor_config() - self.docker_execute("supervisorctl reread") - self.docker_execute("supervisorctl update") - - def generate_supervisor_config(self): - supervisor_config = os.path.join(self.directory, "config", "supervisor.conf") - self.server._render_template( - "bench/supervisor.conf", - { - "background_workers": self.bench_config["background_workers"], - "gunicorn_workers": self.bench_config["gunicorn_workers"], - "http_timeout": self.bench_config["http_timeout"], - "name": self.name, - "statsd_host": self.bench_config["statsd_host"], - "is_ssh_enabled": self.bench_config.get("is_ssh_enabled", False), - "merge_all_rq_queues": self.bench_config.get("merge_all_rq_queues", False), - "merge_default_and_short_rq_queues": self.bench_config.get( - "merge_default_and_short_rq_queues", False - ), - "use_rq_workerpool": self.bench_config.get("use_rq_workerpool", False), - "environment_variables": self.bench_config.get("environment_variables"), - "gunicorn_threads_per_worker": self.bench_config.get("gunicorn_threads_per_worker"), - "is_code_server_enabled": self.bench_config.get("is_code_server_enabled", False), - }, - supervisor_config, - ) - - @step("Generate Docker Compose File") - def generate_docker_compose_file(self): - config = self.bench_config - config.update({"directory": self.directory}) - docker_compose = os.path.join(self.directory, "docker-compose.yml") - self.server._render_template("bench/docker-compose.yml.jinja2", config, docker_compose) - - @job("Setup Code Server") - def setup_code_server(self, name, password): - self.create_code_server_config(name) - self._start_code_server(password, setup=True) - self.generate_nginx_config() - self.server._reload_nginx() - - @step("Create Code Server Config") - def create_code_server_config(self, name): - code_server_path = os.path.join(self.directory, "codeserver") - if not os.path.exists(code_server_path): - os.mkdir(code_server_path) - - filename = os.path.join(code_server_path, name) - with open(filename, "w") as file: - file.write(str(self.bench_config.get("codeserver_port"))) - - @step("Start Code Server") - def _start_code_server(self, password, setup=False): - if setup: - self.docker_execute("supervisorctl start code-server:") - - self.docker_execute( - f"sed -i 's/^password:.*/password: {password}/' /home/jingrow/.config/code-server/config.yaml" - ) - self.docker_execute("supervisorctl restart code-server:") - - @step("Stop Code Server") - def _stop_code_server(self): - self.docker_execute("supervisorctl stop code-server:") - - @job("Start Code Server") - def start_code_server(self, password): - self._start_code_server(password) - - @job("Stop Code Server") - def stop_code_server(self): - self._stop_code_server() - - @job("Archive Code Server") - def archive_code_server(self): - if os.path.exists(self.directory): - self.remove_code_server() - self.setup_nginx() - self.server._reload_nginx() - - @step("Remove Code Server") - def remove_code_server(self): - code_server_path = os.path.join(self.directory, "codeserver") - shutil.rmtree(code_server_path) - self.docker_execute("supervisorctl stop code-server:") - - def prepare_mounts_on_host(self, bench_directory): - mounts_cmd = "" - - if not self.mounts: - return mounts_cmd - - def _create_mounts(host_path): - if not os.path.exists(host_path): - os.mkdir(host_path) - - for mp in self.mounts: - host_path = mp["source"] - destination_path = mp["destination"] - - if not mp["is_absolute_path"]: - """ - self.server.benches_directory = /home/jingrow/benches (Host) - bench_directory = "/home/jingrow/jingrow-bench" (container) - """ - host_path = os.path.join(self.server.benches_directory, mp["source"]) - destination_path = os.path.join(bench_directory, mp["destination"]) - - _create_mounts(host_path) - - mounts_cmd += f" -v {host_path}:{destination_path} " - - return mounts_cmd - - def start(self): - if self.bench_config.get("single_container"): - try: - self.execute(f"docker stop {self.name}") - self.execute(f"docker rm {self.name}") - except Exception: - pass - - ssh_port = self.bench_config.get("ssh_port", self.bench_config["web_port"] + 4000) - ssh_ip = self.bench_config.get("private_ip", "127.0.0.1") - - rq_port = self.bench_config.get("rq_port") - rq_port_mapping = f"-p 127.0.0.1:{rq_port}:11000 " - - bench_directory = "/home/jingrow/jingrow-bench" - mounts = self.prepare_mounts_on_host(bench_directory) - - command = ( - "docker run -d --init -u jingrow " - f"--restart always --hostname {self.name} " - f"-p 127.0.0.1:{self.bench_config['web_port']}:8000 " - f"-p 127.0.0.1:{self.bench_config['socketio_port']}:9000 " - f"-p 127.0.0.1:{self.bench_config['codeserver_port']}:8088 " - f"{rq_port_mapping if rq_port else ''}" - f"-p {ssh_ip}:{ssh_port}:2200 " - f"-v {self.sites_directory}:{bench_directory}/sites " - f"-v {self.logs_directory}:{bench_directory}/logs " - f"-v {self.config_directory}:{bench_directory}/config " - f"{mounts} " - f"--name {self.name} {self.bench_config['docker_image']}" - ) - else: - command = ( - "docker stack deploy " - "--resolve-image=never --with-registry-auth " - f"--compose-file docker-compose.yml {self.name} " - ) - return self.execute(command) - - def stop(self): - if self.bench_config.get("single_container"): - self.execute(f"docker stop {self.name}") - return self.execute(f"docker rm {self.name}") - return self.execute(f"docker stack rm {self.name}") - - @step("Stop Bench") - def _stop(self): - return self.execute(f"docker stop {self.name}") - - @step("Start Bench") - def _start(self): - return self.execute(f"docker start {self.name}") - - @job("Force Update Bench Limits") - def force_update_limits(self, memory_high, memory_max, memory_swap, vcpu): - self._stop() - self._update_runtime_limits(memory_high, memory_max, memory_swap, vcpu) - self._start() - - def update_runtime_limits(self): - memory_high = self.bench_config.get("memory_high") - memory_max = self.bench_config.get("memory_max") - memory_swap = self.bench_config.get("memory_swap") - vcpu = self.bench_config.get("vcpu") - if not any([memory_high, memory_max, memory_swap, vcpu]): - return - self._update_runtime_limits(memory_high, memory_max, memory_swap, vcpu) - - @step("Update Bench Memory Limits") - def _update_runtime_limits(self, memory_high, memory_max, memory_swap, vcpu): - cmd = f"docker update {self.name}" - if memory_high: - cmd += f" --memory-reservation={memory_high}M" - if memory_max: - cmd += f" --memory={memory_max}M" - if memory_swap: - cmd += f" --memory-swap={memory_swap}M" - if vcpu: - cmd += f" --cpus={vcpu}" - return self.execute(cmd) - - @property - def job_record(self): - return self.server.job_record - - def readable_jde_err(self, title: str, jde: json.decoder.JSONDecodeError) -> str: - output = f"{title}:\n{jde.doc}\n{jde}\n" - import re - - output = re.sub(r'("db_name":.* ")(\w*)(")', r"\1********\3", output) - return re.sub(r'("db_password":.* ")(\w*)(")', r"\1********\3", output) - - @property - def sites(self): - return self._sites() - - @property - def valid_sites(self): - return self._sites(validate_configs=True) - - def _sites(self, validate_configs=False) -> dict[str, Site]: - sites = {} - for directory in os.listdir(self.sites_directory): - try: - sites[directory] = Site(directory, self) - except json.decoder.JSONDecodeError as jde: - output = self.readable_jde_err(f"Error parsing JSON in {directory}", jde) - try: - self.execute( - f"echo '{output}';exit {int(validate_configs)}", - ) # exit 1 to make sure the job fails and shows output - except AgentException as e: - raise InvalidSiteConfigException(e.data, directory) from e - except Exception: - pass - return sites - - def get_site(self, site): - try: - return self.valid_sites[site] - except KeyError as e: - raise SiteNotExistsException(site, self.name) from e - except InvalidSiteConfigException as e: - if e.site == site: - raise - - @property - def step_record(self): - return self.server.step_record - - @step_record.setter - def step_record(self, value): - self.server.step_record = value - - def get_usage(self): - return { - "storage": get_size(self.directory), - "database": sum([site.get_database_size() for site in self.sites.values()]), - } - - @property - def bench_config(self) -> dict: - with open(self.bench_config_file, "r") as f: - return json.load(f) - - def set_bench_config(self, value, indent=1): - """ - To avoid partial writes, we need to first write the config to a temporary file, - then rename it to the original file. - """ - with tempfile.NamedTemporaryFile(mode="w", delete=False) as temp_file: - json.dump(value, temp_file, indent=indent, sort_keys=True) - temp_file.flush() - os.fsync(temp_file.fileno()) - temp_file.close() - - os.rename(temp_file.name, self.bench_config_file) - - @job("Patch App") - def patch_app( - self, - app: str, - patch: str, - filename: str, - build_assets: bool, - revert: bool, - ): - patch_container_path = self.prepare_app_patch(app, patch, filename) - self.git_apply(app, revert, patch_container_path) - - if build_assets: - self.rebuild() - - self.restart() - - def prepare_app_patch(self, app: str, patch: str, filename: str) -> str: - """ - Function returns path inside the container, the sites is - mounted in the container at a different path from that of - the bench outside it. - """ - relative = ["sites", "patches", app] - patch_dir = Path(os.path.join(self.directory, *relative)) - patch_dir.mkdir(parents=True, exist_ok=True) - - bench_container_dir = "/home/jingrow/jingrow-bench" - patch_container_dir = os.path.join(bench_container_dir, *relative, filename) - - patch_path = patch_dir / filename - if patch_path.is_file(): - return patch_container_dir - - with patch_path.open("w") as f: - f.write(patch) - - return patch_container_dir - - @step("Git Apply") - def git_apply(self, app: str, revert: bool, patch_container_path: str): - command = "git apply " - if revert: - command += "--reverse " - command += patch_container_path - - app_path = os.path.join("apps", app) - self.docker_execute(command, subdir=app_path) - - @job("Call Bench Supervisorctl") - def call_supervisorctl(self, command: str, programs: list[str]): - self.run_supervisorctl_command(command, programs) - - @step("Run Supervisorctl Command") - def run_supervisorctl_command(self, command: str, programs: list[str]): - target = "all" - if len(programs) > 0: - target = " ".join(programs) - self.docker_execute(f"supervisorctl {command} {target}") - - @job("Update Bench In Place") - def update_inplace( - self, - sites: list[str], - image: str, - apps: list[BenchUpdateApp], - ): - if not (diff_dict := self.pull_app_changes(apps).get("diff")): - return - - should_run = get_should_run_update_phase(diff_dict) - - node = should_run["setup_requirements_node"] - python = should_run["setup_requirements_python"] - if node or python: - self.setup_requirements(node, python) - - if should_run["migrate_sites"]: - self.migrate_sites(sites) - - if should_run["rebuild_frontend"]: - self.rebuild(apps=[app["app"] for app in apps], is_inplace=True) - - # commit container changes - self.commit_container_changes(image) - - # restart site - self.restart(web_only=False) - - @step("Pull App Changes") - def pull_app_changes(self, apps: list[BenchUpdateApp]): - res = get_execution_result() - - diff: dict[str, list[str]] = {} - outputs: list[str] = [] - for app in apps: - if not (files := self._pull_app_change(app)): - continue - - app_name = app["app"] - diff[app_name] = files - - output = "\n".join( - [ - app_name, - indent("\n".join(files), " "), - ] - ) - outputs.append(output) - - res = end_execution(res, "\n\n".join(outputs)) - res["diff"] = diff - return res - - def _pull_app_change(self, app: BenchUpdateApp) -> list[str]: - remote = "inplace" - app_path = os.path.join("apps", app["app"]) - exec = partial(self.docker_execute, subdir=app_path) - - self.set_git_remote(app["app"], app["url"], remote) - - app_path: str = os.path.join("apps", app["app"]) - new_hash: str = app["hash"] - old_hash: str = exec("git rev-parse HEAD")["output"] - - if old_hash == new_hash: - # Remove remote, url might be private - exec(f"git remote remove {remote}") - return [] - - # Fetch new hash and get changed files - exec(f"git fetch --depth 1 {remote} {new_hash}") - diff: str = exec(f"git diff --name-only {old_hash} {new_hash}")["output"] - - # Ensure repo is not dirty and checkout next_hash - exec(f"git reset --hard {old_hash}") - exec("git clean -fd") - exec(f"git checkout {new_hash}") - - # Remove remote, url might be private - exec(f"git remote remove {remote}") - return [s for s in diff.split("\n") if s] - - def set_git_remote( - self, - app: str, - url: str, - remote: str, - ): - app_path = os.path.join("apps", app) - res = self.docker_execute( - f"git remote get-url {remote}", - subdir=app_path, - non_zero_throw=False, - ) - - if res["output"] == url: - return - - if res["returncode"] == 0: - self.docker_execute( - f"git remote remove {remote}", - subdir=app_path, - ) - - self.docker_execute( - f"git remote add {remote} {url}", - subdir=app_path, - ) - - @step("Setup Requirements") - def setup_requirements(self, node: bool = True, python: bool = True): - flag = "" - - if node and not python: - flag = " --node" - - if not node and python: - flag = " --python" - - return self.docker_execute("bench setup requirements" + flag) - - @step("Migrate Sites") - def migrate_sites( - self, - sites: list[str], - skip_search_index: bool = False, - skip_failing_patches: bool = False, - ): - res = get_execution_result() - outputs: list[str] = [] - - for site_name in sites: - migrate_res = self.migrate_site( - self.sites[site_name], - skip_search_index, - skip_failing_patches, - ) - output = "\n".join( - [ - site_name, - indent(migrate_res["output"], " "), - ] - ) - outputs.append(output) - - return end_execution( - res, - "\n\n".join(outputs), - ) - - def migrate_site( - self, - site: Site, - skip_search_index: bool = False, - skip_failing_patches: bool = False, - ): - site._enable_maintenance_mode() - res = site._migrate( - skip_search_index, - skip_failing_patches, - ) - site._disable_maintenance_mode() - return res - - @step("Commit Container Changes") - def commit_container_changes(self, image: str): - container_id = self.execute(f'docker ps -aqf "name={self.name}"')["output"] - res = self.execute(f"docker commit {container_id} {image}") - self._update_config(bench_config={"docker_image": image}) - return res - - @job("Recover Update In Place") - def recover_update_inplace( - self, - site_names: list[str], - image: str, - ): - self._update_config(bench_config={"docker_image": image}) - sites = [Site(name, self) for name in site_names] - - # Enable maintenance mode on sites if possible - self.enable_maintenance_mode(sites) - - """ - Will stop and remove failed inplace updated container and - start last running container pointed to by image. - """ - self.deploy() - - self.setup_nginx() - self.recover_sites(sites) - - @step("Enable Maintenance Mode") - def enable_maintenance_mode(self, sites: list[Site]): - for site in sites: - with suppress(Exception): - site._enable_maintenance_mode() - - @step("Recover Sites") - def recover_sites(self, sites: list[Site]): - for site in sites: - site._restore_touched_tables() - with suppress(Exception): - site.generate_theme_files() - site._disable_maintenance_mode() - - -def get_should_run_update_phase( - diff_dict: dict[str, list[str]], -) -> ShouldRunUpdatePhase: - diff = [] - for dl in diff_dict.values(): - diff.extend(dl) - - setup_node = False - setup_python = False - rebuild = False - migrate = False - - for file in diff: - if all([setup_node, setup_python, rebuild, migrate]): - break - - if not setup_node: - setup_node = should_setup_requirements_node(file) - - if not setup_python: - setup_python = should_setup_requirements_py(file) - - if not rebuild: - rebuild = should_rebuild_frontend(file) - - if not migrate: - migrate = should_migrate_sites(file) - - return dict( - setup_requirements_node=setup_node, - setup_requirements_python=setup_python, - rebuild_frontend=rebuild, - migrate_sites=migrate, - ) - - -def should_setup_requirements_node(file: str) -> bool: - return _should_run_phase( - file, - [ - "package.json", - "package-lock.json", - "yarn.lock", - ".lockb", - "pnpm-lock.yaml", - ], - ) - - -def should_setup_requirements_py(file: str) -> bool: - return _should_run_phase( - file, - ["pyproject.toml", "setup.py", "requirements.txt"], - ) - - -def should_rebuild_frontend(file: str) -> bool: - return _should_run_phase( - file, - [ - ".js", - ".ts", - ".html", - ".vue", - ".jsx", - ".tsx", - ".css", - ".scss", - ".sass", - ], - ["www", "public", "frontend", "dashboard"], - ) - - -def should_migrate_sites(file: str) -> bool: - return _should_run_phase( - file, - ["hooks.py", ".json"], - ["patches"], - ) - - -def _should_run_phase( - file: str, - ends: list[str] | None = None, - subs: list[str] | None = None, - globs: list[str] | None = None, -) -> bool: - ends = ends or [] - subs = subs or [] - globs = globs or [] - - if any([file.endswith(e) for e in ends]): - return True - - if any([s in file for s in subs]): - return True - - return any([PurePath(file).match(s) for s in globs]) - - -def get_site_from_name(name: str, new_name: str, bench: Bench): - try: - return Site(name, bench) - except OSError: - pass - - try: - return Site(new_name, bench) - except OSError: - raise Exception(f"Neither {name} nor {new_name} exists") from None - - -def _touch_currentsite_file(bench: Bench): - file = os.path.join(bench.sites_directory, "currentsite.txt") - open(file, "w").close() - - -def _inactive_scheduler_sites(bench: Bench): - inactive = [] - _touch_currentsite_file(bench) - try: - doctor = bench.docker_execute("bench doctor")["output"].split("\n") - except AgentException as e: - doctor = e.data["output"] - - for line in doctor: - if "inactive" in line: - site = line.split(" ")[-1] - inactive.append(site) - return inactive - - -def _inactive_web_sites(bench: Bench): - inactive = [] - session = requests.Session() - for site in bench.sites: - url = f"https://{site}/api/method/ping" - try: - result = session.get(url) - except Exception as e: - result = None - print("Ping Failed", url, e) - if not result or result.status_code != 200: - inactive.append(site) - return inactive - - -def _get_domains(sites: list[Site]): - domains: dict[str, str] = {} - for site in sites: - for domain in site.config.get("domains", []): - domains[domain] = site.name - return domains - - -def _get_codeserver_config(bench_directory: str): - codeserver_directory = os.path.join(bench_directory, "codeserver") - - if not os.path.exists(codeserver_directory): - return {} - - codeservers = os.listdir(codeserver_directory) - if not codeservers: - return {} - - with open(os.path.join(codeserver_directory, codeservers[0])) as file: - port = file.read() - - return {"name": codeservers[0], "port": port} +from __future__ import annotations + +import hashlib +import json +import os +import shutil +import string +import tempfile +import traceback +from contextlib import suppress +from datetime import datetime, timedelta +from functools import partial +from glob import glob +from pathlib import Path, PurePath +from random import choices +from textwrap import indent +from typing import TYPE_CHECKING, TypedDict + +import requests + +from agent.app import App +from agent.base import AgentException, Base +from agent.exceptions import InvalidSiteConfigException, SiteNotExistsException +from agent.job import job, step +from agent.site import Site +from agent.utils import download_file, end_execution, get_execution_result, get_size + +if TYPE_CHECKING: + from agent.server import Server + + class BenchUpdateApp(TypedDict): + app: string + url: string + hash: string + + class ShouldRunUpdatePhase(TypedDict): + setup_requirements_node: bool + setup_requirements_python: bool + rebuild_frontend: bool + migrate_sites: bool + + +class Bench(Base): + def __init__(self, name: str, server: Server, mounts=None): + super().__init__() + + self.name = name + self.server = server + self.directory = os.path.join(self.server.benches_directory, name) + self.sites_directory = os.path.join(self.directory, "sites") + self.config_directory = os.path.join(self.directory, "config") + self.logs_directory = os.path.join(self.directory, "logs") + self.apps_file = os.path.join(self.directory, "sites", "apps.txt") + self.bench_config_file = os.path.join(self.directory, "config.json") + self.config_file = os.path.join(self.directory, "sites", "common_site_config.json") + self.host = self.config.get("db_host", "localhost") + self.docker_image = self.bench_config.get("docker_image") + self.mounts = mounts + if not ( + os.path.isdir(self.directory) + and os.path.exists(self.sites_directory) + and os.path.exists(self.config_file) + and os.path.exists(self.bench_config_file) + ): + raise Exception + + @step("Deploy Bench") + def deploy(self): + return self.start() + + def dump(self): + return { + "name": self.name, + "apps": {name: app.dump() for name, app in self.apps.items()}, + "config": self.config, + "sites": {name: site.dump() for name, site in self.sites.items()}, + } + + def _delete_older_usage_files(self, max_retention_time): + log_files = glob( + os.path.join( + self.directory, + "logs", + f"{self.server.name}-usage-*.json.log", + ) + ) + + for file in log_files: + if os.stat(file).st_mtime < max_retention_time: + print(f"Deleting {file} as it's older than {max_retention_time}") + os.remove(file) + + def fetch_sites_info(self, since=None): + max_retention_time = (datetime.utcnow() - timedelta(days=7)).timestamp() + self._delete_older_usage_files(max_retention_time) + + if not since: + since = max_retention_time + + info = {} + usage_data = [] + log_files = glob( + os.path.join( + self.server.directory, + "logs", + f"{self.server.name}-usage-*.json.log", + ) + ) + + for file in log_files: + # Only load files that are newer than the since timestamp + if os.stat(file).st_mtime <= since: + continue + + with open(file) as f: + try: + usage_data.extend(json.load(f)) + except json.decoder.JSONDecodeError: + print(f"Error loading JSON from {file}") + + usage_data.sort( + key=lambda x: datetime.fromisoformat(x["timestamp"]), + reverse=True, + ) + + for site in self.sites.values(): + try: + timezone_data = {d["timestamp"]: d["timezone"] for d in usage_data if d["site"] == site.name} + timezone = timezone_data[max(timezone_data)] + except Exception: + timezone = None + + if not (usage_data and timezone): + timezone = site.timezone + + info[site.name] = { + "config": site.config, + "usage": [ + { + "database": d["database"], + "public": d["public"], + "private": d["private"], + "backups": d["backups"], + "timestamp": d["timestamp"], + } + for d in usage_data + if d["site"] == site.name + ], + "timezone": timezone, + } + + return info + + def fetch_sites_analytics(self): + analytics = {} + for site in self.sites.values(): + try: + analytics[site.name] = site.fetch_site_analytics() + except Exception: + import traceback + + traceback.print_exc() + return analytics + + def execute(self, command, input=None, non_zero_throw=True): + return super().execute( + command, + directory=self.directory, + input=input, + non_zero_throw=non_zero_throw, + ) + + def docker_execute(self, command, input=None, subdir=None, non_zero_throw=True): + interactive = "-i" if input else "" + workdir = "/home/jingrow/jingrow-bench" + if subdir: + workdir = os.path.join(workdir, subdir) + + if self.bench_config.get("single_container"): + command = f"docker exec -w {workdir} {interactive} {self.name} {command}" + else: + service = f"{self.name}_worker_default" + task = self.execute(f"docker service ps -f desired-state=Running -q --no-trunc {service}")[ + "output" + ].split()[0] + command = f"docker exec -w {workdir} {interactive} {service}.1.{task} {command}" + return self.execute(command, input=input, non_zero_throw=non_zero_throw) + + @step("New Site") + def bench_new_site(self, name, mariadb_root_password, admin_password): + site_database, temp_user, temp_password = self.create_mariadb_user(name, mariadb_root_password) + try: + return self.docker_execute( + f"bench new-site --no-mariadb-socket " + f"--mariadb-root-username {temp_user} " + f"--mariadb-root-password {temp_password} " + f"--admin-password {admin_password} " + f"--db-name {site_database} {name}" + ) + finally: + self.drop_mariadb_user(name, mariadb_root_password, site_database) + + @job("Create User", priority="high") + def create_user( + self, + site: str, + email: str, + first_name: str, + last_name: str, + password: str | None = None, + ): + _site = Site(site, self) + _site.create_user(email, first_name, last_name, password) + + @job("Complete Setup Wizard") + def complete_setup_wizard(self, site: str, data: dict): + _site = Site(site, self) + return _site.complete_setup_wizard(data) + + @job("Rename Site", priority="high") + def rename_site_job( + self, + site: str, + new_name: str, + create_user: dict | None = None, + config: dict | None = None, + ): + site = get_site_from_name(site, new_name, self) + site.enable_maintenance_mode() + site.wait_till_ready() + if config: + if site.config.get("host_name") == f"https://{site.name}": + config.update({"host_name": f"https://{new_name}"}) + site.update_config(config) + else: + if site.config.get("host_name") == f"https://{site.name}": + site.update_config({"host_name": f"https://{new_name}"}) + site.rename(new_name) + self.setup_nginx() + self.server.reload_nginx() + site.disable_maintenance_mode() + site.enable_scheduler() + if create_user and create_user.get("email"): + site.create_user( + create_user.get("email"), + create_user.get("first_name"), + create_user.get("last_name"), + create_user.get("password"), + ) + + def get_database_name(self, site): + site_directory = os.path.join(self.sites_directory, "sites", site) + return "_" + hashlib.sha1(site_directory.encode()).hexdigest()[:16] + + def get_random_string(self, length): + return "".join(choices(string.ascii_letters + string.digits, k=length)) + + def create_mariadb_user(self, site, mariadb_root_password, database=None): + database = database or self.get_database_name(site) + user = f"{database}_limited" + password = self.get_random_string(16) + queries = [ + f"CREATE OR REPLACE USER '{user}'@'%' IDENTIFIED BY '{password}'", + f"CREATE OR REPLACE DATABASE {user}", + f"GRANT ALL ON {user}.* TO '{user}'@'%'", + f"GRANT RELOAD, CREATE USER ON *.* TO '{user}'@'%'", + f"GRANT ALL ON {database}.* TO '{user}'@'%' WITH GRANT OPTION", + "FLUSH PRIVILEGES", + ] + for query in queries: + command = f'mysql -h {self.host} -uroot -p{mariadb_root_password} -e "{query}"' + self.execute(command) + return database, user, password + + def drop_mariadb_user(self, site, mariadb_root_password, database=None): + database = database or self.get_database_name(site) + user = f"{database}_limited" + queries = [ + f"DROP DATABASE IF EXISTS {user}", + f"DROP USER IF EXISTS '{user}'@'%'", + "FLUSH PRIVILEGES", + ] + for query in queries: + command = f'mysql -h {self.host} -uroot -p{mariadb_root_password} -e "{query}"' + self.execute(command) + + def fetch_monitor_data(self): + lines = [] + try: + monitor_log_file = os.path.join(self.directory, "logs", "monitor.json.log") + time = datetime.utcnow().isoformat() + logs_directory = os.path.join( + self.server.directory, + "logs", + ) + target_file = os.path.join( + logs_directory, + f"{self.name}-{time}-monitor.json.log", + ) + if os.path.exists(monitor_log_file): + shutil.move(monitor_log_file, target_file) + + with open(target_file) as f: + for line in f.readlines(): + try: + lines.append(json.loads(line)) + except Exception: + traceback.print_exc() + + now = datetime.now().timestamp() + for file in os.listdir(logs_directory): + path = os.path.join(logs_directory, file) + if file.endswith("-monitor.json.log") and (now - os.stat(path).st_mtime) > (7 * 86400): + os.remove(path) + except FileNotFoundError: + pass + except Exception: + traceback.print_exc() + return lines + + def _parse_pids(self, lines): + pids = [] + lines = lines.strip().split("\n") + + for line in lines: + parts = line.strip().split() + name, pid = parts[0], parts[1] + pids.append((name, pid)) + + return pids + + def get_worker_pids(self): + """Get all the processes running gunicorn for now""" + return self._parse_pids(self.execute(f"docker top {self.name} | grep gunicorn")["output"]) + + def take_snapshot(self, pid_info: list[tuple[str, str]]): + snapshots = {} + pyspy_bin = os.path.join(self.server.directory, "env/bin/py-spy") + + for name, pid in pid_info: + try: + snapshots[f"{name}:{pid}"] = json.loads( + self.execute(f"sudo {pyspy_bin} dump --pid {pid} --json")["output"] + ) + except AgentException as e: + snapshots[f"{name}:{pid}"] = str(e) + + return snapshots + + def status(self): + status = { + "sites": {site: {"scheduler": True, "web": True} for site in self.sites}, + "timestamp": str(datetime.now()), + } + + for site in _inactive_scheduler_sites(self): + status["sites"][site]["scheduler"] = False + + for site in _inactive_web_sites(self): + status["sites"][site]["web"] = False + + return status + + @job("New Site", priority="high") + def new_site( + self, + name, + config, + apps, + mariadb_root_password, + admin_password, + create_user: dict | None = None, + ): + self.bench_new_site(name, mariadb_root_password, admin_password) + site = Site(name, self) + site.install_apps(apps) + site.update_config(config) + site.enable_scheduler() + if create_user and create_user.get("email"): + site.create_user( + create_user.get("email"), + create_user.get("first_name"), + create_user.get("last_name"), + create_user.get("password"), + ) + self.setup_nginx() + self.server.reload_nginx() + + @job("New Site from Backup", priority="high") + def new_site_from_backup( + self, + name, + default_config, + apps, + mariadb_root_password, + admin_password, + site_config, + database, + public, + private, + skip_failing_patches, + ): + files = self.download_files(name, database, public, private) + self.bench_new_site(name, mariadb_root_password, admin_password) + site = Site(name, self) + site.update_config(default_config) + try: + site.restore( + mariadb_root_password, + admin_password, + files["database"], + files["public"], + files["private"], + ) + if site_config: + site_config = json.loads(site_config) + site.update_config(site_config) + finally: + self.delete_downloaded_files(files["directory"]) + site.uninstall_unavailable_apps(apps) + site.migrate(skip_failing_patches=skip_failing_patches) + site.set_admin_password(admin_password) + site.enable_scheduler() + self.setup_nginx() + self.server.reload_nginx() + + return site.bench_execute("list-apps") + + @step("Archive Site") + def bench_archive_site(self, name, mariadb_root_password, force): + site_database, temp_user, temp_password = self.create_mariadb_user( + name, mariadb_root_password, self.valid_sites[name].database + ) + force_flag = "--force" if force else "" + try: + return self.docker_execute( + f"bench drop-site --no-backup {force_flag} " + f"--root-login {temp_user} --root-password {temp_password} " + f"--archived-sites-path archived {name}" + ) + finally: + self.drop_mariadb_user(name, mariadb_root_password, site_database) + + @step("Download Backup Files") + def download_files(self, name, database_url, public_url, private_url): + download_directory = os.path.join(self.sites_directory, "downloads") + if not os.path.exists(download_directory): + os.mkdir(download_directory) + directory = tempfile.mkdtemp(prefix="agent-upload-", suffix=f"-{name}", dir=download_directory) + database_file = download_file(database_url, prefix=directory) + private_file = download_file(private_url, prefix=directory) if private_url else "" + public_file = download_file(public_url, prefix=directory) if public_url else "" + return { + "directory": directory, + "database": database_file, + "private": private_file, + "public": public_file, + } + + @step("Delete Downloaded Backup Files") + def delete_downloaded_files(self, backup_files_directory): + shutil.rmtree(backup_files_directory) + + @job("Archive Site") + def archive_site(self, name, mariadb_root_password, force): + site_directory = os.path.join(self.sites_directory, name) + if os.path.exists(site_directory): + self.bench_archive_site(name, mariadb_root_password, force) + self.setup_nginx() + self.server._reload_nginx() + + @step("Bench Setup NGINX") + def setup_nginx(self): + from filelock import FileLock + + with FileLock(os.path.join(self.directory, "nginx.config.lock")): + self.generate_nginx_config() + return self.server._reload_nginx() + + @step("Bench Setup NGINX Target") + def setup_nginx_target(self): + from filelock import FileLock + + with FileLock(os.path.join(self.directory, "nginx.config.lock")): + self.generate_nginx_config() + return self.server._reload_nginx() + + def _set_sites_host(self, sites: list[Site]): + for site in sites: + for wildcard_domain in self.server.wildcards: + if site.name.endswith("." + wildcard_domain): + site.host = "*." + wildcard_domain + + def generate_nginx_config(self): + sites = [s for s in self.valid_sites.values()] + domains = _get_domains(sites) + + if standalone := self.server.config.get("standalone"): + self._set_sites_host(sites) + + codeserver = _get_codeserver_config(self.directory) + + config = { + "bench_name": self.name, + "bench_name_slug": self.name.replace("-", "_"), + "domain": self.server.config.get("domain"), + "sites": sites, + "domains": domains, + "http_timeout": self.bench_config["http_timeout"], + "web_port": self.bench_config["web_port"], + "socketio_port": self.bench_config["socketio_port"], + "sites_directory": self.sites_directory, + "standalone": standalone, + "error_pages_directory": self.server.error_pages_directory, + "nginx_directory": self.server.nginx_directory, + "tls_protocols": self.server.config.get("tls_protocols"), + "code_server": codeserver, + } + nginx_config = os.path.join(self.directory, "nginx.conf") + + self.server._render_template("bench/nginx.conf.jinja2", config, nginx_config) + + @step("Bench Disable Production") + def disable_production(self): + try: + return self.stop() + except AgentException as e: + if "No such container" in e.data["output"]: + pass + else: + raise + + @job("Bench Restart") + def restart_job(self, web_only=False): + return self.restart(web_only=web_only) + + @step("Bench Restart") + def restart(self, web_only=False): + return self.docker_execute(f"bench restart {'--web' if web_only else ''}") + + @job("Rebuild Bench Assets") + def rebuild_job(self): + return self.rebuild() + + @step("Rebuild Bench Assets") + def rebuild(self, apps: list[str] | None = None, is_inplace: bool = False): + if not apps: + return self.docker_execute("bench build") + + if len(apps) == 1 and not is_inplace: + return self.docker_execute(f"bench build --app {apps[0]}") + + return self.docker_execute(f"bench build --apps {','.join(apps)}") + + @property + def apps(self): + with open(self.apps_file, "r") as f: + apps_list = f.read().split("\n") + + apps = {} + for directory in apps_list: + with suppress(Exception): + apps[directory] = App(directory, self) + return apps + + @step("Update Bench Configuration") + def update_config(self, common_site_config, bench_config): + self._update_config(common_site_config, bench_config) + + def _update_config( + self, + common_site_config: dict | None = None, + bench_config: dict | None = None, + ): + if common_site_config: + new_common_site_config = self.get_config(for_update=True) + new_common_site_config.update(common_site_config) + self.set_config(new_common_site_config) + + if bench_config: + new_bench_config = self.bench_config + new_bench_config.update(bench_config) + self.set_bench_config(new_bench_config) + + @job("Update Bench Configuration", priority="high") + def update_config_job(self, common_site_config, bench_config): + old_config = self.bench_config + self.update_config(common_site_config, bench_config) + self.setup_nginx() + if self.bench_config.get("single_container"): + self.update_supervisor() + self.update_runtime_limits() + if (old_config["web_port"] != bench_config["web_port"]) or ( + old_config["socketio_port"] != bench_config["socketio_port"] + ): + self.deploy() + else: + self.generate_docker_compose_file() + self.deploy() + + @step("Update Supervisor Configuration") + def update_supervisor(self): + self.generate_supervisor_config() + self.docker_execute("supervisorctl reread") + self.docker_execute("supervisorctl update") + + def generate_supervisor_config(self): + supervisor_config = os.path.join(self.directory, "config", "supervisor.conf") + self.server._render_template( + "bench/supervisor.conf", + { + "background_workers": self.bench_config["background_workers"], + "gunicorn_workers": self.bench_config["gunicorn_workers"], + "http_timeout": self.bench_config["http_timeout"], + "name": self.name, + "statsd_host": self.bench_config["statsd_host"], + "is_ssh_enabled": self.bench_config.get("is_ssh_enabled", False), + "merge_all_rq_queues": self.bench_config.get("merge_all_rq_queues", False), + "merge_default_and_short_rq_queues": self.bench_config.get( + "merge_default_and_short_rq_queues", False + ), + "use_rq_workerpool": self.bench_config.get("use_rq_workerpool", False), + "environment_variables": self.bench_config.get("environment_variables"), + "gunicorn_threads_per_worker": self.bench_config.get("gunicorn_threads_per_worker"), + "is_code_server_enabled": self.bench_config.get("is_code_server_enabled", False), + }, + supervisor_config, + ) + + @step("Generate Docker Compose File") + def generate_docker_compose_file(self): + config = self.bench_config + config.update({"directory": self.directory}) + docker_compose = os.path.join(self.directory, "docker-compose.yml") + self.server._render_template("bench/docker-compose.yml.jinja2", config, docker_compose) + + @job("Setup Code Server") + def setup_code_server(self, name, password): + self.create_code_server_config(name) + self._start_code_server(password, setup=True) + self.generate_nginx_config() + self.server._reload_nginx() + + @step("Create Code Server Config") + def create_code_server_config(self, name): + code_server_path = os.path.join(self.directory, "codeserver") + if not os.path.exists(code_server_path): + os.mkdir(code_server_path) + + filename = os.path.join(code_server_path, name) + with open(filename, "w") as file: + file.write(str(self.bench_config.get("codeserver_port"))) + + @step("Start Code Server") + def _start_code_server(self, password, setup=False): + if setup: + self.docker_execute("supervisorctl start code-server:") + + self.docker_execute( + f"sed -i 's/^password:.*/password: {password}/' /home/jingrow/.config/code-server/config.yaml" + ) + self.docker_execute("supervisorctl restart code-server:") + + @step("Stop Code Server") + def _stop_code_server(self): + self.docker_execute("supervisorctl stop code-server:") + + @job("Start Code Server") + def start_code_server(self, password): + self._start_code_server(password) + + @job("Stop Code Server") + def stop_code_server(self): + self._stop_code_server() + + @job("Archive Code Server") + def archive_code_server(self): + if os.path.exists(self.directory): + self.remove_code_server() + self.setup_nginx() + self.server._reload_nginx() + + @step("Remove Code Server") + def remove_code_server(self): + code_server_path = os.path.join(self.directory, "codeserver") + shutil.rmtree(code_server_path) + self.docker_execute("supervisorctl stop code-server:") + + def prepare_mounts_on_host(self, bench_directory): + mounts_cmd = "" + + if not self.mounts: + return mounts_cmd + + def _create_mounts(host_path): + if not os.path.exists(host_path): + os.mkdir(host_path) + + for mp in self.mounts: + host_path = mp["source"] + destination_path = mp["destination"] + + if not mp["is_absolute_path"]: + """ + self.server.benches_directory = /home/jingrow/benches (Host) + bench_directory = "/home/jingrow/jingrow-bench" (container) + """ + host_path = os.path.join(self.server.benches_directory, mp["source"]) + destination_path = os.path.join(bench_directory, mp["destination"]) + + _create_mounts(host_path) + + mounts_cmd += f" -v {host_path}:{destination_path} " + + return mounts_cmd + + def start(self): + if self.bench_config.get("single_container"): + try: + self.execute(f"docker stop {self.name}") + self.execute(f"docker rm {self.name}") + except Exception: + pass + + ssh_port = self.bench_config.get("ssh_port", self.bench_config["web_port"] + 4000) + ssh_ip = self.bench_config.get("private_ip", "127.0.0.1") + + rq_port = self.bench_config.get("rq_port") + rq_port_mapping = f"-p 127.0.0.1:{rq_port}:11000 " + + bench_directory = "/home/jingrow/jingrow-bench" + mounts = self.prepare_mounts_on_host(bench_directory) + + command = ( + "docker run -d --init -u jingrow " + f"--restart always --hostname {self.name} " + f"-p 127.0.0.1:{self.bench_config['web_port']}:8000 " + f"-p 127.0.0.1:{self.bench_config['socketio_port']}:9000 " + f"-p 127.0.0.1:{self.bench_config['codeserver_port']}:8088 " + f"{rq_port_mapping if rq_port else ''}" + f"-p {ssh_ip}:{ssh_port}:2200 " + f"-v {self.sites_directory}:{bench_directory}/sites " + f"-v {self.logs_directory}:{bench_directory}/logs " + f"-v {self.config_directory}:{bench_directory}/config " + f"{mounts} " + f"--name {self.name} {self.bench_config['docker_image']}" + ) + else: + command = ( + "docker stack deploy " + "--resolve-image=never --with-registry-auth " + f"--compose-file docker-compose.yml {self.name} " + ) + return self.execute(command) + + def stop(self): + if self.bench_config.get("single_container"): + self.execute(f"docker stop {self.name}") + return self.execute(f"docker rm {self.name}") + return self.execute(f"docker stack rm {self.name}") + + @step("Stop Bench") + def _stop(self): + return self.execute(f"docker stop {self.name}") + + @step("Start Bench") + def _start(self): + return self.execute(f"docker start {self.name}") + + @job("Force Update Bench Limits") + def force_update_limits(self, memory_high, memory_max, memory_swap, vcpu): + self._stop() + self._update_runtime_limits(memory_high, memory_max, memory_swap, vcpu) + self._start() + + def update_runtime_limits(self): + memory_high = self.bench_config.get("memory_high") + memory_max = self.bench_config.get("memory_max") + memory_swap = self.bench_config.get("memory_swap") + vcpu = self.bench_config.get("vcpu") + if not any([memory_high, memory_max, memory_swap, vcpu]): + return + self._update_runtime_limits(memory_high, memory_max, memory_swap, vcpu) + + @step("Update Bench Memory Limits") + def _update_runtime_limits(self, memory_high, memory_max, memory_swap, vcpu): + cmd = f"docker update {self.name}" + if memory_high: + cmd += f" --memory-reservation={memory_high}M" + if memory_max: + cmd += f" --memory={memory_max}M" + if memory_swap: + cmd += f" --memory-swap={memory_swap}M" + if vcpu: + cmd += f" --cpus={vcpu}" + return self.execute(cmd) + + @property + def job_record(self): + return self.server.job_record + + def readable_jde_err(self, title: str, jde: json.decoder.JSONDecodeError) -> str: + output = f"{title}:\n{jde.doc}\n{jde}\n" + import re + + output = re.sub(r'("db_name":.* ")(\w*)(")', r"\1********\3", output) + return re.sub(r'("db_password":.* ")(\w*)(")', r"\1********\3", output) + + @property + def sites(self): + return self._sites() + + @property + def valid_sites(self): + return self._sites(validate_configs=True) + + def _sites(self, validate_configs=False) -> dict[str, Site]: + sites = {} + for directory in os.listdir(self.sites_directory): + try: + sites[directory] = Site(directory, self) + except json.decoder.JSONDecodeError as jde: + output = self.readable_jde_err(f"Error parsing JSON in {directory}", jde) + try: + self.execute( + f"echo '{output}';exit {int(validate_configs)}", + ) # exit 1 to make sure the job fails and shows output + except AgentException as e: + raise InvalidSiteConfigException(e.data, directory) from e + except Exception: + pass + return sites + + def get_site(self, site): + try: + return self.valid_sites[site] + except KeyError as e: + raise SiteNotExistsException(site, self.name) from e + except InvalidSiteConfigException as e: + if e.site == site: + raise + + @property + def step_record(self): + return self.server.step_record + + @step_record.setter + def step_record(self, value): + self.server.step_record = value + + def get_usage(self): + return { + "storage": get_size(self.directory), + "database": sum([site.get_database_size() for site in self.sites.values()]), + } + + @property + def bench_config(self) -> dict: + with open(self.bench_config_file, "r") as f: + return json.load(f) + + def set_bench_config(self, value, indent=1): + """ + To avoid partial writes, we need to first write the config to a temporary file, + then rename it to the original file. + """ + with tempfile.NamedTemporaryFile(mode="w", delete=False) as temp_file: + json.dump(value, temp_file, indent=indent, sort_keys=True) + temp_file.flush() + os.fsync(temp_file.fileno()) + temp_file.close() + + os.rename(temp_file.name, self.bench_config_file) + + @job("Patch App") + def patch_app( + self, + app: str, + patch: str, + filename: str, + build_assets: bool, + revert: bool, + ): + patch_container_path = self.prepare_app_patch(app, patch, filename) + self.git_apply(app, revert, patch_container_path) + + if build_assets: + self.rebuild() + + self.restart() + + def prepare_app_patch(self, app: str, patch: str, filename: str) -> str: + """ + Function returns path inside the container, the sites is + mounted in the container at a different path from that of + the bench outside it. + """ + relative = ["sites", "patches", app] + patch_dir = Path(os.path.join(self.directory, *relative)) + patch_dir.mkdir(parents=True, exist_ok=True) + + bench_container_dir = "/home/jingrow/jingrow-bench" + patch_container_dir = os.path.join(bench_container_dir, *relative, filename) + + patch_path = patch_dir / filename + if patch_path.is_file(): + return patch_container_dir + + with patch_path.open("w") as f: + f.write(patch) + + return patch_container_dir + + @step("Git Apply") + def git_apply(self, app: str, revert: bool, patch_container_path: str): + command = "git apply " + if revert: + command += "--reverse " + command += patch_container_path + + app_path = os.path.join("apps", app) + self.docker_execute(command, subdir=app_path) + + @job("Call Bench Supervisorctl") + def call_supervisorctl(self, command: str, programs: list[str]): + self.run_supervisorctl_command(command, programs) + + @step("Run Supervisorctl Command") + def run_supervisorctl_command(self, command: str, programs: list[str]): + target = "all" + if len(programs) > 0: + target = " ".join(programs) + self.docker_execute(f"supervisorctl {command} {target}") + + @job("Update Bench In Place") + def update_inplace( + self, + sites: list[str], + image: str, + apps: list[BenchUpdateApp], + ): + if not (diff_dict := self.pull_app_changes(apps).get("diff")): + return + + should_run = get_should_run_update_phase(diff_dict) + + node = should_run["setup_requirements_node"] + python = should_run["setup_requirements_python"] + if node or python: + self.setup_requirements(node, python) + + if should_run["migrate_sites"]: + self.migrate_sites(sites) + + if should_run["rebuild_frontend"]: + self.rebuild(apps=[app["app"] for app in apps], is_inplace=True) + + # commit container changes + self.commit_container_changes(image) + + # restart site + self.restart(web_only=False) + + @step("Pull App Changes") + def pull_app_changes(self, apps: list[BenchUpdateApp]): + res = get_execution_result() + + diff: dict[str, list[str]] = {} + outputs: list[str] = [] + for app in apps: + if not (files := self._pull_app_change(app)): + continue + + app_name = app["app"] + diff[app_name] = files + + output = "\n".join( + [ + app_name, + indent("\n".join(files), " "), + ] + ) + outputs.append(output) + + res = end_execution(res, "\n\n".join(outputs)) + res["diff"] = diff + return res + + def _pull_app_change(self, app: BenchUpdateApp) -> list[str]: + remote = "inplace" + app_path = os.path.join("apps", app["app"]) + exec = partial(self.docker_execute, subdir=app_path) + + self.set_git_remote(app["app"], app["url"], remote) + + app_path: str = os.path.join("apps", app["app"]) + new_hash: str = app["hash"] + old_hash: str = exec("git rev-parse HEAD")["output"] + + if old_hash == new_hash: + # Remove remote, url might be private + exec(f"git remote remove {remote}") + return [] + + # Fetch new hash and get changed files + exec(f"git fetch --depth 1 {remote} {new_hash}") + diff: str = exec(f"git diff --name-only {old_hash} {new_hash}")["output"] + + # Ensure repo is not dirty and checkout next_hash + exec(f"git reset --hard {old_hash}") + exec("git clean -fd") + exec(f"git checkout {new_hash}") + + # Remove remote, url might be private + exec(f"git remote remove {remote}") + return [s for s in diff.split("\n") if s] + + def set_git_remote( + self, + app: str, + url: str, + remote: str, + ): + app_path = os.path.join("apps", app) + res = self.docker_execute( + f"git remote get-url {remote}", + subdir=app_path, + non_zero_throw=False, + ) + + if res["output"] == url: + return + + if res["returncode"] == 0: + self.docker_execute( + f"git remote remove {remote}", + subdir=app_path, + ) + + self.docker_execute( + f"git remote add {remote} {url}", + subdir=app_path, + ) + + @step("Setup Requirements") + def setup_requirements(self, node: bool = True, python: bool = True): + flag = "" + + if node and not python: + flag = " --node" + + if not node and python: + flag = " --python" + + return self.docker_execute("bench setup requirements" + flag) + + @step("Migrate Sites") + def migrate_sites( + self, + sites: list[str], + skip_search_index: bool = False, + skip_failing_patches: bool = False, + ): + res = get_execution_result() + outputs: list[str] = [] + + for site_name in sites: + migrate_res = self.migrate_site( + self.sites[site_name], + skip_search_index, + skip_failing_patches, + ) + output = "\n".join( + [ + site_name, + indent(migrate_res["output"], " "), + ] + ) + outputs.append(output) + + return end_execution( + res, + "\n\n".join(outputs), + ) + + def migrate_site( + self, + site: Site, + skip_search_index: bool = False, + skip_failing_patches: bool = False, + ): + site._enable_maintenance_mode() + res = site._migrate( + skip_search_index, + skip_failing_patches, + ) + site._disable_maintenance_mode() + return res + + @step("Commit Container Changes") + def commit_container_changes(self, image: str): + container_id = self.execute(f'docker ps -aqf "name={self.name}"')["output"] + res = self.execute(f"docker commit {container_id} {image}") + self._update_config(bench_config={"docker_image": image}) + return res + + @job("Recover Update In Place") + def recover_update_inplace( + self, + site_names: list[str], + image: str, + ): + self._update_config(bench_config={"docker_image": image}) + sites = [Site(name, self) for name in site_names] + + # Enable maintenance mode on sites if possible + self.enable_maintenance_mode(sites) + + """ + Will stop and remove failed inplace updated container and + start last running container pointed to by image. + """ + self.deploy() + + self.setup_nginx() + self.recover_sites(sites) + + @step("Enable Maintenance Mode") + def enable_maintenance_mode(self, sites: list[Site]): + for site in sites: + with suppress(Exception): + site._enable_maintenance_mode() + + @step("Recover Sites") + def recover_sites(self, sites: list[Site]): + for site in sites: + site._restore_touched_tables() + with suppress(Exception): + site.generate_theme_files() + site._disable_maintenance_mode() + + +def get_should_run_update_phase( + diff_dict: dict[str, list[str]], +) -> ShouldRunUpdatePhase: + diff = [] + for dl in diff_dict.values(): + diff.extend(dl) + + setup_node = False + setup_python = False + rebuild = False + migrate = False + + for file in diff: + if all([setup_node, setup_python, rebuild, migrate]): + break + + if not setup_node: + setup_node = should_setup_requirements_node(file) + + if not setup_python: + setup_python = should_setup_requirements_py(file) + + if not rebuild: + rebuild = should_rebuild_frontend(file) + + if not migrate: + migrate = should_migrate_sites(file) + + return dict( + setup_requirements_node=setup_node, + setup_requirements_python=setup_python, + rebuild_frontend=rebuild, + migrate_sites=migrate, + ) + + +def should_setup_requirements_node(file: str) -> bool: + return _should_run_phase( + file, + [ + "package.json", + "package-lock.json", + "yarn.lock", + ".lockb", + "pnpm-lock.yaml", + ], + ) + + +def should_setup_requirements_py(file: str) -> bool: + return _should_run_phase( + file, + ["pyproject.toml", "setup.py", "requirements.txt"], + ) + + +def should_rebuild_frontend(file: str) -> bool: + return _should_run_phase( + file, + [ + ".js", + ".ts", + ".html", + ".vue", + ".jsx", + ".tsx", + ".css", + ".scss", + ".sass", + ], + ["www", "public", "frontend", "dashboard"], + ) + + +def should_migrate_sites(file: str) -> bool: + return _should_run_phase( + file, + ["hooks.py", ".json"], + ["patches"], + ) + + +def _should_run_phase( + file: str, + ends: list[str] | None = None, + subs: list[str] | None = None, + globs: list[str] | None = None, +) -> bool: + ends = ends or [] + subs = subs or [] + globs = globs or [] + + if any([file.endswith(e) for e in ends]): + return True + + if any([s in file for s in subs]): + return True + + return any([PurePath(file).match(s) for s in globs]) + + +def get_site_from_name(name: str, new_name: str, bench: Bench): + try: + return Site(name, bench) + except OSError: + pass + + try: + return Site(new_name, bench) + except OSError: + raise Exception(f"Neither {name} nor {new_name} exists") from None + + +def _touch_currentsite_file(bench: Bench): + file = os.path.join(bench.sites_directory, "currentsite.txt") + open(file, "w").close() + + +def _inactive_scheduler_sites(bench: Bench): + inactive = [] + _touch_currentsite_file(bench) + try: + doctor = bench.docker_execute("bench doctor")["output"].split("\n") + except AgentException as e: + doctor = e.data["output"] + + for line in doctor: + if "inactive" in line: + site = line.split(" ")[-1] + inactive.append(site) + return inactive + + +def _inactive_web_sites(bench: Bench): + inactive = [] + session = requests.Session() + for site in bench.sites: + url = f"https://{site}/api/action/ping" + try: + result = session.get(url) + except Exception as e: + result = None + print("Ping Failed", url, e) + if not result or result.status_code != 200: + inactive.append(site) + return inactive + + +def _get_domains(sites: list[Site]): + domains: dict[str, str] = {} + for site in sites: + for domain in site.config.get("domains", []): + domains[domain] = site.name + return domains + + +def _get_codeserver_config(bench_directory: str): + codeserver_directory = os.path.join(bench_directory, "codeserver") + + if not os.path.exists(codeserver_directory): + return {} + + codeservers = os.listdir(codeserver_directory) + if not codeservers: + return {} + + with open(os.path.join(codeserver_directory, codeservers[0])) as file: + port = file.read() + + return {"name": codeservers[0], "port": port} diff --git a/agent/callbacks.py b/agent/callbacks.py index 693e0d8..e51964e 100644 --- a/agent/callbacks.py +++ b/agent/callbacks.py @@ -1,8 +1,8 @@ -import requests - - -def callback(job, connection, result, *args, **kwargs): - from agent.server import Server - - jcloud_url = Server().jcloud_url - requests.post(url=f"{jcloud_url}/api/method/jcloud.api.callbacks.callback", data={"job_id": job.id}) +import requests + + +def callback(job, connection, result, *args, **kwargs): + from agent.server import Server + + jcloud_url = Server().jcloud_url + requests.post(url=f"{jcloud_url}/api/action/jcloud.api.callbacks.callback", data={"job_id": job.id}) diff --git a/agent/monitor.py b/agent/monitor.py index b5559d6..c8e2f5d 100644 --- a/agent/monitor.py +++ b/agent/monitor.py @@ -1,114 +1,114 @@ -from __future__ import annotations - -import os -import tempfile - -import requests - -from agent.server import Server - - -class Monitor(Server): - def __init__(self, directory=None): - super().__init__(directory=directory) - self.prometheus_directory = "/home/jingrow/prometheus" - self.alertmanager_directory = "/home/jingrow/alertmanager" - - def update_rules(self, rules): - rules_file = os.path.join(self.prometheus_directory, "rules", "agent.yml") - self._render_template( - "prometheus/rules.yml", - {"rules": rules}, - rules_file, - { - "variable_start_string": "###", - "variable_end_string": "###", - }, - ) - - promtool = os.path.join(self.prometheus_directory, "promtool") - self.execute(f"{promtool} check rules {rules_file}") - - self.execute("sudo systemctl reload prometheus") - - def update_routes(self, routes): - config_file = os.path.join(self.alertmanager_directory, "alertmanager.yml") - self._render_template( - "alertmanager/routes.yml", - {"routes": routes}, - config_file, - { - "variable_start_string": "###", - "variable_end_string": "###", - }, - ) - amtool = os.path.join(self.alertmanager_directory, "amtool") - self.execute(f"{amtool} check-config {config_file}") - - self.execute("sudo systemctl reload alertmanager") - - def discover_targets(self): - targets = self.fetch_targets() - for cluster in targets["clusters"]: - self.generate_prometheus_cluster_config(cluster) - - self.generate_prometheus_tls_config(targets["tls"]) - self.generate_prometheus_sites_config(targets["benches"]) - self.generate_prometheus_domains_config(targets["domains"]) - - def fetch_targets(self): - jcloud_url = self.config.get("jcloud_url") - jcloud_token = self.config.get("jcloud_token") - return requests.post( - f"{jcloud_url}/api/method/jcloud.api.monitoring.targets", - data={"token": jcloud_token}, - ).json()["message"] - - def generate_prometheus_sites_config(self, benches): - prometheus_sites_config = os.path.join(self.prometheus_directory, "file_sd", "sites.yml") - temp_sites_config = tempfile.mkstemp(prefix="agent-prometheus-sites-", suffix=".yml")[1] - self._render_template( - "prometheus/sites.yml", - {"benches": benches}, - temp_sites_config, - {"block_start_string": "##", "block_end_string": "##"}, - ) - os.rename(temp_sites_config, prometheus_sites_config) - - def generate_prometheus_tls_config(self, servers): - prometheus_tls_config = os.path.join(self.prometheus_directory, "file_sd", "tls.yml") - temp_tls_config = tempfile.mkstemp(prefix="agent-prometheus-tls-", suffix=".yml")[1] - self._render_template( - "prometheus/tls.yml", - {"servers": servers}, - temp_tls_config, - {"block_start_string": "##", "block_end_string": "##"}, - ) - os.rename(temp_tls_config, prometheus_tls_config) - - def generate_prometheus_domains_config(self, domains): - prometheus_domains_config = os.path.join(self.prometheus_directory, "file_sd", "domains.yml") - temp_domains_config = tempfile.mkstemp(prefix="agent-prometheus-domains-", suffix=".yml")[1] - self._render_template( - "prometheus/domains.yml", - {"domains": domains}, - temp_domains_config, - {"block_start_string": "##", "block_end_string": "##"}, - ) - os.rename(temp_domains_config, prometheus_domains_config) - - def generate_prometheus_cluster_config(self, cluster): - prometheus_cluster_config = os.path.join( - self.prometheus_directory, - "file_sd", - f"cluster.{cluster['name']}.yml", - ) - - temp_cluster_config = tempfile.mkstemp(prefix="agent-prometheus-cluster-", suffix=".yml")[1] - self._render_template( - "prometheus/servers.yml", - {"cluster": cluster}, - temp_cluster_config, - {"block_start_string": "##", "block_end_string": "##"}, - ) - os.rename(temp_cluster_config, prometheus_cluster_config) +from __future__ import annotations + +import os +import tempfile + +import requests + +from agent.server import Server + + +class Monitor(Server): + def __init__(self, directory=None): + super().__init__(directory=directory) + self.prometheus_directory = "/home/jingrow/prometheus" + self.alertmanager_directory = "/home/jingrow/alertmanager" + + def update_rules(self, rules): + rules_file = os.path.join(self.prometheus_directory, "rules", "agent.yml") + self._render_template( + "prometheus/rules.yml", + {"rules": rules}, + rules_file, + { + "variable_start_string": "###", + "variable_end_string": "###", + }, + ) + + promtool = os.path.join(self.prometheus_directory, "promtool") + self.execute(f"{promtool} check rules {rules_file}") + + self.execute("sudo systemctl reload prometheus") + + def update_routes(self, routes): + config_file = os.path.join(self.alertmanager_directory, "alertmanager.yml") + self._render_template( + "alertmanager/routes.yml", + {"routes": routes}, + config_file, + { + "variable_start_string": "###", + "variable_end_string": "###", + }, + ) + amtool = os.path.join(self.alertmanager_directory, "amtool") + self.execute(f"{amtool} check-config {config_file}") + + self.execute("sudo systemctl reload alertmanager") + + def discover_targets(self): + targets = self.fetch_targets() + for cluster in targets["clusters"]: + self.generate_prometheus_cluster_config(cluster) + + self.generate_prometheus_tls_config(targets["tls"]) + self.generate_prometheus_sites_config(targets["benches"]) + self.generate_prometheus_domains_config(targets["domains"]) + + def fetch_targets(self): + jcloud_url = self.config.get("jcloud_url") + jcloud_token = self.config.get("jcloud_token") + return requests.post( + f"{jcloud_url}/api/action/jcloud.api.monitoring.targets", + data={"token": jcloud_token}, + ).json()["message"] + + def generate_prometheus_sites_config(self, benches): + prometheus_sites_config = os.path.join(self.prometheus_directory, "file_sd", "sites.yml") + temp_sites_config = tempfile.mkstemp(prefix="agent-prometheus-sites-", suffix=".yml")[1] + self._render_template( + "prometheus/sites.yml", + {"benches": benches}, + temp_sites_config, + {"block_start_string": "##", "block_end_string": "##"}, + ) + os.rename(temp_sites_config, prometheus_sites_config) + + def generate_prometheus_tls_config(self, servers): + prometheus_tls_config = os.path.join(self.prometheus_directory, "file_sd", "tls.yml") + temp_tls_config = tempfile.mkstemp(prefix="agent-prometheus-tls-", suffix=".yml")[1] + self._render_template( + "prometheus/tls.yml", + {"servers": servers}, + temp_tls_config, + {"block_start_string": "##", "block_end_string": "##"}, + ) + os.rename(temp_tls_config, prometheus_tls_config) + + def generate_prometheus_domains_config(self, domains): + prometheus_domains_config = os.path.join(self.prometheus_directory, "file_sd", "domains.yml") + temp_domains_config = tempfile.mkstemp(prefix="agent-prometheus-domains-", suffix=".yml")[1] + self._render_template( + "prometheus/domains.yml", + {"domains": domains}, + temp_domains_config, + {"block_start_string": "##", "block_end_string": "##"}, + ) + os.rename(temp_domains_config, prometheus_domains_config) + + def generate_prometheus_cluster_config(self, cluster): + prometheus_cluster_config = os.path.join( + self.prometheus_directory, + "file_sd", + f"cluster.{cluster['name']}.yml", + ) + + temp_cluster_config = tempfile.mkstemp(prefix="agent-prometheus-cluster-", suffix=".yml")[1] + self._render_template( + "prometheus/servers.yml", + {"cluster": cluster}, + temp_cluster_config, + {"block_start_string": "##", "block_end_string": "##"}, + ) + os.rename(temp_cluster_config, prometheus_cluster_config) diff --git a/agent/site.py b/agent/site.py index fe0e57b..35d93e9 100644 --- a/agent/site.py +++ b/agent/site.py @@ -1,982 +1,982 @@ -from __future__ import annotations - -import json -import os -import re -import shutil -import time -from datetime import datetime -from shlex import quote -from typing import TYPE_CHECKING - -import requests - -from agent.base import AgentException, Base -from agent.database import Database -from agent.job import job, step -from agent.utils import b2mb, compute_file_hash, get_size - -if TYPE_CHECKING: - from agent.bench import Bench - - -class Site(Base): - def __init__(self, name: str, bench: Bench): - super().__init__() - - self.name = name - self.bench = bench - self.directory = os.path.join(self.bench.sites_directory, name) - self.backup_directory = os.path.join(self.directory, ".migrate") - self.logs_directory = os.path.join(self.directory, "logs") - self.config_file = os.path.join(self.directory, "site_config.json") - self.touched_tables_file = os.path.join(self.directory, "touched_tables.json") - self.previous_tables_file = os.path.join(self.directory, "previous_tables.json") - self.analytics_file = os.path.join( - self.directory, - "analytics.json", - ) - - if not os.path.isdir(self.directory): - raise OSError(f"Path {self.directory} is not a directory") - - if not os.path.exists(self.config_file): - raise OSError(f"Path {self.config_file} does not exist") - - self.database = self.config["db_name"] - self.user = self.config["db_name"] - self.password = self.config["db_password"] - self.host = self.config.get("db_host", self.bench.host) - - def bench_execute(self, command, input=None): - return self.bench.docker_execute(f"bench --site {self.name} {command}", input=input) - - def dump(self): - return {"name": self.name} - - @step("Rename Site") - def rename(self, new_name): - os.rename(self.directory, os.path.join(self.bench.sites_directory, new_name)) - self.name = new_name - - @job("Run After Migrate Steps") - def run_after_migrate_steps_job(self, admin_password): - """ - Run after migrate steps - - Used to run after-migrate steps for when migrations break. - """ - self.set_admin_password(admin_password) - self.bench.setup_nginx() - self.bench.server.reload_nginx() - self.disable_maintenance_mode() - self.enable_scheduler() - - @step("Install Apps") - def install_apps(self, apps): - data = {"apps": {}} - output = [] - for app in apps: - data["apps"][app] = {} - log = data["apps"][app] - if app != "jingrow": - log["install"] = self.bench_execute(f"install-app {app}") - output.append(log["install"]["output"]) - data["output"] = "\n".join(output) - return data - - @step("Install App on Site") - def install_app(self, app): - try: - return self.bench_execute(f"install-app {app} --force") - except AgentException as e: - if "Error: no such option: --force" in e.data["output"]: - return self.bench_execute(f"install-app {app}") # not available in < v14 - raise - - @step("Uninstall App from Site") - def uninstall_app(self, app): - return self.bench_execute(f"uninstall-app {app} --yes --force") - - @step("Restore Site") - def restore( - self, - mariadb_root_password, - admin_password, - database_file, - public_file, - private_file, - ): - sites_directory = self.bench.sites_directory - database_file = database_file.replace(sites_directory, "/home/jingrow/jingrow-bench/sites") - public_file = public_file.replace(sites_directory, "/home/jingrow/jingrow-bench/sites") - private_file = private_file.replace(sites_directory, "/home/jingrow/jingrow-bench/sites") - - public_file_option = f"--with-public-files {public_file}" if public_file else "" - private_file_option = f"--with-private-files {private_file} " if private_file else "" - - _, temp_user, temp_password = self.bench.create_mariadb_user( - self.name, mariadb_root_password, self.database - ) - try: - return self.bench_execute( - "--force restore " - f"--mariadb-root-username {temp_user} " - f"--mariadb-root-password {temp_password} " - f"--admin-password {admin_password} " - f"{public_file_option} " - f"{private_file_option} " - f"{database_file}" - ) - finally: - self.bench.drop_mariadb_user(self.name, mariadb_root_password, self.database) - - @step("Checksum of Downloaded Backup Files") - def calculate_checksum_of_backup_files(self, database_file, public_file, private_file): - database_file_sha256 = compute_file_hash(database_file, algorithm="sha256", raise_exception=False) - - data = f"""Database File -> File Name - {os.path.basename(database_file)} -> SHA256 Checksum - {database_file_sha256}\n""" - if public_file: - public_file_sha256 = compute_file_hash(public_file, algorithm="sha256", raise_exception=False) - data += f"""\nPublic File -> File Name - {os.path.basename(public_file)} -> SHA256 Checksum - {public_file_sha256}\n""" - if private_file: - private_file_sha256 = compute_file_hash(private_file, algorithm="sha256", raise_exception=False) - data += f"""\nPrivate File -> File Name - {os.path.basename(private_file)} -> SHA256 Checksum - {private_file_sha256}\n""" - - return {"output": data} - - @job("Restore Site") - def restore_job( - self, - apps, - mariadb_root_password, - admin_password, - database, - public, - private, - skip_failing_patches, - ): - files = self.bench.download_files(self.name, database, public, private) - try: - self.restore( - mariadb_root_password, - admin_password, - files["database"], - files["public"], - files["private"], - ) - except Exception: - self.calculate_checksum_of_backup_files(files["database"], files["public"], files["private"]) - raise - finally: - self.bench.delete_downloaded_files(files["directory"]) - self.uninstall_unavailable_apps(apps) - self.migrate(skip_failing_patches=skip_failing_patches) - self.set_admin_password(admin_password) - self.enable_scheduler() - - self.bench.setup_nginx() - self.bench.server.reload_nginx() - - return self.bench_execute("list-apps") - - @job("Migrate Site") - def migrate_job(self, skip_failing_patches=False, activate=True): - self.migrate(skip_failing_patches=skip_failing_patches) - if activate: - self.disable_maintenance_mode() - - @step("Reinstall Site") - def reinstall( - self, - mariadb_root_password, - admin_password, - ): - _, temp_user, temp_password = self.bench.create_mariadb_user( - self.name, mariadb_root_password, self.database - ) - try: - return self.bench_execute( - f"reinstall --yes " - f"--mariadb-root-username {temp_user} " - f"--mariadb-root-password {temp_password} " - f"--admin-password {admin_password}" - ) - finally: - self.bench.drop_mariadb_user(self.name, mariadb_root_password, self.database) - - @job("Reinstall Site") - def reinstall_job( - self, - mariadb_root_password, - admin_password, - ): - return self.reinstall(mariadb_root_password, admin_password) - - @job("Install App on Site") - def install_app_job(self, app): - self.install_app(app) - - @job("Uninstall App on Site") - def uninstall_app_job(self, app): - self.uninstall_app(app) - - @step("Update Site Configuration") - def update_config(self, value, remove=None): - """Pass Site Config value to update or replace existing site config. - - Args: - value (dict): Site Config - remove (list, optional): Keys sent in the form of a list will be - popped from the existing site config. Defaults to None. - """ - new_config = self.get_config(for_update=True) - new_config.update(value) - - if remove: - for key in remove: - new_config.pop(key, None) - - self.set_config(new_config) - - @job("Add Domain", priority="high") - def add_domain(self, domain): - domains = set(self.config.get("domains", [])) - domains.add(domain) - self.update_config({"domains": list(domains)}) - self.bench.setup_nginx() - self.bench.server.reload_nginx() - - @job("Remove Domain", priority="high") - def remove_domain(self, domain): - domains = set(self.config.get("domains", [])) - domains.discard(domain) - self.update_config({"domains": list(domains)}) - self.bench.setup_nginx() - self.bench.server.reload_nginx() - - def create_database_access_credentials(self, mode, mariadb_root_password): - database = self.database - user = f"{self.user}_{mode}" - password = self.bench.get_random_string(16) - privileges = { - "read_only": "SELECT", - "read_write": "ALL", - }.get(mode, "SELECT") - queries = [ - f"CREATE OR REPLACE USER '{user}'@'%' IDENTIFIED BY '{password}'", - f"GRANT {privileges} ON {database}.* TO '{user}'@'%'", - "FLUSH PRIVILEGES", - ] - for query in queries: - command = f'mysql -h {self.host} -uroot -p{mariadb_root_password} -e "{query}"' - self.execute(command) - return {"database": database, "user": user, "password": password} - - def revoke_database_access_credentials(self, user, mariadb_root_password): - if user == self.user: - # Do not revoke access for the main user - return {} - self.db_instance("root", mariadb_root_password).remove_user(user) - return {} - - @job("Create Database User", priority="high") - def create_database_user_job(self, user, password, mariadb_root_password): - return self.create_database_user(user, password, mariadb_root_password) - - @step("Create Database User") - def create_database_user(self, user, password, mariadb_root_password): - if user == self.user: - # Do not perform any operation for the main user - return {} - self.db_instance("root", mariadb_root_password).create_user(user, password) - return { - "database": self.database, - } - - @job("Remove Database User", priority="high") - def remove_database_user_job(self, user, mariadb_root_password): - return self.remove_database_user(user, mariadb_root_password) - - @step("Remove Database User") - def remove_database_user(self, user, mariadb_root_password): - if user == self.user: - # Do not perform any operation for the main user - return {} - self.db_instance("root", mariadb_root_password).remove_user(user) - return {} - - @job("Modify Database User Permissions", priority="high") - def modify_database_user_permissions_job(self, user, mode, permissions, mariadb_root_password): - return self.modify_database_user_permissions(user, mode, permissions, mariadb_root_password) - - @step("Modify Database User Permissions") - def modify_database_user_permissions(self, user, mode, permissions, mariadb_root_password): - if user == self.user: - # Do not perform any operation for the main user - return {} - self.db_instance("root", mariadb_root_password).modify_user_permissions(user, mode, permissions) - return {} - - @job("Setup Jerp", priority="high") - def setup_jerp(self, user, config): - self.create_user( - user["email"], - user["first_name"], - user["last_name"], - ) - self.update_jerp_config(config) - return {"sid": self.sid(user["email"])} - - @job("Restore Site Tables", priority="high") - def restore_site_tables_job(self, activate): - self.restore_site_tables() - if activate: - self.disable_maintenance_mode() - - @step("Restore Site Tables") - def restore_site_tables(self): - data = {"tables": {}} - for backup_file in os.listdir(self.backup_directory): - backup_file_path = os.path.join(self.backup_directory, backup_file) - output = self.execute( - "set -o pipefail && " - f"gunzip -c '{backup_file_path}' | " - f"mysql -h {self.host} -u {self.user} -p{self.password} " - f"{self.database}", - executable="/bin/bash", - ) - data["tables"][backup_file] = output - return data - - @step("Update Jerp Configuration") - def update_jerp_config(self, value): - config_file = os.path.join(self.directory, "journeys_config.json") - with open(config_file, "r") as f: - config = json.load(f) - - config.update(value) - - with open(config_file, "w") as f: - json.dump(config, f, indent=1, sort_keys=True) - - @step("Create User") - def create_user(self, email, first_name, last_name, password=None): - first_name = quote(first_name) - last_name = quote(last_name) - if password: - password = quote(password) - command = f"add-system-manager {email} --first-name {first_name} --last-name {last_name}" - if password: - command += f" --password {password}" - return self.bench_execute(command) - - @step("Complete Setup Wizard") - def complete_setup_wizard(self, data): - payload = {"args": data} - payload = quote(json.dumps(payload)) - command = f"execute jingrow.desk.page.setup_wizard.setup_wizard.setup_complete --kwargs {payload}" - return self.bench_execute(command) - - @job("Update Site Configuration", priority="high") - def update_config_job(self, value, remove): - self.update_config(value, remove) - - @job("Reset Site Usage", priority="high") - def reset_site_usage_job(self): - return self.reset_site_usage() - - @step("Reset Site Usage") - def reset_site_usage(self): - pattern = f"{self.database}|rate-limit-counter-[0-9]*" - keys_command = f"redis-cli --raw -p 13000 KEYS '{pattern}'" - keys = self.bench.docker_execute(keys_command) - data = {"keys": keys, "get": [], "delete": []} - for key in keys["output"].splitlines(): - get = self.bench.docker_execute(f"redis-cli -p 13000 GET '{key}'") - delete = self.bench.docker_execute(f"redis-cli -p 13000 DEL '{key}'") - data["get"].append(get) - data["delete"].append(delete) - return data - - @job("Update Saas Plan") - def update_saas_plan(self, plan): - self.update_plan(plan) - - @step("Update Saas Plan") - def update_plan(self, plan): - self.bench_execute(f"update-site-plan {plan}") - - @step("Backup Site") - def backup(self, with_files=False): - with_files = "--with-files" if with_files else "" - self.bench_execute(f"backup {with_files}") - return self.fetch_latest_backup(with_files=with_files) - - @step("Upload Site Backup to S3") - def upload_offsite_backup(self, backup_files, offsite): - import boto3 - - offsite_files = {} - bucket, auth, prefix = ( - offsite["bucket"], - offsite["auth"], - offsite["path"], - ) - region = auth.get("REGION") - - if region: - s3 = boto3.client( - "s3", - aws_access_key_id=auth["ACCESS_KEY"], - aws_secret_access_key=auth["SECRET_KEY"], - region_name=region, - ) - else: - s3 = boto3.client( - "s3", - aws_access_key_id=auth["ACCESS_KEY"], - aws_secret_access_key=auth["SECRET_KEY"], - ) - - for backup_file in backup_files.values(): - file_name = backup_file["file"].split(os.sep)[-1] - offsite_path = os.path.join(prefix, file_name) - offsite_files[file_name] = offsite_path - - with open(backup_file["path"], "rb") as data: - s3.upload_fileobj(data, bucket, offsite_path) - - return offsite_files - - @step("Enable Maintenance Mode") - def enable_maintenance_mode(self): - return self._enable_maintenance_mode() - - def _enable_maintenance_mode(self): - return self.bench_execute("set-maintenance-mode on") - - @step("Set Administrator Password") - def set_admin_password(self, password): - return self.bench_execute(f"set-admin-password {password}") - - @step("Wait for Enqueued Jobs") - def wait_till_ready(self): - WAIT_TIMEOUT = 600 - data = {"tries": []} - start = time.time() - is_ready = False - while (time.time() - start) < WAIT_TIMEOUT: - try: - output = self.bench_execute("ready-for-migration") - data["tries"].append(output) - is_ready = True - break - except Exception as e: - data["tries"].append(e.data) - time.sleep(1) - - if not is_ready: - raise Exception( - f"Site not ready for migration after {WAIT_TIMEOUT}s." - f" Site might have lot of jobs in queue. Try again later." - ) - - return data - - @step("Clear Backup Directory") - def clear_backup_directory(self): - if os.path.exists(self.backup_directory): - shutil.rmtree(self.backup_directory) - os.mkdir(self.backup_directory) - - @step("Backup Site Tables") - def tablewise_backup(self): - tables = self.tables - with open(self.previous_tables_file, "w") as ptf: - json.dump(tables, ptf, indent=4, sort_keys=True) - - data = {"tables": {}} - for table in tables: - backup_file = os.path.join(self.backup_directory, f"{table}.sql.gz") - output = self.execute( - "set -o pipefail && " - "mysqldump --single-transaction --quick --lock-tables=false " - f"-h {self.host} -u {self.user} -p{self.password} " - f"{self.database} '{table}' " - f" | gzip > '{backup_file}'", - executable="/bin/bash", - ) - data["tables"][table] = output - return data - - @step("Run App Specific Scripts") - def run_app_scripts(self, scripts: dict[str, str]): - for app_name in scripts: - script = scripts[app_name] - self.bench_execute("console", input=script) - - @step("Migrate Site") - def migrate(self, skip_search_index=False, skip_failing_patches=False): - return self._migrate( - skip_search_index, - skip_failing_patches, - ) - - def _migrate( - self, - skip_search_index: bool = False, - skip_failing_patches: bool = False, - ): - cmd = "migrate" - if skip_search_index: - cmd += " --skip-search-index" - if skip_failing_patches: - cmd += " --skip-failing" - return self.bench_execute(cmd) - - @step("Log Touched Tables") - def log_touched_tables(self): - try: - # It will either return the touched tables - # or try to return the previous tables - return self.tables_to_restore - except Exception: - # If both file is not there, assume no tables are touched - return [] - - @step("Build Search Index") - def build_search_index(self): - return self.bench_execute("build-search-index") - - @job("Clear Cache") - def clear_cache_job(self): - self.clear_cache() - self.clear_website_cache() - - @step("Clear Cache") - def clear_cache(self): - return self.bench_execute("clear-cache") - - @step("Clear Website Cache") - def clear_website_cache(self): - return self.bench_execute("clear-website-cache") - - @step("Uninstall Unavailable Apps") - def uninstall_unavailable_apps(self, apps_to_keep): - installed_apps = json.loads(self.bench_execute("execute jingrow.get_installed_apps")["output"]) - for app in installed_apps: - if app not in apps_to_keep: - self.bench_execute(f"remove-from-installed-apps '{app}'") - self.bench_execute("clear-cache") - - @step("Disable Maintenance Mode") - def disable_maintenance_mode(self): - self._disable_maintenance_mode() - - def _disable_maintenance_mode(self): - return self.bench_execute("set-maintenance-mode off") - - @step("Restore Touched Tables") - def restore_touched_tables(self): - return self._restore_touched_tables() - - def _restore_touched_tables(self): - data = {"restored": {}} - for table in self.tables_to_restore: - backup_file = os.path.join(self.backup_directory, f"{table}.sql.gz") - if os.path.exists(backup_file): - output = self.execute( - "set -o pipefail && " - f"gunzip -c '{backup_file}' | " - f"mysql -h {self.host} -u {self.user} -p{self.password} " - f"{self.database}", - executable="/bin/bash", - ) - data["restored"][table] = output - - dropped_tables = self.drop_new_tables() - data.update(dropped_tables) - return data - - def drop_new_tables(self): - new_tables = set(self.tables) - set(self.previous_tables) - data = {"dropped": {}} - for table in new_tables: - output = self.execute( - f"mysql -h {self.host} -u {self.user} -p{self.password} " - f"{self.database} -e 'DROP TABLE `{table}`'" - ) - data["dropped"][table] = output - return data - - @step("Pause Scheduler") - def pause_scheduler(self): - return self.bench_execute("scheduler pause") - - @step("Enable Scheduler") - def enable_scheduler(self): - return self.bench_execute("scheduler enable") - - @step("Resume Scheduler") - def resume_scheduler(self): - return self.bench_execute("scheduler resume") - - def fetch_site_status(self): - data = { - "scheduler": True, - "web": True, - "timestamp": str(datetime.now()), - } - try: - ping_url = f"https://{self.name}/api/method/ping" - data["web"] = requests.get(ping_url).status_code == 200 - except Exception: - data["web"] = False - - doctor = self.bench_execute("doctor") - if "inactive" in doctor["output"]: - data["scheduler"] = False - - return data - - def get_timezone(self): - return self.timezone - - def fetch_site_info(self): - return { - "config": self.config, - "timezone": self.get_timezone(), - "usage": self.get_usage(), - } - - def fetch_site_analytics(self): - if not os.path.exists(self.analytics_file): - return {} - with open(self.analytics_file) as af: - return json.load(af) - - def sid(self, user="Administrator"): - code = f"""from jingrow.auth import CookieManager, LoginManager -try: - from jingrow.utils import set_request -except ImportError: - from jingrow.tests import set_request - -user = '{user}' -set_request(path="/") -jingrow.local.cookie_manager = CookieManager() -jingrow.local.login_manager = LoginManager() -jingrow.local.request_ip = "127.0.0.1" -jingrow.local.login_manager.login_as(user) -print(">>>" + jingrow.session.sid + "<<<") -""" - - sid = None - if (output := self.bench_execute("console", input=code)["output"]) and ( - res := re.search(r">>>(.*)<<<", output) - ): - sid = res.group(1) - if ( - (not sid or sid == user or sid == "Guest") - and (output := self.bench_execute(f"browse --user {user}")["output"]) - and (res := re.search(r"\?sid=([a-z0-9]*)", output)) - ): - sid = res.group(1) - return sid - - @property - def timezone(self): - query = ( - f"select defvalue from {self.database}.tabDefaultValue where" - " defkey = 'time_zone' and parent = '__default'" - ) - try: - timezone = self.execute( - f"mysql -h {self.host} -u{self.database} -p{self.password} " - f'--connect-timeout 3 -sN -e "{query}"' - )["output"].strip() - except Exception: - timezone = "" - return timezone - - @property - def tables(self): - return self.execute( - "mysql --disable-column-names -B -e 'SHOW TABLES' " - f"-h {self.host} -u {self.user} -p{self.password} {self.database}" - )["output"].split("\n") - - @property - def touched_tables(self): - with open(self.touched_tables_file, "r") as f: - return json.load(f) - - @property - def previous_tables(self): - with open(self.previous_tables_file, "r") as f: - return json.load(f) - - @property - def tables_to_restore(self): - try: - return self.touched_tables - except Exception: - return self.previous_tables - - @job("Backup Site", priority="low") - def backup_job(self, with_files=False, offsite=None): - backup_files = self.backup(with_files) - uploaded_files = ( - self.upload_offsite_backup(backup_files, offsite) if (offsite and backup_files) else {} - ) - return {"backups": backup_files, "offsite": uploaded_files} - - @job("Optimize Tables") - def optimize_tables_job(self): - return self.optimize_tables() - - @step("Optimize Tables") - def optimize_tables(self): - tables = [row[0] for row in self.get_database_free_tables()] - for table in tables: - query = f"OPTIMIZE TABLE `{table}`" - self.execute( - f"mysql -sN -h {self.host} -u{self.user} -p{self.password} {self.database} -e '{query}'" - ) - - def fetch_latest_backup(self, with_files=True): - databases, publics, privates, site_configs = [], [], [], [] - backup_directory = os.path.join(self.directory, "private", "backups") - - for file in os.listdir(backup_directory): - path = os.path.join(backup_directory, file) - if file.endswith("database.sql.gz") or file.endswith("database-enc.sql.gz"): - databases.append(path) - elif file.endswith("private-files.tar") or file.endswith("private-files-enc.tar"): - privates.append(path) - elif file.endswith("files.tar") or file.endswith("files-enc.tar"): - publics.append(path) - elif file.endswith("site_config_backup.json") or file.endswith("site_config_backup-enc.json"): - site_configs.append(path) - - backups = { - "database": {"path": max(databases, key=os.path.getmtime)}, - "site_config": {"path": max(site_configs, key=os.path.getmtime)}, - } - - if with_files: - backups["private"] = {"path": max(privates, key=os.path.getmtime)} - backups["public"] = {"path": max(publics, key=os.path.getmtime)} - - for backup in backups.values(): - file = os.path.basename(backup["path"]) - backup["file"] = file - backup["size"] = os.stat(backup["path"]).st_size - backup["url"] = f"https://{self.name}/backups/{file}" - - return backups - - def get_usage(self): - """Returns Usage in bytes""" - backup_directory = os.path.join(self.directory, "private", "backups") - public_directory = os.path.join(self.directory, "public") - private_directory = os.path.join(self.directory, "private") - - return { - "database": b2mb(self.get_database_size()), - "database_free_tables": self.get_database_free_tables(), - "database_free": b2mb(self.get_database_free_size()), - "public": b2mb(get_size(public_directory)), - "private": b2mb(get_size(private_directory, ignore_dirs=["backups"])), - "backups": b2mb(get_size(backup_directory)), - } - - def get_analytics(self): - analytics = self.bench_execute("execute jingrow.utils.get_site_info")["output"] - return json.loads(analytics) - - def get_database_size(self): - # only specific to mysql/mariaDB. use a different query for postgres. - # or try using jingrow.db.get_database_size if possible - query = ( - "SELECT SUM(`data_length` + `index_length`)" - " FROM information_schema.tables" - f' WHERE `table_schema` = "{self.database}"' - " GROUP BY `table_schema`" - ) - command = f"mysql -sN -h {self.host} -u{self.user} -p{self.password} -e '{query}'" - database_size = self.execute(command).get("output") - - try: - return int(database_size) - except Exception: - return 0 - - def describe_database_table(self, doctype, columns=None): - if not columns: - columns = [] - - command = f"describe-database-table --doctype '{doctype}' " - for column in columns: - command += f"--column {column} " - try: - output = self.bench_execute(command)["output"] - return json.loads(output) - except Exception: - return {} - - @property - def apps(self): - return self.bench_execute("list-apps")["output"] - - @job("Add Database Index") - def add_database_index(self, doctype, columns=None): - if not columns: - return - self._add_database_index(doctype, columns) - - @step("Add Database Index With Bench Command") - def _add_database_index(self, doctype, columns): - command = f"add-database-index --doctype '{doctype}' " - for column in columns: - command += f"--column {column} " - - return self.bench_execute(command) - - def get_database_free_size(self): - query = ( - "SELECT SUM(`data_free`)" - " FROM information_schema.tables" - f' WHERE `table_schema` = "{self.database}"' - " GROUP BY `table_schema`" - ) - command = f"mysql -sN -h {self.host} -u{self.user} -p{self.password} -e '{query}'" - database_size = self.execute(command).get("output") - - try: - return int(database_size) - except Exception: - return 0 - - def get_database_free_tables(self): - try: - query = ( - "SELECT `table_name`," - " round((`data_free` / 1024 / 1024), 2)" - " FROM information_schema.tables" - f' WHERE `table_schema` = "{self.database}"' - " AND ((`data_free` / (`data_length` + `index_length`)) > 0.2" - " OR `data_free` > 100 * 1024 * 1024)" - ) - command = f"mysql -sN -h {self.host} -u{self.user} -p{self.password} -e '{query}'" - output = self.execute(command).get("output") - return [line.split("\t") for line in output.splitlines()] - except Exception: - return [] - - @job("Fetch Database Table Schema") - def fetch_database_table_schema(self, include_table_size: bool = True, include_index_info: bool = True): - database = self.db_instance() - tables = {} - table_schemas = self._fetch_database_table_schema(database, include_index_info=include_index_info) - for table_name in table_schemas: - tables[table_name] = { - "columns": table_schemas[table_name], - } - - if include_table_size: - table_sizes = self._fetch_database_table_sizes(database) - for table_name in table_sizes: - if table_name not in tables: - continue - tables[table_name]["size"] = table_sizes[table_name] - - return tables - - @step("Fetch Database Table Schema") - def _fetch_database_table_schema(self, database: Database, include_index_info: bool = True): - return database.fetch_database_table_schema(include_index_info=include_index_info) - - @step("Fetch Database Table Sizes") - def _fetch_database_table_sizes(self, database: Database): - return database.fetch_database_table_sizes() - - def run_sql_query(self, query: str, commit: bool = False, as_dict: bool = False): - db = self.db_instance() - success, output = db.execute_query(query, commit=commit, as_dict=as_dict) - response = {"success": success, "data": output} - if not success and hasattr(db, "last_executed_query"): - response["failed_query"] = db.last_executed_query - return response - - @job("Analyze Slow Queries") - def analyze_slow_queries_job(self, queries: list[dict], database_root_password: str) -> list[dict]: - return self.analyze_slow_queries(queries, database_root_password) - - @step("Analyze Slow Queries") - def analyze_slow_queries(self, queries: list[dict], database_root_password: str) -> list[dict]: - from agent.database_optimizer import OptimizeDatabaseQueries - - """ - Args: - queries (list[dict]): List of queries to analyze - { - "example": "", - "normalized": "", - } - """ - example_queries = [query["example"] for query in queries] - optimizer = OptimizeDatabaseQueries(self, example_queries, database_root_password) - analysis = optimizer.analyze() - analysis_summary = {} # map[query -> list[index_info_dict] - for query, indexes in analysis.items(): - analysis_summary[query] = [index.to_dict() for index in indexes] - - result = [] # list[{example, normalized, suggested_indexes}] - for query in queries: - query["suggested_indexes"] = analysis_summary.get(query["example"], []) - result.append(query) - return { - "result": result, - } - - def fetch_summarized_database_performance_report(self, mariadb_root_password: str): - database = self.db_instance(username="root", password=mariadb_root_password) - return database.fetch_summarized_performance_report() - - def fetch_database_process_list(self, mariadb_root_password: str): - return self.db_instance(username="root", password=mariadb_root_password).fetch_process_list() - - def kill_database_process(self, pid: str, mariadb_root_password: str): - return self.db_instance(username="root", password=mariadb_root_password).kill_process(pid) - - def db_instance(self, username: str | None = None, password: str | None = None) -> Database: - if not username: - username = self.user - if not password: - password = self.password - return Database(self.host, 3306, username, password, self.database) - - @property - def job_record(self): - return self.bench.server.job_record - - @property - def step_record(self): - return self.bench.server.step_record - - @step_record.setter - def step_record(self, value): - self.bench.server.step_record = value - - def generate_theme_files(self): - self.bench_execute( - "execute jingrow.website.doctype.website_theme.website_theme.generate_theme_files_if_not_exist" - ) +from __future__ import annotations + +import json +import os +import re +import shutil +import time +from datetime import datetime +from shlex import quote +from typing import TYPE_CHECKING + +import requests + +from agent.base import AgentException, Base +from agent.database import Database +from agent.job import job, step +from agent.utils import b2mb, compute_file_hash, get_size + +if TYPE_CHECKING: + from agent.bench import Bench + + +class Site(Base): + def __init__(self, name: str, bench: Bench): + super().__init__() + + self.name = name + self.bench = bench + self.directory = os.path.join(self.bench.sites_directory, name) + self.backup_directory = os.path.join(self.directory, ".migrate") + self.logs_directory = os.path.join(self.directory, "logs") + self.config_file = os.path.join(self.directory, "site_config.json") + self.touched_tables_file = os.path.join(self.directory, "touched_tables.json") + self.previous_tables_file = os.path.join(self.directory, "previous_tables.json") + self.analytics_file = os.path.join( + self.directory, + "analytics.json", + ) + + if not os.path.isdir(self.directory): + raise OSError(f"Path {self.directory} is not a directory") + + if not os.path.exists(self.config_file): + raise OSError(f"Path {self.config_file} does not exist") + + self.database = self.config["db_name"] + self.user = self.config["db_name"] + self.password = self.config["db_password"] + self.host = self.config.get("db_host", self.bench.host) + + def bench_execute(self, command, input=None): + return self.bench.docker_execute(f"bench --site {self.name} {command}", input=input) + + def dump(self): + return {"name": self.name} + + @step("Rename Site") + def rename(self, new_name): + os.rename(self.directory, os.path.join(self.bench.sites_directory, new_name)) + self.name = new_name + + @job("Run After Migrate Steps") + def run_after_migrate_steps_job(self, admin_password): + """ + Run after migrate steps + + Used to run after-migrate steps for when migrations break. + """ + self.set_admin_password(admin_password) + self.bench.setup_nginx() + self.bench.server.reload_nginx() + self.disable_maintenance_mode() + self.enable_scheduler() + + @step("Install Apps") + def install_apps(self, apps): + data = {"apps": {}} + output = [] + for app in apps: + data["apps"][app] = {} + log = data["apps"][app] + if app != "jingrow": + log["install"] = self.bench_execute(f"install-app {app}") + output.append(log["install"]["output"]) + data["output"] = "\n".join(output) + return data + + @step("Install App on Site") + def install_app(self, app): + try: + return self.bench_execute(f"install-app {app} --force") + except AgentException as e: + if "Error: no such option: --force" in e.data["output"]: + return self.bench_execute(f"install-app {app}") # not available in < v14 + raise + + @step("Uninstall App from Site") + def uninstall_app(self, app): + return self.bench_execute(f"uninstall-app {app} --yes --force") + + @step("Restore Site") + def restore( + self, + mariadb_root_password, + admin_password, + database_file, + public_file, + private_file, + ): + sites_directory = self.bench.sites_directory + database_file = database_file.replace(sites_directory, "/home/jingrow/jingrow-bench/sites") + public_file = public_file.replace(sites_directory, "/home/jingrow/jingrow-bench/sites") + private_file = private_file.replace(sites_directory, "/home/jingrow/jingrow-bench/sites") + + public_file_option = f"--with-public-files {public_file}" if public_file else "" + private_file_option = f"--with-private-files {private_file} " if private_file else "" + + _, temp_user, temp_password = self.bench.create_mariadb_user( + self.name, mariadb_root_password, self.database + ) + try: + return self.bench_execute( + "--force restore " + f"--mariadb-root-username {temp_user} " + f"--mariadb-root-password {temp_password} " + f"--admin-password {admin_password} " + f"{public_file_option} " + f"{private_file_option} " + f"{database_file}" + ) + finally: + self.bench.drop_mariadb_user(self.name, mariadb_root_password, self.database) + + @step("Checksum of Downloaded Backup Files") + def calculate_checksum_of_backup_files(self, database_file, public_file, private_file): + database_file_sha256 = compute_file_hash(database_file, algorithm="sha256", raise_exception=False) + + data = f"""Database File +> File Name - {os.path.basename(database_file)} +> SHA256 Checksum - {database_file_sha256}\n""" + if public_file: + public_file_sha256 = compute_file_hash(public_file, algorithm="sha256", raise_exception=False) + data += f"""\nPublic File +> File Name - {os.path.basename(public_file)} +> SHA256 Checksum - {public_file_sha256}\n""" + if private_file: + private_file_sha256 = compute_file_hash(private_file, algorithm="sha256", raise_exception=False) + data += f"""\nPrivate File +> File Name - {os.path.basename(private_file)} +> SHA256 Checksum - {private_file_sha256}\n""" + + return {"output": data} + + @job("Restore Site") + def restore_job( + self, + apps, + mariadb_root_password, + admin_password, + database, + public, + private, + skip_failing_patches, + ): + files = self.bench.download_files(self.name, database, public, private) + try: + self.restore( + mariadb_root_password, + admin_password, + files["database"], + files["public"], + files["private"], + ) + except Exception: + self.calculate_checksum_of_backup_files(files["database"], files["public"], files["private"]) + raise + finally: + self.bench.delete_downloaded_files(files["directory"]) + self.uninstall_unavailable_apps(apps) + self.migrate(skip_failing_patches=skip_failing_patches) + self.set_admin_password(admin_password) + self.enable_scheduler() + + self.bench.setup_nginx() + self.bench.server.reload_nginx() + + return self.bench_execute("list-apps") + + @job("Migrate Site") + def migrate_job(self, skip_failing_patches=False, activate=True): + self.migrate(skip_failing_patches=skip_failing_patches) + if activate: + self.disable_maintenance_mode() + + @step("Reinstall Site") + def reinstall( + self, + mariadb_root_password, + admin_password, + ): + _, temp_user, temp_password = self.bench.create_mariadb_user( + self.name, mariadb_root_password, self.database + ) + try: + return self.bench_execute( + f"reinstall --yes " + f"--mariadb-root-username {temp_user} " + f"--mariadb-root-password {temp_password} " + f"--admin-password {admin_password}" + ) + finally: + self.bench.drop_mariadb_user(self.name, mariadb_root_password, self.database) + + @job("Reinstall Site") + def reinstall_job( + self, + mariadb_root_password, + admin_password, + ): + return self.reinstall(mariadb_root_password, admin_password) + + @job("Install App on Site") + def install_app_job(self, app): + self.install_app(app) + + @job("Uninstall App on Site") + def uninstall_app_job(self, app): + self.uninstall_app(app) + + @step("Update Site Configuration") + def update_config(self, value, remove=None): + """Pass Site Config value to update or replace existing site config. + + Args: + value (dict): Site Config + remove (list, optional): Keys sent in the form of a list will be + popped from the existing site config. Defaults to None. + """ + new_config = self.get_config(for_update=True) + new_config.update(value) + + if remove: + for key in remove: + new_config.pop(key, None) + + self.set_config(new_config) + + @job("Add Domain", priority="high") + def add_domain(self, domain): + domains = set(self.config.get("domains", [])) + domains.add(domain) + self.update_config({"domains": list(domains)}) + self.bench.setup_nginx() + self.bench.server.reload_nginx() + + @job("Remove Domain", priority="high") + def remove_domain(self, domain): + domains = set(self.config.get("domains", [])) + domains.discard(domain) + self.update_config({"domains": list(domains)}) + self.bench.setup_nginx() + self.bench.server.reload_nginx() + + def create_database_access_credentials(self, mode, mariadb_root_password): + database = self.database + user = f"{self.user}_{mode}" + password = self.bench.get_random_string(16) + privileges = { + "read_only": "SELECT", + "read_write": "ALL", + }.get(mode, "SELECT") + queries = [ + f"CREATE OR REPLACE USER '{user}'@'%' IDENTIFIED BY '{password}'", + f"GRANT {privileges} ON {database}.* TO '{user}'@'%'", + "FLUSH PRIVILEGES", + ] + for query in queries: + command = f'mysql -h {self.host} -uroot -p{mariadb_root_password} -e "{query}"' + self.execute(command) + return {"database": database, "user": user, "password": password} + + def revoke_database_access_credentials(self, user, mariadb_root_password): + if user == self.user: + # Do not revoke access for the main user + return {} + self.db_instance("root", mariadb_root_password).remove_user(user) + return {} + + @job("Create Database User", priority="high") + def create_database_user_job(self, user, password, mariadb_root_password): + return self.create_database_user(user, password, mariadb_root_password) + + @step("Create Database User") + def create_database_user(self, user, password, mariadb_root_password): + if user == self.user: + # Do not perform any operation for the main user + return {} + self.db_instance("root", mariadb_root_password).create_user(user, password) + return { + "database": self.database, + } + + @job("Remove Database User", priority="high") + def remove_database_user_job(self, user, mariadb_root_password): + return self.remove_database_user(user, mariadb_root_password) + + @step("Remove Database User") + def remove_database_user(self, user, mariadb_root_password): + if user == self.user: + # Do not perform any operation for the main user + return {} + self.db_instance("root", mariadb_root_password).remove_user(user) + return {} + + @job("Modify Database User Permissions", priority="high") + def modify_database_user_permissions_job(self, user, mode, permissions, mariadb_root_password): + return self.modify_database_user_permissions(user, mode, permissions, mariadb_root_password) + + @step("Modify Database User Permissions") + def modify_database_user_permissions(self, user, mode, permissions, mariadb_root_password): + if user == self.user: + # Do not perform any operation for the main user + return {} + self.db_instance("root", mariadb_root_password).modify_user_permissions(user, mode, permissions) + return {} + + @job("Setup Jerp", priority="high") + def setup_jerp(self, user, config): + self.create_user( + user["email"], + user["first_name"], + user["last_name"], + ) + self.update_jerp_config(config) + return {"sid": self.sid(user["email"])} + + @job("Restore Site Tables", priority="high") + def restore_site_tables_job(self, activate): + self.restore_site_tables() + if activate: + self.disable_maintenance_mode() + + @step("Restore Site Tables") + def restore_site_tables(self): + data = {"tables": {}} + for backup_file in os.listdir(self.backup_directory): + backup_file_path = os.path.join(self.backup_directory, backup_file) + output = self.execute( + "set -o pipefail && " + f"gunzip -c '{backup_file_path}' | " + f"mysql -h {self.host} -u {self.user} -p{self.password} " + f"{self.database}", + executable="/bin/bash", + ) + data["tables"][backup_file] = output + return data + + @step("Update Jerp Configuration") + def update_jerp_config(self, value): + config_file = os.path.join(self.directory, "journeys_config.json") + with open(config_file, "r") as f: + config = json.load(f) + + config.update(value) + + with open(config_file, "w") as f: + json.dump(config, f, indent=1, sort_keys=True) + + @step("Create User") + def create_user(self, email, first_name, last_name, password=None): + first_name = quote(first_name) + last_name = quote(last_name) + if password: + password = quote(password) + command = f"add-system-manager {email} --first-name {first_name} --last-name {last_name}" + if password: + command += f" --password {password}" + return self.bench_execute(command) + + @step("Complete Setup Wizard") + def complete_setup_wizard(self, data): + payload = {"args": data} + payload = quote(json.dumps(payload)) + command = f"execute jingrow.desk.page.setup_wizard.setup_wizard.setup_complete --kwargs {payload}" + return self.bench_execute(command) + + @job("Update Site Configuration", priority="high") + def update_config_job(self, value, remove): + self.update_config(value, remove) + + @job("Reset Site Usage", priority="high") + def reset_site_usage_job(self): + return self.reset_site_usage() + + @step("Reset Site Usage") + def reset_site_usage(self): + pattern = f"{self.database}|rate-limit-counter-[0-9]*" + keys_command = f"redis-cli --raw -p 13000 KEYS '{pattern}'" + keys = self.bench.docker_execute(keys_command) + data = {"keys": keys, "get": [], "delete": []} + for key in keys["output"].splitlines(): + get = self.bench.docker_execute(f"redis-cli -p 13000 GET '{key}'") + delete = self.bench.docker_execute(f"redis-cli -p 13000 DEL '{key}'") + data["get"].append(get) + data["delete"].append(delete) + return data + + @job("Update Saas Plan") + def update_saas_plan(self, plan): + self.update_plan(plan) + + @step("Update Saas Plan") + def update_plan(self, plan): + self.bench_execute(f"update-site-plan {plan}") + + @step("Backup Site") + def backup(self, with_files=False): + with_files = "--with-files" if with_files else "" + self.bench_execute(f"backup {with_files}") + return self.fetch_latest_backup(with_files=with_files) + + @step("Upload Site Backup to S3") + def upload_offsite_backup(self, backup_files, offsite): + import boto3 + + offsite_files = {} + bucket, auth, prefix = ( + offsite["bucket"], + offsite["auth"], + offsite["path"], + ) + region = auth.get("REGION") + + if region: + s3 = boto3.client( + "s3", + aws_access_key_id=auth["ACCESS_KEY"], + aws_secret_access_key=auth["SECRET_KEY"], + region_name=region, + ) + else: + s3 = boto3.client( + "s3", + aws_access_key_id=auth["ACCESS_KEY"], + aws_secret_access_key=auth["SECRET_KEY"], + ) + + for backup_file in backup_files.values(): + file_name = backup_file["file"].split(os.sep)[-1] + offsite_path = os.path.join(prefix, file_name) + offsite_files[file_name] = offsite_path + + with open(backup_file["path"], "rb") as data: + s3.upload_fileobj(data, bucket, offsite_path) + + return offsite_files + + @step("Enable Maintenance Mode") + def enable_maintenance_mode(self): + return self._enable_maintenance_mode() + + def _enable_maintenance_mode(self): + return self.bench_execute("set-maintenance-mode on") + + @step("Set Administrator Password") + def set_admin_password(self, password): + return self.bench_execute(f"set-admin-password {password}") + + @step("Wait for Enqueued Jobs") + def wait_till_ready(self): + WAIT_TIMEOUT = 600 + data = {"tries": []} + start = time.time() + is_ready = False + while (time.time() - start) < WAIT_TIMEOUT: + try: + output = self.bench_execute("ready-for-migration") + data["tries"].append(output) + is_ready = True + break + except Exception as e: + data["tries"].append(e.data) + time.sleep(1) + + if not is_ready: + raise Exception( + f"Site not ready for migration after {WAIT_TIMEOUT}s." + f" Site might have lot of jobs in queue. Try again later." + ) + + return data + + @step("Clear Backup Directory") + def clear_backup_directory(self): + if os.path.exists(self.backup_directory): + shutil.rmtree(self.backup_directory) + os.mkdir(self.backup_directory) + + @step("Backup Site Tables") + def tablewise_backup(self): + tables = self.tables + with open(self.previous_tables_file, "w") as ptf: + json.dump(tables, ptf, indent=4, sort_keys=True) + + data = {"tables": {}} + for table in tables: + backup_file = os.path.join(self.backup_directory, f"{table}.sql.gz") + output = self.execute( + "set -o pipefail && " + "mysqldump --single-transaction --quick --lock-tables=false " + f"-h {self.host} -u {self.user} -p{self.password} " + f"{self.database} '{table}' " + f" | gzip > '{backup_file}'", + executable="/bin/bash", + ) + data["tables"][table] = output + return data + + @step("Run App Specific Scripts") + def run_app_scripts(self, scripts: dict[str, str]): + for app_name in scripts: + script = scripts[app_name] + self.bench_execute("console", input=script) + + @step("Migrate Site") + def migrate(self, skip_search_index=False, skip_failing_patches=False): + return self._migrate( + skip_search_index, + skip_failing_patches, + ) + + def _migrate( + self, + skip_search_index: bool = False, + skip_failing_patches: bool = False, + ): + cmd = "migrate" + if skip_search_index: + cmd += " --skip-search-index" + if skip_failing_patches: + cmd += " --skip-failing" + return self.bench_execute(cmd) + + @step("Log Touched Tables") + def log_touched_tables(self): + try: + # It will either return the touched tables + # or try to return the previous tables + return self.tables_to_restore + except Exception: + # If both file is not there, assume no tables are touched + return [] + + @step("Build Search Index") + def build_search_index(self): + return self.bench_execute("build-search-index") + + @job("Clear Cache") + def clear_cache_job(self): + self.clear_cache() + self.clear_website_cache() + + @step("Clear Cache") + def clear_cache(self): + return self.bench_execute("clear-cache") + + @step("Clear Website Cache") + def clear_website_cache(self): + return self.bench_execute("clear-website-cache") + + @step("Uninstall Unavailable Apps") + def uninstall_unavailable_apps(self, apps_to_keep): + installed_apps = json.loads(self.bench_execute("execute jingrow.get_installed_apps")["output"]) + for app in installed_apps: + if app not in apps_to_keep: + self.bench_execute(f"remove-from-installed-apps '{app}'") + self.bench_execute("clear-cache") + + @step("Disable Maintenance Mode") + def disable_maintenance_mode(self): + self._disable_maintenance_mode() + + def _disable_maintenance_mode(self): + return self.bench_execute("set-maintenance-mode off") + + @step("Restore Touched Tables") + def restore_touched_tables(self): + return self._restore_touched_tables() + + def _restore_touched_tables(self): + data = {"restored": {}} + for table in self.tables_to_restore: + backup_file = os.path.join(self.backup_directory, f"{table}.sql.gz") + if os.path.exists(backup_file): + output = self.execute( + "set -o pipefail && " + f"gunzip -c '{backup_file}' | " + f"mysql -h {self.host} -u {self.user} -p{self.password} " + f"{self.database}", + executable="/bin/bash", + ) + data["restored"][table] = output + + dropped_tables = self.drop_new_tables() + data.update(dropped_tables) + return data + + def drop_new_tables(self): + new_tables = set(self.tables) - set(self.previous_tables) + data = {"dropped": {}} + for table in new_tables: + output = self.execute( + f"mysql -h {self.host} -u {self.user} -p{self.password} " + f"{self.database} -e 'DROP TABLE `{table}`'" + ) + data["dropped"][table] = output + return data + + @step("Pause Scheduler") + def pause_scheduler(self): + return self.bench_execute("scheduler pause") + + @step("Enable Scheduler") + def enable_scheduler(self): + return self.bench_execute("scheduler enable") + + @step("Resume Scheduler") + def resume_scheduler(self): + return self.bench_execute("scheduler resume") + + def fetch_site_status(self): + data = { + "scheduler": True, + "web": True, + "timestamp": str(datetime.now()), + } + try: + ping_url = f"https://{self.name}/api/action/ping" + data["web"] = requests.get(ping_url).status_code == 200 + except Exception: + data["web"] = False + + doctor = self.bench_execute("doctor") + if "inactive" in doctor["output"]: + data["scheduler"] = False + + return data + + def get_timezone(self): + return self.timezone + + def fetch_site_info(self): + return { + "config": self.config, + "timezone": self.get_timezone(), + "usage": self.get_usage(), + } + + def fetch_site_analytics(self): + if not os.path.exists(self.analytics_file): + return {} + with open(self.analytics_file) as af: + return json.load(af) + + def sid(self, user="Administrator"): + code = f"""from jingrow.auth import CookieManager, LoginManager +try: + from jingrow.utils import set_request +except ImportError: + from jingrow.tests import set_request + +user = '{user}' +set_request(path="/") +jingrow.local.cookie_manager = CookieManager() +jingrow.local.login_manager = LoginManager() +jingrow.local.request_ip = "127.0.0.1" +jingrow.local.login_manager.login_as(user) +print(">>>" + jingrow.session.sid + "<<<") +""" + + sid = None + if (output := self.bench_execute("console", input=code)["output"]) and ( + res := re.search(r">>>(.*)<<<", output) + ): + sid = res.group(1) + if ( + (not sid or sid == user or sid == "Guest") + and (output := self.bench_execute(f"browse --user {user}")["output"]) + and (res := re.search(r"\?sid=([a-z0-9]*)", output)) + ): + sid = res.group(1) + return sid + + @property + def timezone(self): + query = ( + f"select defvalue from {self.database}.tabDefaultValue where" + " defkey = 'time_zone' and parent = '__default'" + ) + try: + timezone = self.execute( + f"mysql -h {self.host} -u{self.database} -p{self.password} " + f'--connect-timeout 3 -sN -e "{query}"' + )["output"].strip() + except Exception: + timezone = "" + return timezone + + @property + def tables(self): + return self.execute( + "mysql --disable-column-names -B -e 'SHOW TABLES' " + f"-h {self.host} -u {self.user} -p{self.password} {self.database}" + )["output"].split("\n") + + @property + def touched_tables(self): + with open(self.touched_tables_file, "r") as f: + return json.load(f) + + @property + def previous_tables(self): + with open(self.previous_tables_file, "r") as f: + return json.load(f) + + @property + def tables_to_restore(self): + try: + return self.touched_tables + except Exception: + return self.previous_tables + + @job("Backup Site", priority="low") + def backup_job(self, with_files=False, offsite=None): + backup_files = self.backup(with_files) + uploaded_files = ( + self.upload_offsite_backup(backup_files, offsite) if (offsite and backup_files) else {} + ) + return {"backups": backup_files, "offsite": uploaded_files} + + @job("Optimize Tables") + def optimize_tables_job(self): + return self.optimize_tables() + + @step("Optimize Tables") + def optimize_tables(self): + tables = [row[0] for row in self.get_database_free_tables()] + for table in tables: + query = f"OPTIMIZE TABLE `{table}`" + self.execute( + f"mysql -sN -h {self.host} -u{self.user} -p{self.password} {self.database} -e '{query}'" + ) + + def fetch_latest_backup(self, with_files=True): + databases, publics, privates, site_configs = [], [], [], [] + backup_directory = os.path.join(self.directory, "private", "backups") + + for file in os.listdir(backup_directory): + path = os.path.join(backup_directory, file) + if file.endswith("database.sql.gz") or file.endswith("database-enc.sql.gz"): + databases.append(path) + elif file.endswith("private-files.tar") or file.endswith("private-files-enc.tar"): + privates.append(path) + elif file.endswith("files.tar") or file.endswith("files-enc.tar"): + publics.append(path) + elif file.endswith("site_config_backup.json") or file.endswith("site_config_backup-enc.json"): + site_configs.append(path) + + backups = { + "database": {"path": max(databases, key=os.path.getmtime)}, + "site_config": {"path": max(site_configs, key=os.path.getmtime)}, + } + + if with_files: + backups["private"] = {"path": max(privates, key=os.path.getmtime)} + backups["public"] = {"path": max(publics, key=os.path.getmtime)} + + for backup in backups.values(): + file = os.path.basename(backup["path"]) + backup["file"] = file + backup["size"] = os.stat(backup["path"]).st_size + backup["url"] = f"https://{self.name}/backups/{file}" + + return backups + + def get_usage(self): + """Returns Usage in bytes""" + backup_directory = os.path.join(self.directory, "private", "backups") + public_directory = os.path.join(self.directory, "public") + private_directory = os.path.join(self.directory, "private") + + return { + "database": b2mb(self.get_database_size()), + "database_free_tables": self.get_database_free_tables(), + "database_free": b2mb(self.get_database_free_size()), + "public": b2mb(get_size(public_directory)), + "private": b2mb(get_size(private_directory, ignore_dirs=["backups"])), + "backups": b2mb(get_size(backup_directory)), + } + + def get_analytics(self): + analytics = self.bench_execute("execute jingrow.utils.get_site_info")["output"] + return json.loads(analytics) + + def get_database_size(self): + # only specific to mysql/mariaDB. use a different query for postgres. + # or try using jingrow.db.get_database_size if possible + query = ( + "SELECT SUM(`data_length` + `index_length`)" + " FROM information_schema.tables" + f' WHERE `table_schema` = "{self.database}"' + " GROUP BY `table_schema`" + ) + command = f"mysql -sN -h {self.host} -u{self.user} -p{self.password} -e '{query}'" + database_size = self.execute(command).get("output") + + try: + return int(database_size) + except Exception: + return 0 + + def describe_database_table(self, doctype, columns=None): + if not columns: + columns = [] + + command = f"describe-database-table --doctype '{doctype}' " + for column in columns: + command += f"--column {column} " + try: + output = self.bench_execute(command)["output"] + return json.loads(output) + except Exception: + return {} + + @property + def apps(self): + return self.bench_execute("list-apps")["output"] + + @job("Add Database Index") + def add_database_index(self, doctype, columns=None): + if not columns: + return + self._add_database_index(doctype, columns) + + @step("Add Database Index With Bench Command") + def _add_database_index(self, doctype, columns): + command = f"add-database-index --doctype '{doctype}' " + for column in columns: + command += f"--column {column} " + + return self.bench_execute(command) + + def get_database_free_size(self): + query = ( + "SELECT SUM(`data_free`)" + " FROM information_schema.tables" + f' WHERE `table_schema` = "{self.database}"' + " GROUP BY `table_schema`" + ) + command = f"mysql -sN -h {self.host} -u{self.user} -p{self.password} -e '{query}'" + database_size = self.execute(command).get("output") + + try: + return int(database_size) + except Exception: + return 0 + + def get_database_free_tables(self): + try: + query = ( + "SELECT `table_name`," + " round((`data_free` / 1024 / 1024), 2)" + " FROM information_schema.tables" + f' WHERE `table_schema` = "{self.database}"' + " AND ((`data_free` / (`data_length` + `index_length`)) > 0.2" + " OR `data_free` > 100 * 1024 * 1024)" + ) + command = f"mysql -sN -h {self.host} -u{self.user} -p{self.password} -e '{query}'" + output = self.execute(command).get("output") + return [line.split("\t") for line in output.splitlines()] + except Exception: + return [] + + @job("Fetch Database Table Schema") + def fetch_database_table_schema(self, include_table_size: bool = True, include_index_info: bool = True): + database = self.db_instance() + tables = {} + table_schemas = self._fetch_database_table_schema(database, include_index_info=include_index_info) + for table_name in table_schemas: + tables[table_name] = { + "columns": table_schemas[table_name], + } + + if include_table_size: + table_sizes = self._fetch_database_table_sizes(database) + for table_name in table_sizes: + if table_name not in tables: + continue + tables[table_name]["size"] = table_sizes[table_name] + + return tables + + @step("Fetch Database Table Schema") + def _fetch_database_table_schema(self, database: Database, include_index_info: bool = True): + return database.fetch_database_table_schema(include_index_info=include_index_info) + + @step("Fetch Database Table Sizes") + def _fetch_database_table_sizes(self, database: Database): + return database.fetch_database_table_sizes() + + def run_sql_query(self, query: str, commit: bool = False, as_dict: bool = False): + db = self.db_instance() + success, output = db.execute_query(query, commit=commit, as_dict=as_dict) + response = {"success": success, "data": output} + if not success and hasattr(db, "last_executed_query"): + response["failed_query"] = db.last_executed_query + return response + + @job("Analyze Slow Queries") + def analyze_slow_queries_job(self, queries: list[dict], database_root_password: str) -> list[dict]: + return self.analyze_slow_queries(queries, database_root_password) + + @step("Analyze Slow Queries") + def analyze_slow_queries(self, queries: list[dict], database_root_password: str) -> list[dict]: + from agent.database_optimizer import OptimizeDatabaseQueries + + """ + Args: + queries (list[dict]): List of queries to analyze + { + "example": "", + "normalized": "", + } + """ + example_queries = [query["example"] for query in queries] + optimizer = OptimizeDatabaseQueries(self, example_queries, database_root_password) + analysis = optimizer.analyze() + analysis_summary = {} # map[query -> list[index_info_dict] + for query, indexes in analysis.items(): + analysis_summary[query] = [index.to_dict() for index in indexes] + + result = [] # list[{example, normalized, suggested_indexes}] + for query in queries: + query["suggested_indexes"] = analysis_summary.get(query["example"], []) + result.append(query) + return { + "result": result, + } + + def fetch_summarized_database_performance_report(self, mariadb_root_password: str): + database = self.db_instance(username="root", password=mariadb_root_password) + return database.fetch_summarized_performance_report() + + def fetch_database_process_list(self, mariadb_root_password: str): + return self.db_instance(username="root", password=mariadb_root_password).fetch_process_list() + + def kill_database_process(self, pid: str, mariadb_root_password: str): + return self.db_instance(username="root", password=mariadb_root_password).kill_process(pid) + + def db_instance(self, username: str | None = None, password: str | None = None) -> Database: + if not username: + username = self.user + if not password: + password = self.password + return Database(self.host, 3306, username, password, self.database) + + @property + def job_record(self): + return self.bench.server.job_record + + @property + def step_record(self): + return self.bench.server.step_record + + @step_record.setter + def step_record(self, value): + self.bench.server.step_record = value + + def generate_theme_files(self): + self.bench_execute( + "execute jingrow.website.doctype.website_theme.website_theme.generate_theme_files_if_not_exist" + ) diff --git a/agent/templates/prometheus/domains.yml b/agent/templates/prometheus/domains.yml index 9b242a0..bdf8848 100644 --- a/agent/templates/prometheus/domains.yml +++ b/agent/templates/prometheus/domains.yml @@ -1,8 +1,8 @@ -- targets: - -##- for domain in domains ## -- targets: - - https://{{ domain.name }}/api/method/ping - labels: - site: "{{ domain.site }}" -##- endfor ## +- targets: + +##- for domain in domains ## +- targets: + - https://{{ domain.name }}/api/action/ping + labels: + site: "{{ domain.site }}" +##- endfor ## diff --git a/agent/templates/prometheus/sites.yml b/agent/templates/prometheus/sites.yml index 2b726d5..e61b90f 100644 --- a/agent/templates/prometheus/sites.yml +++ b/agent/templates/prometheus/sites.yml @@ -1,13 +1,13 @@ -- targets: - -## for bench in benches ## -- targets: - ##- for site in bench.sites ## - - https://{{ site }}/api/method/ping - ##- endfor ## - labels: - cluster: "{{ bench.cluster }}" - server: "{{ bench.server }}" - group: "{{ bench.group }}" - bench: "{{ bench.name }}" -## endfor ## +- targets: + +## for bench in benches ## +- targets: + ##- for site in bench.sites ## + - https://{{ site }}/api/action/ping + ##- endfor ## + labels: + cluster: "{{ bench.cluster }}" + server: "{{ bench.server }}" + group: "{{ bench.group }}" + bench: "{{ bench.name }}" +## endfor ##