| 阅读 |,阅读约 3 分钟
| 复制链接:

argocd源码分析-argocd-application-controller

概述

  • cluster存放在secret中
  • 缓存支持两种:redis、memory

CRD

pkg/apis/application/v1alpha1/types.go

 1type AppProject struct {
 2	metav1.TypeMeta   `json:",inline"`
 3	metav1.ObjectMeta `json:"metadata" protobuf:"bytes,1,opt,name=metadata"`
 4	Spec              AppProjectSpec   `json:"spec" protobuf:"bytes,2,opt,name=spec"`
 5	Status            AppProjectStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"`
 6}
 7
 8type AppProjectSpec struct {
 9  // 被用来部署的仓库地址列表信息
10	SourceRepos []string `json:"sourceRepos,omitempty" protobuf:"bytes,1,name=sourceRepos"`
11  // 目标部署列表
12	Destinations []ApplicationDestination `json:"destinations,omitempty" protobuf:"bytes,2,name=destination"`
13  // 描述
14	Description string `json:"description,omitempty" protobuf:"bytes,3,opt,name=description"`
15	// Roles are user defined RBAC roles associated with this project
16	Roles []ProjectRole `json:"roles,omitempty" protobuf:"bytes,4,rep,name=roles"`
17	// ClusterResourceWhitelist contains list of whitelisted cluster level resources
18	ClusterResourceWhitelist []metav1.GroupKind `json:"clusterResourceWhitelist,omitempty" protobuf:"bytes,5,opt,name=clusterResourceWhitelist"`
19	// NamespaceResourceBlacklist contains list of blacklisted namespace level resources
20	NamespaceResourceBlacklist []metav1.GroupKind `json:"namespaceResourceBlacklist,omitempty" protobuf:"bytes,6,opt,name=namespaceResourceBlacklist"`
21	// OrphanedResources specifies if controller should monitor orphaned resources of apps in this project
22	OrphanedResources *OrphanedResourcesMonitorSettings `json:"orphanedResources,omitempty" protobuf:"bytes,7,opt,name=orphanedResources"`
23	// SyncWindows controls when syncs can be run for apps in this project
24	SyncWindows SyncWindows `json:"syncWindows,omitempty" protobuf:"bytes,8,opt,name=syncWindows"`
25	// NamespaceResourceWhitelist contains list of whitelisted namespace level resources
26	NamespaceResourceWhitelist []metav1.GroupKind `json:"namespaceResourceWhitelist,omitempty" protobuf:"bytes,9,opt,name=namespaceResourceWhitelist"`
27	// List of PGP key IDs that commits to be synced to must be signed with
28	SignatureKeys []SignatureKey `json:"signatureKeys,omitempty" protobuf:"bytes,10,opt,name=signatureKeys"`
29	// ClusterResourceBlacklist contains list of blacklisted cluster level resources
30	ClusterResourceBlacklist []metav1.GroupKind `json:"clusterResourceBlacklist,omitempty" protobuf:"bytes,11,opt,name=clusterResourceBlacklist"`
31}
32
33// 应用部署的目标集群信息
34type ApplicationDestination struct {
35	// Server overrides the environment server value in the ksonnet app.yaml
36	Server string `json:"server,omitempty" protobuf:"bytes,1,opt,name=server"`
37	// Namespace overrides the environment namespace value in the ksonnet app.yaml
38  // 目标集群的命名空间
39	Namespace string `json:"namespace,omitempty" protobuf:"bytes,2,opt,name=namespace"`
40	// Name of the destination cluster which can be used instead of server (url) field
41	Name string `json:"name,omitempty" protobuf:"bytes,3,opt,name=name"`
42
43	// nolint:govet
44	isServerInferred bool `json:"-"`
45}

Controller

函数入口

cmd/argocd-application-controller/commands/argocd-application-controller.go

 1appController, err := controller.NewApplicationController(
 2				namespace,
 3				settingsMgr,
 4				kubeClient,
 5				appClient,
 6				repoClientset,
 7				cache,
 8				kubectl,
 9				resyncDuration,
10				time.Duration(selfHealTimeoutSeconds)*time.Second,
11				metricsPort,
12				kubectlParallelismLimit,
13				clusterFilter)
14...
15go appController.Run(ctx, statusProcessors, operationProcessors)

NewApplicationController

负责注册增、删、改、查的回调函数

 1projInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
 2		AddFunc: func(obj interface{}) {
 3			if key, err := cache.MetaNamespaceKeyFunc(obj); err == nil {
 4				ctrl.projectRefreshQueue.Add(key)
 5			}
 6		},
 7		UpdateFunc: func(old, new interface{}) {
 8			if key, err := cache.MetaNamespaceKeyFunc(new); err == nil {
 9				ctrl.projectRefreshQueue.Add(key)
10			}
11		},
12		DeleteFunc: func(obj interface{}) {
13			if key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj); err == nil {
14				ctrl.projectRefreshQueue.Add(key)
15			}
16		},
17	})

