client-go源码分析(二) - Informer机制


| 阅读 |,阅读约 21 分钟
| 复制链接:

Overview

client-go源码分析(二) - Informer机制

最近加入云原生社区组织的k8s源码研习社,开始学习k8s底层源码,并整理成笔记。欢迎感兴趣的同学一起加入,共同学习进步。群里和社区里有各种大佬,随时可以帮你答疑解惑。https://github.com/cloudnativeto/sig-k8s-source-code

先放一张调用关系图

高清地址

由于Informer这部分的源码比较复杂,调用链路也很长,后面的源码分析,都会围绕这一张图展开。

Informer调用关系图

概述

k8s中,组件之间通过http通讯,在不依赖任何中间件的情况下,需要保证消息的可靠性、实时性、顺序性等?k8s是如何做到的呢?— 答案就是Informer。k8s的其他组件都是通过informer与api-server通讯的。

Informer运行原理

k8s-informer

各个组件包括:

  • Reflector:用于监控(watch)指定的资源,当监控的资源发生变化时,触发相应的变更事件。并将资源对象存放到本地缓存DeltaFIFO中
  • DeltaFIFO:对资源对象的的操作类型进行队列的基本操作
    • FIFO:先进先出队列,提供资源对象的增删改查等操作
    • Dealta:资源对象存储,可以保存资源对象的操作类型。如:添加操作类型、更新操作类型、删除操作类型、同步操作类型
  • Indexer:存储资源对象,并自带索引功能的本地存储。
    • Reflect从DeltaFIFO中将消费出来的资源对象存储至Indexer
    • Indexer中的数据与Etcd完全一致,client-go可以从本地读取,减轻etcd和api-server的压力

Informer使用示例

  • 通过kubernetes.NewForConfig创建clientset对象。informer需要通过clientset与apiserver进行交互
  • 创建一个用于停止的channel,用于进程退出前通知informer提前退出。因为informer是一个持久运行的Groutine
  • informers.NewSharedInformerFactory实例化sharedInformer对象
    • 第一个参数是ClientSet
    • 第二个参数是多久同步一次
  • Informer方法可以获得特定资源的informer对象
  • AddEventHandler函数可以为对象添加回调方法,支持三种对象的回调方法
    • AddFunc:创建资源对象时触发的回调方法
    • UpdateFunc:更新资源对象时触发的回调方法
    • DeleteFunc:删除资源对象时触发的回调方法
  • Run方法运行当前的informer
 1// 通过informer机制,实现k8s资源的监控
 2func informer() {
 3  // 因为informer是一个持久运行的groutine,channel作用:进程退出前通知informer退出
 4  stopChan := make(chan struct{})
 5  defer close(stopChan)
 6
 7  // 创建连接k8s的client对象
 8  clientSet, err := kubernetes.NewForConfig(config)
 9  if err != nil {
10    log.Printf("init clientset error.")
11    return
12  }
13
14  // 第一步:创建sharedInformer对象,第二个参数为重新同步数据的间隔时间
15  sharedInformers := informers.NewSharedInformerFactory(clientSet, time.Minute)
16  // 第二步:每个资源都有informer对象,这里获取pod资源的informer对象
17  podInformer := sharedInformers.Core().V1().Pods().Informer()
18  // 第三步:添加自定义回调函数
19  podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
20    // 添加资源的回调函数,返回的是接口类型,需要强制转换为真正的类型
21    AddFunc: func(obj interface{}) {
22      mObj := obj.(v1.Object)
23      log.Printf("New pod added: %s", mObj.GetName())
24    },
25    // 更新资源的回调函数
26    UpdateFunc: func(oldObj, newObj interface{}) {
27      oObj := oldObj.(v1.Object)
28      nObj := newObj.(v1.Object)
29      log.Printf("%s pod updated to %s", oObj.GetName(), nObj.GetName())
30    },
31    // 删除资源的回调函数
32    DeleteFunc: func(obj interface{}) {
33      mObj := obj.(v1.Object)
34      log.Printf("pod deleted from store: %s", mObj.GetName())
35    },
36  })
37  // 第四步:开始运行informer对象
38  podInformer.Run(stopChan)
39}

