260 lines
8.0 KiB
Python
260 lines
8.0 KiB
Python
import json
|
|
|
|
import jingrow
|
|
import wrapt
|
|
from ansible import constants, context
|
|
from ansible.executor.playbook_executor import PlaybookExecutor
|
|
from ansible.executor.task_executor import TaskExecutor
|
|
from ansible.inventory.manager import InventoryManager
|
|
from ansible.module_utils.common.collections import ImmutableDict
|
|
from ansible.parsing.dataloader import DataLoader
|
|
from ansible.playbook import Playbook
|
|
from ansible.plugins.action.async_status import ActionModule
|
|
from ansible.plugins.callback import CallbackBase
|
|
from ansible.utils.display import Display
|
|
from ansible.vars.manager import VariableManager
|
|
from jingrow.utils import cstr
|
|
from jingrow.utils import now_datetime as now
|
|
from pymysql.err import InterfaceError
|
|
|
|
|
|
def reconnect_on_failure():
|
|
@wrapt.decorator
|
|
def wrapper(wrapped, instance, args, kwargs):
|
|
try:
|
|
return wrapped(*args, **kwargs)
|
|
except InterfaceError:
|
|
jingrow.db.connect()
|
|
return wrapped(*args, **kwargs)
|
|
|
|
return wrapper
|
|
|
|
|
|
class AnsibleCallback(CallbackBase):
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
|
|
@reconnect_on_failure()
|
|
def process_task_success(self, result):
|
|
result, action = jingrow._dict(result._result), result._task.action
|
|
if action == "user":
|
|
server_type, server = jingrow.db.get_value("Ansible Play", self.play, ["server_type", "server"])
|
|
server = jingrow.get_pg(server_type, server)
|
|
if result.name == "root":
|
|
server.root_public_key = result.ssh_public_key
|
|
elif result.name == "jingrow":
|
|
server.jingrow_public_key = result.ssh_public_key
|
|
server.save()
|
|
|
|
def v2_runner_on_ok(self, result, *args, **kwargs):
|
|
self.update_task("Success", result)
|
|
self.process_task_success(result)
|
|
|
|
def v2_runner_on_failed(self, result, *args, **kwargs):
|
|
self.update_task("Failure", result)
|
|
|
|
def v2_runner_on_skipped(self, result):
|
|
self.update_task("Skipped", result)
|
|
|
|
def v2_runner_on_unreachable(self, result):
|
|
self.update_task("Unreachable", result)
|
|
|
|
def v2_playbook_on_task_start(self, task, is_conditional):
|
|
self.update_task("Running", None, task)
|
|
|
|
def v2_playbook_on_start(self, playbook):
|
|
self.update_play("Running")
|
|
|
|
def v2_playbook_on_stats(self, stats):
|
|
self.update_play(None, stats)
|
|
|
|
@reconnect_on_failure()
|
|
def update_play(self, status=None, stats=None):
|
|
play = jingrow.get_pg("Ansible Play", self.play)
|
|
if stats:
|
|
# Assume we're running on one host
|
|
host = next(iter(stats.processed.keys()))
|
|
play.update(stats.summarize(host))
|
|
if play.failures or play.unreachable:
|
|
play.status = "Failure"
|
|
else:
|
|
play.status = "Success"
|
|
play.end = now()
|
|
play.duration = play.end - play.start
|
|
else:
|
|
play.status = status
|
|
play.start = now()
|
|
|
|
play.save()
|
|
jingrow.db.commit()
|
|
|
|
@reconnect_on_failure()
|
|
def update_task(self, status, result=None, task=None):
|
|
if result:
|
|
if not result._task._role:
|
|
return
|
|
task_name, result = self.parse_result(result)
|
|
else:
|
|
if not task._role:
|
|
return
|
|
task_name = self.tasks[task._role.get_name()][task.name]
|
|
task = jingrow.get_pg("Ansible Task", task_name)
|
|
task.status = status
|
|
if result:
|
|
task.output = result.stdout
|
|
task.error = result.stderr
|
|
task.exception = result.msg
|
|
# Reduce clutter be removing keys already shown elsewhere
|
|
for key in ("stdout", "stdout_lines", "stderr", "stderr_lines", "msg"):
|
|
result.pop(key, None)
|
|
task.result = json.dumps(result, indent=4)
|
|
task.end = now()
|
|
task.duration = task.end - task.start
|
|
else:
|
|
task.start = now()
|
|
task.save()
|
|
self.publish_play_progress(task.name)
|
|
jingrow.db.commit()
|
|
|
|
def publish_play_progress(self, task):
|
|
jingrow.publish_realtime(
|
|
"ansible_play_progress",
|
|
{"progress": self.task_list.index(task), "total": len(self.task_list), "play": self.play},
|
|
pagetype="Ansible Play",
|
|
docname=self.play,
|
|
user=jingrow.session.user,
|
|
)
|
|
|
|
def parse_result(self, result):
|
|
task = result._task.name
|
|
role = result._task._role.get_name()
|
|
return self.tasks[role][task], jingrow._dict(result._result)
|
|
|
|
@reconnect_on_failure()
|
|
def on_async_start(self, role, task, job_id):
|
|
task_name = self.tasks[role][task]
|
|
task = jingrow.get_pg("Ansible Task", task_name)
|
|
task.job_id = job_id
|
|
task.save()
|
|
jingrow.db.commit()
|
|
|
|
@reconnect_on_failure()
|
|
def on_async_poll(self, result):
|
|
job_id = result["ansible_job_id"]
|
|
task_name = jingrow.get_value("Ansible Task", {"play": self.play, "job_id": job_id}, "name")
|
|
task = jingrow.get_pg("Ansible Task", task_name)
|
|
task.result = json.dumps(result, indent=4)
|
|
task.duration = now() - task.start
|
|
task.save()
|
|
jingrow.db.commit()
|
|
|
|
|
|
class Ansible:
|
|
def __init__(self, server, playbook, user="root", variables=None, port=22):
|
|
self.patch()
|
|
self.server = server
|
|
self.playbook = playbook
|
|
self.playbook_path = jingrow.get_app_path("jcloud", "playbooks", self.playbook)
|
|
self.host = f"{server.ip}:{port}"
|
|
self.variables = variables or {}
|
|
|
|
constants.HOST_KEY_CHECKING = False
|
|
context.CLIARGS = ImmutableDict(
|
|
become_method="sudo",
|
|
check=False,
|
|
connection="ssh",
|
|
# This is the only way to pass variables that preserves newlines
|
|
extra_vars=[f"{cstr(key)}='{cstr(value)}'" for key, value in self.variables.items()],
|
|
remote_user=user,
|
|
start_at_task=None,
|
|
syntax=False,
|
|
verbosity=1,
|
|
)
|
|
|
|
self.loader = DataLoader()
|
|
self.passwords = dict({})
|
|
|
|
self.sources = f"{self.host},"
|
|
self.inventory = InventoryManager(loader=self.loader, sources=self.sources)
|
|
self.variable_manager = VariableManager(loader=self.loader, inventory=self.inventory)
|
|
|
|
self.callback = AnsibleCallback()
|
|
self.display = Display()
|
|
self.display.verbosity = 1
|
|
self.create_ansible_play()
|
|
|
|
def patch(self):
|
|
def modified_action_module_run(*args, **kwargs):
|
|
result = self.action_module_run(*args, **kwargs)
|
|
self.callback.on_async_poll(result)
|
|
return result
|
|
|
|
def modified_poll_async_result(executor, result, templar, task_vars=None):
|
|
job_id = result["ansible_job_id"]
|
|
task = executor._task
|
|
self.callback.on_async_start(task._role.get_name(), task.name, job_id)
|
|
return self._poll_async_result(executor, result, templar, task_vars=task_vars)
|
|
|
|
if ActionModule.run.__module__ != "jcloud.runner":
|
|
self.action_module_run = ActionModule.run
|
|
ActionModule.run = modified_action_module_run
|
|
|
|
if TaskExecutor.run.__module__ != "jcloud.runner":
|
|
self._poll_async_result = TaskExecutor._poll_async_result
|
|
TaskExecutor._poll_async_result = modified_poll_async_result
|
|
|
|
def unpatch(self):
|
|
TaskExecutor._poll_async_result = self._poll_async_result
|
|
ActionModule.run = self.action_module_run
|
|
|
|
def run(self):
|
|
self.executor = PlaybookExecutor(
|
|
playbooks=[self.playbook_path],
|
|
inventory=self.inventory,
|
|
variable_manager=self.variable_manager,
|
|
loader=self.loader,
|
|
passwords=self.passwords,
|
|
)
|
|
# Use AnsibleCallback so we can receive updates for tasks execution
|
|
self.executor._tqm._stdout_callback = self.callback
|
|
self.callback.play = self.play
|
|
self.callback.tasks = self.tasks
|
|
self.callback.task_list = self.task_list
|
|
self.executor.run()
|
|
self.unpatch()
|
|
return jingrow.get_pg("Ansible Play", self.play)
|
|
|
|
def create_ansible_play(self):
|
|
# Parse the playbook and create Ansible Tasks so we can show how many tasks are pending
|
|
playbook = Playbook.load(
|
|
self.playbook_path, variable_manager=self.variable_manager, loader=self.loader
|
|
)
|
|
# Assume we only have one play per playbook
|
|
play = playbook.get_plays()[0]
|
|
play_pg = jingrow.get_pg(
|
|
{
|
|
"pagetype": "Ansible Play",
|
|
"server_type": self.server.pagetype,
|
|
"server": self.server.name,
|
|
"variables": json.dumps(self.variables, indent=4),
|
|
"playbook": self.playbook,
|
|
"play": play.get_name(),
|
|
}
|
|
).insert()
|
|
self.play = play_pg.name
|
|
self.tasks = {}
|
|
self.task_list = []
|
|
for role in play.get_roles():
|
|
for block in role.get_task_blocks():
|
|
for task in block.block:
|
|
task_pg = jingrow.get_pg(
|
|
{
|
|
"pagetype": "Ansible Task",
|
|
"play": self.play,
|
|
"role": role.get_name(),
|
|
"task": task.name,
|
|
}
|
|
).insert()
|
|
self.tasks.setdefault(role.get_name(), {})[task.name] = task_pg.name
|
|
self.task_list.append(task_pg.name)
|