kubeapps源码分析-apprepository-controller


| 阅读 |,阅读约 6 分钟
| 复制链接:

Overview

kubeppas源码分析-apprepository-controller

概述

apprepository-controller是一个标准的k8s operator实现,负责监听CRD资源,并实现调谐过程,达到声明式的效果。

所有operator的开发套路都是一样的,通过operator框架生成代码的基本骨架,只需要关心调谐部分的代码即可。

源码分析

CRD

源码位置:cmd/apprepository-controller/pkg/apis/apprepository/v1alpha1/types.go

 1// AppRepository CRD
 2type AppRepository struct {
 3	metav1.TypeMeta   `json:",inline"`
 4	metav1.ObjectMeta `json:"metadata,omitempty"`
 5
 6	Spec   AppRepositorySpec   `json:"spec"`
 7	Status AppRepositoryStatus `json:"status"`
 8}
 9
10type AppRepositorySpec struct {
11	Type               string                 `json:"type"`
12	URL                string                 `json:"url"`
13	Auth               AppRepositoryAuth      `json:"auth,omitempty"`
14	ResyncRequests     uint                   `json:"resyncRequests"`
15	SyncJobPodTemplate corev1.PodTemplateSpec `json:"syncJobPodTemplate"`
16	// DockerRegistrySecrets is a list of dockerconfigjson secrets which exist
17	// in the same namespace as the AppRepository and should be included
18	// automatically for matching images.
19	DockerRegistrySecrets []string `json:"dockerRegistrySecrets,omitempty"`
20	// In case of an OCI type, the list of repositories is needed
21	// as there is no API for the index
22	OCIRepositories []string `json:"ociRepositories,omitempty"`
23	// TLSInsecureSkipVerify skips TLS verification
24	TLSInsecureSkipVerify bool `json:"tlsInsecureSkipVerify,omitempty"`
25	// FilterRule allows to filter packages based on a JQuery
26	FilterRule FilterRuleSpec `json:"filterRule,omitempty"`
27}

main

该模块和其他operator的开发没有区别,代码结构也一致

源码位置:cmd/apprepository-controller/main.go

 1func main() {
 2	...
 3  // 声明k8s客户端对象
 4	kubeClient, err := kubernetes.NewForConfig(cfg)
 5	...
 6  // 声明AppRepository的客户端对象
 7	apprepoClient, err := clientset.NewForConfig(cfg)
 8	...
 9	// 创建SharedInformerFactory对象
10	kubeInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeinformers.WithNamespace(conf.KubeappsNamespace))
11	// Enable app repo scanning to be manually set to scan the kubeapps repo only. See #1923.
12	var apprepoInformerFactory informers.SharedInformerFactory
13	if conf.ReposPerNamespace {
14		apprepoInformerFactory = informers.NewSharedInformerFactory(apprepoClient, 0)
15	} else {
16		apprepoInformerFactory = informers.NewFilteredSharedInformerFactory(apprepoClient, 0, conf.KubeappsNamespace, nil)
17	}
18
19	conf.ImagePullSecretsRefs = getImagePullSecretsRefs(conf.RepoSyncImagePullSecrets)
20
21  // 初始化Controller
22	controller := NewController(kubeClient, apprepoClient, kubeInformerFactory, apprepoInformerFactory, conf)
23
24  // 启动informer
25	go kubeInformerFactory.Start(stopCh)
26	go apprepoInformerFactory.Start(stopCh)
27  ...
28}

Controller

  • 这个controller的逻辑主要是启动Job或者CronJob等k8s原生资源
  • Job使用的是镜像是bitnami/kubeapps-asset-syncer,使用的参数是sync命令
  • asset-syncer负责同步给定chart包仓库的资源信息,参考asset-syncer组件的源码分析

源码位置:cmd/apprepository-controller/controller.go