newApplicationInformerAndLister

注册的增删改查回调函数,分别如下:

  • AddFunc:
  • UpdateFunc:automatedSyncEnabled
 1func (ctrl *ApplicationController) newApplicationInformerAndLister() (cache.SharedIndexInformer, applisters.ApplicationLister) {
 2	informer := cache.NewSharedIndexInformer(
 3		&cache.ListWatch{
 4			ListFunc: func(options metav1.ListOptions) (apiruntime.Object, error) {
 5				return ctrl.applicationClientset.ArgoprojV1alpha1().Applications(ctrl.namespace).List(context.TODO(), options)
 6			},
 7			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
 8				return ctrl.applicationClientset.ArgoprojV1alpha1().Applications(ctrl.namespace).Watch(context.TODO(), options)
 9			},
10		},
11		&appv1.Application{},
12		ctrl.statusRefreshTimeout,
13		cache.Indexers{
14			cache.NamespaceIndex: func(obj interface{}) ([]string, error) {
15				app, ok := obj.(*appv1.Application)
16				if ok {
17					if err := argo.ValidateDestination(context.Background(), &app.Spec.Destination, ctrl.db); err != nil {
18						ctrl.setAppCondition(app, appv1.ApplicationCondition{Type: appv1.ApplicationConditionInvalidSpecError, Message: err.Error()})
19					}
20				}
21
22				return cache.MetaNamespaceIndexFunc(obj)
23			},
24			orphanedIndex: func(obj interface{}) (i []string, e error) {
25				app, ok := obj.(*appv1.Application)
26				if !ok {
27					return nil, nil
28				}
29
30				proj, err := ctrl.getAppProj(app)
31				if err != nil {
32					return nil, nil
33				}
34				if proj.Spec.OrphanedResources != nil {
35					return []string{app.Spec.Destination.Namespace}, nil
36				}
37				return nil, nil
38			},
39		},
40	)
41	lister := applisters.NewApplicationLister(informer.GetIndexer())
42	informer.AddEventHandler(
43		cache.ResourceEventHandlerFuncs{
44			AddFunc: func(obj interface{}) {
45				if !ctrl.canProcessApp(obj) {
46					return
47				}
48				key, err := cache.MetaNamespaceKeyFunc(obj)
49				if err == nil {
50					ctrl.appRefreshQueue.Add(key)
51					ctrl.appOperationQueue.Add(key)
52				}
53			},
54			UpdateFunc: func(old, new interface{}) {
55				if !ctrl.canProcessApp(new) {
56					return
57				}
58
59				key, err := cache.MetaNamespaceKeyFunc(new)
60				if err != nil {
61					return
62				}
63				var compareWith *CompareWith
64				oldApp, oldOK := old.(*appv1.Application)
65				newApp, newOK := new.(*appv1.Application)
66				if oldOK && newOK && automatedSyncEnabled(oldApp, newApp) {
67					log.WithField("application", newApp.Name).Info("Enabled automated sync")
68					compareWith = CompareWithLatest.Pointer()
69				}
70				ctrl.requestAppRefresh(newApp.Name, compareWith, nil)
71				ctrl.appOperationQueue.Add(key)
72			},
73			DeleteFunc: func(obj interface{}) {
74				if !ctrl.canProcessApp(obj) {
75					return
76				}
77				// IndexerInformer uses a delta queue, therefore for deletes we have to use this
78				// key function.
79				key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
80				if err == nil {
81					ctrl.appRefreshQueue.Add(key)
82				}
83			},
84		},
85	)
86	return informer, lister
87}

Run函数

 1func (ctrl *ApplicationController) Run(ctx context.Context, statusProcessors int, operationProcessors int) {
 2	...
 3  // 启动informer
 4	go ctrl.appInformer.Run(ctx.Done())
 5	go ctrl.projInformer.Run(ctx.Done())
 6
 7	errors.CheckError(ctrl.stateCache.Init())
 8
 9	if !cache.WaitForCacheSync(ctx.Done(), ctrl.appInformer.HasSynced, ctrl.projInformer.HasSynced) {
10		log.Error("Timed out waiting for caches to sync")
11		return
12	}
13
14	go func() { errors.CheckError(ctrl.stateCache.Run(ctx)) }()
15	go func() { errors.CheckError(ctrl.metricsServer.ListenAndServe()) }()
16
17	for i := 0; i < statusProcessors; i++ {
18		go wait.Until(func() {
19      // app处理逻辑
20			for ctrl.processAppRefreshQueueItem() {
21			}
22		}, time.Second, ctx.Done())
23	}
24
25	for i := 0; i < operationProcessors; i++ {
26		go wait.Until(func() {
27			for ctrl.processAppOperationQueueItem() {
28			}
29		}, time.Second, ctx.Done())
30	}
31
32	go wait.Until(func() {
33		for ctrl.processAppComparisonTypeQueueItem() {
34		}
35	}, time.Second, ctx.Done())
36
37	go wait.Until(func() {
38		for ctrl.processProjectQueueItem() {
39		}
40	}, time.Second, ctx.Done())
41	<-ctx.Done()
42}

