Ansible部署核心代码解析


| 阅读 |,阅读约 2 分钟
| 复制链接:

Overview

Ansible部署核心代码解析

playbook_executor.py

 1def run(self):
 2    # 遍历所有的 playbook
 3    for playbook_path in self._playbooks:
 4        # 执行单个 playbook 逻辑
 5        pb = Playbook.load(...)
 6        # 触发 playbook start 的回调函数
 7        self._tmq.send_callback('v2_playbook_on_start', pb)
 8        # 遍历单个 playbook 下所有的 play(entries对象)
 9        plays = pb.get_plays()
10        for play in plays:
11            # 获取主机列表
12            batches = self._get_serialized_batches(play)
13            for batch in batches:
14                # 执行单个 play 逻辑, 调用 task_queue_manager.run()
15                result = self._tqm.run(play=play)
16        # 触发 统计 的回调函数
17        self._tqm.send_callback('v2_playbook_on_stats', self._tqm._stats)

task_queue_manager.py

 1
 2# 执行 play 的核心方法
 3def run(self, play):
 4    # 加载插件
 5    self.load_callbacks()
 6    # 触发 play 开始执行的回调函数
 7    self.send_callback('v2_playbook_on_play_start', new_play)
 8    iterator = PlayIterator(...)
 9    strategy = strategy_loader.get(...)
10    # 调用 linear.py 的 run 方法
11    play_return = strategy.run(iterator, play_context)
12    return play_return
13# 调用回调函数的方法
14def send_callback(self, method_name, *args, **kwargs):
15    # 遍历所有注册的回调函数
16    for callback_plugin in [self._stdout_callback] + self._callback_plugins:
17        # 遍历所有的方法名
18        for method in methods:
19            # 调用回调函数
20            method(*new_args, **kwargs)

linear.py

 1def run(self, iterator, play_context):
 2    # 循环执行部署
 3    while work_to_do and not self._tqm._terminated:
 4        # 获取主机信息
 5        hosts_left = self.get_hosts_left(iterator)
 6        # 获取任务信息
 7        host_tasks = self._get_next_task_lockstep(hosts_left, iterator)
 8        for (host, task) in host_tasks:
 9            # 触发 task 开始执行的回调函数
10            self._tqm.send_callback('v2_playbook_on_task_start', task, is_conditional=False)
11            # 将要执行的任务加入队列中排队
12            self._queue_task(host, task, task_vars, play_context)
13        # 等待收取执行结果
14        if self._pending_results > 0:
15            results += self._wait_on_pending_results(iterator)

strategy/init.py

 1def _wait_on_pending_results(self, iterator):
 2    while self._pending_results > 0 and not self._tqm._terminated:
 3        # 等待处理结果
 4        results = self._process_pending_results(iterator)
 5        return ret_results
 6
 7def _process_pending_results(...):
 8
 9    while True:
10        # 获取部署结果, _results是一个deque对象
11        task_result = self._results.popleft()
12        # 触发执行结果的回调函数
13        if '_ansible_retry' in task_result._result:
14            self._tqm.send_callback('v2_runner_retry', task_result)
15        elif '_ansible_item_result' in task_result._result:
16            self._tqm.send_callback('v2_runner_item_on_failed', task_result)
17            self._tqm.send_callback('v2_runner_item_on_skipped', task_result)
18            self._tqm.send_callback('v2_runner_item_on_ok', task_result)
19
20        # 部署出错
21        if task_result.is_failed():
22            self._tqm.send_callback('v2_runner_on_failed', task_result, ignore_errors=ignore_errors)
23
24        # 主机不可达
25        elif task_result.is_unreachable():
26            self._tqm.send_callback('v2_runner_on_unreachable', task_result)
27        # 跳过
28        elif task_result.is_skipped():
29            self._tqm.send_callback('v2_runner_on_skipped', task_result)
30
31        # 执行成功
32        self._tqm.send_callback('v2_runner_on_ok', task_result)
33
34    return ret_results