informer数据流转图

资源Informer和SharedInformer

前面demo中,第一步便是创建SharedInformer对象,下面先介绍一下Informer和SharedInformer

资源Informer

  • 每一种资源都实现了Informer机制,允许监控不同的资源事件
  • 每一个Informer都会实现Informer和Lister方法
1type PodInformer interface {
2  Informer() cache.SharedIndexInformer
3  Lister() v1.PodLister
4}

SharedInformer

若同一个资源的Informer被实例化了多次,每个Informer使用一个Reflector,那么会运行过多相同的ListAndWatch,太多重复的序列化和反序列化操作会导致api-server负载过重

SharedInformer可以使同一类资源Informer共享一个Reflector。内部定义了一个map字段,用于存放所有Infromer的字段。

前面demo中第一步创建SharedInformer,sharedInformers := informers.NewSharedInformerFactory(clientSet, time.Minute),内部初始化了一个sharedInformerFactory对象,先看下sharedInformerFactory

源码位置:vendor/k8s.io/client-go/informer/factory.go

 1type sharedInformerFactory struct {
 2  client           kubernetes.Interface
 3  namespace        string
 4  tweakListOptions internalinterfaces.TweakListOptionsFunc
 5  lock             sync.Mutex
 6  defaultResync    time.Duration
 7  customResync     map[reflect.Type]time.Duration
 8
 9  // 按照类型存放共享的informer
10  informers map[reflect.Type]cache.SharedIndexInformer
11
12  // 这个字段用来追踪informers是否被启动了
13  // 可以保证Start()方法安全的重复调用多次(幂等性)
14  startedInformers map[reflect.Type]bool
15}

Start方法

k8s中的Controller-Manager组件,源码中的Run方法调用了SharedInformerFactory的Start方法

源码位置:cmd/kube-controller-manager/app/controllermanager.go

1func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
2  ...
3  controllerContext.InformerFactory.Start(controllerContext.Stop)
4  ...
5}

源码位置:k8s.io/client-go/informers/factory.go

 1func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
 2  ...
 3  // 遍历所有的informers
 4  for informerType, informer := range f.informers {
 5    if !f.startedInformers[informerType] {
 6      // 每一种informer启动一个协程,运行Run方法
 7      go informer.Run(stopCh)
 8      f.startedInformers[informerType] = true
 9    }
10  }
11}

获取Informer

前面demo中,创建好sharedInformer对象后,第二步是调用podInformer := sharedInformers.Core().V1().Pods().Informer(),获取具体的informer实例,下面开始分析Informer方法

关键逻辑包括:

  • sharedProcessor的初始化
  • List和Watch方法的注册:注册具体某个资源类型的list和watch方法
  • Indexer的初始化:实现类是cache类

以pod为例,源码位置:client-go/informers/core/v1/pod.go

 1// 获取pod的informer,内部调用InformerFor,参数需要传入f.defaultInformer
 2func (f *podInformer) Informer() cache.SharedIndexInformer {
 3  return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
 4}
 5
 6func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
 7  // 最后一个参数,初始化indexers
 8  return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
 9}
10
11func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
12  return cache.NewSharedIndexInformer(
13    // 注册List、Watch方法
14    &cache.ListWatch{
15      ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
16        if tweakListOptions != nil {
17          tweakListOptions(&options)
18        }
19        // List方法是该种资源对象的List方法(这里是pod)
20        return client.CoreV1().Pods(namespace).List(context.TODO(), options)
21      },
22      WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
23        if tweakListOptions != nil {
24          tweakListOptions(&options)
25        }
26        // Watch方法是该种资源对象的Watch方法(这里是pod)
27        return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
28      },
29    },
30    &corev1.Pod{},
31    resyncPeriod,
32    indexers,
33  )
34}
35
36func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
37  realClock := &clock.RealClock{}
38  sharedIndexInformer := &sharedIndexInformer{
39    // 这里是processor的初始化
40    processor:                       &sharedProcessor{clock: realClock},
41    // 这里是Indexer的初始化,接口为Indexer,实现类为cache
42    indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
43    listerWatcher:                   lw,
44    objectType:                      exampleObject,
45    resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
46    defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
47    cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
48    clock:                           realClock,
49  }
50  return sharedIndexInformer
51}
52
53// Index接口,实现类是cache类
54func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
55  return &cache{
56    cacheStorage: NewThreadSafeStore(indexers, Indices{}),
57    keyFunc:      keyFunc,
58  }
59}

