kubeapps源码分析-asset-syncer


| 阅读 |,阅读约 6 分钟
| 复制链接:

Overview

kubeapps源码分析-asset-syncer

概述

asset-syncer模块负责同步chart仓库的资源,并保存到数据库。

提供了三个命令:

  • sync:同步repo的所有chart记录到本地数据库
  • delete:删除某个repo仓库的所有信息
  • invalidate-cache:删除所有数据

数据库使用pg实现,提供了三张表:

  • repos:存放repo信息
  • charts:存放repo的所有chart信息
  • files:存放chart文件信息

源码分析

源码位置:cmd/asset-syncer

代码入口

main.go 是所有代码的入口,命令行的处理使用了知名的Cobra框架

 1// 声明全局命令行对象
 2var rootCmd = &cobra.Command{
 3	Use:   "asset-syncer",
 4	Short: "Asset Synchronization utility",
 5	Run: func(cmd *cobra.Command, args []string) {
 6		cmd.Help()
 7	},
 8}
 9
10// main函数,启动命令行
11func main() {
12	cmd := rootCmd
13	if err := cmd.Execute(); err != nil {
14		os.Exit(1)
15	}
16}
17
18// init函数会先与main函数
19// 这里主要处理传入的参数,并注册子命令
20func init() {
21  // 启动参数处理
22	...
23  // 注册子命令,包括sync、delete、invalidate-cache
24	cmds := []*cobra.Command{syncCmd, deleteCmd, invalidateCacheCmd}
25	for _, cmd := range cmds {
26		rootCmd.AddCommand(cmd)
27	}
28	rootCmd.AddCommand(versionCmd)
29}

synCmd

  • 对应asset-syncer sync 子命令,需要传入三个参数
    • repo name:待同步资源的仓库名
    • repo url:待同步资源的仓库链接地址
    • repo type:仓库类型,包括 helm 和 oci
 1var syncCmd = &cobra.Command{
 2  // 使用说明
 3	Use:   "sync [REPO NAME] [REPO URL] [REPO TYPE]",
 4	Short: "add a new chart repository, and resync its charts periodically",
 5	Run: func(cmd *cobra.Command, args []string) {
 6		...
 7		// 初始化数据库配置
 8		dbConfig := datastore.Config{URL: databaseURL, Database: databaseName, Username: databaseUser, Password: databasePassword}
 9		kubeappsNamespace := os.Getenv("POD_NAMESPACE")
10    
11    // 创建一个manager,用于管理所有的操作
12		manager, err := newManager(dbConfig, kubeappsNamespace)
13		...
14    
15    // manager 初始化
16		err = manager.Init()
17	  ...
18
19    // 初始化 http client
20		netClient, err := initNetClient(additionalCAFile, tlsInsecureSkipVerify)
21		...
22    
23    // 根据 repo type 初始化 Repo 接口的实现类
24    // 类型支持 helm、oci两种
25		var repoIface Repo
26		if args[2] == "helm" {
27			repoIface, err = getHelmRepo(namespace, args[0], args[1], authorizationHeader, filters, netClient)
28		} else {
29			repoIface, err = getOCIRepo(namespace, args[0], args[1], authorizationHeader, filters, ociRepositories, netClient)
30		}
31		...
32    
33    // 获取 repo
34		repo := repoIface.Repo()
35		checksum, err := repoIface.Checksum()
36		...
37   
38    // 判断repo是否被处理过
39		if manager.RepoAlreadyProcessed(models.Repo{Namespace: repo.Namespace, Name: repo.Name}, checksum) {
40			logrus.WithFields(logrus.Fields{"url": repo.URL}).Info("Skipping repository since there are no updates")
41			return
42		}
43
44    // 如果没有处理过,获取 repo 中所有的 chart
45    // 每个chart使用 Chart 这个对象保存
46		charts, err := repoIface.Charts()
47		if err != nil {
48			logrus.Fatal(err)
49		}
50
51    // 调用manager的sync模块同步 chart 信息到数据库
52		if err = manager.Sync(models.Repo{Name: repo.Name, Namespace: repo.Namespace}, charts); err != nil {
53			logrus.Fatalf("Can't add chart repository to database: %v", err)
54		}
55
56		// Fetch and store chart icons
57		fImporter := fileImporter{manager, netClient}
58		fImporter.fetchFiles(charts, repoIface)
59
60		// Update cache in the database
61		if err = manager.UpdateLastCheck(repo.Namespace, repo.Name, checksum, time.Now()); err != nil {
62			logrus.Fatal(err)
63		}
64		logrus.WithFields(logrus.Fields{"url": repo.URL}).Info("Stored repository update in cache")
65
66		logrus.Infof("Successfully added the chart repository %s to database", args[0])
67	},
68}

manager

