import json import jingrow import wrapt from ansible import constants, context from ansible.executor.playbook_executor import PlaybookExecutor from ansible.executor.task_executor import TaskExecutor from ansible.inventory.manager import InventoryManager from ansible.module_utils.common.collections import ImmutableDict from ansible.parsing.dataloader import DataLoader from ansible.playbook import Playbook from ansible.plugins.action.async_status import ActionModule from ansible.plugins.callback import CallbackBase from ansible.utils.display import Display from ansible.vars.manager import VariableManager from jingrow.utils import cstr from jingrow.utils import now_datetime as now from pymysql.err import InterfaceError def reconnect_on_failure(): @wrapt.decorator def wrapper(wrapped, instance, args, kwargs): try: return wrapped(*args, **kwargs) except InterfaceError: jingrow.db.connect() return wrapped(*args, **kwargs) return wrapper class AnsibleCallback(CallbackBase): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @reconnect_on_failure() def process_task_success(self, result): result, action = jingrow._dict(result._result), result._task.action if action == "user": server_type, server = jingrow.db.get_value("Ansible Play", self.play, ["server_type", "server"]) server = jingrow.get_pg(server_type, server) if result.name == "root": server.root_public_key = result.ssh_public_key elif result.name == "jingrow": server.jingrow_public_key = result.ssh_public_key server.save() def v2_runner_on_ok(self, result, *args, **kwargs): self.update_task("Success", result) self.process_task_success(result) def v2_runner_on_failed(self, result, *args, **kwargs): self.update_task("Failure", result) def v2_runner_on_skipped(self, result): self.update_task("Skipped", result) def v2_runner_on_unreachable(self, result): self.update_task("Unreachable", result) def v2_playbook_on_task_start(self, task, is_conditional): self.update_task("Running", None, task) def v2_playbook_on_start(self, playbook): self.update_play("Running") def v2_playbook_on_stats(self, stats): self.update_play(None, stats) @reconnect_on_failure() def update_play(self, status=None, stats=None): play = jingrow.get_pg("Ansible Play", self.play) if stats: # Assume we're running on one host host = next(iter(stats.processed.keys())) play.update(stats.summarize(host)) if play.failures or play.unreachable: play.status = "Failure" else: play.status = "Success" play.end = now() play.duration = play.end - play.start else: play.status = status play.start = now() play.save() jingrow.db.commit() @reconnect_on_failure() def update_task(self, status, result=None, task=None): if result: if not result._task._role: return task_name, result = self.parse_result(result) else: if not task._role: return task_name = self.tasks[task._role.get_name()][task.name] task = jingrow.get_pg("Ansible Task", task_name) task.status = status if result: task.output = result.stdout task.error = result.stderr task.exception = result.msg # Reduce clutter be removing keys already shown elsewhere for key in ("stdout", "stdout_lines", "stderr", "stderr_lines", "msg"): result.pop(key, None) task.result = json.dumps(result, indent=4) task.end = now() task.duration = task.end - task.start else: task.start = now() task.save() self.publish_play_progress(task.name) jingrow.db.commit() def publish_play_progress(self, task): jingrow.publish_realtime( "ansible_play_progress", {"progress": self.task_list.index(task), "total": len(self.task_list), "play": self.play}, pagetype="Ansible Play", docname=self.play, user=jingrow.session.user, ) def parse_result(self, result): task = result._task.name role = result._task._role.get_name() return self.tasks[role][task], jingrow._dict(result._result) @reconnect_on_failure() def on_async_start(self, role, task, job_id): task_name = self.tasks[role][task] task = jingrow.get_pg("Ansible Task", task_name) task.job_id = job_id task.save() jingrow.db.commit() @reconnect_on_failure() def on_async_poll(self, result): job_id = result["ansible_job_id"] task_name = jingrow.get_value("Ansible Task", {"play": self.play, "job_id": job_id}, "name") task = jingrow.get_pg("Ansible Task", task_name) task.result = json.dumps(result, indent=4) task.duration = now() - task.start task.save() jingrow.db.commit() class Ansible: def __init__(self, server, playbook, user="root", variables=None, port=22): self.patch() self.server = server self.playbook = playbook self.playbook_path = jingrow.get_app_path("jcloud", "playbooks", self.playbook) self.host = f"{server.ip}:{port}" self.variables = variables or {} constants.HOST_KEY_CHECKING = False context.CLIARGS = ImmutableDict( become_method="sudo", check=False, connection="ssh", # This is the only way to pass variables that preserves newlines extra_vars=[f"{cstr(key)}='{cstr(value)}'" for key, value in self.variables.items()], remote_user=user, start_at_task=None, syntax=False, verbosity=1, ) self.loader = DataLoader() self.passwords = dict({}) self.sources = f"{self.host}," self.inventory = InventoryManager(loader=self.loader, sources=self.sources) self.variable_manager = VariableManager(loader=self.loader, inventory=self.inventory) self.callback = AnsibleCallback() self.display = Display() self.display.verbosity = 1 self.create_ansible_play() def patch(self): def modified_action_module_run(*args, **kwargs): result = self.action_module_run(*args, **kwargs) self.callback.on_async_poll(result) return result def modified_poll_async_result(executor, result, templar, task_vars=None): job_id = result["ansible_job_id"] task = executor._task self.callback.on_async_start(task._role.get_name(), task.name, job_id) return self._poll_async_result(executor, result, templar, task_vars=task_vars) if ActionModule.run.__module__ != "jcloud.runner": self.action_module_run = ActionModule.run ActionModule.run = modified_action_module_run if TaskExecutor.run.__module__ != "jcloud.runner": self._poll_async_result = TaskExecutor._poll_async_result TaskExecutor._poll_async_result = modified_poll_async_result def unpatch(self): TaskExecutor._poll_async_result = self._poll_async_result ActionModule.run = self.action_module_run def run(self): self.executor = PlaybookExecutor( playbooks=[self.playbook_path], inventory=self.inventory, variable_manager=self.variable_manager, loader=self.loader, passwords=self.passwords, ) # Use AnsibleCallback so we can receive updates for tasks execution self.executor._tqm._stdout_callback = self.callback self.callback.play = self.play self.callback.tasks = self.tasks self.callback.task_list = self.task_list self.executor.run() self.unpatch() return jingrow.get_pg("Ansible Play", self.play) def create_ansible_play(self): # Parse the playbook and create Ansible Tasks so we can show how many tasks are pending playbook = Playbook.load( self.playbook_path, variable_manager=self.variable_manager, loader=self.loader ) # Assume we only have one play per playbook play = playbook.get_plays()[0] play_pg = jingrow.get_pg( { "pagetype": "Ansible Play", "server_type": self.server.pagetype, "server": self.server.name, "variables": json.dumps(self.variables, indent=4), "playbook": self.playbook, "play": play.get_name(), } ).insert() self.play = play_pg.name self.tasks = {} self.task_list = [] for role in play.get_roles(): for block in role.get_task_blocks(): for task in block.block: task_pg = jingrow.get_pg( { "pagetype": "Ansible Task", "play": self.play, "role": role.get_name(), "task": task.name, } ).insert() self.tasks.setdefault(role.get_name(), {})[task.name] = task_pg.name self.task_list.append(task_pg.name)