jcloud/jcloud/metrics.py
2025-04-12 17:39:38 +08:00

82 lines
2.4 KiB
Python

# Copyright (c) 2024, JINGROW
# For license information, please see license.txt
import jingrow
from jingrow.utils import cint
from prometheus_client import (
CollectorRegistry,
Gauge,
generate_latest,
)
from werkzeug.wrappers import Response
class MetricsRenderer:
def __init__(self, path, status_code=None):
self.path = path
self.registry = CollectorRegistry(auto_describe=True)
def get_status(self, metric, pagetype, status_field="status", filters=None):
if filters is None:
filters = {}
c = Gauge(metric, "", [status_field], registry=self.registry)
rows = jingrow.get_all(
pagetype,
fields=[status_field, "count(*) as count"],
filters=filters,
group_by=status_field,
order_by=f"{status_field} asc",
ignore_ifnull=True,
)
for row in rows:
c.labels(row[status_field]).set(row.count)
def metrics(self):
suspended_builds = Gauge(
"jcloud_builds_suspended", "Are docker builds suspended", registry=self.registry
)
suspended_builds.set(
cint(jingrow.db.get_value("Jcloud Settings", None, "suspend_builds"))
)
self.get_status(
"jcloud_deploy_candidate_total",
"Deploy Candidate",
filters={"status": ("!=", "Success")},
)
self.get_status("jcloud_site_total", "Site", filters={"status": ("!=", "Archived")})
self.get_status("jcloud_bench_total", "Bench", filters={"status": ("!=", "Archived")})
self.get_status("jcloud_server_total", "Server")
self.get_status("jcloud_database_server_total", "Database Server")
self.get_status("jcloud_virtual_machine_total", "Virtual Machine")
self.get_status(
"jcloud_site_backup_total", "Site Backup", filters={"status": ("!=", "Success")}
)
self.get_status(
"jcloud_site_update_total", "Site Update", filters={"status": ("!=", "Success")}
)
self.get_status("jcloud_site_migration_total", "Site Migration")
self.get_status("jcloud_site_upgrade_total", "Version Upgrade")
self.get_status("jcloud_jcloud_job_total", "Jcloud Job")
self.get_status(
"jcloud_ansible_play_total", "Ansible Play", filters={"status": ("!=", "Success")}
)
self.get_status(
"jcloud_agent_job_total", "Agent Job", filters={"status": ("!=", "Success")}
)
return generate_latest(self.registry).decode("utf-8")
def can_render(self):
if self.path in ("metrics",):
return True
def render(self):
response = Response()
response.mimetype = "text"
response.data = self.metrics()
return response