创建manager
 1// 创建manager
 2func newManager(config datastore.Config, kubeappsNamespace string) (assetManager, error) {
 3  // 支持pg数据库
 4	return newPGManager(config, kubeappsNamespace)
 5}
 6
 7func newPGManager(config datastore.Config, kubeappsNamespace string) (assetManager, error) {
 8	m, err := dbutils.NewPGManager(config, kubeappsNamespace)
 9	if err != nil {
10		return nil, err
11	}
12	return &postgresAssetManager{m}, nil
13}
14
15func NewPGManager(config datastore.Config, kubeappsNamespace string) (*PostgresAssetManager, error) {
16	url := strings.Split(config.URL, ":")
17	if len(url) != 2 {
18		return nil, fmt.Errorf("Can't parse database URL: %s", config.URL)
19	}
20  // 获取pg数据库连接信息
21	connStr := fmt.Sprintf(
22		"host=%s port=%s user=%s password=%s dbname=%s sslmode=disable",
23		url[0], url[1], config.Username, config.Password, config.Database,
24	)
25	return &PostgresAssetManager{connStr, nil, kubeappsNamespace}, nil
26}
manager初始化
1func (m *PostgresAssetManager) Init() error {
2  // 创建pg数据库连接
3	db, err := sql.Open("postgres", m.connStr)
4	if err != nil {
5		return err
6	}
7	m.DB = db
8	return nil
9}
manager.RepoAlreadyProcessed

查询数据库记录,判断数据库存储的校验码是否匹配当前的校验码

1SELECT checksum FROM %s WHERE name = $1 AND namespace = $2
 1func (m *postgresAssetManager) RepoAlreadyProcessed(repo models.Repo, repoChecksum string) bool {
 2	var lastChecksum string
 3  
 4  // 拼接sql语句
 5	row := m.DB.QueryRow(fmt.Sprintf("SELECT checksum FROM %s WHERE name = $1 AND namespace = $2", dbutils.RepositoryTable), repo.Name, repo.Namespace)
 6	if row != nil {
 7		err := row.Scan(&lastChecksum)
 8		log.Errorf("lastChecksum: %+v", lastChecksum)
 9    
10    // 对比checksum数据是否一致
11		return err == nil && lastChecksum == repoChecksum
12	}
13	return false
14}
manager.Sync

同步操作包括以下几步:

  • 从repo索引获取chart元信息,并更新数据库记录
  • 并发处理chart的icon信息
  • 并发处理chart包最新版本的Readme、values.yaml等文件
  • 并发处理chart包历史版本的Readme、values.yaml等文件
 1func (m *postgresAssetManager) Sync(repo models.Repo, charts []models.Chart) error {
 2  
 3  // 初始化表结构
 4  // 创建了三张表:repos、charts、files
 5  // 注意这里设置了外键关联,并且设置数据级联删除
 6	m.InitTables()
 7
 8  // 将repo信息插入repos表
 9	_, err := m.EnsureRepoExists(repo.Namespace, repo.Name)
10	if err != nil {
11		return err
12	}
13
14  // 导入index中的chart到数据库
15  // for循环执行sql语句,将chart信息插入charts表
16	err = m.importCharts(charts, repo)
17	if err != nil {
18		return err
19	}
20
21	// 移除不在索引中的数据库中的chart记录
22	return m.removeMissingCharts(repo, charts)
23}

Repo接口

repo定义了从给定仓库获取信息的方法,是一个接口类,具体实现类有两个:

  • HelmRepo
  • OCIRegistry

后续主要围绕HelmRpo的实现类做分析

1type Repo interface {
2	Checksum() (string, error)
3	Repo() *models.RepoInternal
4	Charts() ([]models.Chart, error)
5	FetchFiles(name string, cv models.ChartVersion) (map[string]string, error)
6}
Repo实例化
 1func getHelmRepo(namespace, name, repoURL, authorizationHeader string, filter *apprepov1alpha1.FilterRuleSpec, netClient httpClient) (Repo, error) {
 2	url, err := parseRepoURL(repoURL)
 3	...
 4
 5  // 读取 helm 仓库的 index.yaml文件,并读取内容
 6  // 这部分内容请参考helm源码分析系列文章
 7	repoBytes, err := fetchRepoIndex(url.String(), authorizationHeader, netClient)
 8	if err != nil {
 9		return nil, err
10	}
11
12	return &HelmRepo{
13    // 索引内容赋值给content字段
14		content: repoBytes,
15		RepoInternal: &models.RepoInternal{
16			Namespace:           namespace,
17			Name:                name,
18			URL:                 url.String(),
19			AuthorizationHeader: authorizationHeader,
20		},
21		netClient: netClient,
22		filter:    filter,
23	}, nil
24}
repo.Repo方法

两种实现都返回内部的RepoInternal属性

