330 lines
8.7 KiB
Python
330 lines
8.7 KiB
Python
# Copyright (c) 2020, JINGROW
|
|
# For license information, please see license.txt
|
|
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
from base64 import b64decode
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import TYPE_CHECKING
|
|
|
|
import jingrow
|
|
import jwt
|
|
import requests
|
|
|
|
from jcloude.utils import get_current_team, log_error
|
|
|
|
if TYPE_CHECKING:
|
|
|
|
from jcloude.jcloude.pagetype.github_webhook_log.github_webhook_log import GitHubWebhookLog
|
|
|
|
|
|
@jingrow.whitelist(allow_guest=True, xss_safe=True)
|
|
def hook(*args, **kwargs):
|
|
user = jingrow.session.user
|
|
# set user to Administrator, to not have to do ignore_permissions everywhere
|
|
jingrow.set_user("Administrator")
|
|
headers = jingrow.request.headers
|
|
pg: "GitHubWebhookLog" = jingrow.get_pg(
|
|
{
|
|
"pagetype": "GitHub Webhook Log",
|
|
"name": headers.get("X-Github-Delivery"),
|
|
"event": headers.get("X-Github-Event"),
|
|
"signature": headers.get("X-Hub-Signature").split("=")[1],
|
|
"payload": jingrow.request.get_data().decode(),
|
|
}
|
|
)
|
|
|
|
try:
|
|
pg.insert()
|
|
jingrow.db.commit()
|
|
except Exception as e:
|
|
jingrow.set_user(user)
|
|
log_error("GitHub Webhook Insert Error", args=args, kwargs=kwargs)
|
|
raise Exception from e
|
|
|
|
try:
|
|
pg.handle_events()
|
|
except Exception as e:
|
|
jingrow.set_user(user)
|
|
log_error("GitHub Webhook Error", pg=pg)
|
|
raise Exception from e
|
|
|
|
|
|
def get_jwt_token():
|
|
key = jingrow.db.get_single_value("Jcloude Settings", "github_app_private_key")
|
|
app_id = jingrow.db.get_single_value("Jcloude Settings", "github_app_id")
|
|
now = datetime.now()
|
|
expiry = now + timedelta(minutes=9)
|
|
payload = {"iat": int(now.timestamp()), "exp": int(expiry.timestamp()), "iss": app_id}
|
|
return jwt.encode(payload, key.encode(), algorithm="RS256")
|
|
|
|
|
|
def get_access_token(installation_id: str | None = None):
|
|
if not installation_id:
|
|
return jingrow.db.get_value(
|
|
"Jcloude Settings",
|
|
None,
|
|
"github_access_token",
|
|
)
|
|
|
|
token = get_jwt_token()
|
|
headers = {
|
|
"Authorization": f"Bearer {token}",
|
|
"Accept": "application/vnd.github.machine-man-preview+json",
|
|
}
|
|
response = requests.post(
|
|
f"http://git.jingrow.com/api/v1/app/installations/{installation_id}/access_tokens",
|
|
headers=headers,
|
|
).json()
|
|
return response.get("token")
|
|
|
|
|
|
@jingrow.whitelist()
|
|
def clear_token_and_get_installation_url():
|
|
clear_current_team_access_token()
|
|
public_link = jingrow.db.get_single_value("Jcloude Settings", "github_app_public_link")
|
|
return f"{public_link}/installations/new"
|
|
|
|
|
|
def clear_current_team_access_token():
|
|
team = get_current_team()
|
|
jingrow.db.set_value("Team", team, "github_access_token", "") # clear access token
|
|
|
|
|
|
@jingrow.whitelist()
|
|
def options():
|
|
team = get_current_team()
|
|
token = jingrow.db.get_value("Team", team, "github_access_token")
|
|
public_link = jingrow.db.get_single_value("Jcloude Settings", "github_app_public_link")
|
|
|
|
return {
|
|
"authorized": bool(token),
|
|
"installation_url": f"{public_link}/installations/new",
|
|
"installations": installations(token) if token else [],
|
|
}
|
|
|
|
|
|
def installations(token):
|
|
headers = {
|
|
"Authorization": f"token {token}",
|
|
"Accept": "application/vnd.github.machine-man-preview+json",
|
|
}
|
|
response = requests.get("http://git.jingrow.com/api/v1/user/installations", headers=headers)
|
|
data = response.json()
|
|
installations = []
|
|
if response.ok:
|
|
for installation in data["installations"]:
|
|
installations.append(
|
|
{
|
|
"id": installation["id"],
|
|
"login": installation["account"]["login"],
|
|
"url": installation["html_url"],
|
|
"image": installation["account"]["avatar_url"],
|
|
"repos": repositories(installation["id"], token),
|
|
}
|
|
)
|
|
else:
|
|
jingrow.throw(data.get("message") or "An error Occurred")
|
|
|
|
return installations
|
|
|
|
|
|
def repositories(installation, token):
|
|
headers = {
|
|
"Authorization": f"token {token}",
|
|
"Accept": "application/vnd.github.machine-man-preview+json",
|
|
}
|
|
repositories = []
|
|
current_page, is_last_page = 1, False
|
|
while not is_last_page:
|
|
response = requests.get(
|
|
f"http://git.jingrow.com/api/v1/user/installations/{installation}/repositories",
|
|
params={"per_page": 100, "page": current_page},
|
|
headers=headers,
|
|
)
|
|
if len(response.json().get("repositories", [])) < 100:
|
|
is_last_page = True
|
|
|
|
for repository in response.json().get("repositories", []):
|
|
repositories.append(
|
|
{
|
|
"id": repository["id"],
|
|
"name": repository["name"],
|
|
"private": repository["private"],
|
|
"url": repository["html_url"],
|
|
"default_branch": repository["default_branch"],
|
|
}
|
|
)
|
|
current_page += 1
|
|
|
|
return repositories
|
|
|
|
|
|
@jingrow.whitelist()
|
|
def repository(owner, name, installation=None):
|
|
token = ""
|
|
if not installation:
|
|
token = jingrow.db.get_value("Jcloude Settings", "github_access_token")
|
|
else:
|
|
token = get_access_token(installation)
|
|
headers = {
|
|
"Authorization": f"token {token}",
|
|
}
|
|
repo = requests.get(f"http://git.jingrow.com/api/v1/repos/{owner}/{name}", headers=headers).json()
|
|
|
|
current_page, is_last_page = 1, False
|
|
branches = []
|
|
while not is_last_page:
|
|
response = requests.get(
|
|
f"http://git.jingrow.com/api/v1/repos/{owner}/{name}/branches",
|
|
params={"per_page": 100, "page": current_page},
|
|
headers=headers,
|
|
)
|
|
if response.ok:
|
|
branches.extend(response.json())
|
|
else:
|
|
break
|
|
|
|
if len(response.json()) < 100:
|
|
is_last_page = True
|
|
|
|
current_page += 1
|
|
|
|
repo["branches"] = branches
|
|
|
|
return repo
|
|
|
|
|
|
@jingrow.whitelist()
|
|
def app(owner, repository, branch, installation=None):
|
|
headers = get_auth_headers(installation)
|
|
response = requests.get(
|
|
f"http://git.jingrow.com/api/v1/repos/{owner}/{repository}/branches/{branch}",
|
|
headers=headers,
|
|
)
|
|
|
|
if not response.ok:
|
|
jingrow.throw(f"Could not fetch branch ({branch}) info for repo {owner}/{repository}")
|
|
|
|
branch_info = response.json()
|
|
sha = branch_info["commit"]["commit"]["tree"]["sha"]
|
|
contents = requests.get(
|
|
f"http://git.jingrow.com/api/v1/repos/{owner}/{repository}/git/trees/{sha}",
|
|
params={"recursive": True},
|
|
headers=headers,
|
|
).json()
|
|
|
|
tree = _generate_files_tree(contents["tree"])
|
|
py_setup_files = ["setup.py", "setup.cfg", "pyproject.toml"]
|
|
|
|
if not any(x in tree for x in py_setup_files):
|
|
setup_filenames = jingrow.bold(" or ".join(py_setup_files))
|
|
reason = f"Files {setup_filenames} do not exist in app directory."
|
|
jingrow.throw(f"Not a valid Jingrow App! {reason}")
|
|
|
|
app_name, title = _get_app_name_and_title_from_hooks(
|
|
owner,
|
|
repository,
|
|
branch_info,
|
|
headers,
|
|
tree,
|
|
)
|
|
|
|
return {"name": app_name, "title": title}
|
|
|
|
|
|
@jingrow.whitelist()
|
|
def branches(owner, name, installation=None):
|
|
if installation:
|
|
token = get_access_token(installation)
|
|
else:
|
|
token = jingrow.get_value("Jcloude Settings", None, "github_access_token")
|
|
|
|
if token:
|
|
headers = {
|
|
"Authorization": f"token {token}",
|
|
}
|
|
else:
|
|
headers = {}
|
|
|
|
response = requests.get(
|
|
f"http://git.jingrow.com/api/v1/repos/{owner}/{name}/branches",
|
|
params={"per_page": 100},
|
|
headers=headers,
|
|
)
|
|
|
|
if response.ok:
|
|
return response.json()
|
|
jingrow.throw("Error fetching branch list from GitHub: " + response.text)
|
|
return None
|
|
|
|
|
|
def get_auth_headers(installation_id: str | None = None) -> "dict[str, str]":
|
|
if token := get_access_token(installation_id):
|
|
return {"Authorization": f"token {token}"}
|
|
return {}
|
|
|
|
|
|
def _get_app_name_and_title_from_hooks(
|
|
owner,
|
|
repository,
|
|
branch_info,
|
|
headers,
|
|
tree,
|
|
) -> "tuple[str, str]":
|
|
reason_for_invalidation = f"Files {jingrow.bold('hooks.py or patches.txt')} not found."
|
|
for directory, files in tree.items():
|
|
if not files:
|
|
continue
|
|
|
|
if ("hooks.py" not in files) or ("patches.txt" not in files):
|
|
reason_for_invalidation = (
|
|
f"Files {jingrow.bold('hooks.py or patches.txt')} does not exist"
|
|
f" inside {directory}/{directory} directory."
|
|
)
|
|
continue
|
|
|
|
hooks = requests.get(
|
|
f"http://git.jingrow.com/api/v1/repos/{owner}/{repository}/contents/{directory}/hooks.py",
|
|
params={"ref": branch_info["name"]},
|
|
headers=headers,
|
|
).json()
|
|
if "content" not in hooks:
|
|
reason_for_invalidation = f"File {jingrow.bold('hooks.py')} could not be fetched."
|
|
continue
|
|
|
|
content = b64decode(hooks["content"]).decode()
|
|
match = re.search(r"""app_title = ["'](.*)["']""", content)
|
|
|
|
if match:
|
|
return directory, match.group(1)
|
|
|
|
reason_for_invalidation = (
|
|
f"File {jingrow.bold('hooks.py')} does not have {jingrow.bold('app_title')} defined."
|
|
)
|
|
break
|
|
|
|
jingrow.throw(f"Not a valid Jingrow App! {reason_for_invalidation}")
|
|
return None
|
|
|
|
|
|
def _generate_files_tree(files):
|
|
children = {}
|
|
for file in files:
|
|
path = Path(file["path"])
|
|
children.setdefault(str(path.parent), []).append(
|
|
jingrow._dict({"name": str(path.name), "path": file["path"]})
|
|
)
|
|
return _construct_tree({}, children["."], children)
|
|
|
|
|
|
def _construct_tree(tree, children, children_map):
|
|
for file in children:
|
|
if file.path in children_map:
|
|
tree[file.name] = _construct_tree({}, children_map[file.path], children_map)
|
|
else:
|
|
tree[file.name] = None
|
|
return tree
|