| 阅读 | 共 1430 字,阅读约
argocd源码分析-argocd-application-controller
概述
- cluster存放在secret中
- 缓存支持两种:redis、memory
CRD
pkg/apis/application/v1alpha1/types.go
1type AppProject struct {
2 metav1.TypeMeta `json:",inline"`
3 metav1.ObjectMeta `json:"metadata" protobuf:"bytes,1,opt,name=metadata"`
4 Spec AppProjectSpec `json:"spec" protobuf:"bytes,2,opt,name=spec"`
5 Status AppProjectStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"`
6}
7
8type AppProjectSpec struct {
9 // 被用来部署的仓库地址列表信息
10 SourceRepos []string `json:"sourceRepos,omitempty" protobuf:"bytes,1,name=sourceRepos"`
11 // 目标部署列表
12 Destinations []ApplicationDestination `json:"destinations,omitempty" protobuf:"bytes,2,name=destination"`
13 // 描述
14 Description string `json:"description,omitempty" protobuf:"bytes,3,opt,name=description"`
15 // Roles are user defined RBAC roles associated with this project
16 Roles []ProjectRole `json:"roles,omitempty" protobuf:"bytes,4,rep,name=roles"`
17 // ClusterResourceWhitelist contains list of whitelisted cluster level resources
18 ClusterResourceWhitelist []metav1.GroupKind `json:"clusterResourceWhitelist,omitempty" protobuf:"bytes,5,opt,name=clusterResourceWhitelist"`
19 // NamespaceResourceBlacklist contains list of blacklisted namespace level resources
20 NamespaceResourceBlacklist []metav1.GroupKind `json:"namespaceResourceBlacklist,omitempty" protobuf:"bytes,6,opt,name=namespaceResourceBlacklist"`
21 // OrphanedResources specifies if controller should monitor orphaned resources of apps in this project
22 OrphanedResources *OrphanedResourcesMonitorSettings `json:"orphanedResources,omitempty" protobuf:"bytes,7,opt,name=orphanedResources"`
23 // SyncWindows controls when syncs can be run for apps in this project
24 SyncWindows SyncWindows `json:"syncWindows,omitempty" protobuf:"bytes,8,opt,name=syncWindows"`
25 // NamespaceResourceWhitelist contains list of whitelisted namespace level resources
26 NamespaceResourceWhitelist []metav1.GroupKind `json:"namespaceResourceWhitelist,omitempty" protobuf:"bytes,9,opt,name=namespaceResourceWhitelist"`
27 // List of PGP key IDs that commits to be synced to must be signed with
28 SignatureKeys []SignatureKey `json:"signatureKeys,omitempty" protobuf:"bytes,10,opt,name=signatureKeys"`
29 // ClusterResourceBlacklist contains list of blacklisted cluster level resources
30 ClusterResourceBlacklist []metav1.GroupKind `json:"clusterResourceBlacklist,omitempty" protobuf:"bytes,11,opt,name=clusterResourceBlacklist"`
31}
32
33// 应用部署的目标集群信息
34type ApplicationDestination struct {
35 // Server overrides the environment server value in the ksonnet app.yaml
36 Server string `json:"server,omitempty" protobuf:"bytes,1,opt,name=server"`
37 // Namespace overrides the environment namespace value in the ksonnet app.yaml
38 // 目标集群的命名空间
39 Namespace string `json:"namespace,omitempty" protobuf:"bytes,2,opt,name=namespace"`
40 // Name of the destination cluster which can be used instead of server (url) field
41 Name string `json:"name,omitempty" protobuf:"bytes,3,opt,name=name"`
42
43 // nolint:govet
44 isServerInferred bool `json:"-"`
45}
Controller
函数入口
cmd/argocd-application-controller/commands/argocd-application-controller.go
1appController, err := controller.NewApplicationController(
2 namespace,
3 settingsMgr,
4 kubeClient,
5 appClient,
6 repoClientset,
7 cache,
8 kubectl,
9 resyncDuration,
10 time.Duration(selfHealTimeoutSeconds)*time.Second,
11 metricsPort,
12 kubectlParallelismLimit,
13 clusterFilter)
14...
15go appController.Run(ctx, statusProcessors, operationProcessors)
NewApplicationController
负责注册增、删、改、查的回调函数
1projInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
2 AddFunc: func(obj interface{}) {
3 if key, err := cache.MetaNamespaceKeyFunc(obj); err == nil {
4 ctrl.projectRefreshQueue.Add(key)
5 }
6 },
7 UpdateFunc: func(old, new interface{}) {
8 if key, err := cache.MetaNamespaceKeyFunc(new); err == nil {
9 ctrl.projectRefreshQueue.Add(key)
10 }
11 },
12 DeleteFunc: func(obj interface{}) {
13 if key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj); err == nil {
14 ctrl.projectRefreshQueue.Add(key)
15 }
16 },
17 })
newApplicationInformerAndLister
注册的增删改查回调函数,分别如下:
- AddFunc:
- UpdateFunc:automatedSyncEnabled
1func (ctrl *ApplicationController) newApplicationInformerAndLister() (cache.SharedIndexInformer, applisters.ApplicationLister) {
2 informer := cache.NewSharedIndexInformer(
3 &cache.ListWatch{
4 ListFunc: func(options metav1.ListOptions) (apiruntime.Object, error) {
5 return ctrl.applicationClientset.ArgoprojV1alpha1().Applications(ctrl.namespace).List(context.TODO(), options)
6 },
7 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
8 return ctrl.applicationClientset.ArgoprojV1alpha1().Applications(ctrl.namespace).Watch(context.TODO(), options)
9 },
10 },
11 &appv1.Application{},
12 ctrl.statusRefreshTimeout,
13 cache.Indexers{
14 cache.NamespaceIndex: func(obj interface{}) ([]string, error) {
15 app, ok := obj.(*appv1.Application)
16 if ok {
17 if err := argo.ValidateDestination(context.Background(), &app.Spec.Destination, ctrl.db); err != nil {
18 ctrl.setAppCondition(app, appv1.ApplicationCondition{Type: appv1.ApplicationConditionInvalidSpecError, Message: err.Error()})
19 }
20 }
21
22 return cache.MetaNamespaceIndexFunc(obj)
23 },
24 orphanedIndex: func(obj interface{}) (i []string, e error) {
25 app, ok := obj.(*appv1.Application)
26 if !ok {
27 return nil, nil
28 }
29
30 proj, err := ctrl.getAppProj(app)
31 if err != nil {
32 return nil, nil
33 }
34 if proj.Spec.OrphanedResources != nil {
35 return []string{app.Spec.Destination.Namespace}, nil
36 }
37 return nil, nil
38 },
39 },
40 )
41 lister := applisters.NewApplicationLister(informer.GetIndexer())
42 informer.AddEventHandler(
43 cache.ResourceEventHandlerFuncs{
44 AddFunc: func(obj interface{}) {
45 if !ctrl.canProcessApp(obj) {
46 return
47 }
48 key, err := cache.MetaNamespaceKeyFunc(obj)
49 if err == nil {
50 ctrl.appRefreshQueue.Add(key)
51 ctrl.appOperationQueue.Add(key)
52 }
53 },
54 UpdateFunc: func(old, new interface{}) {
55 if !ctrl.canProcessApp(new) {
56 return
57 }
58
59 key, err := cache.MetaNamespaceKeyFunc(new)
60 if err != nil {
61 return
62 }
63 var compareWith *CompareWith
64 oldApp, oldOK := old.(*appv1.Application)
65 newApp, newOK := new.(*appv1.Application)
66 if oldOK && newOK && automatedSyncEnabled(oldApp, newApp) {
67 log.WithField("application", newApp.Name).Info("Enabled automated sync")
68 compareWith = CompareWithLatest.Pointer()
69 }
70 ctrl.requestAppRefresh(newApp.Name, compareWith, nil)
71 ctrl.appOperationQueue.Add(key)
72 },
73 DeleteFunc: func(obj interface{}) {
74 if !ctrl.canProcessApp(obj) {
75 return
76 }
77 // IndexerInformer uses a delta queue, therefore for deletes we have to use this
78 // key function.
79 key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
80 if err == nil {
81 ctrl.appRefreshQueue.Add(key)
82 }
83 },
84 },
85 )
86 return informer, lister
87}
Run函数
1func (ctrl *ApplicationController) Run(ctx context.Context, statusProcessors int, operationProcessors int) {
2 ...
3 // 启动informer
4 go ctrl.appInformer.Run(ctx.Done())
5 go ctrl.projInformer.Run(ctx.Done())
6
7 errors.CheckError(ctrl.stateCache.Init())
8
9 if !cache.WaitForCacheSync(ctx.Done(), ctrl.appInformer.HasSynced, ctrl.projInformer.HasSynced) {
10 log.Error("Timed out waiting for caches to sync")
11 return
12 }
13
14 go func() { errors.CheckError(ctrl.stateCache.Run(ctx)) }()
15 go func() { errors.CheckError(ctrl.metricsServer.ListenAndServe()) }()
16
17 for i := 0; i < statusProcessors; i++ {
18 go wait.Until(func() {
19 // app处理逻辑
20 for ctrl.processAppRefreshQueueItem() {
21 }
22 }, time.Second, ctx.Done())
23 }
24
25 for i := 0; i < operationProcessors; i++ {
26 go wait.Until(func() {
27 for ctrl.processAppOperationQueueItem() {
28 }
29 }, time.Second, ctx.Done())
30 }
31
32 go wait.Until(func() {
33 for ctrl.processAppComparisonTypeQueueItem() {
34 }
35 }, time.Second, ctx.Done())
36
37 go wait.Until(func() {
38 for ctrl.processProjectQueueItem() {
39 }
40 }, time.Second, ctx.Done())
41 <-ctx.Done()
42}
processAppRefreshQueueItem
1func (ctrl *ApplicationController) processAppRefreshQueueItem() (processNext bool) {
2 ...
3 // 获取到Application
4 app := origApp.DeepCopy()
5 ...
6 if comparisonLevel == ComparisonWithNothing {
7 managedResources := make([]*appv1.ResourceDiff, 0)
8 // 获取缓存
9 if err := ctrl.cache.GetAppManagedResources(app.Name, &managedResources); err != nil {
10 logCtx.Warnf("Failed to get cached managed resources for tree reconciliation, fallback to full reconciliation")
11 } else {
12 var tree *appv1.ApplicationTree
13 // 获取资源信息
14 if tree, err = ctrl.getResourceTree(app, managedResources); err == nil {
15 app.Status.Summary = tree.GetSummary()
16 if err := ctrl.cache.SetAppResourcesTree(app.Name, tree); err != nil {
17 logCtx.Errorf("Failed to cache resources tree: %v", err)
18 return
19 }
20 }
21
22 ctrl.persistAppStatus(origApp, &app.Status)
23 return
24 }
25 }
26
27 project, hasErrors := ctrl.refreshAppConditions(app)
28 if hasErrors {
29 app.Status.Sync.Status = appv1.SyncStatusCodeUnknown
30 app.Status.Health.Status = health.HealthStatusUnknown
31 ctrl.persistAppStatus(origApp, &app.Status)
32 return
33 }
34
35 var localManifests []string
36 if opState := app.Status.OperationState; opState != nil && opState.Operation.Sync != nil {
37 localManifests = opState.Operation.Sync.Manifests
38 }
39
40 revision := app.Spec.Source.TargetRevision
41 if comparisonLevel == CompareWithRecent {
42 revision = app.Status.Sync.Revision
43 }
44
45 now := metav1.Now()
46 compareResult := ctrl.appStateManager.CompareAppState(app, project, revision, app.Spec.Source, refreshType == appv1.RefreshTypeHard, localManifests)
47 for k, v := range compareResult.timings {
48 logCtx = logCtx.WithField(k, v.Milliseconds())
49 }
50
51 ctrl.normalizeApplication(origApp, app)
52
53 tree, err := ctrl.setAppManagedResources(app, compareResult)
54 if err != nil {
55 logCtx.Errorf("Failed to cache app resources: %v", err)
56 } else {
57 app.Status.Summary = tree.GetSummary()
58 }
59
60 if project.Spec.SyncWindows.Matches(app).CanSync(false) {
61 syncErrCond := ctrl.autoSync(app, compareResult.syncStatus, compareResult.resources)
62 if syncErrCond != nil {
63 app.Status.SetConditions(
64 []appv1.ApplicationCondition{*syncErrCond},
65 map[appv1.ApplicationConditionType]bool{appv1.ApplicationConditionSyncError: true},
66 )
67 } else {
68 app.Status.SetConditions(
69 []appv1.ApplicationCondition{},
70 map[appv1.ApplicationConditionType]bool{appv1.ApplicationConditionSyncError: true},
71 )
72 }
73 } else {
74 logCtx.Info("Sync prevented by sync window")
75 }
76
77 if app.Status.ReconciledAt == nil || comparisonLevel == CompareWithLatest {
78 app.Status.ReconciledAt = &now
79 }
80 app.Status.Sync = *compareResult.syncStatus
81 app.Status.Health = *compareResult.healthStatus
82 app.Status.Resources = compareResult.resources
83 sort.Slice(app.Status.Resources, func(i, j int) bool {
84 return resourceStatusKey(app.Status.Resources[i]) < resourceStatusKey(app.Status.Resources[j])
85 })
86 app.Status.SourceType = compareResult.appSourceType
87 ctrl.persistAppStatus(origApp, &app.Status)
88 return
89}