helm源码分析-storage
| 阅读 | 共 2285 字,阅读约
Overview
helm源码分析-storage
storage模块主要用于管理和操作发布的release信息,当我们通过`helm list、helm history等命令查询release信息时,就涉及到存储相关的知识
概述
storage模块定义了存储的基本接口,并提供了不同的实现方式,包括:
- secret:默认方式
- configmap
- memory
- sql:目前只支持postgresql
源码分析
目录结构
storage相关的源码都在 pkg/storage
包下
1➜ helm git:(041ce5a2) ✗ tree pkg/storage
2pkg/storage
3├── driver
4│ ├── cfgmaps.go // configmap 这种实现的相关代码
5│ ├── cfgmaps_test.go
6│ ├── driver.go // 定义了存储的公共接口
7│ ├── labels.go
8│ ├── labels_test.go
9│ ├── memory.go // memory 这种实现的相关代码
10│ ├── memory_test.go
11│ ├── mock_test.go
12│ ├── records.go
13│ ├── records_test.go
14│ ├── secrets.go // secret 这种实现的相关代码
15│ ├── secrets_test.go
16│ ├── sql.go // postgresql 这种实现的相关代码
17│ ├── sql_test.go
18│ └── util.go
19├── storage.go // 存储接口定义
20└── storage_test.go
driver
1type Driver interface {
2 Creator
3 Updator
4 Deletor
5 Queryor
6 Name() string
7}
Creator
1type Creator interface {
2 Create(key string, rls *rspb.Release) error
3}
Updator
1type Updator interface {
2 Update(key string, rls *rspb.Release) error
3}
Deletor
1type Deletor interface {
2 Delete(key string) (*rspb.Release, error)
3}
Quaryor
1type Queryor interface {
2 Get(key string) (*rspb.Release, error)
3 List(filter func(*rspb.Release) bool) ([]*rspb.Release, error)
4 Query(labels map[string]string) ([]*rspb.Release, error)
5}
storage
1type Storage struct {
2 driver.Driver
3
4 // MaxHistory specifies the maximum number of historical releases that will
5 // be retained, including the most recent release. Values of 0 or less are
6 // ignored (meaning no limits are imposed).
7 MaxHistory int
8
9 Log func(string, ...interface{})
10}
secret实现
secret的实现主要是将Release对象做base64编码,并保存到k8s的secret这种资源类型中
pkg/storage/driver/secret.go
1type Secrets struct {
2 impl corev1.SecretInterface
3 Log func(string, ...interface{})
4}
Get
1func (secrets *Secrets) Get(key string) (*rspb.Release, error) {
2 // 通过给定的key,调用k8s的api查询seecret
3 obj, err := secrets.impl.Get(context.Background(), key, metav1.GetOptions{})
4 ...
5 // 找到secret中的数据,并做base64解码,就得到了Release对象
6 r, err := decodeRelease(string(obj.Data["release"]))
7 return r, errors.Wrapf(err, "get: failed to decode data %q", key)
8}
List
1func (secrets *Secrets) sList(filter func(*rspb.Release) bool) ([]*rspb.Release, error) {
2 // 构造Label选择器,选择owner=helm的label
3 lsel := kblabels.Set{"owner": "helm"}.AsSelector()
4 opts := metav1.ListOptions{LabelSelector: lsel.String()}
5
6 // 调用k8s的原生api,获取所有owner=helm的secret
7 list, err := secrets.impl.List(context.Background(), opts)
8 ...
9 // 结果处理
10 for _, item := range list.Items {
11 // 获取每个secret中的数据,转换为Release对象
12 rls, err := decodeRelease(string(item.Data["release"]))
13 ...
14 }
15 return results, nil
16}
Create
1func (secrets *Secrets) Create(key string, rls *rspb.Release) error {
2 // set labels for secrets object meta data
3 var lbs labels
4
5 // 初始化label信息
6 lbs.init()
7 lbs.set("createdAt", strconv.Itoa(int(time.Now().Unix())))
8
9 // 创建一个持有release信息的secret
10 obj, err := newSecretsObject(key, rls, lbs)
11 ...
12
13 // god k8s的接口,创建secret这个资源
14 if _, err := secrets.impl.Create(context.Background(), obj, metav1.CreateOptions{}); err != nil {
15 ...
16 }
17 return nil
18}
初始化secret对象
1func newSecretsObject(key string, rls *rspb.Release, lbs labels) (*v1.Secret, error) {
2 const owner = "helm"
3
4 // release做base64编码
5 s, err := encodeRelease(rls)
6 ...
7 // 设置label,包括:name、owner、status、version
8 // 其中owner和name是后续查询release的关键参数
9 lbs.set("name", rls.Name)
10 lbs.set("owner", owner)
11 lbs.set("status", rls.Info.Status.String())
12 lbs.set("version", strconv.Itoa(rls.Version))
13
14 // 添加了Type这个字段,并设置data
15 return &v1.Secret{
16 ObjectMeta: metav1.ObjectMeta{
17 Name: key,
18 Labels: lbs.toMap(),
19 },
20 Type: "helm.sh/release.v1",
21 Data: map[string][]byte{"release": []byte(s)},
22 }, nil
23}
Quary
1func (secrets *Secrets) Query(labels map[string]string) ([]*rspb.Release, error) {
2 ls := kblabels.Set{}
3 for k, v := range labels {
4 if errs := validation.IsValidLabelValue(v); len(errs) != 0 {
5 return nil, errors.Errorf("invalid label value: %q: %s", v, strings.Join(errs, "; "))
6 }
7 ls[k] = v
8 }
9
10 opts := metav1.ListOptions{LabelSelector: ls.AsSelector().String()}
11
12 list, err := secrets.impl.List(context.Background(), opts)
13 if err != nil {
14 return nil, errors.Wrap(err, "query: failed to query with labels")
15 }
16
17 if len(list.Items) == 0 {
18 return nil, ErrReleaseNotFound
19 }
20
21 var results []*rspb.Release
22 for _, item := range list.Items {
23 rls, err := decodeRelease(string(item.Data["release"]))
24 if err != nil {
25 secrets.Log("query: failed to decode release: %s", err)
26 continue
27 }
28 results = append(results, rls)
29 }
30 return results, nil
31}
Update
1func (secrets *Secrets) Update(key string, rls *rspb.Release) error {
2 ...
3 // 设置资源更新时间
4 lbs.set("modifiedAt", strconv.Itoa(int(time.Now().Unix())))
5
6 // 创建一个新的secret资源对象
7 obj, err := newSecretsObject(key, rls, lbs)
8 if err != nil {
9 return errors.Wrapf(err, "update: failed to encode release %q", rls.Name)
10 }
11 // 调用k8s的接口,更新secret资源
12 _, err = secrets.impl.Update(context.Background(), obj, metav1.UpdateOptions{})
13 return errors.Wrap(err, "update: failed to update")
14}
15
Delete
1func (secrets *Secrets) Delete(key string) (rls *rspb.Release, err error) {
2 // 先获取对应的资源信息
3 if rls, err = secrets.Get(key); err != nil {
4 return nil, err
5 }
6 // 调用k8s的api删除对应的secret
7 err = secrets.impl.Delete(context.Background(), key, metav1.DeleteOptions{})
8 return rls, err
9}
configmap实现
configmap的实现和secret类似,不再重复分析了。唯一的区别就是,操作的资源是k8s中的configmap,而不是secret
sql实现
sql实现是将release信息存储到postgresql数据库中,
表结构信息
其中涉及到的表结构信息如下
pkg/storage/driver/sql.go
1// 支持的数据库:postgresql
2const postgreSQLDialect = "postgres"
3
4// SQLDriverName is the string name of this driver.
5const SQLDriverName = "SQL"
6
7// 存储数据的表明
8const sqlReleaseTableName = "releases_v1"
9
10// 存储数据的字段名
11const (
12 sqlReleaseTableKeyColumn = "key"
13 sqlReleaseTableTypeColumn = "type"
14 sqlReleaseTableBodyColumn = "body"
15 sqlReleaseTableNameColumn = "name"
16 sqlReleaseTableNamespaceColumn = "namespace"
17 sqlReleaseTableVersionColumn = "version"
18 sqlReleaseTableStatusColumn = "status"
19 sqlReleaseTableOwnerColumn = "owner"
20 sqlReleaseTableCreatedAtColumn = "createdAt"
21 sqlReleaseTableModifiedAtColumn = "modifiedAt"
22)
创建表结构
1func NewSQL(connectionString string, logger func(string, ...interface{}), namespace string) (*SQL, error) {
2 // 连接数据库
3 db, err := sqlx.Connect(postgreSQLDialect, connectionString)
4 if err != nil {
5 return nil, err
6 }
7
8 driver := &SQL{
9 db: db,
10 Log: logger,
11 statementBuilder: sq.StatementBuilder.PlaceholderFormat(sq.Dollar),
12 }
13
14 // 表结构初始化
15 if err := driver.ensureDBSetup(); err != nil {
16 return nil, err
17 }
18
19 driver.namespace = namespace
20 return driver, nil
21}
22
23func (s *SQL) ensureDBSetup() error {
24 ...
25}
Get
1func (s *SQL) Get(key string) (*rspb.Release, error) {
2 var record SQLReleaseWrapper
3 // 拼接sql语句:select body from releases_v1 where key = ? and namespace = ?
4 qb := s.statementBuilder.
5 Select(sqlReleaseTableBodyColumn).
6 From(sqlReleaseTableName).
7 Where(sq.Eq{sqlReleaseTableKeyColumn: key}).
8 Where(sq.Eq{sqlReleaseTableNamespaceColumn: s.namespace})
9
10 query, args, err := qb.ToSql()
11 ...
12 // 获取记录
13 if err := s.db.Get(&record, query, args...); err != nil {
14 s.Log("got SQL error when getting release %s: %v", key, err)
15 return nil, ErrReleaseNotFound
16 }
17
18 // base64解码
19 release, err := decodeRelease(record.Body)
20 ...
21 return release, nil
22}
List
对应的sql语句为:
1select body from releases_v1 where owner = helm
Create
对应的sql语句为:
1insert into release_v1(key1, key2, ...) values(val1, val2, ...)
Quary
对应的sql语句为:
1select body from release_v1 where label1 = value1 and ...
Update
对应的sql语句为:
1update body set key1=val1 where key = ? and namespace = ?
Delete
对应的sql语句为:
1delete releases_v1 where key = ? and namespace = ?
memory实现
memory是基于内存实现的存储,数据存放在Memeory这个对象中。
该对象按照命名空间、名称的层级划分,memory的存储驱动就是对这个数据结构的增删改查,本文只分析Get操作。
Memory对象
1type Memory struct {
2 sync.RWMutex
3 namespace string
4 // key是命名空间
5 cache map[string]memReleases
6}
7
8// key是release名称,value是release对应的多个版本数据
9type memReleases map[string]records
10
11type records []*record
Get
1func (mem *Memory) Get(key string) (*rspb.Release, error) {
2 ...
3 keyWithoutPrefix := strings.TrimPrefix(key, "sh.helm.release.v1.")
4 switch elems := strings.Split(keyWithoutPrefix, ".v"); len(elems) {
5 case 2:
6 ...
7 // 根据namespace、name的层级结构定位到数据
8 if recs, ok := mem.cache[mem.namespace][name]; ok {
9 if r := recs.Get(key); r != nil {
10 return r.rls, nil
11 }
12 }
13 return nil, ErrReleaseNotFound
14 default:
15 return nil, ErrInvalidKey
16 }
17}