k8s源码分析: kube-scheduler


| 阅读 |,阅读约 3 分钟
| 复制链接:

Overview

前言

最近加入云原生社区组织的k8s源码研习社,开始学习k8s底层源码,并整理成笔记。欢迎感兴趣的同学一起加入,共同学习进步。群里和社区里有各种大佬,随时可以帮你答疑解惑。k8s源码研习社

概述

kube-scheduler是k8s系统核心组件之一,主要负责整个集群pod资源对象的调度,根据调度算法,将未调度的pod资源对象调度到最优的工作节点上,从而更加合理、更加充分的利用集群资源。之前整理过一篇关于k8s入门篇的介绍,如果对于k8s的各个组件功能不了解,可以先看下这篇文章。源码内部还调用了informer相关代码,如果对informer机制不了解,可以看下之前整理的k8s源码分析- Informer机制

架构设计

kube-scheduler在为pod资源对象选择合适节点时,有两种最优解:

  • 全局最优解:每个调度周期都会遍历所有节点,找出最优的节点。适合小型集群,比如100节点
  • 局部最优解:每个调度周期只会遍历部分节点,找出局部最优的节点。适合中大型集群,比如5000节点

调度包括两个环节:

  • 预选调度
  • 优选调度

高清图片

scheduler总体流程

组件启动流程源码分析

kube-scheduler的启动流程包括如下几步:

  • 内置调度算法的注册
  • cobra命令行参数解析
  • 实例化Scheduler对象
  • 运行EventBoardercaster事件管理器
  • 运行http或https服务
  • 领导者选举实例化
  • 运行调度器

0. 函数入口

k8s中各个组件的启动,使用的是cobra框架。函数入口主要是初始化cobra的Command,然后执行cobra的Execute

源码位置:cmd/kube-scheduler/scheduler.go

 1func main() {
 2	rand.Seed(time.Now().UnixNano())
 3	// 初始化cobra的command
 4	command := app.NewSchedulerCommand()
 5	...
 6  // 启动cobra
 7	if err := command.Execute(); err != nil {
 8		os.Exit(1)
 9	}
10}

2. cobra命令行参数解析

源码位置:cmd/kube-scheduler/app/server.go

 1func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
 2  // 初始化默认配置
 3	opts, err := options.NewOptions()
 4  ...
 5	cmd := &cobra.Command{
 6		...
 7		Run: func(cmd *cobra.Command, args []string) {
 8      // 主要是调用了runCommand方法
 9			if err := runCommand(cmd, opts, registryOptions...); err != nil {
10				...
11			}
12		},
13	}
14	...
15	return cmd
16}
17
18// runCommand
19func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Option) error {
20	// 创建Context
21	ctx, cancel := context.WithCancel(context.Background())
22	defer cancel()
23	// Setup函数创建scheduler对象
24	cc, sched, err := Setup(ctx, opts, registryOptions...)
25	...
26  // 执行主调度逻辑
27	return Run(ctx, cc, sched)
28}

3. 实例化Scheduler对象

 1func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) {
 2  // 校验参数的合法性
 3	if errs := opts.Validate(); len(errs) > 0 {
 4		return nil, nil, utilerrors.NewAggregate(errs)
 5	}
 6	// 初始化config
 7	c, err := opts.Config()
 8	...
 9	// Get the completed config
10	cc := c.Complete()
11	...
12	recorderFactory := getRecorderFactory(&cc)
13	// 创建scheduler对象
14	sched, err := scheduler.New(cc.Client,
15		cc.InformerFactory,
16		cc.PodInformer,
17		recorderFactory,
18		ctx.Done(),
19		scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
20		scheduler.WithAlgorithmSource(cc.ComponentConfig.AlgorithmSource),
21		scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
22		scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
23		scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
24		scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
25		scheduler.WithExtenders(cc.ComponentConfig.Extenders...),
26	)
27	...
28	return &cc, sched, nil
29}
30
31// Scheduler对象
32type Scheduler struct {
33	// It is expected that changes made via SchedulerCache will be observed
34	// by NodeLister and Algorithm.
35	SchedulerCache internalcache.Cache
36
37	Algorithm core.ScheduleAlgorithm
38
39	// NextPod should be a function that blocks until the next pod
40	// is available. We don't use a channel for this, because scheduling
41	// a pod may take some amount of time and we don't want pods to get
42	// stale while they sit in a channel.
43	NextPod func() *framework.QueuedPodInfo
44
45	// Error is called if there is an error. It is passed the pod in
46	// question, and the error
47	Error func(*framework.QueuedPodInfo, error)
48
49	// Close this to shut down the scheduler.
50	StopEverything <-chan struct{}
51
52	// SchedulingQueue holds pods to be scheduled
53	SchedulingQueue internalqueue.SchedulingQueue
54
55	// Profiles are the scheduling profiles.
56	Profiles profile.Map
57
58	scheduledPodsHasSynced func() bool
59
60	client clientset.Interface
61}

4. 运行EventBoardercaster事件管理器

5. 运行http或https服务

6. 领导者选举实例化

7. 运行调度器