k8s存储组件Etcd核心源码分析


| 阅读 |,阅读约 9 分钟
| 复制链接:

Overview

概述

  • Etcd是一个分布式键值对存储数据库
  • etcd以一致和容错的方式存储元数据
  • 通过分布式锁、leader选举、写屏障来实现可靠的分布式协作

为什么取名"Etcd"

etcd = etc + d

  • etc: unix系统下存储配置的地方
  • d:distribute,分布式系统

etcd寓意存储大规模分布式系统的配置信息

etcd与k8s

  • k8s使用Etcd作为集群唯一存储,生产环境一般以集群部署,称为Etcd集群。
  • 集群存储k8s的集群状态和元信息,包括所有的k8s资源对象信息、资源对象状态、集群节点信息等。
  • k8s将所有数据存储至etcd集群前缀为/registry的目录下

存储架构设计

高清地址

etcd存储架构

k8s对etcd存储进行大量封装,其架构是分层的,而每一层封装设计又拥有高度的可扩展性。

  • RESTStorage:实现Restful风格的对外资源存储服务的api接口
  • RegistryStore:实现了资源的通用操作,在存储资源对象之前和之后可以执行某些函数
  • Storage.Interface:通用存储接口,该接口定义了资源的操作方法
  • CacheStorage:带有缓存功能的资源存储对象
  • UnderlyingStorage:底层存储,是真正与etcd集群交互的资源存储对象

存储概述

资源对象在etcd集群中以二进制形式存储,存储与获取过程都通过protobufSerializer编解码器进行数据的编码和解码

RESTStorage

k8s的每种资源都提供了restful风格的对外资源存储服务接口,所有对外暴露restful的资源都必须实现RESTStorage接口。

源码位置:vendor/k8s.io/apiserver/pkg/registry/rest/rest.go

1type Storage interface {
2  New() runtime.Object
3}

以Deployment资源为例,看下源码实现

源码位置:pkg/registry/apps/deployment/storage/storage.go

 1type REST struct {
 2  *genericregistry.Store
 3  categories []string
 4}
 5
 6type StatusREST struct {
 7  store *genericregistry.Store
 8}
 9
10type ScaleREST struct {
11  store *genericregistry.Store
12}
13
14type DeploymentStorage struct {
15  // 实现deployment资源的RESTStorage接口
16  Deployment *REST
17  // 实现deployment/status资源的RESTStorage接口
18  Status     *StatusREST
19  Scale      *ScaleREST
20  Rollback   *RollbackREST
21}

RegistryStore

RegistryStore实现了资源存储的通用操作,例如在存储资源对象之前执行某个函数,在存储资源对象之后执行某个函数。 RegistryStore定义了4种Strategy预处理方法,3种创建资源对象后的处理方法。

 1type Store struct {
 2  // 创建资源对象时的预处理操作
 3  CreateStrategy rest.RESTCreateStrategy
 4  // 创建资源对象后的处理操作
 5  AfterCreate ObjectFunc
 6  // 更新资源对象时的预处理操作
 7  UpdateStrategy rest.RESTUpdateStrategy
 8  // 更新资源对象后的处理操作
 9  AfterUpdate ObjectFunc
10  // 删除资源对象时的预处理操作
11  DeleteStrategy rest.RESTDeleteStrategy
12  // 删除资源对象后的处理操作
13  AfterDelete ObjectFunc
14  // 导出资源对象时的预处理操作
15  ExportStrategy rest.RESTExportStrategy
16
17  // 对Storage.Interface通用存储接口进行的封装,实现了对Etcd集群的读、写操作
18  Storage DryRunnableStorage
19  ...
20}
21
22type DryRunnableStorage struct {
23  Storage storage.Interface
24  Codec   runtime.Codec
25}
26
27// 创建资源对象的方法
28func (e *Store) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
29  // 资源创建前操作
30  if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {
31    return nil, err
32  }
33  ...
34  // 创建资源对象
35  if err := e.Storage.Create(ctx, key, obj, out, ttl, dryrun.IsDryRun(options.DryRun)); err != nil {
36    ...
37  }
38  // 资源创建后操作
39  if e.AfterCreate != nil {
40    if err := e.AfterCreate(out); err != nil {
41      return nil, err
42    }
43  }
44  if e.Decorator != nil {
45    if err := e.Decorator(out); err != nil {
46      return nil, err
47    }
48  }
49  return out, nil
50}

