k8s存储组件Etcd核心源码分析
| 阅读 | 共 4458 字,阅读约
Overview
概述
- Etcd是一个分布式键值对存储数据库
- etcd以一致和容错的方式存储元数据
- 通过分布式锁、leader选举、写屏障来实现可靠的分布式协作
为什么取名"Etcd"
etcd = etc + d
- etc: unix系统下存储配置的地方
- d:distribute,分布式系统
etcd寓意存储大规模分布式系统的配置信息
etcd与k8s
- k8s使用Etcd作为集群唯一存储,生产环境一般以集群部署,称为Etcd集群。
- 集群存储k8s的集群状态和元信息,包括所有的k8s资源对象信息、资源对象状态、集群节点信息等。
- k8s将所有数据存储至etcd集群前缀为/registry的目录下
存储架构设计
k8s对etcd存储进行大量封装,其架构是分层的,而每一层封装设计又拥有高度的可扩展性。
- RESTStorage:实现Restful风格的对外资源存储服务的api接口
- RegistryStore:实现了资源的通用操作,在存储资源对象之前和之后可以执行某些函数
- Storage.Interface:通用存储接口,该接口定义了资源的操作方法
- CacheStorage:带有缓存功能的资源存储对象
- UnderlyingStorage:底层存储,是真正与etcd集群交互的资源存储对象
存储概述
资源对象在etcd集群中以二进制形式存储,存储与获取过程都通过protobufSerializer编解码器进行数据的编码和解码
RESTStorage
k8s的每种资源都提供了restful风格的对外资源存储服务接口,所有对外暴露restful的资源都必须实现RESTStorage接口。
源码位置:vendor/k8s.io/apiserver/pkg/registry/rest/rest.go
1type Storage interface {
2 New() runtime.Object
3}
以Deployment资源为例,看下源码实现
源码位置:pkg/registry/apps/deployment/storage/storage.go
1type REST struct {
2 *genericregistry.Store
3 categories []string
4}
5
6type StatusREST struct {
7 store *genericregistry.Store
8}
9
10type ScaleREST struct {
11 store *genericregistry.Store
12}
13
14type DeploymentStorage struct {
15 // 实现deployment资源的RESTStorage接口
16 Deployment *REST
17 // 实现deployment/status资源的RESTStorage接口
18 Status *StatusREST
19 Scale *ScaleREST
20 Rollback *RollbackREST
21}
RegistryStore
RegistryStore实现了资源存储的通用操作,例如在存储资源对象之前执行某个函数,在存储资源对象之后执行某个函数。 RegistryStore定义了4种Strategy预处理方法,3种创建资源对象后的处理方法。
1type Store struct {
2 // 创建资源对象时的预处理操作
3 CreateStrategy rest.RESTCreateStrategy
4 // 创建资源对象后的处理操作
5 AfterCreate ObjectFunc
6 // 更新资源对象时的预处理操作
7 UpdateStrategy rest.RESTUpdateStrategy
8 // 更新资源对象后的处理操作
9 AfterUpdate ObjectFunc
10 // 删除资源对象时的预处理操作
11 DeleteStrategy rest.RESTDeleteStrategy
12 // 删除资源对象后的处理操作
13 AfterDelete ObjectFunc
14 // 导出资源对象时的预处理操作
15 ExportStrategy rest.RESTExportStrategy
16
17 // 对Storage.Interface通用存储接口进行的封装,实现了对Etcd集群的读、写操作
18 Storage DryRunnableStorage
19 ...
20}
21
22type DryRunnableStorage struct {
23 Storage storage.Interface
24 Codec runtime.Codec
25}
26
27// 创建资源对象的方法
28func (e *Store) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
29 // 资源创建前操作
30 if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {
31 return nil, err
32 }
33 ...
34 // 创建资源对象
35 if err := e.Storage.Create(ctx, key, obj, out, ttl, dryrun.IsDryRun(options.DryRun)); err != nil {
36 ...
37 }
38 // 资源创建后操作
39 if e.AfterCreate != nil {
40 if err := e.AfterCreate(out); err != nil {
41 return nil, err
42 }
43 }
44 if e.Decorator != nil {
45 if err := e.Decorator(out); err != nil {
46 return nil, err
47 }
48 }
49 return out, nil
50}
Storage.Interface
Storage.Interface通用存储接口定义了资源的操作方法。实现了该接口的类有:
- CacherStorage:带有缓存功能的资源存储对象
- UnderlyingStorage:底层存储对象,真正与Etcd集群交互的资源存储对象
源码位置:vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go
1type Interface interface {
2 // 资源版本管理器,用于管理etcd集群中的数据版本对象
3 Versioner() Versioner
4 // 创建资源对象的方法
5 Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error
6 // 删除资源对象的方法
7 Delete(ctx context.Context, key string, out runtime.Object, preconditions *Preconditions, validateDeletion ValidateObjectFunc) error
8 // 通过watch机制监控资源对象变化方法,只应用于单个key
9 Watch(ctx context.Context, key string, opts ListOptions) (watch.Interface, error)
10 // 通过watch机制监控资源对象变化方法,应用于多个key
11 WatchList(ctx context.Context, key string, opts ListOptions) (watch.Interface, error)
12 // 获取资源对象的方法
13 Get(ctx context.Context, key string, opts GetOptions, objPtr runtime.Object) error
14 // 获取资源对象的方法,以列表的形式返回
15 GetToList(ctx context.Context, key string, opts ListOptions, listObj runtime.Object) error
16 // 获取资源对象的方法,以列表的形式返回
17 List(ctx context.Context, key string, opts ListOptions, listObj runtime.Object) error
18 // 保证传入的tryUpdate函数运行成功
19 GuaranteedUpdate(
20 ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool,
21 precondtions *Preconditions, tryUpdate UpdateFunc, suggestion ...runtime.Object) error
22 // 获取指定key下的条目数量
23 Count(key string) (int64, error)
24}
CacheStorage
- CacheStorage缓存层的设计有利于快速响应请求并返回所需数据
- 对应某些操作,为保证数据一致性,不需要缓存,直接操作UnderlyingStorage
- 另外一些操作(Get)是需要基于缓存设计的
- Watch操作的事件缓存机制(watchCache)使用缓存滑动窗口来保证历史事件不会丢失
缓存层设计
CacheStorage缓存层包含三大部分:
- cacheWatcher:Watcher观察者管理
- watchCache:通过Reflector框架与UnderlyingStorage底层存储对象交互
- Cacher:用于分发给目前所有已连接的观察者
cacheWatcher
每一个发送watch请求的客户端都会分配一个cacheWatcher,用于客户端接收Watch事件
源码位置:vendor/k8s.io/apiserver/pkg/storage/cacher/cacher.go
1func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
2 // 创建cacherWatcher对象
3 watcher := newCacheWatcher(chanSize, filterWithAttrsFunction(key, pred), emptyFunc, c.versioner, deadline, pred.AllowWatchBookmarks, c.objectType)
4 ...
5 func() {
6 // 将对象添加到c.watchers中进行统一管理
7 c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
8 ...
9 c.watcherIdx++
10 }()
11 // 调用process
12 // process用于监控c.input channel中的数据
13 go watcher.process(ctx, initEvents, watchRV)
14 return watcher, nil
15}
16
17func (t *watcherBookmarkTimeBuckets) addWatcher(w *cacheWatcher) bool {
18 ...
19 // 计算id
20 bucketID := nextTime.Unix()
21 watchers, _ := t.watchersBuckets[bucketID]
22 // 对象添加到map中
23 t.watchersBuckets[bucketID] = append(watchers, w)
24 return true
25}
26
27type watcherBookmarkTimeBuckets struct {
28 lock sync.Mutex
29 // 存储所有的cacheMatcher,key为id
30 watchersBuckets map[int64][]*cacheWatcher
31 startBucketID int64
32 clock clock.Clock
33 bookmarkFrequency time.Duration
34}
35
36// process
37func (c *cacheWatcher) process(ctx context.Context, initEvents []*watchCacheEvent, resourceVersion uint64) {
38 ...
39 for _, event := range initEvents {
40 c.sendWatchCacheEvent(event)
41 }
42 ...
43 for {
44 select {
45 // 读取c.input中的数据
46 case event, ok := <-c.input:
47 if !ok {
48 return
49 }
50 // 资源版本大于ResourceVersion的数据,发送通知
51 if event.ResourceVersion > resourceVersion {
52 c.sendWatchCacheEvent(event)
53 }
54 case <-ctx.Done():
55 return
56 }
57 }
58}
59
60func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
61 watchEvent := c.convertToWatchEvent(event)
62 ...
63 select {
64 // 将watchEvent放入到c.result这个channel中
65 case c.result <- *watchEvent:
66 case <-c.done:
67 }
68}
watchCache
watchCache接收Reflector框架的回调事件,实现了增删改查方法,分别用于监听相关事件,并将事件存放到3个地方:
- w.onEvent:将事件回调给CacherStorage,CacherStorage将其分发给目前所有已经连接的观察者
- w.cache:将事件存储至缓存滑动窗口
- cache.Store:将事件存储至本地缓存
Cacher
资源版本号
缓存滑动窗口
UnderlyingStorage
UnderlyingStorage是底层存储,也称为后端存储,是真正与Etcd集群交互的资源对象。UnderlyingStorage对Etcd的官方库进行了封装。早期版本的k8s支持v2和v3版本的etcd,新版本只支持etcd3。UnderlyingStorage通过newETCD3Storage函数进行实例化
源码位置:vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go
1func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
2 switch c.Type {
3 // etcd2,会提示不再支持
4 case "etcd2":
5 return nil, nil, fmt.Errorf("%v is no longer a supported storage backend", c.Type)
6 // etcd3,通过newETCD3Storage初始化
7 case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
8 return newETCD3Storage(c)
9 default:
10 return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
11 }
12}
13
14func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
15 stopCompactor, err := startCompactorOnce(c.Transport, c.CompactionInterval)
16
17 client, err := newETCD3Client(c.Transport)
18
19 stopDBSizeMonitor, err := startDBSizeMonitorPerEndpoint(client, c.DBMetricPollInterval)
20 ...
21 return etcd3.New(client, c.Codec, c.Prefix, transformer, c.Paging), destroyFunc, nil
22}
23
24func New(c *clientv3.Client, codec runtime.Codec, prefix string, transformer value.Transformer, pagingEnabled bool) storage.Interface {
25 return newStore(c, pagingEnabled, codec, prefix, transformer)
26}
Get操作
源码位置:vendor/k8s.io/apiserver/pkg/storage/etcd3/store.go
1func (s *store) Get(ctx context.Context, key string, opts storage.GetOptions, out runtime.Object) error {
2 ...
3 // 通过client获取Etcd集群中资源对象数据
4 getResp, err := s.client.KV.Get(ctx, key)
5 ...
6 // 解码
7 return decode(s.codec, s.versioner, data, out, kv.ModRevision)
8}
9
10func decode(codec runtime.Codec, versioner storage.Versioner, value []byte, objPtr runtime.Object, rev int64) error {
11 ...
12 // 解码
13 _, _, err := codec.Decode(value, nil, objPtr)
14 ...
15 // 更新资源对象版本号
16 if err := versioner.UpdateObject(objPtr, uint64(rev)); err != nil {
17 klog.Errorf("failed to update object version: %v", err)
18 }
19 return nil
20}
Codec编解码数据
Strategy预处理
k8s中每种资源都有自己的预处理操作,用于资源对象创建、更新、删除、导出资源对象之前对资源执行预处理操作。每个资源的特殊需求都可以在自己的Strategy预处理接口中实现。Strategy预处理接口定义如下:
源码位置:vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go
1type GenericStore interface {
2 // 创建资源对象时的预处理操作
3 GetCreateStrategy() rest.RESTCreateStrategy
4 // 更新资源对象时的预处理操作
5 GetUpdateStrategy() rest.RESTUpdateStrategy
6 // 删除资源对象时的预处理操作
7 GetDeleteStrategy() rest.RESTDeleteStrategy
8 // 导出资源对象时的预处理操作
9 GetExportStrategy() rest.RESTExportStrategy
10}
创建资源对象时的预处理操作
源码位置:vendor/k8s.io/apiserver/pkg/registry/rest/create.go
1type RESTCreateStrategy interface {
2 runtime.ObjectTyper
3 names.NameGenerator
4
5 // 判断资源对象是否有所属的资源对象
6 NamespaceScoped() bool
7 // 创建资源对象之前的处理函数
8 PrepareForCreate(ctx context.Context, obj runtime.Object)
9 // 创建当前资源对象之前的校验操作,验证资源对象的字段信息。不会修改资源信息
10 Validate(ctx context.Context, obj runtime.Object) field.ErrorList
11 // 创建资源对象之前将资源对象规范化
12 Canonicalize(obj runtime.Object)
13}
14
15// BeforeCreate将RESTCreateStrategy操作的方法进行打包、封装,并按顺序执行
16// 调用者只需要执行BeforeCreate
17func BeforeCreate(strategy RESTCreateStrategy, ctx context.Context, obj runtime.Object) error {
18 ...
19 if strategy.NamespaceScoped() {
20 ...
21 }
22 ...
23 strategy.PrepareForCreate(ctx, obj)
24 ...
25 if errs := strategy.Validate(ctx, obj); len(errs) > 0 {
26 return errors.NewInvalid(kind.GroupKind(), objectMeta.GetName(), errs)
27 }
28 ...
29 strategy.Canonicalize(obj)
30
31 return nil
32}
更新资源对象时的预处理操作
源码位置:vendor/k8s.io/apiserver/pkg/registry/rest/update.go
1type RESTUpdateStrategy interface {
2 runtime.ObjectTyper
3 NamespaceScoped() bool
4 // 是否允许重复创建资源对象
5 AllowCreateOnUpdate() bool
6 // 更新当前资源对象之前的处理函数
7 PrepareForUpdate(ctx context.Context, obj, old runtime.Object)
8 // 创建当前资源对象之前的校验操作,验证资源对象的字段信息。不会修改资源信息
9 ValidateUpdate(ctx context.Context, obj, old runtime.Object) field.ErrorList
10 // 创建资源对象之前将资源对象规范化
11 Canonicalize(obj runtime.Object)
12 // 更新资源对象时,未指定资源版本是否要执行更新
13 AllowUnconditionalUpdate() bool
14}
15
16// BeforeUpdate将RESTUpdateStrategy操作的方法进行打包、封装,并按顺序执行
17// 调用者只需要执行BeforeUpdate
18func BeforeUpdate(strategy RESTUpdateStrategy, ctx context.Context, obj, old runtime.Object) error {
19 ...
20 if strategy.NamespaceScoped() {
21 ...
22 }
23 ...
24 strategy.PrepareForUpdate(ctx, obj, old)
25 ...
26 errs = append(errs, strategy.ValidateUpdate(ctx, obj, old)...)
27 ...
28 strategy.Canonicalize(obj)
29
30 return nil
31}
32
删除资源对象时的预处理操作
源码位置:vendor/k8s.io/apiserver/pkg/registry/rest/delete.go
1// 删除预处理操作没有定义预处理方法
2type RESTDeleteStrategy interface {
3 runtime.ObjectTyper
4}
5
6type RESTGracefulDeleteStrategy interface {
7 CheckGracefulDelete(ctx context.Context, obj runtime.Object, options *metav1.DeleteOptions) bool
8}
9
10func BeforeDelete(strategy RESTDeleteStrategy, ctx context.Context, obj runtime.Object, options *metav1.DeleteOptions) (graceful, gracefulPending bool, err error) {
11 ...
12 gracefulStrategy, ok := strategy.(RESTGracefulDeleteStrategy)
13 ...
14 if !gracefulStrategy.CheckGracefulDelete(ctx, obj, options) {
15 return false, false, nil
16 }
17 ...
18}
导出资源对象时的预处理操作
只有部分资源实现了该方法
1type RESTExportStrategy interface {
2 // 只定义了一个方法
3 Export(ctx context.Context, obj runtime.Object, exact bool) error
4}