NewController

 1func NewController(
 2	kubeclientset kubernetes.Interface,
 3	apprepoclientset clientset.Interface,
 4	kubeInformerFactory kubeinformers.SharedInformerFactory,
 5	apprepoInformerFactory informers.SharedInformerFactory,
 6	conf *Config) *Controller {
 7
 8  // 获取 CronJob 的informer对象
 9	cronjobInformer := kubeInformerFactory.Batch().V1beta1().CronJobs()
10  
11  // 获取 AppRepository 的informer对象
12	apprepoInformer := apprepoInformerFactory.Kubeapps().V1alpha1().AppRepositories()
13  ...
14	
15  // controller初始化
16	controller := &Controller{
17		kubeclientset:    kubeclientset,
18		apprepoclientset: apprepoclientset,
19		cronjobsLister:   cronjobInformer.Lister(),
20		cronjobsSynced:   cronjobInformer.Informer().HasSynced,
21		appreposLister:   apprepoInformer.Lister(),
22		appreposSynced:   apprepoInformer.Informer().HasSynced,
23		workqueue:        workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AppRepositories"),
24		recorder:         recorder,
25		conf:             *conf,
26	}
27
28	log.Info("Setting up event handlers")
29	// Set up an event handler for when AppRepository resources change
30  
31  // 注册 AppRepository 资源变化时的事件处理函数
32	apprepoInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
33    
34    // crd 添加事件处理函数
35		AddFunc: controller.enqueueAppRepo,
36    
37    // crd 更新事件处理函数
38    // 这里调用的根添加事件使用同一个函数 enqueueAppRepo
39		UpdateFunc: func(oldObj, newObj interface{}) {
40			oldApp := oldObj.(*apprepov1alpha1.AppRepository)
41			newApp := newObj.(*apprepov1alpha1.AppRepository)
42			if oldApp.Spec.URL != newApp.Spec.URL || oldApp.Spec.ResyncRequests != newApp.Spec.ResyncRequests {
43				controller.enqueueAppRepo(newApp)
44			}
45		},
46    
47    // crd 删除事件处理函数
48    // 这里就是将key添加到workqueue中
49		DeleteFunc: func(obj interface{}) {
50			key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
51			if err == nil {
52				controller.workqueue.AddRateLimited(key)
53			}
54		},
55	})
56
57  // cronjob 资源删除时,做一些特殊的处理逻辑
58	cronjobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
59		DeleteFunc: controller.handleObject,
60	})
61
62	return controller
63}
enqueueAppRepo
 1func (c *Controller) enqueueAppRepo(obj interface{}) {
 2	var key string
 3	var err error
 4	if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
 5		runtime.HandleError(err)
 6		return
 7	}
 8  // 将事件添加到workqueue中
 9	c.workqueue.AddRateLimited(key)
10}
handleObject
1

Run

 1func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
 2	...
 3  // 等待缓存资源同步
 4	if ok := cache.WaitForCacheSync(stopCh, c.cronjobsSynced, c.appreposSynced); !ok {
 5		return fmt.Errorf("failed to wait for caches to sync")
 6	}
 7
 8	log.Info("Starting workers")
 9	// Launch two workers to process AppRepository resources