Storage.Interface

Storage.Interface通用存储接口定义了资源的操作方法。实现了该接口的类有:

  • CacherStorage:带有缓存功能的资源存储对象
  • UnderlyingStorage:底层存储对象,真正与Etcd集群交互的资源存储对象

源码位置:vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go

 1type Interface interface {
 2  // 资源版本管理器,用于管理etcd集群中的数据版本对象
 3  Versioner() Versioner
 4  // 创建资源对象的方法
 5  Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error
 6  // 删除资源对象的方法
 7  Delete(ctx context.Context, key string, out runtime.Object, preconditions *Preconditions, validateDeletion ValidateObjectFunc) error
 8  // 通过watch机制监控资源对象变化方法,只应用于单个key
 9  Watch(ctx context.Context, key string, opts ListOptions) (watch.Interface, error)
10  // 通过watch机制监控资源对象变化方法,应用于多个key
11  WatchList(ctx context.Context, key string, opts ListOptions) (watch.Interface, error)
12  // 获取资源对象的方法
13  Get(ctx context.Context, key string, opts GetOptions, objPtr runtime.Object) error
14  // 获取资源对象的方法,以列表的形式返回
15  GetToList(ctx context.Context, key string, opts ListOptions, listObj runtime.Object) error
16  // 获取资源对象的方法,以列表的形式返回
17  List(ctx context.Context, key string, opts ListOptions, listObj runtime.Object) error
18  // 保证传入的tryUpdate函数运行成功
19  GuaranteedUpdate(
20    ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool,
21    precondtions *Preconditions, tryUpdate UpdateFunc, suggestion ...runtime.Object) error
22  // 获取指定key下的条目数量
23  Count(key string) (int64, error)
24}

CacheStorage

  • CacheStorage缓存层的设计有利于快速响应请求并返回所需数据
  • 对应某些操作,为保证数据一致性,不需要缓存,直接操作UnderlyingStorage
  • 另外一些操作(Get)是需要基于缓存设计的
  • Watch操作的事件缓存机制(watchCache)使用缓存滑动窗口来保证历史事件不会丢失

缓存层设计

CacheStorage缓存层包含三大部分:

  • cacheWatcher:Watcher观察者管理
  • watchCache:通过Reflector框架与UnderlyingStorage底层存储对象交互
  • Cacher:用于分发给目前所有已连接的观察者

cacheWatcher

每一个发送watch请求的客户端都会分配一个cacheWatcher,用于客户端接收Watch事件

源码位置:vendor/k8s.io/apiserver/pkg/storage/cacher/cacher.go

 1func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
 2  // 创建cacherWatcher对象
 3  watcher := newCacheWatcher(chanSize, filterWithAttrsFunction(key, pred), emptyFunc, c.versioner, deadline, pred.AllowWatchBookmarks, c.objectType)
 4  ...
 5  func() {
 6    // 将对象添加到c.watchers中进行统一管理
 7    c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
 8    ...
 9    c.watcherIdx++
10  }()
11  // 调用process
12  // process用于监控c.input channel中的数据
13  go watcher.process(ctx, initEvents, watchRV)
14  return watcher, nil
15}
16
17func (t *watcherBookmarkTimeBuckets) addWatcher(w *cacheWatcher) bool {
18  ...
19  // 计算id
20  bucketID := nextTime.Unix()
21  watchers, _ := t.watchersBuckets[bucketID]
22  // 对象添加到map中
23  t.watchersBuckets[bucketID] = append(watchers, w)
24  return true
25}
26
27type watcherBookmarkTimeBuckets struct {
28  lock              sync.Mutex
29  // 存储所有的cacheMatcher,key为id
30  watchersBuckets   map[int64][]*cacheWatcher
31  startBucketID     int64
32  clock             clock.Clock
33  bookmarkFrequency time.Duration
34}
35
36// process
37func (c *cacheWatcher) process(ctx context.Context, initEvents []*watchCacheEvent, resourceVersion uint64) {
38  ...
39  for _, event := range initEvents {
40    c.sendWatchCacheEvent(event)
41  }
42  ...
43  for {
44    select {
45    // 读取c.input中的数据
46    case event, ok := <-c.input:
47      if !ok {
48        return
49      }
50      // 资源版本大于ResourceVersion的数据,发送通知
51      if event.ResourceVersion > resourceVersion {
52        c.sendWatchCacheEvent(event)
53      }
54    case <-ctx.Done():
55      return
56    }
57  }
58}
59
60func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
61  watchEvent := c.convertToWatchEvent(event)
62  ...
63  select {
64  // 将watchEvent放入到c.result这个channel中
65  case c.result <- *watchEvent:
66  case <-c.done:
67  }
68}

