helm源码分析-storage


| 阅读 |,阅读约 5 分钟
| 复制链接:

Overview

helm源码分析-storage

storage模块主要用于管理和操作发布的release信息,当我们通过`helm list、helm history等命令查询release信息时,就涉及到存储相关的知识

概述

storage模块定义了存储的基本接口,并提供了不同的实现方式,包括:

  • secret:默认方式
  • configmap
  • memory
  • sql:目前只支持postgresql

源码分析

目录结构

storage相关的源码都在 pkg/storage 包下

 1➜  helm git:(041ce5a2) ✗ tree pkg/storage 
 2pkg/storage
 3├── driver
 4│   ├── cfgmaps.go // configmap 这种实现的相关代码
 5│   ├── cfgmaps_test.go
 6│   ├── driver.go // 定义了存储的公共接口
 7│   ├── labels.go
 8│   ├── labels_test.go
 9│   ├── memory.go // memory 这种实现的相关代码
10│   ├── memory_test.go
11│   ├── mock_test.go
12│   ├── records.go
13│   ├── records_test.go
14│   ├── secrets.go // secret 这种实现的相关代码
15│   ├── secrets_test.go
16│   ├── sql.go // postgresql 这种实现的相关代码
17│   ├── sql_test.go
18│   └── util.go
19├── storage.go // 存储接口定义
20└── storage_test.go

driver

1type Driver interface {
2	Creator
3	Updator
4	Deletor
5	Queryor
6	Name() string
7}

Creator

1type Creator interface {
2	Create(key string, rls *rspb.Release) error
3}

Updator

1type Updator interface {
2	Update(key string, rls *rspb.Release) error
3}

Deletor

1type Deletor interface {
2	Delete(key string) (*rspb.Release, error)
3}

Quaryor

1type Queryor interface {
2	Get(key string) (*rspb.Release, error)
3	List(filter func(*rspb.Release) bool) ([]*rspb.Release, error)
4	Query(labels map[string]string) ([]*rspb.Release, error)
5}

storage

 1type Storage struct {
 2	driver.Driver
 3
 4	// MaxHistory specifies the maximum number of historical releases that will
 5	// be retained, including the most recent release. Values of 0 or less are
 6	// ignored (meaning no limits are imposed).
 7	MaxHistory int
 8
 9	Log func(string, ...interface{})
10}

secret实现

secret的实现主要是将Release对象做base64编码,并保存到k8s的secret这种资源类型中

pkg/storage/driver/secret.go

1type Secrets struct {
2	impl corev1.SecretInterface
3	Log  func(string, ...interface{})
4}

Get

1func (secrets *Secrets) Get(key string) (*rspb.Release, error) {
2  // 通过给定的key,调用k8s的api查询seecret
3	obj, err := secrets.impl.Get(context.Background(), key, metav1.GetOptions{})
4	...
5  // 找到secret中的数据,并做base64解码,就得到了Release对象
6	r, err := decodeRelease(string(obj.Data["release"]))
7	return r, errors.Wrapf(err, "get: failed to decode data %q", key)
8}

List

 1func (secrets *Secrets) sList(filter func(*rspb.Release) bool) ([]*rspb.Release, error) {
 2  // 构造Label选择器,选择owner=helm的label
 3	lsel := kblabels.Set{"owner": "helm"}.AsSelector()
 4	opts := metav1.ListOptions{LabelSelector: lsel.String()}
 5
 6  // 调用k8s的原生api,获取所有owner=helm的secret
 7	list, err := secrets.impl.List(context.Background(), opts)
 8	...
 9  // 结果处理
10	for _, item := range list.Items {
11    // 获取每个secret中的数据,转换为Release对象
12		rls, err := decodeRelease(string(item.Data["release"]))
13		...
14	}
15	return results, nil
16}

Create

 1func (secrets *Secrets) Create(key string, rls *rspb.Release) error {
 2	// set labels for secrets object meta data
 3	var lbs labels
 4
 5  // 初始化label信息
 6	lbs.init()
 7	lbs.set("createdAt", strconv.Itoa(int(time.Now().Unix())))
 8
 9  // 创建一个持有release信息的secret
10	obj, err := newSecretsObject(key, rls, lbs)
11	...
12  
13  // god k8s的接口,创建secret这个资源
14	if _, err := secrets.impl.Create(context.Background(), obj, metav1.CreateOptions{}); err != nil {
15		...
16	}
17	return nil
18}
初始化secret对象
 1func newSecretsObject(key string, rls *rspb.Release, lbs labels) (*v1.Secret, error) {
 2	const owner = "helm"
 3
 4	// release做base64编码
 5	s, err := encodeRelease(rls)
 6	...
 7	// 设置label,包括:name、owner、status、version
 8  // 其中owner和name是后续查询release的关键参数
 9	lbs.set("name", rls.Name)
10	lbs.set("owner", owner)
11	lbs.set("status", rls.Info.Status.String())
12	lbs.set("version", strconv.Itoa(rls.Version))
13
14  // 添加了Type这个字段,并设置data
15	return &v1.Secret{
16		ObjectMeta: metav1.ObjectMeta{
17			Name:   key,
18			Labels: lbs.toMap(),
19		},
20		Type: "helm.sh/release.v1",
21		Data: map[string][]byte{"release": []byte(s)},
22	}, nil
23}

Quary

 1func (secrets *Secrets) Query(labels map[string]string) ([]*rspb.Release, error) {
 2	ls := kblabels.Set{}
 3	for k, v := range labels {
 4		if errs := validation.IsValidLabelValue(v); len(errs) != 0 {
 5			return nil, errors.Errorf("invalid label value: %q: %s", v, strings.Join(errs, "; "))
 6		}
 7		ls[k] = v
 8	}
 9
10	opts := metav1.ListOptions{LabelSelector: ls.AsSelector().String()}
11
12	list, err := secrets.impl.List(context.Background(), opts)
13	if err != nil {
14		return nil, errors.Wrap(err, "query: failed to query with labels")
15	}
16
17	if len(list.Items) == 0 {
18		return nil, ErrReleaseNotFound
19	}
20
21	var results []*rspb.Release
22	for _, item := range list.Items {
23		rls, err := decodeRelease(string(item.Data["release"]))
24		if err != nil {
25			secrets.Log("query: failed to decode release: %s", err)
26			continue
27		}
28		results = append(results, rls)
29	}
30	return results, nil
31}

Update

 1func (secrets *Secrets) Update(key string, rls *rspb.Release) error {
 2	...
 3  // 设置资源更新时间
 4	lbs.set("modifiedAt", strconv.Itoa(int(time.Now().Unix())))
 5
 6  // 创建一个新的secret资源对象
 7	obj, err := newSecretsObject(key, rls, lbs)
 8	if err != nil {
 9		return errors.Wrapf(err, "update: failed to encode release %q", rls.Name)
10	}
11	// 调用k8s的接口,更新secret资源
12	_, err = secrets.impl.Update(context.Background(), obj, metav1.UpdateOptions{})
13	return errors.Wrap(err, "update: failed to update")
14}
15

Delete

1func (secrets *Secrets) Delete(key string) (rls *rspb.Release, err error) {
2	// 先获取对应的资源信息
3	if rls, err = secrets.Get(key); err != nil {
4		return nil, err
5	}
6	// 调用k8s的api删除对应的secret
7	err = secrets.impl.Delete(context.Background(), key, metav1.DeleteOptions{})
8	return rls, err
9}

configmap实现

configmap的实现和secret类似,不再重复分析了。唯一的区别就是,操作的资源是k8s中的configmap,而不是secret

sql实现

sql实现是将release信息存储到postgresql数据库中,

表结构信息

其中涉及到的表结构信息如下

pkg/storage/driver/sql.go

 1// 支持的数据库:postgresql
 2const postgreSQLDialect = "postgres"
 3
 4// SQLDriverName is the string name of this driver.
 5const SQLDriverName = "SQL"
 6
 7// 存储数据的表明
 8const sqlReleaseTableName = "releases_v1"
 9
10// 存储数据的字段名
11const (
12	sqlReleaseTableKeyColumn        = "key"
13	sqlReleaseTableTypeColumn       = "type"
14	sqlReleaseTableBodyColumn       = "body"
15	sqlReleaseTableNameColumn       = "name"
16	sqlReleaseTableNamespaceColumn  = "namespace"
17	sqlReleaseTableVersionColumn    = "version"
18	sqlReleaseTableStatusColumn     = "status"
19	sqlReleaseTableOwnerColumn      = "owner"
20	sqlReleaseTableCreatedAtColumn  = "createdAt"
21	sqlReleaseTableModifiedAtColumn = "modifiedAt"
22)

创建表结构

 1func NewSQL(connectionString string, logger func(string, ...interface{}), namespace string) (*SQL, error) {
 2	// 连接数据库
 3  db, err := sqlx.Connect(postgreSQLDialect, connectionString)
 4	if err != nil {
 5		return nil, err
 6	}
 7
 8	driver := &SQL{
 9		db:               db,
10		Log:              logger,
11		statementBuilder: sq.StatementBuilder.PlaceholderFormat(sq.Dollar),
12	}
13
14  // 表结构初始化
15	if err := driver.ensureDBSetup(); err != nil {
16		return nil, err
17	}
18
19	driver.namespace = namespace
20	return driver, nil
21}
22
23func (s *SQL) ensureDBSetup() error {
24  ...
25}

Get

 1func (s *SQL) Get(key string) (*rspb.Release, error) {
 2	var record SQLReleaseWrapper
 3  // 拼接sql语句:select body from releases_v1 where key = ? and namespace = ?
 4	qb := s.statementBuilder.
 5		Select(sqlReleaseTableBodyColumn).
 6		From(sqlReleaseTableName).
 7		Where(sq.Eq{sqlReleaseTableKeyColumn: key}).
 8		Where(sq.Eq{sqlReleaseTableNamespaceColumn: s.namespace})
 9
10	query, args, err := qb.ToSql()
11	...
12  // 获取记录
13	if err := s.db.Get(&record, query, args...); err != nil {
14		s.Log("got SQL error when getting release %s: %v", key, err)
15		return nil, ErrReleaseNotFound
16	}
17
18  // base64解码
19	release, err := decodeRelease(record.Body)
20	...
21	return release, nil
22}

List

对应的sql语句为:

1select body from releases_v1 where owner = helm

Create

对应的sql语句为:

1insert into release_v1(key1, key2, ...) values(val1, val2, ...)

Quary

对应的sql语句为:

1select body from release_v1 where label1 = value1 and ...

Update

对应的sql语句为:

1update body set key1=val1 where key = ? and namespace = ?

Delete

对应的sql语句为:

1delete releases_v1 where key = ? and namespace = ?

memory实现

memory是基于内存实现的存储,数据存放在Memeory这个对象中。

该对象按照命名空间、名称的层级划分,memory的存储驱动就是对这个数据结构的增删改查,本文只分析Get操作。

Memory对象

 1type Memory struct {
 2	sync.RWMutex
 3	namespace string
 4	// key是命名空间
 5	cache map[string]memReleases
 6}
 7
 8// key是release名称,value是release对应的多个版本数据
 9type memReleases map[string]records
10
11type records []*record

Get

 1func (mem *Memory) Get(key string) (*rspb.Release, error) {
 2  ...
 3	keyWithoutPrefix := strings.TrimPrefix(key, "sh.helm.release.v1.")
 4	switch elems := strings.Split(keyWithoutPrefix, ".v"); len(elems) {
 5	case 2:
 6		...
 7    // 根据namespace、name的层级结构定位到数据
 8		if recs, ok := mem.cache[mem.namespace][name]; ok {
 9			if r := recs.Get(key); r != nil {
10				return r.rls, nil
11			}
12		}
13		return nil, ErrReleaseNotFound
14	default:
15		return nil, ErrInvalidKey
16	}
17}