jcloud/jcloud/api/webhook.py
2025-04-12 17:39:38 +08:00

84 lines
2.2 KiB
Python

# Copyright (c) 2022, JINGROW
# For license information, please see license.txt
from __future__ import annotations
import json
import jingrow
from jcloud.jcloud.pagetype.jcloud_role.jcloud_role import check_role_permissions
@jingrow.whitelist(allow_guest=True)
def available_events():
return jingrow.get_all(
"Jcloud Webhook Event",
fields=["name", "description"],
filters={"enabled": 1},
order_by="creation desc",
)
@jingrow.whitelist()
def add(endpoint: str, secret: str, events: list[str]):
check_role_permissions("Jcloud Webhook")
pg = jingrow.new_pg("Jcloud Webhook")
pg.endpoint = endpoint
pg.secret = secret
pg.team = jingrow.local.team().name
for event in events:
pg.append("events", {"event": event})
pg.save()
@jingrow.whitelist()
def update(name: str, endpoint: str, secret: str, events: list[str]):
check_role_permissions("Jcloud Webhook")
pg = jingrow.get_pg("Jcloud Webhook", name)
pg.endpoint = endpoint
if secret:
pg.secret = secret
# reset event list
pg.events = []
# add new events
for event in events:
pg.append("events", {"event": event})
pg.save()
@jingrow.whitelist()
def attempts(webhook: str):
check_role_permissions("Jcloud Webhook Log")
pg = jingrow.get_pg("Jcloud Webhook", webhook)
pg.has_permission("read")
JcloudWebhookAttempt = jingrow.qb.PageType("Jcloud Webhook Attempt")
JcloudWebhookLog = jingrow.qb.PageType("Jcloud Webhook Log")
query = (
jingrow.qb.from_(JcloudWebhookAttempt)
.select(
JcloudWebhookAttempt.name,
JcloudWebhookAttempt.endpoint,
JcloudWebhookLog.event,
JcloudWebhookAttempt.status,
JcloudWebhookAttempt.response_status_code,
JcloudWebhookAttempt.timestamp,
)
.left_join(JcloudWebhookLog)
.on(JcloudWebhookAttempt.parent == JcloudWebhookLog.name)
.where(JcloudWebhookAttempt.webhook == pg.name)
.orderby(JcloudWebhookAttempt.timestamp, order=jingrow.qb.desc)
)
return query.run(as_dict=1)
@jingrow.whitelist()
def attempt(name: str):
check_role_permissions("Jcloud Webhook Attempt")
pg = jingrow.get_pg("Jcloud Webhook Attempt", name)
pg.has_permission("read")
data = pg.as_dict()
data.request_payload = json.loads(jingrow.get_value("Jcloud Webhook Log", pg.parent, "request_payload"))
return data