processAppRefreshQueueItem

 1func (ctrl *ApplicationController) processAppRefreshQueueItem() (processNext bool) {
 2	...
 3  // 获取到Application
 4	app := origApp.DeepCopy()
 5	...
 6	if comparisonLevel == ComparisonWithNothing {
 7		managedResources := make([]*appv1.ResourceDiff, 0)
 8    // 获取缓存
 9		if err := ctrl.cache.GetAppManagedResources(app.Name, &managedResources); err != nil {
10			logCtx.Warnf("Failed to get cached managed resources for tree reconciliation, fallback to full reconciliation")
11		} else {
12			var tree *appv1.ApplicationTree
13      // 获取资源信息
14			if tree, err = ctrl.getResourceTree(app, managedResources); err == nil {
15				app.Status.Summary = tree.GetSummary()
16				if err := ctrl.cache.SetAppResourcesTree(app.Name, tree); err != nil {
17					logCtx.Errorf("Failed to cache resources tree: %v", err)
18					return
19				}
20			}
21
22			ctrl.persistAppStatus(origApp, &app.Status)
23			return
24		}
25	}
26
27	project, hasErrors := ctrl.refreshAppConditions(app)
28	if hasErrors {
29		app.Status.Sync.Status = appv1.SyncStatusCodeUnknown
30		app.Status.Health.Status = health.HealthStatusUnknown
31		ctrl.persistAppStatus(origApp, &app.Status)
32		return
33	}
34
35	var localManifests []string
36	if opState := app.Status.OperationState; opState != nil && opState.Operation.Sync != nil {
37		localManifests = opState.Operation.Sync.Manifests
38	}
39
40	revision := app.Spec.Source.TargetRevision
41	if comparisonLevel == CompareWithRecent {
42		revision = app.Status.Sync.Revision
43	}
44
45	now := metav1.Now()
46	compareResult := ctrl.appStateManager.CompareAppState(app, project, revision, app.Spec.Source, refreshType == appv1.RefreshTypeHard, localManifests)
47	for k, v := range compareResult.timings {
48		logCtx = logCtx.WithField(k, v.Milliseconds())
49	}
50
51	ctrl.normalizeApplication(origApp, app)
52
53	tree, err := ctrl.setAppManagedResources(app, compareResult)
54	if err != nil {
55		logCtx.Errorf("Failed to cache app resources: %v", err)
56	} else {
57		app.Status.Summary = tree.GetSummary()
58	}
59
60	if project.Spec.SyncWindows.Matches(app).CanSync(false) {
61		syncErrCond := ctrl.autoSync(app, compareResult.syncStatus, compareResult.resources)
62		if syncErrCond != nil {
63			app.Status.SetConditions(
64				[]appv1.ApplicationCondition{*syncErrCond},
65				map[appv1.ApplicationConditionType]bool{appv1.ApplicationConditionSyncError: true},
66			)
67		} else {
68			app.Status.SetConditions(
69				[]appv1.ApplicationCondition{},
70				map[appv1.ApplicationConditionType]bool{appv1.ApplicationConditionSyncError: true},
71			)
72		}
73	} else {
74		logCtx.Info("Sync prevented by sync window")
75	}
76
77	if app.Status.ReconciledAt == nil || comparisonLevel == CompareWithLatest {
78		app.Status.ReconciledAt = &now
79	}
80	app.Status.Sync = *compareResult.syncStatus
81	app.Status.Health = *compareResult.healthStatus
82	app.Status.Resources = compareResult.resources
83	sort.Slice(app.Status.Resources, func(i, j int) bool {
84		return resourceStatusKey(app.Status.Resources[i]) < resourceStatusKey(app.Status.Resources[j])
85	})
86	app.Status.SourceType = compareResult.appSourceType
87	ctrl.persistAppStatus(origApp, &app.Status)
88	return
89}

processAppOperationQueueItem

processAppComparisonTypeQueueItem

processProjectQueueItem