Ansible部署核心代码解析
| 阅读 | 共 526 字,阅读约
Overview
Ansible部署核心代码解析
playbook_executor.py
1def run(self):
2 # 遍历所有的 playbook
3 for playbook_path in self._playbooks:
4 # 执行单个 playbook 逻辑
5 pb = Playbook.load(...)
6 # 触发 playbook start 的回调函数
7 self._tmq.send_callback('v2_playbook_on_start', pb)
8 # 遍历单个 playbook 下所有的 play(entries对象)
9 plays = pb.get_plays()
10 for play in plays:
11 # 获取主机列表
12 batches = self._get_serialized_batches(play)
13 for batch in batches:
14 # 执行单个 play 逻辑, 调用 task_queue_manager.run()
15 result = self._tqm.run(play=play)
16 # 触发 统计 的回调函数
17 self._tqm.send_callback('v2_playbook_on_stats', self._tqm._stats)
task_queue_manager.py
1
2# 执行 play 的核心方法
3def run(self, play):
4 # 加载插件
5 self.load_callbacks()
6 # 触发 play 开始执行的回调函数
7 self.send_callback('v2_playbook_on_play_start', new_play)
8 iterator = PlayIterator(...)
9 strategy = strategy_loader.get(...)
10 # 调用 linear.py 的 run 方法
11 play_return = strategy.run(iterator, play_context)
12 return play_return
13# 调用回调函数的方法
14def send_callback(self, method_name, *args, **kwargs):
15 # 遍历所有注册的回调函数
16 for callback_plugin in [self._stdout_callback] + self._callback_plugins:
17 # 遍历所有的方法名
18 for method in methods:
19 # 调用回调函数
20 method(*new_args, **kwargs)
linear.py
1def run(self, iterator, play_context):
2 # 循环执行部署
3 while work_to_do and not self._tqm._terminated:
4 # 获取主机信息
5 hosts_left = self.get_hosts_left(iterator)
6 # 获取任务信息
7 host_tasks = self._get_next_task_lockstep(hosts_left, iterator)
8 for (host, task) in host_tasks:
9 # 触发 task 开始执行的回调函数
10 self._tqm.send_callback('v2_playbook_on_task_start', task, is_conditional=False)
11 # 将要执行的任务加入队列中排队
12 self._queue_task(host, task, task_vars, play_context)
13 # 等待收取执行结果
14 if self._pending_results > 0:
15 results += self._wait_on_pending_results(iterator)
strategy/init.py
1def _wait_on_pending_results(self, iterator):
2 while self._pending_results > 0 and not self._tqm._terminated:
3 # 等待处理结果
4 results = self._process_pending_results(iterator)
5 return ret_results
6
7def _process_pending_results(...):
8
9 while True:
10 # 获取部署结果, _results是一个deque对象
11 task_result = self._results.popleft()
12 # 触发执行结果的回调函数
13 if '_ansible_retry' in task_result._result:
14 self._tqm.send_callback('v2_runner_retry', task_result)
15 elif '_ansible_item_result' in task_result._result:
16 self._tqm.send_callback('v2_runner_item_on_failed', task_result)
17 self._tqm.send_callback('v2_runner_item_on_skipped', task_result)
18 self._tqm.send_callback('v2_runner_item_on_ok', task_result)
19
20 # 部署出错
21 if task_result.is_failed():
22 self._tqm.send_callback('v2_runner_on_failed', task_result, ignore_errors=ignore_errors)
23
24 # 主机不可达
25 elif task_result.is_unreachable():
26 self._tqm.send_callback('v2_runner_on_unreachable', task_result)
27 # 跳过
28 elif task_result.is_skipped():
29 self._tqm.send_callback('v2_runner_on_skipped', task_result)
30
31 # 执行成功
32 self._tqm.send_callback('v2_runner_on_ok', task_result)
33
34 return ret_results