注册自定义回调函数

得到Informer对象之后,第三步是给该infomer注册自定义回调函数,当k8s的资源发送变更时,可以实现自己的业务逻辑。 下面分析一下podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{...})的逻辑

 1// 开始注册事件处理函数
 2func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
 3  s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
 4}
 5
 6func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
 7  ...
 8  // 每一个监听者,都会注册为一个listner实例
 9  // 每个listener中持有一个handler对象,后面事件发生时,框架会调用handler方法,也就走到用户注册的代码逻辑了
10  listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
11  ...
12  // 将listner添加到sharedProcessor中
13  s.processor.addListener(listener)
14  for _, item := range s.indexer.List() {
15    listener.add(addNotification{newObj: item})
16  }
17}
18
19// 将listner添加到sharedProcessor中
20func (p *sharedProcessor) addListener(listener *processorListener) {
21  ...
22  if p.listenersStarted {
23    // listener后台启动了两个协程,这两个协程很关键,后面会介绍
24    p.wg.Start(listener.run)
25    p.wg.Start(listener.pop)
26  }
27}

Run方法

前面demo中,得到informer实例之后,最后一步就是调用Run方法,下面开始解析Run方法的逻辑, 核心逻辑包括:

  • DeltaFIFO的初始化
  • Controller的初始化
  • 运行process.Run方法
  • 运行controller.Run方法

源码位置:k8s.io/client-go/tools/cache/shared_informer.go

 1func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
 2  // 初始化DeltaFIFO
 3  fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
 4    KnownObjects:          s.indexer,
 5    EmitDeltaTypeReplaced: true,
 6  })
 7  // config初始化
 8  // 这里重点关注ListerWatcher对象和Process对象,Process关联的是HandleDeltas函数
 9  // HandleDeltas是消费增量信息(Delta对象)的核心方法
10  cfg := &Config{
11    Queue:            fifo,
12    // ListAndWatch对象
13    ListerWatcher:    s.listerWatcher,
14    ObjectType:       s.objectType,
15    FullResyncPeriod: s.resyncCheckPeriod,
16    RetryOnError:     false,
17    ShouldResync:     s.processor.shouldResync,
18    // 注册回调函数 HandleDeltas,资源变更时,存到到本地Indexer
19    Process:           s.HandleDeltas,
20    WatchErrorHandler: s.watchErrorHandler,
21  }
22  // 这里主要是controller的初始化
23  func() {
24    s.startedLock.Lock()
25    defer s.startedLock.Unlock()
26    // 初始化Controller对象
27    s.controller = New(cfg)
28    s.controller.(*controller).clock = s.clock
29    s.started = true
30  }()
31  // s.cacheMutationDetector.Run检查缓存对象是否存在
32  wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
33  // 执行sharedProcessor.run方法
34  // 这个方法非常重要
35  wg.StartWithChannel(processorStopCh, s.processor.run)
36  ...
37  // 调用Controller的Run方法
38  s.controller.Run(stopCh)
39}

process的Run方法

 1func (p *sharedProcessor) run(stopCh <-chan struct{}) {
 2  func() {
 3    ...
 4    // sharedProcessor的所有Listner,每个后台启动两个协程
 5    // 分别指向run和pop方法
 6    for _, listener := range p.listeners {
 7      p.wg.Start(listener.run)
 8      p.wg.Start(listener.pop)
 9    }
10    p.listenersStarted = true
11  }()
12  ...
13}

processorListener的run方法

processorListener代表一个消费者对象,该函数周期性执行,主要是从自己的nextCh通道中获取从api-server得到的增量信息,然后调用handler的相关方法,handler方法就是前面用户自定义传进来的方法。

