# Copyright (c) 2020, JINGROW # For license information, please see license.txt from __future__ import annotations import re from base64 import b64decode from datetime import datetime, timedelta from pathlib import Path from typing import TYPE_CHECKING import jingrow import jwt import requests from jcloude.utils import get_current_team, log_error if TYPE_CHECKING: from jcloude.jcloude.doctype.github_webhook_log.github_webhook_log import GitHubWebhookLog @jingrow.whitelist(allow_guest=True, xss_safe=True) def hook(*args, **kwargs): user = jingrow.session.user # set user to Administrator, to not have to do ignore_permissions everywhere jingrow.set_user("Administrator") headers = jingrow.request.headers pg: "GitHubWebhookLog" = jingrow.get_pg( { "doctype": "GitHub Webhook Log", "name": headers.get("X-Github-Delivery"), "event": headers.get("X-Github-Event"), "signature": headers.get("X-Hub-Signature").split("=")[1], "payload": jingrow.request.get_data().decode(), } ) try: pg.insert() jingrow.db.commit() except Exception as e: jingrow.set_user(user) log_error("GitHub Webhook Insert Error", args=args, kwargs=kwargs) raise Exception from e try: pg.handle_events() except Exception as e: jingrow.set_user(user) log_error("GitHub Webhook Error", pg=pg) raise Exception from e def get_jwt_token(): key = jingrow.db.get_single_value("Jcloude Settings", "github_app_private_key") app_id = jingrow.db.get_single_value("Jcloude Settings", "github_app_id") now = datetime.now() expiry = now + timedelta(minutes=9) payload = {"iat": int(now.timestamp()), "exp": int(expiry.timestamp()), "iss": app_id} return jwt.encode(payload, key.encode(), algorithm="RS256") def get_access_token(installation_id: str | None = None): if not installation_id: return jingrow.db.get_value( "Jcloude Settings", None, "github_access_token", ) token = get_jwt_token() headers = { "Authorization": f"Bearer {token}", "Accept": "application/vnd.github.machine-man-preview+json", } response = requests.post( f"http://git.jingrow.com/api/v1/app/installations/{installation_id}/access_tokens", headers=headers, ).json() return response.get("token") @jingrow.whitelist() def clear_token_and_get_installation_url(): clear_current_team_access_token() public_link = jingrow.db.get_single_value("Jcloude Settings", "github_app_public_link") return f"{public_link}/installations/new" def clear_current_team_access_token(): team = get_current_team() jingrow.db.set_value("Team", team, "github_access_token", "") # clear access token @jingrow.whitelist() def options(): team = get_current_team() token = jingrow.db.get_value("Team", team, "github_access_token") public_link = jingrow.db.get_single_value("Jcloude Settings", "github_app_public_link") return { "authorized": bool(token), "installation_url": f"{public_link}/installations/new", "installations": installations(token) if token else [], } def installations(token): headers = { "Authorization": f"token {token}", "Accept": "application/vnd.github.machine-man-preview+json", } response = requests.get("http://git.jingrow.com/api/v1/user/installations", headers=headers) data = response.json() installations = [] if response.ok: for installation in data["installations"]: installations.append( { "id": installation["id"], "login": installation["account"]["login"], "url": installation["html_url"], "image": installation["account"]["avatar_url"], "repos": repositories(installation["id"], token), } ) else: jingrow.throw(data.get("message") or "An error Occurred") return installations def repositories(installation, token): headers = { "Authorization": f"token {token}", "Accept": "application/vnd.github.machine-man-preview+json", } repositories = [] current_page, is_last_page = 1, False while not is_last_page: response = requests.get( f"http://git.jingrow.com/api/v1/user/installations/{installation}/repositories", params={"per_page": 100, "page": current_page}, headers=headers, ) if len(response.json().get("repositories", [])) < 100: is_last_page = True for repository in response.json().get("repositories", []): repositories.append( { "id": repository["id"], "name": repository["name"], "private": repository["private"], "url": repository["html_url"], "default_branch": repository["default_branch"], } ) current_page += 1 return repositories @jingrow.whitelist() def repository(owner, name, installation=None): token = "" if not installation: token = jingrow.db.get_value("Jcloude Settings", "github_access_token") else: token = get_access_token(installation) headers = { "Authorization": f"token {token}", } repo = requests.get(f"http://git.jingrow.com/api/v1/repos/{owner}/{name}", headers=headers).json() current_page, is_last_page = 1, False branches = [] while not is_last_page: response = requests.get( f"http://git.jingrow.com/api/v1/repos/{owner}/{name}/branches", params={"per_page": 100, "page": current_page}, headers=headers, ) if response.ok: branches.extend(response.json()) else: break if len(response.json()) < 100: is_last_page = True current_page += 1 repo["branches"] = branches return repo @jingrow.whitelist() def app(owner, repository, branch, installation=None): headers = get_auth_headers(installation) response = requests.get( f"http://git.jingrow.com/api/v1/repos/{owner}/{repository}/branches/{branch}", headers=headers, ) if not response.ok: jingrow.throw(f"Could not fetch branch ({branch}) info for repo {owner}/{repository}") branch_info = response.json() sha = branch_info["commit"]["commit"]["tree"]["sha"] contents = requests.get( f"http://git.jingrow.com/api/v1/repos/{owner}/{repository}/git/trees/{sha}", params={"recursive": True}, headers=headers, ).json() tree = _generate_files_tree(contents["tree"]) py_setup_files = ["setup.py", "setup.cfg", "pyproject.toml"] if not any(x in tree for x in py_setup_files): setup_filenames = jingrow.bold(" or ".join(py_setup_files)) reason = f"Files {setup_filenames} do not exist in app directory." jingrow.throw(f"Not a valid Jingrow App! {reason}") app_name, title = _get_app_name_and_title_from_hooks( owner, repository, branch_info, headers, tree, ) return {"name": app_name, "title": title} @jingrow.whitelist() def branches(owner, name, installation=None): if installation: token = get_access_token(installation) else: token = jingrow.get_value("Jcloude Settings", None, "github_access_token") if token: headers = { "Authorization": f"token {token}", } else: headers = {} response = requests.get( f"http://git.jingrow.com/api/v1/repos/{owner}/{name}/branches", params={"per_page": 100}, headers=headers, ) if response.ok: return response.json() jingrow.throw("Error fetching branch list from GitHub: " + response.text) return None def get_auth_headers(installation_id: str | None = None) -> "dict[str, str]": if token := get_access_token(installation_id): return {"Authorization": f"token {token}"} return {} def _get_app_name_and_title_from_hooks( owner, repository, branch_info, headers, tree, ) -> "tuple[str, str]": reason_for_invalidation = f"Files {jingrow.bold('hooks.py or patches.txt')} not found." for directory, files in tree.items(): if not files: continue if ("hooks.py" not in files) or ("patches.txt" not in files): reason_for_invalidation = ( f"Files {jingrow.bold('hooks.py or patches.txt')} does not exist" f" inside {directory}/{directory} directory." ) continue hooks = requests.get( f"http://git.jingrow.com/api/v1/repos/{owner}/{repository}/contents/{directory}/hooks.py", params={"ref": branch_info["name"]}, headers=headers, ).json() if "content" not in hooks: reason_for_invalidation = f"File {jingrow.bold('hooks.py')} could not be fetched." continue content = b64decode(hooks["content"]).decode() match = re.search(r"""app_title = ["'](.*)["']""", content) if match: return directory, match.group(1) reason_for_invalidation = ( f"File {jingrow.bold('hooks.py')} does not have {jingrow.bold('app_title')} defined." ) break jingrow.throw(f"Not a valid Jingrow App! {reason_for_invalidation}") return None def _generate_files_tree(files): children = {} for file in files: path = Path(file["path"]) children.setdefault(str(path.parent), []).append( jingrow._dict({"name": str(path.name), "path": file["path"]}) ) return _construct_tree({}, children["."], children) def _construct_tree(tree, children, children_map): for file in children: if file.path in children_map: tree[file.name] = _construct_tree({}, children_map[file.path], children_map) else: tree[file.name] = None return tree