jcloud/jcloud/runner.py
2025-04-12 17:39:38 +08:00

260 lines
8.0 KiB
Python

import json
import jingrow
import wrapt
from ansible import constants, context
from ansible.executor.playbook_executor import PlaybookExecutor
from ansible.executor.task_executor import TaskExecutor
from ansible.inventory.manager import InventoryManager
from ansible.module_utils.common.collections import ImmutableDict
from ansible.parsing.dataloader import DataLoader
from ansible.playbook import Playbook
from ansible.plugins.action.async_status import ActionModule
from ansible.plugins.callback import CallbackBase
from ansible.utils.display import Display
from ansible.vars.manager import VariableManager
from jingrow.utils import cstr
from jingrow.utils import now_datetime as now
from pymysql.err import InterfaceError
def reconnect_on_failure():
@wrapt.decorator
def wrapper(wrapped, instance, args, kwargs):
try:
return wrapped(*args, **kwargs)
except InterfaceError:
jingrow.db.connect()
return wrapped(*args, **kwargs)
return wrapper
class AnsibleCallback(CallbackBase):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@reconnect_on_failure()
def process_task_success(self, result):
result, action = jingrow._dict(result._result), result._task.action
if action == "user":
server_type, server = jingrow.db.get_value("Ansible Play", self.play, ["server_type", "server"])
server = jingrow.get_pg(server_type, server)
if result.name == "root":
server.root_public_key = result.ssh_public_key
elif result.name == "jingrow":
server.jingrow_public_key = result.ssh_public_key
server.save()
def v2_runner_on_ok(self, result, *args, **kwargs):
self.update_task("Success", result)
self.process_task_success(result)
def v2_runner_on_failed(self, result, *args, **kwargs):
self.update_task("Failure", result)
def v2_runner_on_skipped(self, result):
self.update_task("Skipped", result)
def v2_runner_on_unreachable(self, result):
self.update_task("Unreachable", result)
def v2_playbook_on_task_start(self, task, is_conditional):
self.update_task("Running", None, task)
def v2_playbook_on_start(self, playbook):
self.update_play("Running")
def v2_playbook_on_stats(self, stats):
self.update_play(None, stats)
@reconnect_on_failure()
def update_play(self, status=None, stats=None):
play = jingrow.get_pg("Ansible Play", self.play)
if stats:
# Assume we're running on one host
host = next(iter(stats.processed.keys()))
play.update(stats.summarize(host))
if play.failures or play.unreachable:
play.status = "Failure"
else:
play.status = "Success"
play.end = now()
play.duration = play.end - play.start
else:
play.status = status
play.start = now()
play.save()
jingrow.db.commit()
@reconnect_on_failure()
def update_task(self, status, result=None, task=None):
if result:
if not result._task._role:
return
task_name, result = self.parse_result(result)
else:
if not task._role:
return
task_name = self.tasks[task._role.get_name()][task.name]
task = jingrow.get_pg("Ansible Task", task_name)
task.status = status
if result:
task.output = result.stdout
task.error = result.stderr
task.exception = result.msg
# Reduce clutter be removing keys already shown elsewhere
for key in ("stdout", "stdout_lines", "stderr", "stderr_lines", "msg"):
result.pop(key, None)
task.result = json.dumps(result, indent=4)
task.end = now()
task.duration = task.end - task.start
else:
task.start = now()
task.save()
self.publish_play_progress(task.name)
jingrow.db.commit()
def publish_play_progress(self, task):
jingrow.publish_realtime(
"ansible_play_progress",
{"progress": self.task_list.index(task), "total": len(self.task_list), "play": self.play},
pagetype="Ansible Play",
docname=self.play,
user=jingrow.session.user,
)
def parse_result(self, result):
task = result._task.name
role = result._task._role.get_name()
return self.tasks[role][task], jingrow._dict(result._result)
@reconnect_on_failure()
def on_async_start(self, role, task, job_id):
task_name = self.tasks[role][task]
task = jingrow.get_pg("Ansible Task", task_name)
task.job_id = job_id
task.save()
jingrow.db.commit()
@reconnect_on_failure()
def on_async_poll(self, result):
job_id = result["ansible_job_id"]
task_name = jingrow.get_value("Ansible Task", {"play": self.play, "job_id": job_id}, "name")
task = jingrow.get_pg("Ansible Task", task_name)
task.result = json.dumps(result, indent=4)
task.duration = now() - task.start
task.save()
jingrow.db.commit()
class Ansible:
def __init__(self, server, playbook, user="root", variables=None, port=22):
self.patch()
self.server = server
self.playbook = playbook
self.playbook_path = jingrow.get_app_path("jcloud", "playbooks", self.playbook)
self.host = f"{server.ip}:{port}"
self.variables = variables or {}
constants.HOST_KEY_CHECKING = False
context.CLIARGS = ImmutableDict(
become_method="sudo",
check=False,
connection="ssh",
# This is the only way to pass variables that preserves newlines
extra_vars=[f"{cstr(key)}='{cstr(value)}'" for key, value in self.variables.items()],
remote_user=user,
start_at_task=None,
syntax=False,
verbosity=1,
)
self.loader = DataLoader()
self.passwords = dict({})
self.sources = f"{self.host},"
self.inventory = InventoryManager(loader=self.loader, sources=self.sources)
self.variable_manager = VariableManager(loader=self.loader, inventory=self.inventory)
self.callback = AnsibleCallback()
self.display = Display()
self.display.verbosity = 1
self.create_ansible_play()
def patch(self):
def modified_action_module_run(*args, **kwargs):
result = self.action_module_run(*args, **kwargs)
self.callback.on_async_poll(result)
return result
def modified_poll_async_result(executor, result, templar, task_vars=None):
job_id = result["ansible_job_id"]
task = executor._task
self.callback.on_async_start(task._role.get_name(), task.name, job_id)
return self._poll_async_result(executor, result, templar, task_vars=task_vars)
if ActionModule.run.__module__ != "jcloud.runner":
self.action_module_run = ActionModule.run
ActionModule.run = modified_action_module_run
if TaskExecutor.run.__module__ != "jcloud.runner":
self._poll_async_result = TaskExecutor._poll_async_result
TaskExecutor._poll_async_result = modified_poll_async_result
def unpatch(self):
TaskExecutor._poll_async_result = self._poll_async_result
ActionModule.run = self.action_module_run
def run(self):
self.executor = PlaybookExecutor(
playbooks=[self.playbook_path],
inventory=self.inventory,
variable_manager=self.variable_manager,
loader=self.loader,
passwords=self.passwords,
)
# Use AnsibleCallback so we can receive updates for tasks execution
self.executor._tqm._stdout_callback = self.callback
self.callback.play = self.play
self.callback.tasks = self.tasks
self.callback.task_list = self.task_list
self.executor.run()
self.unpatch()
return jingrow.get_pg("Ansible Play", self.play)
def create_ansible_play(self):
# Parse the playbook and create Ansible Tasks so we can show how many tasks are pending
playbook = Playbook.load(
self.playbook_path, variable_manager=self.variable_manager, loader=self.loader
)
# Assume we only have one play per playbook
play = playbook.get_plays()[0]
play_pg = jingrow.get_pg(
{
"pagetype": "Ansible Play",
"server_type": self.server.pagetype,
"server": self.server.name,
"variables": json.dumps(self.variables, indent=4),
"playbook": self.playbook,
"play": play.get_name(),
}
).insert()
self.play = play_pg.name
self.tasks = {}
self.task_list = []
for role in play.get_roles():
for block in role.get_task_blocks():
for task in block.block:
task_pg = jingrow.get_pg(
{
"pagetype": "Ansible Task",
"play": self.play,
"role": role.get_name(),
"task": task.name,
}
).insert()
self.tasks.setdefault(role.get_name(), {})[task.name] = task_pg.name
self.task_list.append(task_pg.name)