这里我们只需要知道,run方法是消费者方法,负责消费事件。后面会介绍到谁是生产者,谁往processorLister的nextCh通道中放入增量信息(其实就是下面的pop方法)

 1func (p *processorListener) run() {
 2
 3  stopCh := make(chan struct{})
 4  wait.Until(func() {
 5    // 消费者方法,不断从通道中获取事件
 6    for next := range p.nextCh {
 7      switch notification := next.(type) {
 8      case updateNotification:
 9        // 调用handler的方法,
10        p.handler.OnUpdate(notification.oldObj, notification.newObj)
11      case addNotification:
12        p.handler.OnAdd(notification.newObj)
13      case deleteNotification:
14        p.handler.OnDelete(notification.oldObj)
15      default:
16        utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
17      }
18    }
19    // the only way to get here is if the p.nextCh is empty and closed
20    close(stopCh)
21  }, 1*time.Second, stopCh)
22}
23
24// OnUpdate方法,内部就是调用demo中注册的UpdateFunc方法
25// 其他方法类似
26func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) {
27  if r.UpdateFunc != nil {
28    r.UpdateFunc(oldObj, newObj)
29  }
30}

processorListener的pop方法

这个方法的实现非常复杂,但是总体的目的很简单:就是往nextCh中生产消息,然后前面介绍的run方法就可以消费到

这里主要用到了类似缓冲区的方法,并不是一次就获取一个事件

 1func (p *processorListener) pop() {
 2  ...
 3  var nextCh chan<- interface{}
 4  var notification interface{}
 5  for {
 6    select {
 7    // 函数第一次进来,这个notification一定是空的,这个case会被阻塞住
 8    // 当第二个case调用完成后,notification被赋值为notificationToAdd,才会进入到这里
 9    case nextCh <- notification:
10      // Notification dispatched
11      var ok bool
12      notification, ok = p.pendingNotifications.ReadOne()
13      if !ok { // Nothing to pop
14        nextCh = nil // Disable this select case
15      }
16    // 第一次调用时,会进入这里,首先从addCh中获取数据(后面会介绍谁往addCh中放数据)
17    case notificationToAdd, ok := <-p.addCh:
18      if !ok {
19        return
20      }
21      if notification == nil { // No notification to pop (and pendingNotifications is empty)
22        notification = notificationToAdd
23        // channel是引用类型,将p.nextCh指向nextCh,对nextCh的操作就是操作p.nextCh
24        // 这里也解答了前面的run方法里面提到的疑问,谁是nextCh的生产者,往nextCh放入数据
25        nextCh = p.nextCh
26      } else { // There is already a notification waiting to be dispatched
27        p.pendingNotifications.WriteOne(notificationToAdd)
28      }
29    }
30  }
31}

pop函数中我们看到,主要是消费p.addCh,后面详细介绍p.addCh的生产者是谁。这里简单提一下,watch函数监测到api-server的事件变化时,触发HandlerDelta函数,这个函数会更新Indexer,同时调用distribute方法,将该事件通知给所有的listener,内部实现就是:往每个listener的addCh这个通道中放入数据。

Controller的Run方法

Run方法内部关键逻辑包括:

  • 初始化Reflector对象
  • 调用Reflector的Run方法
    • 调用List获取全部资源数据
    • 调用Watch实时监控资源变更情况,并放入队列
  • 调用controller的processLoop方法
    • 消费队列中的数据

源码位置:k8s.io/client-go/tools/cache/controller.go

 1func (c *controller) Run(stopCh <-chan struct{}) {
 2  // 调用NewReflector初始化一个Reflector
 3  // 必须传入ListWatcher数据接口对象
 4  r := NewReflector(
 5    c.config.ListerWatcher,
 6    c.config.ObjectType,
 7    c.config.Queue,
 8    c.config.FullResyncPeriod,
 9  )
10  ...
11  // 调用Reflector的Run方法,启动监控,并处理监控事件
12  wg.StartWithChannel(stopCh, r.Run)
13
14  // processLoop负责从DeltaFIFO取出数据并消费
15  wait.Until(c.processLoop, time.Second, stopCh)
16}

Reflector

这里先暂停源码分析,简单介绍一下Reflector,Reflector用于监控指定的k8s资源,并触发相应的变更事件。

  • NewReflector:创建Reflector对象,需传入ListerWatcher数据接口对象
  • Run:启动监控,并处理事件