1func (r *HelmRepo) Repo() *models.RepoInternal {
2	return r.RepoInternal
3}
4
5func (r *OCIRegistry) Repo() *models.RepoInternal {
6	return r.RepoInternal
7}
repo.Charts方法
 1func (r *HelmRepo) Charts() ([]models.Chart, error) {
 2   // 这里的r.content在前面分析过,主要是helm仓库中index.yaml文件下载后的内容
 3   // 这一步将索引的二进制流反序列化为IndexFile对象,这部分内容可以参考helm源码分析系列文章
 4   index, err := parseRepoIndex(r.content)
 5   ...
 6   repo := &models.Repo{
 7      Namespace: r.Namespace,
 8      Name:      r.Name,
 9      URL:       r.URL,
10      Type:      r.Type,
11   }
12   
13   // 将索引的IndexFile对象转换为Chart对象
14   charts := chartsFromIndex(index, repo)
15   if len(charts) == 0 {
16      return []models.Chart{}, fmt.Errorf("no charts in repository index")
17   }
18
19   // 过滤结果
20   return filterCharts(charts, r.filter)
21}

fetchFiles方法

 1func (f *fileImporter) fetchFiles(charts []models.Chart, repo Repo) {
 2	...
 3  // 启动10个协程,并发处理icon和file
 4  // 根据Chart对象中的url地址,下载图标和文件,并更新数据库记录
 5  // icon:更新charts表的info字段中关于icon部分的记录
 6  // file:插入记录到files表
 7	for i := 0; i < numWorkers; i++ {
 8		wg.Add(1)
 9		go f.importWorker(&wg, iconJobs, chartFilesJobs, repo)
10	}
11  ...
12	// Iterate through the list of charts and enqueue the latest chart version to
13	// be processed. Append the rest of the chart versions to a list to be
14	// enqueued later
15	var toEnqueue []importChartFilesJob
16	for _, c := range charts {
17		chartFilesJobs <- importChartFilesJob{c.Name, c.Repo, c.ChartVersions[0]}
18		for _, cv := range c.ChartVersions[1:] {
19			toEnqueue = append(toEnqueue, importChartFilesJob{c.Name, c.Repo, cv})
20		}
21	}
22
23	// Enqueue all the remaining chart versions
24	for _, cfj := range toEnqueue {
25		chartFilesJobs <- cfj
26	}
27	// Close the chartFilesJobs channel to signal the worker pools that there are
28	// no more jobs to process
29	close(chartFilesJobs)
30
31	// Wait for the worker pools to finish processing
32	wg.Wait()
33}

deleteCmd

这部分代码和前面syncCmd类似,只是最后调用的是manager的Delete方法

 1var deleteCmd = &cobra.Command{
 2	Use:   "delete [REPO NAME]",
 3	Short: "delete a chart repository",
 4	Run: func(cmd *cobra.Command, args []string) {
 5		...
 6    // Delete方法
 7		if err = manager.Delete(repo); err != nil {
 8			logrus.Fatalf("Can't delete chart repository %s from database: %v", args[0], err)
 9		}
10
11		logrus.Infof("Successfully deleted the chart repository %s from database", args[0])
12	},
13}
manager.Delete
  • 实现比较简单,就是删除repos表中的记录。
  • 这里没有删除charts表和files表,是因为前面的InitTable函数中初始化表结构时,设置了外键级联删除,只把主键所在的记录删除,有外键关联的字段自动删除了
1
2func (m *postgresAssetManager) Delete(repo models.Repo) error {
3	rows, err := m.DB.Query(fmt.Sprintf("DELETE FROM %s WHERE name = $1 AND namespace = $2", dbutils.RepositoryTable), repo.Name, repo.Namespace)
4	if rows != nil {
5		defer rows.Close()
6	}
7	return err
8}

invalidateCacheCmd

这部分代码和前面syncCmd类似,只是最后调用的是manager的InvalidateCache方法

 1var invalidateCacheCmd = &cobra.Command{
 2	Use:   "invalidate-cache",
 3	Short: "removes all data so the cache can be rebuilt",
 4	Run: func(cmd *cobra.Command, args []string) {
 5		...
 6    // 调用manager的方法
 7		err = manager.InvalidateCache()
 8		if err != nil {
 9			logrus.Fatal(err)
10		}
11		logrus.Infof("Successfully invalidated cache")
12	},
13}
manager.InvalidateCache

实现逻辑就是删除三张表

1func (m *PostgresAssetManager) InvalidateCache() error {
2	tables := strings.Join([]string{RepositoryTable, ChartTable, ChartFilesTable}, ",")
3	_, err := m.DB.Exec(fmt.Sprintf("DROP TABLE IF EXISTS %s", tables))
4	if err != nil {
5		return err
6	}
7
8	return m.InitTables()
9}