watchCache

watchCache接收Reflector框架的回调事件,实现了增删改查方法,分别用于监听相关事件,并将事件存放到3个地方:

  • w.onEvent:将事件回调给CacherStorage,CacherStorage将其分发给目前所有已经连接的观察者
  • w.cache:将事件存储至缓存滑动窗口
  • cache.Store:将事件存储至本地缓存

Cacher

资源版本号

缓存滑动窗口

UnderlyingStorage

UnderlyingStorage是底层存储,也称为后端存储,是真正与Etcd集群交互的资源对象。UnderlyingStorage对Etcd的官方库进行了封装。早期版本的k8s支持v2和v3版本的etcd,新版本只支持etcd3。UnderlyingStorage通过newETCD3Storage函数进行实例化

源码位置:vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go

 1func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
 2  switch c.Type {
 3  // etcd2,会提示不再支持
 4  case "etcd2":
 5    return nil, nil, fmt.Errorf("%v is no longer a supported storage backend", c.Type)
 6  // etcd3,通过newETCD3Storage初始化
 7  case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
 8    return newETCD3Storage(c)
 9  default:
10    return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
11  }
12}
13
14func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
15  stopCompactor, err := startCompactorOnce(c.Transport, c.CompactionInterval)
16
17  client, err := newETCD3Client(c.Transport)
18
19  stopDBSizeMonitor, err := startDBSizeMonitorPerEndpoint(client, c.DBMetricPollInterval)
20  ...
21  return etcd3.New(client, c.Codec, c.Prefix, transformer, c.Paging), destroyFunc, nil
22}
23
24func New(c *clientv3.Client, codec runtime.Codec, prefix string, transformer value.Transformer, pagingEnabled bool) storage.Interface {
25  return newStore(c, pagingEnabled, codec, prefix, transformer)
26}

Get操作

源码位置:vendor/k8s.io/apiserver/pkg/storage/etcd3/store.go

 1func (s *store) Get(ctx context.Context, key string, opts storage.GetOptions, out runtime.Object) error {
 2  ...
 3  // 通过client获取Etcd集群中资源对象数据
 4  getResp, err := s.client.KV.Get(ctx, key)
 5  ...
 6  // 解码
 7  return decode(s.codec, s.versioner, data, out, kv.ModRevision)
 8}
 9
10func decode(codec runtime.Codec, versioner storage.Versioner, value []byte, objPtr runtime.Object, rev int64) error {
11  ...
12  // 解码
13  _, _, err := codec.Decode(value, nil, objPtr)
14  ...
15  // 更新资源对象版本号
16  if err := versioner.UpdateObject(objPtr, uint64(rev)); err != nil {
17    klog.Errorf("failed to update object version: %v", err)
18  }
19  return nil
20}

Codec编解码数据

Strategy预处理

k8s中每种资源都有自己的预处理操作,用于资源对象创建、更新、删除、导出资源对象之前对资源执行预处理操作。每个资源的特殊需求都可以在自己的Strategy预处理接口中实现。Strategy预处理接口定义如下:

源码位置:vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go

 1type GenericStore interface {
 2  // 创建资源对象时的预处理操作
 3  GetCreateStrategy() rest.RESTCreateStrategy
 4  // 更新资源对象时的预处理操作
 5  GetUpdateStrategy() rest.RESTUpdateStrategy
 6  // 删除资源对象时的预处理操作
 7  GetDeleteStrategy() rest.RESTDeleteStrategy
 8  // 导出资源对象时的预处理操作
 9  GetExportStrategy() rest.RESTExportStrategy
10}