Run中的核心函数是ListAndWatch,流程包括:

  • 获取资源列表数据
  • 监控资源对象:使用http协议的分块传输编码

Reflector类

 1type Reflector struct {
 2  // 名称
 3  name string
 4  expectedTypeName string
 5  // 期望放入缓存store的资源类型
 6  expectedType reflect.Type
 7  // The GVK of the object we expect to place in the store if unstructured.
 8  expectedGVK *schema.GroupVersionKind
 9  // 存放同步监听到的资源,这里是DeltaFIFO类
10  store Store
11  // 用来执行List和Watch的对象
12  listerWatcher ListerWatcher
13  backoffManager wait.BackoffManager
14  // resync周期
15  resyncPeriod time.Duration
16  ShouldResync func() bool
17  clock clock.Clock
18  paginatedResult bool
19  // 最新一次看到的资源版本号
20  lastSyncResourceVersion string
21  isLastSyncResourceVersionUnavailable bool
22  lastSyncResourceVersionMutex sync.RWMutex
23  WatchListPageSize int64
24  watchErrorHandler WatchErrorHandler
25}

核心方法:Run

核心逻辑包括:

  • 调用List方法获取资源对象下所有的数据
  • 将资源数据转换为资源对象列表
  • 将资源信息存储到DeltaFIFO中,全量替换本地缓存
  • 调用Watch方法监听资源
  • 调用watchHandler函数,处理watch到的各种事件
 1// Run函数
 2func (r *Reflector) Run(stopCh <-chan struct{}) {
 3  ...
 4  wait.BackoffUntil(func() {
 5    // 核心函数:ListAndWatch
 6    if err := r.ListAndWatch(stopCh); err != nil {
 7      r.watchErrorHandler(r, err)
 8    }
 9  }, r.backoffManager, true, stopCh)
10  ...
11}

ListAndWatch方法

 1// ListAndWatch函数
 2func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
 3  ...
 4  if err := func() error {
 5    ...
 6    go func() {
 7      ...
 8      pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
 9        // 调用List方法获取资源对象下所有的数据
10        return r.listerWatcher.List(opts)
11      }))
12      ...
13    }()
14    ...
15    // 获取资源版本号
16    resourceVersion = listMetaInterface.GetResourceVersion()
17    initTrace.Step("Resource version extracted")
18    // 将资源数据转换为资源对象列表
19    items, err := meta.ExtractList(list)
20    // 将资源信息存储到DeltaFIFO中,全量替换本地缓存
21    // 内部调用了replace方法
22    if err := r.syncWith(items, resourceVersion); err != nil {
23      return fmt.Errorf("unable to sync list result: %v", err)
24    }
25    // 设置最新的资源版本号
26    r.setLastSyncResourceVersion(resourceVersion)
27    return nil
28  }(); err != nil {
29    return err
30  }
31
32  go func() {
33    ...
34    for {
35      ...
36      // 同步资源
37      if r.ShouldResync == nil || r.ShouldResync() {
38        // 调用DeltaFIFO的Resync方法
39        if err := r.store.Resync(); err != nil {
40          ...
41        }
42      }
43      resyncCh, cleanup = r.resyncChan()
44    }
45  }()
46
47  for {
48    ...
49    // 监听资源
50    w, err := r.listerWatcher.Watch(options)
51    // 处理handler事件,用户注册的Add,Delete,Update函数
52    if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh);
53    ...
54  }
55}
56
57// syncWith函数
58func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
59  found := make([]interface{}, 0, len(items))
60  for _, item := range items {
61    found = append(found, item)
62  }
63  // 调用cache.Replace
64  return r.store.Replace(found, resourceVersion)
65}
66
67// cache.Replace
68func (c *cache) Replace(list []interface{}, resourceVersion string) error {
69  items := make(map[string]interface{}, len(list))
70  for _, item := range list {
71    key, err := c.keyFunc(item)
72    if err != nil {
73      return KeyError{item, err}
74    }
75    items[key] = item
76  }
77  c.cacheStorage.Replace(items, resourceVersion)
78  return nil
79}