10  
11  // 启动协程处理 AppRepository 资源
12	for i := 0; i < threadiness; i++ {
13		go wait.Until(c.runWorker, time.Second, stopCh)
14	}
15
16  // 等待协程结束
17	log.Info("Started workers")
18	<-stopCh
19	log.Info("Shutting down workers")
20
21	return nil
22}
runWorker
 1func (c *Controller) runWorker() {
 2	for c.processNextWorkItem() {
 3	}
 4}
 5
 6func (c *Controller) processNextWorkItem() bool {
 7	obj, shutdown := c.workqueue.Get()
 8
 9	if shutdown {
10		return false
11	}
12
13	// We wrap this block in a func so we can defer c.workqueue.Done.
14	err := func(obj interface{}) error {
15		// We call Done here so the workqueue knows we have finished
16		// processing this item. We also must remember to call Forget if we
17		// do not want this work item being re-queued. For example, we do
18		// not call Forget if a transient error occurs, instead the item is
19		// put back on the workqueue and attempted again after a back-off
20		// period.
21		defer c.workqueue.Done(obj)
22		var key string
23		var ok bool
24		// We expect strings to come off the workqueue. These are of the
25		// form namespace/name. We do this as the delayed nature of the
26		// workqueue means the items in the informer cache may actually be
27		// more up to date that when the item was initially put onto the
28		// workqueue.
29		if key, ok = obj.(string); !ok {
30			// As the item in the workqueue is actually invalid, we call
31			// Forget here else we'd go into a loop of attempting to
32			// process a work item that is invalid.
33			c.workqueue.Forget(obj)
34			runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
35			return nil
36		}
37		// Run the syncHandler, passing it the namespace/name string of the
38		// AppRepository resource to be synced.
39		if err := c.syncHandler(key); err != nil {
40			return fmt.Errorf("error syncing '%s': %s", key, err.Error())
41		}
42		// Finally, if no error occurs we Forget this item so it does not
43		// get queued again until another change happens.
44		c.workqueue.Forget(obj)
45		log.Infof("Successfully synced '%s'", key)
46		return nil
47	}(obj)
48
49	if err != nil {
50		runtime.HandleError(err)
51		return true
52	}
53
54	return true
55}
syncHandler
 1func (c *Controller) syncHandler(key string) error {
 2	// Convert the namespace/name string into a distinct namespace and name
 3	namespace, name, err := cache.SplitMetaNamespaceKey(key)
 4	if err != nil {
 5		runtime.HandleError(fmt.Errorf("invalid resource key: %s", key))
 6		return nil
 7	}
 8
 9  // 获取 AppRepository 对应的 CronJob资源对象
10	apprepo, err := c.appreposLister.AppRepositories(namespace).Get(name)
11	if err != nil {
12		// The AppRepository resource may no longer exist, in which case we stop
13		// processing.
14		if errors.IsNotFound(err) {
15			log.Infof("AppRepository '%s' no longer exists so performing cleanup of charts from the DB", key)
16			// Trigger a Job to perfrom the cleanup of the charts in the DB corresponding to deleted AppRepository
17			_, err = c.kubeclientset.BatchV1().Jobs(c.conf.KubeappsNamespace).Create(context.TODO(), newCleanupJob(c.conf.KubeappsNamespace, namespace, name, c.conf), metav1.CreateOptions{})
18			if err != nil {
19				log.Errorf("Unable to create cleanup job: %v", err)
20				return err
21			}
22
23			// TODO: Workaround until the sync jobs are moved to the repoNamespace (#1647)
24			// Delete the cronjob in the Kubeapps namespace to avoid re-syncing the repository
25			err = c.kubeclientset.BatchV1beta1().CronJobs(c.conf.KubeappsNamespace).Delete(context.TODO(), cronJobName(namespace, name), metav1.DeleteOptions{})
26			if err != nil && !errors.IsNotFound(err) {
27				log.Errorf("Unable to delete sync cronjob: %v", err)
28				return err
29			}
30			return nil
31		}
32		return fmt.Errorf("Error fetching object with key %s from store: %v", key, err)
33	}
34
35	// Get the cronjob with the same name as AppRepository
36	cronjobName := cronJobName(namespace, name)
37	cronjob, err := c.cronjobsLister.CronJobs(c.conf.KubeappsNamespace).Get(cronjobName)
38	// If the resource doesn't exist, we'll create it
39	if errors.IsNotFound(err) {
40		log.Infof("Creating CronJob %q for AppRepository %q", cronjobName, apprepo.GetName())
41    
42    // 创建一个 CronJob 对象
43		cronjob, err = c.kubeclientset.BatchV1beta1().CronJobs(c.conf.KubeappsNamespace).Create(context.TODO(), newCronJob(apprepo, c.conf), metav1.CreateOptions{})
44		if err != nil {
45			return err
46		}
47
48		// Trigger a manual Job for the initial sync
49		_, err = c.kubeclientset.BatchV1().Jobs(c.conf.KubeappsNamespace).Create(context.TODO(), newSyncJob(apprepo, c.conf), metav1.CreateOptions{})
50	} else if err == nil {
51		// If the resource already exists, we'll update it
52		log.Infof("Updating CronJob %q in namespace %q for AppRepository %q in namespace %q", cronjobName, c.conf.KubeappsNamespace, apprepo.GetName(), apprepo.GetNamespace())
53		cronjob, err = c.kubeclientset.BatchV1beta1().CronJobs(c.conf.KubeappsNamespace).Update(context.TODO(), newCronJob(apprepo, c.conf), metav1.UpdateOptions{})
54		if err != nil {
55			return err
56		}
57
58		// The AppRepository has changed, launch a manual Job
59    
60    // 创建一个新的CronJob,调用newSyncJob初始化CronJob资源
61		_, err = c.kubeclientset.BatchV1().Jobs(c.conf.KubeappsNamespace).Create(context.TODO(), newSyncJob(apprepo, c.conf), metav1.CreateOptions{})
62	}
63
64	// If an error occurs during Get/Create, we'll requeue the item so we can
65	// attempt processing again later. This could have been caused by a
66	// temporary network failure, or any other transient reason.
67	if err != nil {
68		return err
69	}
70
71	// If the CronJob is not controlled by this AppRepository resource and it is not a
72	// cronjob for an app repo in another namespace, then we should
73	// log a warning to the event recorder and return it.
74	if !metav1.IsControlledBy(cronjob, apprepo) && !objectBelongsTo(cronjob, apprepo) {
75		msg := fmt.Sprintf(MessageResourceExists, cronjob.Name)
76		c.recorder.Event(apprepo, corev1.EventTypeWarning, ErrResourceExists, msg)
77		return fmt.Errorf(msg)
78	}
79
80	if apprepo.GetNamespace() == c.conf.KubeappsNamespace {
81		c.recorder.Event(apprepo, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
82	}
83	return nil
84}
newSyncJob
 1func newSyncJob(apprepo *apprepov1alpha1.AppRepository, config Config) *batchv1.Job {
 2	return &batchv1.Job{
 3    // 设置Job资源的metadata
 4		ObjectMeta: metav1.ObjectMeta{
 5			GenerateName:    cronJobName(apprepo.Namespace, apprepo.Name) + "-",
 6			OwnerReferences: ownerReferencesForAppRepo(apprepo, config.KubeappsNamespace),
 7		},
 8    // 设置Job资源的Spec字段
 9		Spec: syncJobSpec(apprepo, config),
10	}
11}
syncJobSpec
 1func syncJobSpec(apprepo *apprepov1alpha1.AppRepository, config Config) batchv1.JobSpec {
 2	...
 3
 4  // 这里是设置Job的Pod信息的地方
 5	podTemplateSpec.Spec.Containers[0].Name = "sync"
 6  
 7  // 指定的镜像是config.RepoSyncImage
 8  // 使用的镜像是 bitnami/kubeapps-asset-syncer
 9	podTemplateSpec.Spec.Containers[0].Image = config.RepoSyncImage
10	podTemplateSpec.Spec.Containers[0].ImagePullPolicy = "IfNotPresent"
11	podTemplateSpec.Spec.Containers[0].Command = []string{config.RepoSyncCommand}
12  
13  // 启动参数
14  // apprepoSyncJobArgs函数拼接参数,最终的参数如下:
15  // sync --database-url=xxx ----database-user=xxx --database-name=xxx --user-agent-comment=xxx --namespace=xxx {appRepoName} {appRepoURL} {appRepoType}......
16	podTemplateSpec.Spec.Containers[0].Args = apprepoSyncJobArgs(apprepo, config)
17	...
18	return batchv1.JobSpec{
19		TTLSecondsAfterFinished: ttlLifetimeJobs(config),
20		Template:                podTemplateSpec,
21	}
22}
newCronJob

代码和newSyncJob类似,只不过创建的资源类型不同,新增了Schedule字段设置定时规则

 1func newCronJob(apprepo *apprepov1alpha1.AppRepository, config Config) *batchv1beta1.CronJob {
 2	return &batchv1beta1.CronJob{
 3		ObjectMeta: metav1.ObjectMeta{
 4			Name:            cronJobName(apprepo.Namespace, apprepo.Name),
 5			OwnerReferences: ownerReferencesForAppRepo(apprepo, config.KubeappsNamespace),
 6			Labels:          jobLabels(apprepo),
 7		},
 8		Spec: batchv1beta1.CronJobSpec{
 9      // 设置定时规则
10			Schedule: config.Crontab,
11			// Set to replace as short-circuit in k8s <1.12
12			// TODO re-evaluate ConcurrentPolicy when 1.12+ is mainstream (i.e 1.14)
13			// https://github.com/kubernetes/kubernetes/issues/54870
14			ConcurrencyPolicy: "Replace",
15			JobTemplate: batchv1beta1.JobTemplateSpec{
16				Spec: syncJobSpec(apprepo, config),
17			},
18		},
19	}
20}