kubeapps源码分析-apprepository-controller
| 阅读 | 共 2591 字,阅读约
Overview
kubeppas源码分析-apprepository-controller
概述
apprepository-controller是一个标准的k8s operator实现,负责监听CRD资源,并实现调谐过程,达到声明式的效果。
所有operator的开发套路都是一样的,通过operator框架生成代码的基本骨架,只需要关心调谐部分的代码即可。
源码分析
CRD
源码位置:cmd/apprepository-controller/pkg/apis/apprepository/v1alpha1/types.go
1// AppRepository CRD
2type AppRepository struct {
3 metav1.TypeMeta `json:",inline"`
4 metav1.ObjectMeta `json:"metadata,omitempty"`
5
6 Spec AppRepositorySpec `json:"spec"`
7 Status AppRepositoryStatus `json:"status"`
8}
9
10type AppRepositorySpec struct {
11 Type string `json:"type"`
12 URL string `json:"url"`
13 Auth AppRepositoryAuth `json:"auth,omitempty"`
14 ResyncRequests uint `json:"resyncRequests"`
15 SyncJobPodTemplate corev1.PodTemplateSpec `json:"syncJobPodTemplate"`
16 // DockerRegistrySecrets is a list of dockerconfigjson secrets which exist
17 // in the same namespace as the AppRepository and should be included
18 // automatically for matching images.
19 DockerRegistrySecrets []string `json:"dockerRegistrySecrets,omitempty"`
20 // In case of an OCI type, the list of repositories is needed
21 // as there is no API for the index
22 OCIRepositories []string `json:"ociRepositories,omitempty"`
23 // TLSInsecureSkipVerify skips TLS verification
24 TLSInsecureSkipVerify bool `json:"tlsInsecureSkipVerify,omitempty"`
25 // FilterRule allows to filter packages based on a JQuery
26 FilterRule FilterRuleSpec `json:"filterRule,omitempty"`
27}
main
该模块和其他operator的开发没有区别,代码结构也一致
源码位置:cmd/apprepository-controller/main.go
1func main() {
2 ...
3 // 声明k8s客户端对象
4 kubeClient, err := kubernetes.NewForConfig(cfg)
5 ...
6 // 声明AppRepository的客户端对象
7 apprepoClient, err := clientset.NewForConfig(cfg)
8 ...
9 // 创建SharedInformerFactory对象
10 kubeInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeinformers.WithNamespace(conf.KubeappsNamespace))
11 // Enable app repo scanning to be manually set to scan the kubeapps repo only. See #1923.
12 var apprepoInformerFactory informers.SharedInformerFactory
13 if conf.ReposPerNamespace {
14 apprepoInformerFactory = informers.NewSharedInformerFactory(apprepoClient, 0)
15 } else {
16 apprepoInformerFactory = informers.NewFilteredSharedInformerFactory(apprepoClient, 0, conf.KubeappsNamespace, nil)
17 }
18
19 conf.ImagePullSecretsRefs = getImagePullSecretsRefs(conf.RepoSyncImagePullSecrets)
20
21 // 初始化Controller
22 controller := NewController(kubeClient, apprepoClient, kubeInformerFactory, apprepoInformerFactory, conf)
23
24 // 启动informer
25 go kubeInformerFactory.Start(stopCh)
26 go apprepoInformerFactory.Start(stopCh)
27 ...
28}
Controller
- 这个controller的逻辑主要是启动Job或者CronJob等k8s原生资源
- Job使用的是镜像是
bitnami/kubeapps-asset-syncer
,使用的参数是sync命令 - asset-syncer负责同步给定chart包仓库的资源信息,参考asset-syncer组件的源码分析
源码位置:cmd/apprepository-controller/controller.go
NewController
1func NewController(
2 kubeclientset kubernetes.Interface,
3 apprepoclientset clientset.Interface,
4 kubeInformerFactory kubeinformers.SharedInformerFactory,
5 apprepoInformerFactory informers.SharedInformerFactory,
6 conf *Config) *Controller {
7
8 // 获取 CronJob 的informer对象
9 cronjobInformer := kubeInformerFactory.Batch().V1beta1().CronJobs()
10
11 // 获取 AppRepository 的informer对象
12 apprepoInformer := apprepoInformerFactory.Kubeapps().V1alpha1().AppRepositories()
13 ...
14
15 // controller初始化
16 controller := &Controller{
17 kubeclientset: kubeclientset,
18 apprepoclientset: apprepoclientset,
19 cronjobsLister: cronjobInformer.Lister(),
20 cronjobsSynced: cronjobInformer.Informer().HasSynced,
21 appreposLister: apprepoInformer.Lister(),
22 appreposSynced: apprepoInformer.Informer().HasSynced,
23 workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AppRepositories"),
24 recorder: recorder,
25 conf: *conf,
26 }
27
28 log.Info("Setting up event handlers")
29 // Set up an event handler for when AppRepository resources change
30
31 // 注册 AppRepository 资源变化时的事件处理函数
32 apprepoInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
33
34 // crd 添加事件处理函数
35 AddFunc: controller.enqueueAppRepo,
36
37 // crd 更新事件处理函数
38 // 这里调用的根添加事件使用同一个函数 enqueueAppRepo
39 UpdateFunc: func(oldObj, newObj interface{}) {
40 oldApp := oldObj.(*apprepov1alpha1.AppRepository)
41 newApp := newObj.(*apprepov1alpha1.AppRepository)
42 if oldApp.Spec.URL != newApp.Spec.URL || oldApp.Spec.ResyncRequests != newApp.Spec.ResyncRequests {
43 controller.enqueueAppRepo(newApp)
44 }
45 },
46
47 // crd 删除事件处理函数
48 // 这里就是将key添加到workqueue中
49 DeleteFunc: func(obj interface{}) {
50 key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
51 if err == nil {
52 controller.workqueue.AddRateLimited(key)
53 }
54 },
55 })
56
57 // cronjob 资源删除时,做一些特殊的处理逻辑
58 cronjobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
59 DeleteFunc: controller.handleObject,
60 })
61
62 return controller
63}
enqueueAppRepo
1func (c *Controller) enqueueAppRepo(obj interface{}) {
2 var key string
3 var err error
4 if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
5 runtime.HandleError(err)
6 return
7 }
8 // 将事件添加到workqueue中
9 c.workqueue.AddRateLimited(key)
10}
handleObject
1
Run
1func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
2 ...
3 // 等待缓存资源同步
4 if ok := cache.WaitForCacheSync(stopCh, c.cronjobsSynced, c.appreposSynced); !ok {
5 return fmt.Errorf("failed to wait for caches to sync")
6 }
7
8 log.Info("Starting workers")
9 // Launch two workers to process AppRepository resources
10
11 // 启动协程处理 AppRepository 资源
12 for i := 0; i < threadiness; i++ {
13 go wait.Until(c.runWorker, time.Second, stopCh)
14 }
15
16 // 等待协程结束
17 log.Info("Started workers")
18 <-stopCh
19 log.Info("Shutting down workers")
20
21 return nil
22}
runWorker
1func (c *Controller) runWorker() {
2 for c.processNextWorkItem() {
3 }
4}
5
6func (c *Controller) processNextWorkItem() bool {
7 obj, shutdown := c.workqueue.Get()
8
9 if shutdown {
10 return false
11 }
12
13 // We wrap this block in a func so we can defer c.workqueue.Done.
14 err := func(obj interface{}) error {
15 // We call Done here so the workqueue knows we have finished
16 // processing this item. We also must remember to call Forget if we
17 // do not want this work item being re-queued. For example, we do
18 // not call Forget if a transient error occurs, instead the item is
19 // put back on the workqueue and attempted again after a back-off
20 // period.
21 defer c.workqueue.Done(obj)
22 var key string
23 var ok bool
24 // We expect strings to come off the workqueue. These are of the
25 // form namespace/name. We do this as the delayed nature of the
26 // workqueue means the items in the informer cache may actually be
27 // more up to date that when the item was initially put onto the
28 // workqueue.
29 if key, ok = obj.(string); !ok {
30 // As the item in the workqueue is actually invalid, we call
31 // Forget here else we'd go into a loop of attempting to
32 // process a work item that is invalid.
33 c.workqueue.Forget(obj)
34 runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
35 return nil
36 }
37 // Run the syncHandler, passing it the namespace/name string of the
38 // AppRepository resource to be synced.
39 if err := c.syncHandler(key); err != nil {
40 return fmt.Errorf("error syncing '%s': %s", key, err.Error())
41 }
42 // Finally, if no error occurs we Forget this item so it does not
43 // get queued again until another change happens.
44 c.workqueue.Forget(obj)
45 log.Infof("Successfully synced '%s'", key)
46 return nil
47 }(obj)
48
49 if err != nil {
50 runtime.HandleError(err)
51 return true
52 }
53
54 return true
55}
syncHandler
1func (c *Controller) syncHandler(key string) error {
2 // Convert the namespace/name string into a distinct namespace and name
3 namespace, name, err := cache.SplitMetaNamespaceKey(key)
4 if err != nil {
5 runtime.HandleError(fmt.Errorf("invalid resource key: %s", key))
6 return nil
7 }
8
9 // 获取 AppRepository 对应的 CronJob资源对象
10 apprepo, err := c.appreposLister.AppRepositories(namespace).Get(name)
11 if err != nil {
12 // The AppRepository resource may no longer exist, in which case we stop
13 // processing.
14 if errors.IsNotFound(err) {
15 log.Infof("AppRepository '%s' no longer exists so performing cleanup of charts from the DB", key)
16 // Trigger a Job to perfrom the cleanup of the charts in the DB corresponding to deleted AppRepository
17 _, err = c.kubeclientset.BatchV1().Jobs(c.conf.KubeappsNamespace).Create(context.TODO(), newCleanupJob(c.conf.KubeappsNamespace, namespace, name, c.conf), metav1.CreateOptions{})
18 if err != nil {
19 log.Errorf("Unable to create cleanup job: %v", err)
20 return err
21 }
22
23 // TODO: Workaround until the sync jobs are moved to the repoNamespace (#1647)
24 // Delete the cronjob in the Kubeapps namespace to avoid re-syncing the repository
25 err = c.kubeclientset.BatchV1beta1().CronJobs(c.conf.KubeappsNamespace).Delete(context.TODO(), cronJobName(namespace, name), metav1.DeleteOptions{})
26 if err != nil && !errors.IsNotFound(err) {
27 log.Errorf("Unable to delete sync cronjob: %v", err)
28 return err
29 }
30 return nil
31 }
32 return fmt.Errorf("Error fetching object with key %s from store: %v", key, err)
33 }
34
35 // Get the cronjob with the same name as AppRepository
36 cronjobName := cronJobName(namespace, name)
37 cronjob, err := c.cronjobsLister.CronJobs(c.conf.KubeappsNamespace).Get(cronjobName)
38 // If the resource doesn't exist, we'll create it
39 if errors.IsNotFound(err) {
40 log.Infof("Creating CronJob %q for AppRepository %q", cronjobName, apprepo.GetName())
41
42 // 创建一个 CronJob 对象
43 cronjob, err = c.kubeclientset.BatchV1beta1().CronJobs(c.conf.KubeappsNamespace).Create(context.TODO(), newCronJob(apprepo, c.conf), metav1.CreateOptions{})
44 if err != nil {
45 return err
46 }
47
48 // Trigger a manual Job for the initial sync
49 _, err = c.kubeclientset.BatchV1().Jobs(c.conf.KubeappsNamespace).Create(context.TODO(), newSyncJob(apprepo, c.conf), metav1.CreateOptions{})
50 } else if err == nil {
51 // If the resource already exists, we'll update it
52 log.Infof("Updating CronJob %q in namespace %q for AppRepository %q in namespace %q", cronjobName, c.conf.KubeappsNamespace, apprepo.GetName(), apprepo.GetNamespace())
53 cronjob, err = c.kubeclientset.BatchV1beta1().CronJobs(c.conf.KubeappsNamespace).Update(context.TODO(), newCronJob(apprepo, c.conf), metav1.UpdateOptions{})
54 if err != nil {
55 return err
56 }
57
58 // The AppRepository has changed, launch a manual Job
59
60 // 创建一个新的CronJob,调用newSyncJob初始化CronJob资源
61 _, err = c.kubeclientset.BatchV1().Jobs(c.conf.KubeappsNamespace).Create(context.TODO(), newSyncJob(apprepo, c.conf), metav1.CreateOptions{})
62 }
63
64 // If an error occurs during Get/Create, we'll requeue the item so we can
65 // attempt processing again later. This could have been caused by a
66 // temporary network failure, or any other transient reason.
67 if err != nil {
68 return err
69 }
70
71 // If the CronJob is not controlled by this AppRepository resource and it is not a
72 // cronjob for an app repo in another namespace, then we should
73 // log a warning to the event recorder and return it.
74 if !metav1.IsControlledBy(cronjob, apprepo) && !objectBelongsTo(cronjob, apprepo) {
75 msg := fmt.Sprintf(MessageResourceExists, cronjob.Name)
76 c.recorder.Event(apprepo, corev1.EventTypeWarning, ErrResourceExists, msg)
77 return fmt.Errorf(msg)
78 }
79
80 if apprepo.GetNamespace() == c.conf.KubeappsNamespace {
81 c.recorder.Event(apprepo, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
82 }
83 return nil
84}
newSyncJob
1func newSyncJob(apprepo *apprepov1alpha1.AppRepository, config Config) *batchv1.Job {
2 return &batchv1.Job{
3 // 设置Job资源的metadata
4 ObjectMeta: metav1.ObjectMeta{
5 GenerateName: cronJobName(apprepo.Namespace, apprepo.Name) + "-",
6 OwnerReferences: ownerReferencesForAppRepo(apprepo, config.KubeappsNamespace),
7 },
8 // 设置Job资源的Spec字段
9 Spec: syncJobSpec(apprepo, config),
10 }
11}
syncJobSpec
1func syncJobSpec(apprepo *apprepov1alpha1.AppRepository, config Config) batchv1.JobSpec {
2 ...
3
4 // 这里是设置Job的Pod信息的地方
5 podTemplateSpec.Spec.Containers[0].Name = "sync"
6
7 // 指定的镜像是config.RepoSyncImage
8 // 使用的镜像是 bitnami/kubeapps-asset-syncer
9 podTemplateSpec.Spec.Containers[0].Image = config.RepoSyncImage
10 podTemplateSpec.Spec.Containers[0].ImagePullPolicy = "IfNotPresent"
11 podTemplateSpec.Spec.Containers[0].Command = []string{config.RepoSyncCommand}
12
13 // 启动参数
14 // apprepoSyncJobArgs函数拼接参数,最终的参数如下:
15 // sync --database-url=xxx ----database-user=xxx --database-name=xxx --user-agent-comment=xxx --namespace=xxx {appRepoName} {appRepoURL} {appRepoType}......
16 podTemplateSpec.Spec.Containers[0].Args = apprepoSyncJobArgs(apprepo, config)
17 ...
18 return batchv1.JobSpec{
19 TTLSecondsAfterFinished: ttlLifetimeJobs(config),
20 Template: podTemplateSpec,
21 }
22}
newCronJob
代码和newSyncJob类似,只不过创建的资源类型不同,新增了Schedule字段设置定时规则
1func newCronJob(apprepo *apprepov1alpha1.AppRepository, config Config) *batchv1beta1.CronJob {
2 return &batchv1beta1.CronJob{
3 ObjectMeta: metav1.ObjectMeta{
4 Name: cronJobName(apprepo.Namespace, apprepo.Name),
5 OwnerReferences: ownerReferencesForAppRepo(apprepo, config.KubeappsNamespace),
6 Labels: jobLabels(apprepo),
7 },
8 Spec: batchv1beta1.CronJobSpec{
9 // 设置定时规则
10 Schedule: config.Crontab,
11 // Set to replace as short-circuit in k8s <1.12
12 // TODO re-evaluate ConcurrentPolicy when 1.12+ is mainstream (i.e 1.14)
13 // https://github.com/kubernetes/kubernetes/issues/54870
14 ConcurrencyPolicy: "Replace",
15 JobTemplate: batchv1beta1.JobTemplateSpec{
16 Spec: syncJobSpec(apprepo, config),
17 },
18 },
19 }
20}