watchHandler方法

watchHandler函数,处理watch到的各种事件,所有的事件都存放在ResultChan中,包括:事件类型,资源对象

 1func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
 2  ...
 3  for {
 4    select {
 5    ...
 6    // 获取watch接口中的事件的channel
 7    case event, ok := <-w.ResultChan():
 8      ...
 9      switch event.Type {
10      // 处理Add函数
11      case watch.Added:
12        // store是DeltaFIFO类
13        err := r.store.Add(event.Object)
14      // 处理Modified函数
15      case watch.Modified:
16        err := r.store.Update(event.Object)
17      // 处理Deleted函数
18      case watch.Deleted:
19        err := r.store.Delete(event.Object)
20      }
21      *resourceVersion = newResourceVersion
22      // 设置资源版本
23      r.setLastSyncResourceVersion(newResourceVersion)
24      eventCount++
25    }
26  }
27  ...
28}

DeltaFIFO

后面的源码分析会用到DeltaFIFO,这里先介绍一下。

DeltaFIFO用来存储Watch API返回的各种事件,队列中会存在拥有不同操作类型的同一个资源对象 DeltaFIFO实现了Queue接口,Queue继承了Store接口

源码路径:vendor/k8s.io/client-go/tools/cache/delta_fifo.go

 1type DeltaFIFO struct {
 2  // lock/cond protects access to 'items' and 'queue'.
 3  lock sync.RWMutex
 4  cond sync.Cond
 5
 6  // We depend on the property that items in the set are in
 7  // the queue and vice versa, and that all Deltas in this
 8  // map have at least one Delta.
 9
10  // map结构存储:key是资源对象的key,value是对象的Deltas数组
11  items map[string]Deltas
12
13  // 存储资源对象的key
14  queue []string
15
16  // populated is true if the first batch of items inserted by Replace() has been populated
17  // or Delete/Add/Update was called first.
18  populated bool
19  // initialPopulationCount is the number of items inserted by the first call of Replace()
20  initialPopulationCount int
21
22  // keyFunc is used to make the key used for queued item
23  // insertion and retrieval, and should be deterministic.
24  keyFunc KeyFunc
25
26  // Index本地存储对象
27  knownObjects KeyListerGetter
28
29  closed     bool
30  closedLock sync.Mutex
31}

核心功能包括:

  • 生产者方法
  • 消费者方法
  • Resync机制

生产者方法

Reflector监听到资源变化后,将Add、Delete、Update等资源变更信息加入到DeltaFIFO。也就是队列的生产者,方法如下,内部都调用了

入口函数是 r.store.Add(event.Object),在前面的watchHandler介绍过

 1func (f *DeltaFIFO) Add(obj interface{}) error {
 2  ...
 3  return f.queueActionLocked(Added, obj)
 4}
 5
 6func (f *DeltaFIFO) Update(obj interface{}) error {
 7  ...
 8  return f.queueActionLocked(Updated, obj)
 9}
10
11func (f *DeltaFIFO) Delete(obj interface{}) error {
12  ...
13  return f.queueActionLocked(Deleted, obj)
14}
15
16// 举例说明其中一个处理函数,其他的类似,内部都调用queueActionLocked
17func (f *DeltaFIFO) Update(obj interface{}) error {
18  f.lock.Lock()
19  defer f.lock.Unlock()
20  f.populated = true
21  // 内部调用queueActionLocked
22  return f.queueActionLocked(Updated, obj)
23}

queueActionLocked方法

 1// 内部主要是封装Delta事件,并加入队列,供消费者消费(HandleDeltas函数)
 2func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
 3  // 根据资源对象,得到key,一般是 namespace/name 格式
 4  id, err := f.KeyOf(obj)
 5
 6  // 将watch到的事件类型和资源对象,封装成Delta对象
 7  newDeltas := append(f.items[id], Delta{actionType, obj})
 8  // 去重操作
 9  newDeltas = dedupDeltas(newDeltas)