创建资源对象时的预处理操作

源码位置:vendor/k8s.io/apiserver/pkg/registry/rest/create.go

 1type RESTCreateStrategy interface {
 2  runtime.ObjectTyper
 3  names.NameGenerator
 4
 5  // 判断资源对象是否有所属的资源对象
 6  NamespaceScoped() bool
 7  // 创建资源对象之前的处理函数
 8  PrepareForCreate(ctx context.Context, obj runtime.Object)
 9  // 创建当前资源对象之前的校验操作,验证资源对象的字段信息。不会修改资源信息
10  Validate(ctx context.Context, obj runtime.Object) field.ErrorList
11  // 创建资源对象之前将资源对象规范化
12  Canonicalize(obj runtime.Object)
13}
14
15// BeforeCreate将RESTCreateStrategy操作的方法进行打包、封装,并按顺序执行
16// 调用者只需要执行BeforeCreate
17func BeforeCreate(strategy RESTCreateStrategy, ctx context.Context, obj runtime.Object) error {
18  ...
19  if strategy.NamespaceScoped() {
20    ...
21  }
22  ...
23  strategy.PrepareForCreate(ctx, obj)
24  ...
25  if errs := strategy.Validate(ctx, obj); len(errs) > 0 {
26    return errors.NewInvalid(kind.GroupKind(), objectMeta.GetName(), errs)
27  }
28  ...
29  strategy.Canonicalize(obj)
30
31  return nil
32}

更新资源对象时的预处理操作

源码位置:vendor/k8s.io/apiserver/pkg/registry/rest/update.go

 1type RESTUpdateStrategy interface {
 2  runtime.ObjectTyper
 3  NamespaceScoped() bool
 4  // 是否允许重复创建资源对象
 5  AllowCreateOnUpdate() bool
 6  // 更新当前资源对象之前的处理函数
 7  PrepareForUpdate(ctx context.Context, obj, old runtime.Object)
 8  // 创建当前资源对象之前的校验操作,验证资源对象的字段信息。不会修改资源信息
 9  ValidateUpdate(ctx context.Context, obj, old runtime.Object) field.ErrorList
10  // 创建资源对象之前将资源对象规范化
11  Canonicalize(obj runtime.Object)
12  // 更新资源对象时,未指定资源版本是否要执行更新
13  AllowUnconditionalUpdate() bool
14}
15
16// BeforeUpdate将RESTUpdateStrategy操作的方法进行打包、封装,并按顺序执行
17// 调用者只需要执行BeforeUpdate
18func BeforeUpdate(strategy RESTUpdateStrategy, ctx context.Context, obj, old runtime.Object) error {
19  ...
20  if strategy.NamespaceScoped() {
21   ...
22  }
23  ...
24  strategy.PrepareForUpdate(ctx, obj, old)
25  ...
26  errs = append(errs, strategy.ValidateUpdate(ctx, obj, old)...)
27  ...
28  strategy.Canonicalize(obj)
29
30  return nil
31}
32

删除资源对象时的预处理操作

源码位置:vendor/k8s.io/apiserver/pkg/registry/rest/delete.go

 1// 删除预处理操作没有定义预处理方法
 2type RESTDeleteStrategy interface {
 3  runtime.ObjectTyper
 4}
 5
 6type RESTGracefulDeleteStrategy interface {
 7  CheckGracefulDelete(ctx context.Context, obj runtime.Object, options *metav1.DeleteOptions) bool
 8}
 9
10func BeforeDelete(strategy RESTDeleteStrategy, ctx context.Context, obj runtime.Object, options *metav1.DeleteOptions) (graceful, gracefulPending bool, err error) {
11  ...
12  gracefulStrategy, ok := strategy.(RESTGracefulDeleteStrategy)
13  ...
14  if !gracefulStrategy.CheckGracefulDelete(ctx, obj, options) {
15    return false, false, nil
16  }
17  ...
18}

导出资源对象时的预处理操作

只有部分资源实现了该方法

1type RESTExportStrategy interface {
2  // 只定义了一个方法
3  Export(ctx context.Context, obj runtime.Object, exact bool) error
4}