client-go源码分析(二) - Informer机制
| 阅读 | 共 10025 字,阅读约
Overview
client-go源码分析(二) - Informer机制
最近加入云原生社区组织的k8s源码研习社,开始学习k8s底层源码,并整理成笔记。欢迎感兴趣的同学一起加入,共同学习进步。群里和社区里有各种大佬,随时可以帮你答疑解惑。https://github.com/cloudnativeto/sig-k8s-source-code
先放一张调用关系图
由于Informer这部分的源码比较复杂,调用链路也很长,后面的源码分析,都会围绕这一张图展开。
概述
k8s中,组件之间通过http通讯,在不依赖任何中间件的情况下,需要保证消息的可靠性、实时性、顺序性等?k8s是如何做到的呢?— 答案就是Informer。k8s的其他组件都是通过informer与api-server通讯的。
Informer运行原理
各个组件包括:
- Reflector:用于监控(watch)指定的资源,当监控的资源发生变化时,触发相应的变更事件。并将资源对象存放到本地缓存DeltaFIFO中
- DeltaFIFO:对资源对象的的操作类型进行队列的基本操作
- FIFO:先进先出队列,提供资源对象的增删改查等操作
- Dealta:资源对象存储,可以保存资源对象的操作类型。如:添加操作类型、更新操作类型、删除操作类型、同步操作类型
- Indexer:存储资源对象,并自带索引功能的本地存储。
- Reflect从DeltaFIFO中将消费出来的资源对象存储至Indexer
- Indexer中的数据与Etcd完全一致,client-go可以从本地读取,减轻etcd和api-server的压力
Informer使用示例
- 通过kubernetes.NewForConfig创建clientset对象。informer需要通过clientset与apiserver进行交互
- 创建一个用于停止的channel,用于进程退出前通知informer提前退出。因为informer是一个持久运行的Groutine
- informers.NewSharedInformerFactory实例化sharedInformer对象
- 第一个参数是ClientSet
- 第二个参数是多久同步一次
- Informer方法可以获得特定资源的informer对象
- AddEventHandler函数可以为对象添加回调方法,支持三种对象的回调方法
- AddFunc:创建资源对象时触发的回调方法
- UpdateFunc:更新资源对象时触发的回调方法
- DeleteFunc:删除资源对象时触发的回调方法
- Run方法运行当前的informer
1// 通过informer机制,实现k8s资源的监控
2func informer() {
3 // 因为informer是一个持久运行的groutine,channel作用:进程退出前通知informer退出
4 stopChan := make(chan struct{})
5 defer close(stopChan)
6
7 // 创建连接k8s的client对象
8 clientSet, err := kubernetes.NewForConfig(config)
9 if err != nil {
10 log.Printf("init clientset error.")
11 return
12 }
13
14 // 第一步:创建sharedInformer对象,第二个参数为重新同步数据的间隔时间
15 sharedInformers := informers.NewSharedInformerFactory(clientSet, time.Minute)
16 // 第二步:每个资源都有informer对象,这里获取pod资源的informer对象
17 podInformer := sharedInformers.Core().V1().Pods().Informer()
18 // 第三步:添加自定义回调函数
19 podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
20 // 添加资源的回调函数,返回的是接口类型,需要强制转换为真正的类型
21 AddFunc: func(obj interface{}) {
22 mObj := obj.(v1.Object)
23 log.Printf("New pod added: %s", mObj.GetName())
24 },
25 // 更新资源的回调函数
26 UpdateFunc: func(oldObj, newObj interface{}) {
27 oObj := oldObj.(v1.Object)
28 nObj := newObj.(v1.Object)
29 log.Printf("%s pod updated to %s", oObj.GetName(), nObj.GetName())
30 },
31 // 删除资源的回调函数
32 DeleteFunc: func(obj interface{}) {
33 mObj := obj.(v1.Object)
34 log.Printf("pod deleted from store: %s", mObj.GetName())
35 },
36 })
37 // 第四步:开始运行informer对象
38 podInformer.Run(stopChan)
39}
资源Informer和SharedInformer
前面demo中,第一步便是创建SharedInformer对象,下面先介绍一下Informer和SharedInformer
资源Informer
- 每一种资源都实现了Informer机制,允许监控不同的资源事件
- 每一个Informer都会实现Informer和Lister方法
1type PodInformer interface {
2 Informer() cache.SharedIndexInformer
3 Lister() v1.PodLister
4}
SharedInformer
若同一个资源的Informer被实例化了多次,每个Informer使用一个Reflector,那么会运行过多相同的ListAndWatch,太多重复的序列化和反序列化操作会导致api-server负载过重
SharedInformer可以使同一类资源Informer共享一个Reflector。内部定义了一个map字段,用于存放所有Infromer的字段。
前面demo中第一步创建SharedInformer,sharedInformers := informers.NewSharedInformerFactory(clientSet, time.Minute)
,内部初始化了一个sharedInformerFactory对象,先看下sharedInformerFactory
源码位置:vendor/k8s.io/client-go/informer/factory.go
1type sharedInformerFactory struct {
2 client kubernetes.Interface
3 namespace string
4 tweakListOptions internalinterfaces.TweakListOptionsFunc
5 lock sync.Mutex
6 defaultResync time.Duration
7 customResync map[reflect.Type]time.Duration
8
9 // 按照类型存放共享的informer
10 informers map[reflect.Type]cache.SharedIndexInformer
11
12 // 这个字段用来追踪informers是否被启动了
13 // 可以保证Start()方法安全的重复调用多次(幂等性)
14 startedInformers map[reflect.Type]bool
15}
Start方法
k8s中的Controller-Manager组件,源码中的Run方法调用了SharedInformerFactory的Start方法
源码位置:cmd/kube-controller-manager/app/controllermanager.go
1func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
2 ...
3 controllerContext.InformerFactory.Start(controllerContext.Stop)
4 ...
5}
源码位置:k8s.io/client-go/informers/factory.go
1func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
2 ...
3 // 遍历所有的informers
4 for informerType, informer := range f.informers {
5 if !f.startedInformers[informerType] {
6 // 每一种informer启动一个协程,运行Run方法
7 go informer.Run(stopCh)
8 f.startedInformers[informerType] = true
9 }
10 }
11}
获取Informer
前面demo中,创建好sharedInformer对象后,第二步是调用podInformer := sharedInformers.Core().V1().Pods().Informer()
,获取具体的informer实例,下面开始分析Informer方法
关键逻辑包括:
- sharedProcessor的初始化
- List和Watch方法的注册:注册具体某个资源类型的list和watch方法
- Indexer的初始化:实现类是cache类
以pod为例,源码位置:client-go/informers/core/v1/pod.go
1// 获取pod的informer,内部调用InformerFor,参数需要传入f.defaultInformer
2func (f *podInformer) Informer() cache.SharedIndexInformer {
3 return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
4}
5
6func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
7 // 最后一个参数,初始化indexers
8 return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
9}
10
11func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
12 return cache.NewSharedIndexInformer(
13 // 注册List、Watch方法
14 &cache.ListWatch{
15 ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
16 if tweakListOptions != nil {
17 tweakListOptions(&options)
18 }
19 // List方法是该种资源对象的List方法(这里是pod)
20 return client.CoreV1().Pods(namespace).List(context.TODO(), options)
21 },
22 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
23 if tweakListOptions != nil {
24 tweakListOptions(&options)
25 }
26 // Watch方法是该种资源对象的Watch方法(这里是pod)
27 return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
28 },
29 },
30 &corev1.Pod{},
31 resyncPeriod,
32 indexers,
33 )
34}
35
36func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
37 realClock := &clock.RealClock{}
38 sharedIndexInformer := &sharedIndexInformer{
39 // 这里是processor的初始化
40 processor: &sharedProcessor{clock: realClock},
41 // 这里是Indexer的初始化,接口为Indexer,实现类为cache
42 indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
43 listerWatcher: lw,
44 objectType: exampleObject,
45 resyncCheckPeriod: defaultEventHandlerResyncPeriod,
46 defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
47 cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
48 clock: realClock,
49 }
50 return sharedIndexInformer
51}
52
53// Index接口,实现类是cache类
54func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
55 return &cache{
56 cacheStorage: NewThreadSafeStore(indexers, Indices{}),
57 keyFunc: keyFunc,
58 }
59}
注册自定义回调函数
得到Informer对象之后,第三步是给该infomer注册自定义回调函数,当k8s的资源发送变更时,可以实现自己的业务逻辑。
下面分析一下podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{...})
的逻辑
1// 开始注册事件处理函数
2func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
3 s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
4}
5
6func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
7 ...
8 // 每一个监听者,都会注册为一个listner实例
9 // 每个listener中持有一个handler对象,后面事件发生时,框架会调用handler方法,也就走到用户注册的代码逻辑了
10 listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
11 ...
12 // 将listner添加到sharedProcessor中
13 s.processor.addListener(listener)
14 for _, item := range s.indexer.List() {
15 listener.add(addNotification{newObj: item})
16 }
17}
18
19// 将listner添加到sharedProcessor中
20func (p *sharedProcessor) addListener(listener *processorListener) {
21 ...
22 if p.listenersStarted {
23 // listener后台启动了两个协程,这两个协程很关键,后面会介绍
24 p.wg.Start(listener.run)
25 p.wg.Start(listener.pop)
26 }
27}
Run方法
前面demo中,得到informer实例之后,最后一步就是调用Run方法,下面开始解析Run方法的逻辑, 核心逻辑包括:
- DeltaFIFO的初始化
- Controller的初始化
- 运行process.Run方法
- 运行controller.Run方法
源码位置:k8s.io/client-go/tools/cache/shared_informer.go
1func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
2 // 初始化DeltaFIFO
3 fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
4 KnownObjects: s.indexer,
5 EmitDeltaTypeReplaced: true,
6 })
7 // config初始化
8 // 这里重点关注ListerWatcher对象和Process对象,Process关联的是HandleDeltas函数
9 // HandleDeltas是消费增量信息(Delta对象)的核心方法
10 cfg := &Config{
11 Queue: fifo,
12 // ListAndWatch对象
13 ListerWatcher: s.listerWatcher,
14 ObjectType: s.objectType,
15 FullResyncPeriod: s.resyncCheckPeriod,
16 RetryOnError: false,
17 ShouldResync: s.processor.shouldResync,
18 // 注册回调函数 HandleDeltas,资源变更时,存到到本地Indexer
19 Process: s.HandleDeltas,
20 WatchErrorHandler: s.watchErrorHandler,
21 }
22 // 这里主要是controller的初始化
23 func() {
24 s.startedLock.Lock()
25 defer s.startedLock.Unlock()
26 // 初始化Controller对象
27 s.controller = New(cfg)
28 s.controller.(*controller).clock = s.clock
29 s.started = true
30 }()
31 // s.cacheMutationDetector.Run检查缓存对象是否存在
32 wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
33 // 执行sharedProcessor.run方法
34 // 这个方法非常重要
35 wg.StartWithChannel(processorStopCh, s.processor.run)
36 ...
37 // 调用Controller的Run方法
38 s.controller.Run(stopCh)
39}
process的Run方法
1func (p *sharedProcessor) run(stopCh <-chan struct{}) {
2 func() {
3 ...
4 // sharedProcessor的所有Listner,每个后台启动两个协程
5 // 分别指向run和pop方法
6 for _, listener := range p.listeners {
7 p.wg.Start(listener.run)
8 p.wg.Start(listener.pop)
9 }
10 p.listenersStarted = true
11 }()
12 ...
13}
processorListener的run方法
processorListener代表一个消费者对象,该函数周期性执行,主要是从自己的nextCh通道中获取从api-server得到的增量信息,然后调用handler的相关方法,handler方法就是前面用户自定义传进来的方法。
这里我们只需要知道,run方法是消费者方法,负责消费事件。后面会介绍到谁是生产者,谁往processorLister的nextCh通道中放入增量信息(其实就是下面的pop方法)
1func (p *processorListener) run() {
2
3 stopCh := make(chan struct{})
4 wait.Until(func() {
5 // 消费者方法,不断从通道中获取事件
6 for next := range p.nextCh {
7 switch notification := next.(type) {
8 case updateNotification:
9 // 调用handler的方法,
10 p.handler.OnUpdate(notification.oldObj, notification.newObj)
11 case addNotification:
12 p.handler.OnAdd(notification.newObj)
13 case deleteNotification:
14 p.handler.OnDelete(notification.oldObj)
15 default:
16 utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
17 }
18 }
19 // the only way to get here is if the p.nextCh is empty and closed
20 close(stopCh)
21 }, 1*time.Second, stopCh)
22}
23
24// OnUpdate方法,内部就是调用demo中注册的UpdateFunc方法
25// 其他方法类似
26func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) {
27 if r.UpdateFunc != nil {
28 r.UpdateFunc(oldObj, newObj)
29 }
30}
processorListener的pop方法
这个方法的实现非常复杂,但是总体的目的很简单:就是往nextCh中生产消息,然后前面介绍的run方法就可以消费到
这里主要用到了类似缓冲区的方法,并不是一次就获取一个事件
1func (p *processorListener) pop() {
2 ...
3 var nextCh chan<- interface{}
4 var notification interface{}
5 for {
6 select {
7 // 函数第一次进来,这个notification一定是空的,这个case会被阻塞住
8 // 当第二个case调用完成后,notification被赋值为notificationToAdd,才会进入到这里
9 case nextCh <- notification:
10 // Notification dispatched
11 var ok bool
12 notification, ok = p.pendingNotifications.ReadOne()
13 if !ok { // Nothing to pop
14 nextCh = nil // Disable this select case
15 }
16 // 第一次调用时,会进入这里,首先从addCh中获取数据(后面会介绍谁往addCh中放数据)
17 case notificationToAdd, ok := <-p.addCh:
18 if !ok {
19 return
20 }
21 if notification == nil { // No notification to pop (and pendingNotifications is empty)
22 notification = notificationToAdd
23 // channel是引用类型,将p.nextCh指向nextCh,对nextCh的操作就是操作p.nextCh
24 // 这里也解答了前面的run方法里面提到的疑问,谁是nextCh的生产者,往nextCh放入数据
25 nextCh = p.nextCh
26 } else { // There is already a notification waiting to be dispatched
27 p.pendingNotifications.WriteOne(notificationToAdd)
28 }
29 }
30 }
31}
pop函数中我们看到,主要是消费p.addCh,后面详细介绍p.addCh的生产者是谁。这里简单提一下,watch函数监测到api-server的事件变化时,触发HandlerDelta函数,这个函数会更新Indexer,同时调用distribute方法,将该事件通知给所有的listener,内部实现就是:往每个listener的addCh这个通道中放入数据。
Controller的Run方法
Run方法内部关键逻辑包括:
- 初始化Reflector对象
- 调用Reflector的Run方法
- 调用List获取全部资源数据
- 调用Watch实时监控资源变更情况,并放入队列
- 调用controller的processLoop方法
- 消费队列中的数据
源码位置:k8s.io/client-go/tools/cache/controller.go
1func (c *controller) Run(stopCh <-chan struct{}) {
2 // 调用NewReflector初始化一个Reflector
3 // 必须传入ListWatcher数据接口对象
4 r := NewReflector(
5 c.config.ListerWatcher,
6 c.config.ObjectType,
7 c.config.Queue,
8 c.config.FullResyncPeriod,
9 )
10 ...
11 // 调用Reflector的Run方法,启动监控,并处理监控事件
12 wg.StartWithChannel(stopCh, r.Run)
13
14 // processLoop负责从DeltaFIFO取出数据并消费
15 wait.Until(c.processLoop, time.Second, stopCh)
16}
Reflector
这里先暂停源码分析,简单介绍一下Reflector,Reflector用于监控指定的k8s资源,并触发相应的变更事件。
- NewReflector:创建Reflector对象,需传入ListerWatcher数据接口对象
- Run:启动监控,并处理事件
Run中的核心函数是ListAndWatch,流程包括:
- 获取资源列表数据
- 监控资源对象:使用http协议的分块传输编码
Reflector类
1type Reflector struct {
2 // 名称
3 name string
4 expectedTypeName string
5 // 期望放入缓存store的资源类型
6 expectedType reflect.Type
7 // The GVK of the object we expect to place in the store if unstructured.
8 expectedGVK *schema.GroupVersionKind
9 // 存放同步监听到的资源,这里是DeltaFIFO类
10 store Store
11 // 用来执行List和Watch的对象
12 listerWatcher ListerWatcher
13 backoffManager wait.BackoffManager
14 // resync周期
15 resyncPeriod time.Duration
16 ShouldResync func() bool
17 clock clock.Clock
18 paginatedResult bool
19 // 最新一次看到的资源版本号
20 lastSyncResourceVersion string
21 isLastSyncResourceVersionUnavailable bool
22 lastSyncResourceVersionMutex sync.RWMutex
23 WatchListPageSize int64
24 watchErrorHandler WatchErrorHandler
25}
核心方法:Run
核心逻辑包括:
- 调用List方法获取资源对象下所有的数据
- 将资源数据转换为资源对象列表
- 将资源信息存储到DeltaFIFO中,全量替换本地缓存
- 调用Watch方法监听资源
- 调用watchHandler函数,处理watch到的各种事件
1// Run函数
2func (r *Reflector) Run(stopCh <-chan struct{}) {
3 ...
4 wait.BackoffUntil(func() {
5 // 核心函数:ListAndWatch
6 if err := r.ListAndWatch(stopCh); err != nil {
7 r.watchErrorHandler(r, err)
8 }
9 }, r.backoffManager, true, stopCh)
10 ...
11}
ListAndWatch方法
1// ListAndWatch函数
2func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
3 ...
4 if err := func() error {
5 ...
6 go func() {
7 ...
8 pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
9 // 调用List方法获取资源对象下所有的数据
10 return r.listerWatcher.List(opts)
11 }))
12 ...
13 }()
14 ...
15 // 获取资源版本号
16 resourceVersion = listMetaInterface.GetResourceVersion()
17 initTrace.Step("Resource version extracted")
18 // 将资源数据转换为资源对象列表
19 items, err := meta.ExtractList(list)
20 // 将资源信息存储到DeltaFIFO中,全量替换本地缓存
21 // 内部调用了replace方法
22 if err := r.syncWith(items, resourceVersion); err != nil {
23 return fmt.Errorf("unable to sync list result: %v", err)
24 }
25 // 设置最新的资源版本号
26 r.setLastSyncResourceVersion(resourceVersion)
27 return nil
28 }(); err != nil {
29 return err
30 }
31
32 go func() {
33 ...
34 for {
35 ...
36 // 同步资源
37 if r.ShouldResync == nil || r.ShouldResync() {
38 // 调用DeltaFIFO的Resync方法
39 if err := r.store.Resync(); err != nil {
40 ...
41 }
42 }
43 resyncCh, cleanup = r.resyncChan()
44 }
45 }()
46
47 for {
48 ...
49 // 监听资源
50 w, err := r.listerWatcher.Watch(options)
51 // 处理handler事件,用户注册的Add,Delete,Update函数
52 if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh);
53 ...
54 }
55}
56
57// syncWith函数
58func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
59 found := make([]interface{}, 0, len(items))
60 for _, item := range items {
61 found = append(found, item)
62 }
63 // 调用cache.Replace
64 return r.store.Replace(found, resourceVersion)
65}
66
67// cache.Replace
68func (c *cache) Replace(list []interface{}, resourceVersion string) error {
69 items := make(map[string]interface{}, len(list))
70 for _, item := range list {
71 key, err := c.keyFunc(item)
72 if err != nil {
73 return KeyError{item, err}
74 }
75 items[key] = item
76 }
77 c.cacheStorage.Replace(items, resourceVersion)
78 return nil
79}
watchHandler方法
watchHandler函数,处理watch到的各种事件,所有的事件都存放在ResultChan中,包括:事件类型,资源对象
1func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
2 ...
3 for {
4 select {
5 ...
6 // 获取watch接口中的事件的channel
7 case event, ok := <-w.ResultChan():
8 ...
9 switch event.Type {
10 // 处理Add函数
11 case watch.Added:
12 // store是DeltaFIFO类
13 err := r.store.Add(event.Object)
14 // 处理Modified函数
15 case watch.Modified:
16 err := r.store.Update(event.Object)
17 // 处理Deleted函数
18 case watch.Deleted:
19 err := r.store.Delete(event.Object)
20 }
21 *resourceVersion = newResourceVersion
22 // 设置资源版本
23 r.setLastSyncResourceVersion(newResourceVersion)
24 eventCount++
25 }
26 }
27 ...
28}
DeltaFIFO
后面的源码分析会用到DeltaFIFO,这里先介绍一下。
DeltaFIFO用来存储Watch API返回的各种事件,队列中会存在拥有不同操作类型的同一个资源对象 DeltaFIFO实现了Queue接口,Queue继承了Store接口
源码路径:vendor/k8s.io/client-go/tools/cache/delta_fifo.go
1type DeltaFIFO struct {
2 // lock/cond protects access to 'items' and 'queue'.
3 lock sync.RWMutex
4 cond sync.Cond
5
6 // We depend on the property that items in the set are in
7 // the queue and vice versa, and that all Deltas in this
8 // map have at least one Delta.
9
10 // map结构存储:key是资源对象的key,value是对象的Deltas数组
11 items map[string]Deltas
12
13 // 存储资源对象的key
14 queue []string
15
16 // populated is true if the first batch of items inserted by Replace() has been populated
17 // or Delete/Add/Update was called first.
18 populated bool
19 // initialPopulationCount is the number of items inserted by the first call of Replace()
20 initialPopulationCount int
21
22 // keyFunc is used to make the key used for queued item
23 // insertion and retrieval, and should be deterministic.
24 keyFunc KeyFunc
25
26 // Index本地存储对象
27 knownObjects KeyListerGetter
28
29 closed bool
30 closedLock sync.Mutex
31}
核心功能包括:
- 生产者方法
- 消费者方法
- Resync机制
生产者方法
Reflector监听到资源变化后,将Add、Delete、Update等资源变更信息加入到DeltaFIFO。也就是队列的生产者,方法如下,内部都调用了
入口函数是 r.store.Add(event.Object),在前面的watchHandler介绍过
1func (f *DeltaFIFO) Add(obj interface{}) error {
2 ...
3 return f.queueActionLocked(Added, obj)
4}
5
6func (f *DeltaFIFO) Update(obj interface{}) error {
7 ...
8 return f.queueActionLocked(Updated, obj)
9}
10
11func (f *DeltaFIFO) Delete(obj interface{}) error {
12 ...
13 return f.queueActionLocked(Deleted, obj)
14}
15
16// 举例说明其中一个处理函数,其他的类似,内部都调用queueActionLocked
17func (f *DeltaFIFO) Update(obj interface{}) error {
18 f.lock.Lock()
19 defer f.lock.Unlock()
20 f.populated = true
21 // 内部调用queueActionLocked
22 return f.queueActionLocked(Updated, obj)
23}
queueActionLocked方法
1// 内部主要是封装Delta事件,并加入队列,供消费者消费(HandleDeltas函数)
2func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
3 // 根据资源对象,得到key,一般是 namespace/name 格式
4 id, err := f.KeyOf(obj)
5
6 // 将watch到的事件类型和资源对象,封装成Delta对象
7 newDeltas := append(f.items[id], Delta{actionType, obj})
8 // 去重操作
9 newDeltas = dedupDeltas(newDeltas)
10
11 // 将Delta对象加入到队列中
12 if len(newDeltas) > 0 {
13 if _, exists := f.items[id]; !exists {
14 f.queue = append(f.queue, id)
15 }
16 f.items[id] = newDeltas
17 f.cond.Broadcast()
18 } else {
19 delete(f.items, id)
20 }
21 return nil
22}
消费者方法-processLoop
前面的源码分析,分析完了Reflector的Run方法,下一步便是controller的processLoop方法
1func (c *controller) Run(stopCh <-chan struct{}) {
2 // 调用NewReflector初始化一个Reflector
3 // 必须传入ListWatcher数据接口对象
4 r := NewReflector(
5 c.config.ListerWatcher,
6 c.config.ObjectType,
7 c.config.Queue,
8 c.config.FullResyncPeriod,
9 )
10 ...
11 // 调用Reflector的Run方法,启动监控,并处理监控事件
12 wg.StartWithChannel(stopCh, r.Run)
13
14 // processLoop负责从DeltaFIFO取出数据并消费
15 wait.Until(c.processLoop, time.Second, stopCh)
16}
1func (c *controller) processLoop() {
2 for {
3 // 从DeltaFIFO队列中取出数据,并交给process处理
4 // process函数保存在config.Process中,也就是前面传入的 HandleDeltas
5 obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
6 ...
7 }
8}
Pop方法
DeltaFIFO的消费方法为Pop,该函数需要传入process函数,用于接收并处理对象的回调方法
1func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
2 for {
3 for len(f.queue) == 0 {
4 // 当队列为空时,Pop函数阻塞住,知道新的数据入队列才唤醒
5 // 如果Close函数被调用,closed状态被设置,并且广播
6 if f.closed {
7 return nil, ErrFIFOClosed
8 }
9
10 f.cond.Wait()
11 }
12 // 走到这里,说明队列中有数据,取出数据
13 id := f.queue[0]
14 ...
15 // 将数据交给上层回调函数处理
16 err := process(item)
17 // 出错则将数据重新放入队列
18 if e, ok := err.(ErrRequeue); ok {
19 f.addIfNotPresent(id, item)
20 err = e.Err
21 }
22 return item, err
23 }
24}
process函数:HandleDeltas
- Run方法中传入一个回调函数 HandleDeltas
- processLoop内部执行的pop对象就是上面传入的HandleDeltas
核心逻辑包括:
- 更新本地缓存cacheStorage,其实就是更新 threadSafeMap 这个数据结构
- 将事件通知到所有的listener,其实就是往listener的addCh中放入数据,供消费者消费
1func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
2 // 获取所有的Delta资源
3 for _, d := range obj.(Deltas) {
4 // 判断资源类型
5 switch d.Type {
6 // 如果是下列类型,将资源存储到Indexer
7 case Sync, Replaced, Added, Updated:
8 s.cacheMutationDetector.AddObject(d.Object)
9 if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
10 // indexer的实现类是前面介绍过的cache
11 if err := s.indexer.Update(d.Object); err != nil {
12 return err
13 }
14
15 isSync := false
16 switch {
17 case d.Type == Sync:
18 isSync = true
19 case d.Type == Replaced:
20 if accessor, err := meta.Accessor(d.Object); err == nil {
21 if oldAccessor, err := meta.Accessor(old); err == nil {
22 isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
23 }
24 }
25 }
26 // 将资源对象分发至 SharedInformer 的事件处理函数中
27 s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
28 } else {
29 if err := s.indexer.Add(d.Object); err != nil {
30 return err
31 }
32 s.processor.distribute(addNotification{newObj: d.Object}, false)
33 }
34 case Deleted:
35 if err := s.indexer.Delete(d.Object); err != nil {
36 return err
37 }
38 s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
39 }
40 }
41 return nil
42}
43
44// 更新本地缓存cacheStorage
45// 其实就是更新 threadSafeMap 这个数据结构,threadSafeMap的初始化在前面介绍过
46func (c *cache) Update(obj interface{}) error {
47 key, err := c.keyFunc(obj)
48 if err != nil {
49 return KeyError{obj, err}
50 }
51 c.cacheStorage.Update(key, obj)
52 return nil
53}
54
55// distribute函数
56func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
57 ...
58 if sync {
59 for _, listener := range p.syncingListeners {
60 // 核心方法
61 listener.add(obj)
62 }
63 } else {
64 for _, listener := range p.listeners {
65 listener.add(obj)
66 }
67 }
68}
69
70// listener.add方法,这里将事件添加到listener的addCh通道中
71// 至此,也回答的前面的问题-- 谁往p.addCh中生产数据
72func (p *processorListener) add(notification interface{}) {
73 // 不同更新类型的对象加入到channel中
74 // 供给processorListener的Run方法使用
75 p.addCh <- notification
76}
Resync机制
ListAndWatch方法中的三步:
- List
- Rsync
- Watch
RSync负责将Indexer本地存储的资源对象同步到DeltaFIFO中,并设置资源类型为Sync类型。在Reflector中定时执行
1func (f *DeltaFIFO) Resync() error {
2 // 获取indexer本地存储对象
3 keys := f.knownObjects.ListKeys()
4 for _, k := range keys {
5 if err := f.syncKeyLocked(k); err != nil {
6 return err
7 }
8 }
9 return nil
10}
Indexer
前面的分析中说到,资源的变更会保存到本地的Indexer,这里介绍一下。
- Indexer是client-go用来存储资源对象,并自带索引功能的本地存储
- Reflector从DeltaFIFO中消费出来的资源对象存储至Indexer
- Indexer数据与Etcd一致,client-go可以方便的从本地读取,减轻api-server的压力
四个重要的数据结构
源码位置:k8s.io/client-go/tools/cache/index.go
1// 存储缓存数据
2// type Empty struct{}
3// type String map[string]Empty
4// 这里的sets.String是用map模拟set,map中的value都是空结构体
5type Index map[string]sets.String
6
7// 索引器函数,接收资源对象,返回检索结果列表
8type IndexFunc func(obj interface{}) ([]string, error)
9
10// 存储索引器,key为索引器名称,value为索引器实现函数
11type Indexers map[string]IndexFunc
12
13// 存储缓存器,key为缓存器名称,value为缓存数据
14type Indices map[string]Index
ThreadSafeMap
Indexer基于ThreadSafeMap做了封装,先看一下ThreadSafeMap
- ThreadSafeMap是一个内存存储,数据不会存入磁盘
- 增删改查都会加锁,保证数据一致性
- 内部用到了索引器、缓存器
源码位置:k8s.io/tools/cache/thread_safe_store.go
1type threadSafeMap struct {
2 lock sync.RWMutex
3 // map结构存储资源数据
4 // map中的key是通过keyFunc函数计算得出,默认使用MetaNamespaceFunc函数
5 // 该函数根据资源对象计算出<namespace>/<name>格式的key
6 // value是Delta对象,包括Type和Object资源对象
7 items map[string]interface{}
8 // 存储索引器,key为索引器名称,value为索引器实现函数
9 indexers Indexers
10 // 存储缓存器,key为缓存器名称,value为缓存的资源对象数据
11 indices Indices
12}
13
14// 通过执行索引器函数得到索引结果
15// 需要两个参数:索引器名称、需要检索的key
16func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) {
17 // 查找指定的索引器函数
18 indexFunc := c.indexers[indexName]
19 if indexFunc == nil {
20 return nil, fmt.Errorf("Index with name %s does not exist", indexName)
21 }
22 // 查找指定的缓存器函数
23 index := c.indices[indexName]
24
25 // 从缓存数据中查找并返回数据
26 set := index[indexedValue]
27 list := make([]interface{}, 0, set.Len())
28 for key := range set {
29 list = append(list, c.items[key])
30 }
31
32 return list, nil
33}
总结
Informer机制在k8s中占据重要的角色,它的源码也是非常的复杂。学习的过程中一定要配合文章开始的那个图,否则很容易就绕进去了。里面使用Queue和Channel来解耦各个组件。个人的一个心得是:围绕着一个核心思路,可能分析起来逻辑会清晰一点,那就是:谁往channel放了数据,谁又从channel取了数据。