10
11  // 将Delta对象加入到队列中
12  if len(newDeltas) > 0 {
13    if _, exists := f.items[id]; !exists {
14      f.queue = append(f.queue, id)
15    }
16    f.items[id] = newDeltas
17    f.cond.Broadcast()
18  } else {
19    delete(f.items, id)
20  }
21  return nil
22}

消费者方法-processLoop

前面的源码分析,分析完了Reflector的Run方法,下一步便是controller的processLoop方法

 1func (c *controller) Run(stopCh <-chan struct{}) {
 2  // 调用NewReflector初始化一个Reflector
 3  // 必须传入ListWatcher数据接口对象
 4  r := NewReflector(
 5    c.config.ListerWatcher,
 6    c.config.ObjectType,
 7    c.config.Queue,
 8    c.config.FullResyncPeriod,
 9  )
10  ...
11  // 调用Reflector的Run方法,启动监控,并处理监控事件
12  wg.StartWithChannel(stopCh, r.Run)
13
14  // processLoop负责从DeltaFIFO取出数据并消费
15  wait.Until(c.processLoop, time.Second, stopCh)
16}
1func (c *controller) processLoop() {
2  for {
3    // 从DeltaFIFO队列中取出数据,并交给process处理
4    // process函数保存在config.Process中,也就是前面传入的 HandleDeltas
5    obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
6    ...
7  }
8}

Pop方法

DeltaFIFO的消费方法为Pop,该函数需要传入process函数,用于接收并处理对象的回调方法

 1func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
 2  for {
 3    for len(f.queue) == 0 {
 4      // 当队列为空时,Pop函数阻塞住,知道新的数据入队列才唤醒
 5      // 如果Close函数被调用,closed状态被设置,并且广播
 6      if f.closed {
 7        return nil, ErrFIFOClosed
 8      }
 9
10      f.cond.Wait()
11    }
12    // 走到这里,说明队列中有数据,取出数据
13    id := f.queue[0]
14    ...
15    // 将数据交给上层回调函数处理
16    err := process(item)
17    // 出错则将数据重新放入队列
18    if e, ok := err.(ErrRequeue); ok {
19      f.addIfNotPresent(id, item)
20      err = e.Err
21    }
22    return item, err
23  }
24}

process函数:HandleDeltas

  • Run方法中传入一个回调函数 HandleDeltas
  • processLoop内部执行的pop对象就是上面传入的HandleDeltas

核心逻辑包括:

  • 更新本地缓存cacheStorage,其实就是更新 threadSafeMap 这个数据结构
  • 将事件通知到所有的listener,其实就是往listener的addCh中放入数据,供消费者消费
 1func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
 2  // 获取所有的Delta资源
 3  for _, d := range obj.(Deltas) {
 4    // 判断资源类型
 5    switch d.Type {
 6    // 如果是下列类型,将资源存储到Indexer
 7    case Sync, Replaced, Added, Updated:
 8      s.cacheMutationDetector.AddObject(d.Object)
 9      if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
10        // indexer的实现类是前面介绍过的cache
11        if err := s.indexer.Update(d.Object); err != nil {
12          return err
13        }
14
15        isSync := false
16        switch {
17        case d.Type == Sync:
18          isSync = true
19        case d.Type == Replaced:
20          if accessor, err := meta.Accessor(d.Object); err == nil {
21            if oldAccessor, err := meta.Accessor(old); err == nil {
22              isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
23            }
24          }
25        }
26        // 将资源对象分发至 SharedInformer 的事件处理函数中
27        s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
28      } else {
29        if err := s.indexer.Add(d.Object); err != nil {
30          return err
31        }
32        s.processor.distribute(addNotification{newObj: d.Object}, false)
33      }
34    case Deleted:
35      if err := s.indexer.Delete(d.Object); err != nil {
36        return err
37      }
38      s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
39    }
40  }
41  return nil
42}
43
44// 更新本地缓存cacheStorage
45// 其实就是更新 threadSafeMap 这个数据结构,threadSafeMap的初始化在前面介绍过
46func (c *cache) Update(obj interface{}) error {
47  key, err := c.keyFunc(obj)
48  if err != nil {
49    return KeyError{obj, err}
50  }
51  c.cacheStorage.Update(key, obj)
52  return nil
53}
54
55// distribute函数
56func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
57  ...
58  if sync {
59    for _, listener := range p.syncingListeners {
60      // 核心方法
61      listener.add(obj)
62    }
63  } else {
64    for _, listener := range p.listeners {
65      listener.add(obj)
66    }
67  }
68}
69
70// listener.add方法,这里将事件添加到listener的addCh通道中
71// 至此,也回答的前面的问题-- 谁往p.addCh中生产数据
72func (p *processorListener) add(notification interface{}) {
73  // 不同更新类型的对象加入到channel中
74  // 供给processorListener的Run方法使用
75  p.addCh <- notification
76}

