kubeapps源码分析-asset-syncer
| 阅读 | 共 2805 字,阅读约
Overview
kubeapps源码分析-asset-syncer
概述
asset-syncer模块负责同步chart仓库的资源,并保存到数据库。
提供了三个命令:
- sync:同步repo的所有chart记录到本地数据库
- delete:删除某个repo仓库的所有信息
- invalidate-cache:删除所有数据
数据库使用pg实现,提供了三张表:
- repos:存放repo信息
- charts:存放repo的所有chart信息
- files:存放chart文件信息
源码分析
源码位置:cmd/asset-syncer
代码入口
main.go 是所有代码的入口,命令行的处理使用了知名的Cobra框架
1// 声明全局命令行对象
2var rootCmd = &cobra.Command{
3 Use: "asset-syncer",
4 Short: "Asset Synchronization utility",
5 Run: func(cmd *cobra.Command, args []string) {
6 cmd.Help()
7 },
8}
9
10// main函数,启动命令行
11func main() {
12 cmd := rootCmd
13 if err := cmd.Execute(); err != nil {
14 os.Exit(1)
15 }
16}
17
18// init函数会先与main函数
19// 这里主要处理传入的参数,并注册子命令
20func init() {
21 // 启动参数处理
22 ...
23 // 注册子命令,包括sync、delete、invalidate-cache
24 cmds := []*cobra.Command{syncCmd, deleteCmd, invalidateCacheCmd}
25 for _, cmd := range cmds {
26 rootCmd.AddCommand(cmd)
27 }
28 rootCmd.AddCommand(versionCmd)
29}
synCmd
- 对应asset-syncer sync 子命令,需要传入三个参数
- repo name:待同步资源的仓库名
- repo url:待同步资源的仓库链接地址
- repo type:仓库类型,包括 helm 和 oci
1var syncCmd = &cobra.Command{
2 // 使用说明
3 Use: "sync [REPO NAME] [REPO URL] [REPO TYPE]",
4 Short: "add a new chart repository, and resync its charts periodically",
5 Run: func(cmd *cobra.Command, args []string) {
6 ...
7 // 初始化数据库配置
8 dbConfig := datastore.Config{URL: databaseURL, Database: databaseName, Username: databaseUser, Password: databasePassword}
9 kubeappsNamespace := os.Getenv("POD_NAMESPACE")
10
11 // 创建一个manager,用于管理所有的操作
12 manager, err := newManager(dbConfig, kubeappsNamespace)
13 ...
14
15 // manager 初始化
16 err = manager.Init()
17 ...
18
19 // 初始化 http client
20 netClient, err := initNetClient(additionalCAFile, tlsInsecureSkipVerify)
21 ...
22
23 // 根据 repo type 初始化 Repo 接口的实现类
24 // 类型支持 helm、oci两种
25 var repoIface Repo
26 if args[2] == "helm" {
27 repoIface, err = getHelmRepo(namespace, args[0], args[1], authorizationHeader, filters, netClient)
28 } else {
29 repoIface, err = getOCIRepo(namespace, args[0], args[1], authorizationHeader, filters, ociRepositories, netClient)
30 }
31 ...
32
33 // 获取 repo
34 repo := repoIface.Repo()
35 checksum, err := repoIface.Checksum()
36 ...
37
38 // 判断repo是否被处理过
39 if manager.RepoAlreadyProcessed(models.Repo{Namespace: repo.Namespace, Name: repo.Name}, checksum) {
40 logrus.WithFields(logrus.Fields{"url": repo.URL}).Info("Skipping repository since there are no updates")
41 return
42 }
43
44 // 如果没有处理过,获取 repo 中所有的 chart
45 // 每个chart使用 Chart 这个对象保存
46 charts, err := repoIface.Charts()
47 if err != nil {
48 logrus.Fatal(err)
49 }
50
51 // 调用manager的sync模块同步 chart 信息到数据库
52 if err = manager.Sync(models.Repo{Name: repo.Name, Namespace: repo.Namespace}, charts); err != nil {
53 logrus.Fatalf("Can't add chart repository to database: %v", err)
54 }
55
56 // Fetch and store chart icons
57 fImporter := fileImporter{manager, netClient}
58 fImporter.fetchFiles(charts, repoIface)
59
60 // Update cache in the database
61 if err = manager.UpdateLastCheck(repo.Namespace, repo.Name, checksum, time.Now()); err != nil {
62 logrus.Fatal(err)
63 }
64 logrus.WithFields(logrus.Fields{"url": repo.URL}).Info("Stored repository update in cache")
65
66 logrus.Infof("Successfully added the chart repository %s to database", args[0])
67 },
68}
manager
创建manager
1// 创建manager
2func newManager(config datastore.Config, kubeappsNamespace string) (assetManager, error) {
3 // 支持pg数据库
4 return newPGManager(config, kubeappsNamespace)
5}
6
7func newPGManager(config datastore.Config, kubeappsNamespace string) (assetManager, error) {
8 m, err := dbutils.NewPGManager(config, kubeappsNamespace)
9 if err != nil {
10 return nil, err
11 }
12 return &postgresAssetManager{m}, nil
13}
14
15func NewPGManager(config datastore.Config, kubeappsNamespace string) (*PostgresAssetManager, error) {
16 url := strings.Split(config.URL, ":")
17 if len(url) != 2 {
18 return nil, fmt.Errorf("Can't parse database URL: %s", config.URL)
19 }
20 // 获取pg数据库连接信息
21 connStr := fmt.Sprintf(
22 "host=%s port=%s user=%s password=%s dbname=%s sslmode=disable",
23 url[0], url[1], config.Username, config.Password, config.Database,
24 )
25 return &PostgresAssetManager{connStr, nil, kubeappsNamespace}, nil
26}
manager初始化
1func (m *PostgresAssetManager) Init() error {
2 // 创建pg数据库连接
3 db, err := sql.Open("postgres", m.connStr)
4 if err != nil {
5 return err
6 }
7 m.DB = db
8 return nil
9}
manager.RepoAlreadyProcessed
查询数据库记录,判断数据库存储的校验码是否匹配当前的校验码
1SELECT checksum FROM %s WHERE name = $1 AND namespace = $2
1func (m *postgresAssetManager) RepoAlreadyProcessed(repo models.Repo, repoChecksum string) bool {
2 var lastChecksum string
3
4 // 拼接sql语句
5 row := m.DB.QueryRow(fmt.Sprintf("SELECT checksum FROM %s WHERE name = $1 AND namespace = $2", dbutils.RepositoryTable), repo.Name, repo.Namespace)
6 if row != nil {
7 err := row.Scan(&lastChecksum)
8 log.Errorf("lastChecksum: %+v", lastChecksum)
9
10 // 对比checksum数据是否一致
11 return err == nil && lastChecksum == repoChecksum
12 }
13 return false
14}
manager.Sync
同步操作包括以下几步:
- 从repo索引获取chart元信息,并更新数据库记录
- 并发处理chart的icon信息
- 并发处理chart包最新版本的Readme、values.yaml等文件
- 并发处理chart包历史版本的Readme、values.yaml等文件
1func (m *postgresAssetManager) Sync(repo models.Repo, charts []models.Chart) error {
2
3 // 初始化表结构
4 // 创建了三张表:repos、charts、files
5 // 注意这里设置了外键关联,并且设置数据级联删除
6 m.InitTables()
7
8 // 将repo信息插入repos表
9 _, err := m.EnsureRepoExists(repo.Namespace, repo.Name)
10 if err != nil {
11 return err
12 }
13
14 // 导入index中的chart到数据库
15 // for循环执行sql语句,将chart信息插入charts表
16 err = m.importCharts(charts, repo)
17 if err != nil {
18 return err
19 }
20
21 // 移除不在索引中的数据库中的chart记录
22 return m.removeMissingCharts(repo, charts)
23}
Repo接口
repo定义了从给定仓库获取信息的方法,是一个接口类,具体实现类有两个:
- HelmRepo
- OCIRegistry
后续主要围绕HelmRpo的实现类做分析
1type Repo interface {
2 Checksum() (string, error)
3 Repo() *models.RepoInternal
4 Charts() ([]models.Chart, error)
5 FetchFiles(name string, cv models.ChartVersion) (map[string]string, error)
6}
Repo实例化
1func getHelmRepo(namespace, name, repoURL, authorizationHeader string, filter *apprepov1alpha1.FilterRuleSpec, netClient httpClient) (Repo, error) {
2 url, err := parseRepoURL(repoURL)
3 ...
4
5 // 读取 helm 仓库的 index.yaml文件,并读取内容
6 // 这部分内容请参考helm源码分析系列文章
7 repoBytes, err := fetchRepoIndex(url.String(), authorizationHeader, netClient)
8 if err != nil {
9 return nil, err
10 }
11
12 return &HelmRepo{
13 // 索引内容赋值给content字段
14 content: repoBytes,
15 RepoInternal: &models.RepoInternal{
16 Namespace: namespace,
17 Name: name,
18 URL: url.String(),
19 AuthorizationHeader: authorizationHeader,
20 },
21 netClient: netClient,
22 filter: filter,
23 }, nil
24}
repo.Repo方法
两种实现都返回内部的RepoInternal属性
1func (r *HelmRepo) Repo() *models.RepoInternal {
2 return r.RepoInternal
3}
4
5func (r *OCIRegistry) Repo() *models.RepoInternal {
6 return r.RepoInternal
7}
repo.Charts方法
1func (r *HelmRepo) Charts() ([]models.Chart, error) {
2 // 这里的r.content在前面分析过,主要是helm仓库中index.yaml文件下载后的内容
3 // 这一步将索引的二进制流反序列化为IndexFile对象,这部分内容可以参考helm源码分析系列文章
4 index, err := parseRepoIndex(r.content)
5 ...
6 repo := &models.Repo{
7 Namespace: r.Namespace,
8 Name: r.Name,
9 URL: r.URL,
10 Type: r.Type,
11 }
12
13 // 将索引的IndexFile对象转换为Chart对象
14 charts := chartsFromIndex(index, repo)
15 if len(charts) == 0 {
16 return []models.Chart{}, fmt.Errorf("no charts in repository index")
17 }
18
19 // 过滤结果
20 return filterCharts(charts, r.filter)
21}
fetchFiles方法
1func (f *fileImporter) fetchFiles(charts []models.Chart, repo Repo) {
2 ...
3 // 启动10个协程,并发处理icon和file
4 // 根据Chart对象中的url地址,下载图标和文件,并更新数据库记录
5 // icon:更新charts表的info字段中关于icon部分的记录
6 // file:插入记录到files表
7 for i := 0; i < numWorkers; i++ {
8 wg.Add(1)
9 go f.importWorker(&wg, iconJobs, chartFilesJobs, repo)
10 }
11 ...
12 // Iterate through the list of charts and enqueue the latest chart version to
13 // be processed. Append the rest of the chart versions to a list to be
14 // enqueued later
15 var toEnqueue []importChartFilesJob
16 for _, c := range charts {
17 chartFilesJobs <- importChartFilesJob{c.Name, c.Repo, c.ChartVersions[0]}
18 for _, cv := range c.ChartVersions[1:] {
19 toEnqueue = append(toEnqueue, importChartFilesJob{c.Name, c.Repo, cv})
20 }
21 }
22
23 // Enqueue all the remaining chart versions
24 for _, cfj := range toEnqueue {
25 chartFilesJobs <- cfj
26 }
27 // Close the chartFilesJobs channel to signal the worker pools that there are
28 // no more jobs to process
29 close(chartFilesJobs)
30
31 // Wait for the worker pools to finish processing
32 wg.Wait()
33}
deleteCmd
这部分代码和前面syncCmd类似,只是最后调用的是manager的Delete方法
1var deleteCmd = &cobra.Command{
2 Use: "delete [REPO NAME]",
3 Short: "delete a chart repository",
4 Run: func(cmd *cobra.Command, args []string) {
5 ...
6 // Delete方法
7 if err = manager.Delete(repo); err != nil {
8 logrus.Fatalf("Can't delete chart repository %s from database: %v", args[0], err)
9 }
10
11 logrus.Infof("Successfully deleted the chart repository %s from database", args[0])
12 },
13}
manager.Delete
- 实现比较简单,就是删除repos表中的记录。
- 这里没有删除charts表和files表,是因为前面的InitTable函数中初始化表结构时,设置了外键级联删除,只把主键所在的记录删除,有外键关联的字段自动删除了
1
2func (m *postgresAssetManager) Delete(repo models.Repo) error {
3 rows, err := m.DB.Query(fmt.Sprintf("DELETE FROM %s WHERE name = $1 AND namespace = $2", dbutils.RepositoryTable), repo.Name, repo.Namespace)
4 if rows != nil {
5 defer rows.Close()
6 }
7 return err
8}
invalidateCacheCmd
这部分代码和前面syncCmd类似,只是最后调用的是manager的InvalidateCache方法
1var invalidateCacheCmd = &cobra.Command{
2 Use: "invalidate-cache",
3 Short: "removes all data so the cache can be rebuilt",
4 Run: func(cmd *cobra.Command, args []string) {
5 ...
6 // 调用manager的方法
7 err = manager.InvalidateCache()
8 if err != nil {
9 logrus.Fatal(err)
10 }
11 logrus.Infof("Successfully invalidated cache")
12 },
13}
manager.InvalidateCache
实现逻辑就是删除三张表
1func (m *PostgresAssetManager) InvalidateCache() error {
2 tables := strings.Join([]string{RepositoryTable, ChartTable, ChartFilesTable}, ",")
3 _, err := m.DB.Exec(fmt.Sprintf("DROP TABLE IF EXISTS %s", tables))
4 if err != nil {
5 return err
6 }
7
8 return m.InitTables()
9}