Resync机制

ListAndWatch方法中的三步:

  • List
  • Rsync
  • Watch

RSync负责将Indexer本地存储的资源对象同步到DeltaFIFO中,并设置资源类型为Sync类型。在Reflector中定时执行

 1func (f *DeltaFIFO) Resync() error {
 2  // 获取indexer本地存储对象
 3  keys := f.knownObjects.ListKeys()
 4  for _, k := range keys {
 5    if err := f.syncKeyLocked(k); err != nil {
 6      return err
 7    }
 8  }
 9  return nil
10}

Indexer

前面的分析中说到,资源的变更会保存到本地的Indexer,这里介绍一下。

  • Indexer是client-go用来存储资源对象,并自带索引功能的本地存储
  • Reflector从DeltaFIFO中消费出来的资源对象存储至Indexer
  • Indexer数据与Etcd一致,client-go可以方便的从本地读取,减轻api-server的压力

四个重要的数据结构

源码位置:k8s.io/client-go/tools/cache/index.go

 1// 存储缓存数据
 2// type Empty struct{}
 3// type String map[string]Empty
 4// 这里的sets.String是用map模拟set,map中的value都是空结构体
 5type Index map[string]sets.String
 6
 7// 索引器函数,接收资源对象,返回检索结果列表
 8type IndexFunc func(obj interface{}) ([]string, error)
 9
10// 存储索引器,key为索引器名称,value为索引器实现函数
11type Indexers map[string]IndexFunc
12
13// 存储缓存器,key为缓存器名称,value为缓存数据
14type Indices map[string]Index

ThreadSafeMap

Indexer基于ThreadSafeMap做了封装,先看一下ThreadSafeMap

  • ThreadSafeMap是一个内存存储,数据不会存入磁盘
  • 增删改查都会加锁,保证数据一致性
  • 内部用到了索引器、缓存器

源码位置:k8s.io/tools/cache/thread_safe_store.go

 1type threadSafeMap struct {
 2  lock  sync.RWMutex
 3  // map结构存储资源数据
 4  // map中的key是通过keyFunc函数计算得出,默认使用MetaNamespaceFunc函数
 5  // 该函数根据资源对象计算出<namespace>/<name>格式的key
 6  // value是Delta对象,包括Type和Object资源对象
 7  items map[string]interface{}
 8  // 存储索引器,key为索引器名称,value为索引器实现函数
 9  indexers Indexers
10  // 存储缓存器,key为缓存器名称,value为缓存的资源对象数据
11  indices Indices
12}
13
14// 通过执行索引器函数得到索引结果
15// 需要两个参数:索引器名称、需要检索的key
16func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) {
17  // 查找指定的索引器函数
18  indexFunc := c.indexers[indexName]
19  if indexFunc == nil {
20    return nil, fmt.Errorf("Index with name %s does not exist", indexName)
21  }
22  // 查找指定的缓存器函数
23  index := c.indices[indexName]
24
25  // 从缓存数据中查找并返回数据
26  set := index[indexedValue]
27  list := make([]interface{}, 0, set.Len())
28  for key := range set {
29    list = append(list, c.items[key])
30  }
31
32  return list, nil
33}

总结

Informer机制在k8s中占据重要的角色,它的源码也是非常的复杂。学习的过程中一定要配合文章开始的那个图,否则很容易就绕进去了。里面使用Queue和Channel来解耦各个组件。个人的一个心得是:围绕着一个核心思路,可能分析起来逻辑会清晰一点,那就是:谁往channel放了数据,谁又从channel取了数据。