| 阅读 |,阅读约 15 分钟
| 复制链接:

Istio源码分析-pilot-discovery

概述

pilot-discovery扮演服务注册中心、istio控制平面到Envoy之间的桥梁作用。pilot-discovery的主要功能包括:

  1. 监控服务注册中心(如Kubernetes)的服务注册情况。在Kubernetes环境下,会监控serviceendpointpodnode等资源信息
  2. 监控istio控制面信息变化,在Kubernetes环境下,会监控包括RouteRuleVirtualServiceGatewayEgressRuleServiceEntry等以Kubernetes CRD形式存在的istio控制面配置信息。
  3. 将上述两类信息合并组合为Envoy可以理解的(即遵循Envoy data plane api的)配置信息,并将这些信息以gRPC协议提供给Envoy

功能概述

  • 初始化
  • istio控制面信息监控与处理
  • 服务注册信息监控与处理
  • Envoy控制面信息服务

初始化

  • 初始化kubeclient
  • 初始化xdsserver
  • 初始化controller
    • 初始化configController
    • 初始化serviceController
  • 初始化discoveryservice

istio控制面信息监控与处理

服务注册信息监控与处理

Envoy控制面信息服务

pilot-discovery创建 envoy xds server对外提供grpc协议discovery服务

源码分析

启动入口

代码路径:istio/pilot/cmd/pilot-discovery/,包括两个文件

  • main.go:定义了根命令及discovery命令,是启动服务发现和配置下发的主流程
  • request.go:用来请求pilot中的metrics/debug接口,多用来调试

支持的子命令

  • discovery:启动代理发现服务
  • collateral:
  • request:
  • version:

discovery命令

istio/pilot/cmd/pilot-discovery/main.go

 1
 2// 创建一个空的channel,用来停止所有的servers
 3stop := make(chan struct{})
 4// 创建服务发现的server
 5discoveryServer, err := bootstrap.NewServer(serverArgs)
 6...
 7// 运行server中注册的所有服务
 8if err := discoveryServer.Start(stop); err != nil {
 9    return fmt.Errorf("failed to start discovery service: %v", err)
10}
11// 等待退出信号并关闭channel
12cmd.WaitSignal(stop)
13

request命令

 1// 请求 15014 端口数据
 2command := &request.Command{
 3    Address: "127.0.0.1:15014",
 4    Client: &http.Client{
 5        Timeout: 60 * time.Second,
 6    },
 7}
 8body := ""
 9if len(args) > 2 {
10    body = args[2]
11}
12return command.Do(args[0], args[1], body)

Server

bootstrap.NewServer返回的对象是Server,看下Server的数据结构

pilot/pkg/bootstrap/server.go

 1type Server struct {
 2    // xds 服务
 3    XDSServer *xds.DiscoveryServer
 4	// 集群id
 5    clusterID   string
 6    // pilot 环境所需的 API 集合
 7    environment *model.Environment
 8	...
 9    // 处理 k8s 多个集群的注册中心
10    // istio支持使用一个istio control plane来管理跨多个Kubernetes集群上的service mesh
11    // stio的控制平面组件(如pilot-discovery)运行所在的Kubernetes集群叫本地集群,通过这个istio控制面板连接的其他Kubernetes集群叫远程集群(remote cluster)
12    multicluster *kubecontroller.Multicluster
13    // 统一处理配置数据(比如VirtualService等)的 controller
14    configController  model.ConfigStoreCache
15    // 不同配置信息的缓存器,提供Get、List、Create方法
16    ConfigStores      []model.ConfigStoreCache
17    // 单独处理 ServiceEntry 的 Controller
18    serviceEntryStore *serviceentry.ServiceEntryStore
19	...
20    // 文件监听器,主要监听配置文件
21    // fileWatcher used to watch mesh config, networks and certificates.
22    fileWatcher filewatcher.FileWatcher
23	...
24}

NewServer

 1func NewServer(args *PilotArgs) (*Server, error) {
 2    // Environment 初始化
 3	e := &model.Environment{
 4		PushContext:  model.NewPushContext(),
 5		DomainSuffix: args.RegistryOptions.KubeOptions.DomainSuffix,
 6	}
 7	...
 8	s := &Server{
 9		clusterID:           getClusterID(args),
10		environment:         e,
11		fileWatcher:         filewatcher.NewWatcher(),
12		httpMux:             http.NewServeMux(),
13		monitoringMux:       http.NewServeMux(),
14		readinessProbes:     make(map[string]readinessProbe),
15		workloadTrustBundle: tb.NewTrustBundle(nil),
16		server:              server.New(),
17	}
18	// XDS server 初始化
19	s.XDSServer = xds.NewDiscoveryServer(e, args.Plugins, args.PodName, args.Namespace)
20	...
21	// Apply the arguments to the configuration.
22	if err := s.initKubeClient(args); err != nil {
23		return nil, fmt.Errorf("error initializing kube client: %v", err)
24	}
25
26    // mesh config 初始化
27	s.initMeshConfiguration(args, s.fileWatcher)
28	spiffe.SetTrustDomain(s.environment.Mesh().GetTrustDomain())
29
30    // mesh network 初始化
31	s.initMeshNetworks(args, s.fileWatcher)
32    // mesh handler 初始化
33	s.initMeshHandlers()
34	s.environment.Init()
35	...
36    // controller 初始化
37	if err := s.initControllers(args); err != nil {
38		return nil, err
39	}
40	...
41    // sidercar 自动注入初始化
42	wh, err := s.initSidecarInjector(args)
43	...
44    // 初始化 RegistryEnventHandler
45	s.initRegistryEventHandlers()
46
47    // 初始化服务发现
48	s.initDiscoveryService(args)
49	s.initSDSServer(args)
50	...
51    // 最后启动 kubeclient 的运行和监听
52	if s.kubeClient != nil {
53		s.addStartFunc(func(stop <-chan struct{}) error {
54			s.kubeClient.RunAndWait(stop)
55			return nil
56		})
57	}
58    ...
59}

初始化Environment

Environment为pilot提供了一个汇总的、运行中所需的API集合

pilot/pkg/model/context.go

 1type Environment struct {
 2    // 服务发现的接口,用于列出 services 和 instances
 3	ServiceDiscovery
 4    // 配置接口,用于列出配置规则
 5	IstioConfigStore
 6    // 下面两个监听器负责监听istiod启动时挂载的两个配置文件,通过configmap挂载进去
 7    // 监听到配置文件变化时,运行预先注册的handler
 8    // mesh config 文件的监听器
 9	mesh.Watcher
10    // mesh network config 文件的监听器
11	mesh.NetworksWatcher
12    // 持有推送xds信息期间的上下文信息
13	PushContext *PushContext
14	// 默认的后缀域名                       
15	DomainSuffix string
16	...
17}

Environment初始化

 1// 初始化 Environment
 2e := &model.Environment{
 3		PushContext:  model.NewPushContext(),
 4		DomainSuffix: args.RegistryOptions.KubeOptions.DomainSuffix,
 5	}
 6
 7// 初始化 PushContext,并传给 Environment
 8func NewPushContext() *PushContext {
 9	// TODO: detect push in progress, don't update status if set
10	return &PushContext{
11        // 存储所有 Service 的 map
12		ServiceIndex:            newServiceIndex(),
13        // 存储所有 VirtualService 的 map
14		virtualServiceIndex:     newVirtualServiceIndex(),
15        // 存储所有 DestinationRule 的 map
16		destinationRuleIndex:    newDestinationRuleIndex(),
17		sidecarsByNamespace:     map[string][]*SidecarScope{},
18		envoyFiltersByNamespace: map[string][]*EnvoyFilterWrapper{},
19        // 存储所有 Gateway 的 map
20		gatewayIndex:            newGatewayIndex(),
21		ProxyStatus:             map[string]map[string]ProxyPushStatus{},
22		ServiceAccounts:         map[host.Name]map[int][]string{},
23	}
24}

初始化聚合所有注册中心的controller

 1ac := aggregate.NewController(aggregate.Options{
 2		MeshHolder: e,
 3	})
 4// 初始化后的聚合服务,传给Environment
 5e.ServiceDiscovery = ac
 6
 7// 创建一个聚合的controller
 8func NewController(opt Options) *Controller {
 9	return &Controller{
10		registries: make([]serviceregistry.Instance, 0),
11		meshHolder: opt.MeshHolder,
12		running:    atomic.NewBool(false),
13	}
14}

初始化XDSServer

xds服务类是 DiscoveryServer

1s.XDSServer = xds.NewDiscoveryServer(e, args.Plugins, args.PodName, args.Namespace)

DiscoveryServer

pilot/pkg/xds/discovery.go

 1type DiscoveryServer struct {
 2    // 前面介绍的 Environment
 3    Env *model.Environment
 4    // 用于调试和加载测试数据
 5    MemRegistry *memory.ServiceDiscovery
 6    // 控制面 Istio 配置生成器
 7    ConfigGenerator core.ConfigGenerator
 8    // 针对不同配置的定制化生成器
 9    // key 是生成器类型
10    Generators map[string]model.XdsResourceGenerator
11    ...
12    // 不同服务的所有实例集合
13    // 增林更新,key 为 service、namespace
14    EndpointShardsByService map[string]map[string]*EndpointShards
15	// 接受push请求的channel
16    pushChannel chan *model.PushRequest
17
18    // mutex used for config update scheduling (former cache update mutex)
19    updateMutex sync.RWMutex
20
21    // push xds 之前的缓冲队列
22    pushQueue *PushQueue
23
24    // debugHandlers is the list of all the supported debug handlers.
25    debugHandlers map[string]string
26
27    // adsClients reflect active gRPC channels, for both ADS and EDS.
28    // xds 和 eds 之间的 grpc 连接
29    adsClients      map[string]*Connection
30
31    // StatusGen is notified of connect/disconnect/nack on all connections
32    // 状态更新生成器
33    // 状态更新后向所有 connection 推送 DiscoveryResponse
34    StatusGen               *StatusGen
35    WorkloadEntryController *workloadentry.Controller
36
37    // serverReady indicates caches have been synced up and server is ready to process requests.
38    // 缓存已同步,可以接收请求
39    serverReady atomic.Bool
40    ...
41    // XDS 资源缓存
42    Cache model.XdsCache
43    ...
44}

初始化kubeclient

根据服务注册中心配置是否包含k8s创建kubeClient,保存在Server.kubeClient成员中,有两种创建方式:

  • 用户提供kubeConfig文件,kubeconfig参数指定,默认为空
  • 集群内模式,通过自动感知pod所在环境,自动完成配置
1if err := s.initKubeClient(args); err != nil {
2    return nil, fmt.Errorf("error initializing kube client: %v", err)
3}

初始化 mesh config & mesh network

通过 fileWatcher 对 Istiod 从 configmap 中挂载的两个配置文件 mesh 和 meshNetworks 进行监听,当配置文件发生变化时重载配置并触发相应的handlers

filewatcher 在单独的项目中,底层用到了 fsnotify 库来推送文件变化事件

1s.initMeshConfiguration(args, s.fileWatcher)
2s.initMeshNetworks(args, s.fileWatcher)

Mesh配置由MeshConfig结构体定义

初始化 mesh handler

为前面介绍的 mesh 和 meshNetworks 配置文件注册两个handler,当配置文件发生变化时触发全量xDS下发

1s.initMeshHandlers

初始化 controller(核心)

1if err := s.initControllers(args); err != nil {
2    return nil, err
3}

初始化了三种控制器:

  • 证书处理控制器(不重要)
  • 配置信息控制器:当CRD资源变更时,触发回调函数,下发xds
  • 注册信息控制器:当k8s原生资源发生变更时,触发回调函数,下发xds
 1func (s *Server) initControllers(args *PilotArgs) error {
 2	log.Info("initializing controllers")
 3	// Certificate controller is created before MCP controller in case MCP server pod
 4	// waits to mount a certificate to be provisioned by the certificate controller.
 5	if err := s.initCertController(args); err != nil {
 6		return fmt.Errorf("error initializing certificate controller: %v", err)
 7	}
 8	if err := s.initConfigController(args); err != nil {
 9		return fmt.Errorf("error initializing config controller: %v", err)
10	}
11	if err := s.initServiceControllers(args); err != nil {
12		return fmt.Errorf("error initializing service controllers: %v", err)
13	}
14	return nil
15}

配置信息控制器初始化

  • 配置信息是系列CRD,比如 VirtualService、DestinationRule
  • 配置信息的来源有三种:
    • k8s原生服务注册中心
    • MCP协议的服务注册中心
    • 本地文件服务注册中心
  • 配置信息保存在ConfigStore对象中
  • 这一步主要是通过client-go的informer机制注册CRD资源的Add、Update、Delete事件处理函数
 1func (s *Server) initConfigController(args *PilotArgs) error {
 2  s.initStatusController(args, features.EnableStatus)
 3	meshConfig := s.environment.Mesh()
 4	if len(meshConfig.ConfigSources) > 0 {
 5    // 处理 MCP 配置
 6		if err := s.initConfigSources(args); err != nil {
 7			return err
 8		}
 9	} else if args.RegistryOptions.FileDir != "" {
10    // 处理本地文件
11		store := memory.Make(collections.Pilot)
12		configController := memory.NewController(store)
13
14		err := s.makeFileMonitor(args.RegistryOptions.FileDir, args.RegistryOptions.KubeOptions.DomainSuffix, configController)
15		if err != nil {
16			return err
17		}
18		s.ConfigStores = append(s.ConfigStores, configController)
19	} else {
20    // 初始化 k8s 配置存储
21		err2 := s.initK8SConfigStore(args)
22		if err2 != nil {
23			return err2
24		}
25	}
26	
27	...
28    // 将所有的 configStore 聚合并缓存
29	aggregateConfigController, err := configaggregate.MakeCache(s.ConfigStores)
30	// 通过 configController 统一操作 configStore
31	s.configController = aggregateConfigController
32  
33  // 初始化 IstioConfigStore 对象
34	// configStore 保存到 Environment 中
35	s.environment.IstioConfigStore = model.MakeIstioStore(s.configController)
36
37    // 当服务创建成功后,会启动controller
38	s.addStartFunc(func(stop <-chan struct{}) error {
39        // 注册启动函数到 StartFunc中
40		go s.configController.Run(stop)
41		return nil
42	})
43
44	return nil
45}
  • 读取所有的CRD资源,获取Infromer,并注册 AddHandler、UpdateHandler、DeleteHandler。当这些CRD资源发生变更时,触发相应的handler
 1func (s *Server) initK8SConfigStore(args *PilotArgs) error {
 2	// 初始化 config controller
 3	configController, err := s.makeKubeConfigController(args)
 4	...
 5}
 6
 7// makeKubeConfigController
 8func (s *Server) makeKubeConfigController(args *PilotArgs) (model.ConfigStoreCache, error) {
 9	// 初始化一个处理 Istio CRD 的 client
10    c, err := crdclient.New(s.kubeClient, args.Revision, args.RegistryOptions.KubeOptions.DomainSuffix)
11	...
12}
13
14// 初始化 Client
15func New(client kube.Client, revision, domainSuffix string) (model.ConfigStoreCache, error) {
16	...
17	return NewForSchemas(client, revision, domainSuffix, schemas)
18}
19
20// NewForSchemas
21func NewForSchemas(client kube.Client, revision, domainSuffix string, schemas collection.Schemas) (model.ConfigStoreCache, error) {
22    // 创建 client 对象
23	out := &Client{
24		domainSuffix:     domainSuffix,
25		schemas:          schemas,
26		revision:         revision,
27		queue:            queue.NewQueue(1 * time.Second),
28		kinds:            map[config.GroupVersionKind]*cacheHandler{},
29		istioClient:      client.Istio(),
30		gatewayAPIClient: client.GatewayAPI(),
31	}
32    // 获取所有的 CRD
33	known := knownCRDs(client.Ext())
34	for _, s := range out.schemas.All() {
35        ...
36		if _, f := known[name]; f {
37			...
38             // 为每种 CRD 创建 handler
39			out.kinds[s.Resource().GroupVersionKind()] = createCacheHandler(out, s, i)
40		} 
41	}
42	...
43}
44
45// 监听 infomer, 注册增、删、改、查的事件处理函数
46func createCacheHandler(cl *Client, schema collection.Schema, i informers.GenericInformer) *cacheHandler {
47	...
48	i.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
49        // 新增事件
50		AddFunc: func(obj interface{}) {
51			incrementEvent(kind, "add")
52			cl.queue.Push(func() error {
53				return h.onEvent(nil, obj, model.EventAdd)
54			})
55		},
56        // 更新事件
57		UpdateFunc: func(old, cur interface{}) {
58			if !reflect.DeepEqual(old, cur) {
59				incrementEvent(kind, "update")
60				cl.queue.Push(func() error {
61					return h.onEvent(old, cur, model.EventUpdate)
62				})
63			} else {
64				incrementEvent(kind, "updatesame")
65			}
66		},
67        // 删除事件
68		DeleteFunc: func(obj interface{}) {
69			incrementEvent(kind, "delete")
70			cl.queue.Push(func() error {
71				return h.onEvent(nil, obj, model.EventDelete)
72			})
73		},
74	})
75	return h
76}

服务注册控制器初始化

 1func (s *Server) initServiceControllers(args *PilotArgs) error {
 2    // 从之前初始化的 Environment.ServiceDiscovery 中获取已经注册的服务中心(比如k8s默认就有)
 3	serviceControllers := s.ServiceController()
 4
 5    // 初始化 k8s 集群之外的服务,这些服务是通过 ServiceEntry 注册到控制面的
 6    // 所有的 ServiceEntry 配置目前还在前面的介绍的 configController 中
 7    // 将 ServiceEntry 初始化一个 ServiceEntry 注册中心,
 8	s.serviceEntryStore = serviceentry.NewServiceDiscovery(s.configController, s.environment.IstioConfigStore, s.XDSServer)
 9    // 将 serviceEntry 注册中心加入到 ServiceController
10	serviceControllers.AddRegistry(s.serviceEntryStore)
11
12	registered := make(map[serviceregistry.ProviderID]bool)
13	for _, r := range args.RegistryOptions.Registries {
14		serviceRegistry := serviceregistry.ProviderID(r)
15		...
16		switch serviceRegistry {
17         // 如果注册中心是 k8s,执行 initKubeRegistry 方法
18		case serviceregistry.Kubernetes:
19			if err := s.initKubeRegistry(args); err != nil {
20				return err
21            }
22            ...
23	}
24
25    // 注册 StartFunc,使得 Serveice 启动后,执行 Run 方法
26	s.addStartFunc(func(stop <-chan struct{}) error {
27		go serviceControllers.Run(stop)
28		return nil
29	})
30
31	return nil
32}

initKubeRegistry

 1func (s *Server) initKubeRegistry(args *PilotArgs) (err error) {
 2	...
 3	// initialize the "main" cluster registry before starting controllers for remote clusters
 4	if err := mc.AddMemberCluster(s.kubeClient, args.RegistryOptions.KubeOptions.ClusterID); err != nil {
 5        ...
 6    }
 7}
 8
 9func (m *Multicluster) AddMemberCluster(client kubelib.Client, clusterID string) error {
10	...
11    // 为k8s原生资源创建controller
12	kubeRegistry := NewController(client, options)
13	m.serviceController.AddRegistry(kubeRegistry)
14    ...
15}

Controller初始化

 1func NewController(kubeClient kubelib.Client, options Options) *Controller {
 2	// The queue requires a time duration for a retry delay after a handler error
 3	c := &Controller{
 4		...
 5	}
 6    ...
 7	c.initDiscoveryHandlers(kubeClient, options.EndpointMode, options.MeshWatcher, c.discoveryNamespacesFilter)
 8	// 对 Service 资源初始化 Informer,Lister、Map
 9    // 各类hander在informer监听到资源变更时推送事件到queue,再更新对应的map
10	c.serviceInformer = filter.NewFilteredSharedIndexInformer(c.discoveryNamespacesFilter.Filter, kubeClient.KubeInformer().Core().V1().Services().Informer())
11	c.serviceLister = listerv1.NewServiceLister(c.serviceInformer.GetIndexer())
12	registerHandlers(c.serviceInformer, c.queue, "Services", c.onServiceEvent, nil)
13	...
14    // node 处理
15	c.nodeInformer = kubeClient.KubeInformer().Core().V1().Nodes().Informer()
16	c.nodeLister = kubeClient.KubeInformer().Core().V1().Nodes().Lister()
17	registerHandlers(c.nodeInformer, c.queue, "Nodes", c.onNodeEvent, nil)
18	// pod 处理
19	podInformer := filter.NewFilteredSharedIndexInformer(c.discoveryNamespacesFilter.Filter, kubeClient.KubeInformer().Core().V1().Pods().Informer())
20	...
21	registerHandlers(c.pods.informer, c.queue, "Pods", c.pods.onEvent, nil)
22	return c
23}

初始化RegistryEventHanders

这个函数为配置或服务变更时,设置三个事件处理器:

  • serviceHander:服务本身发生变化时,触发 xds 全量下发,所有与该服务相关的代理都会受到推送
  • instanceHander:实例变动触发xds全量下发,不过仅在链接Consul时有效。k8s和MCP的handler在别的地方注册
  • configHandler:

三个事件处理都是构造 PushRequest 请求,并加入到 XDSServeice 的 pushChannel中

 1func (s *Server) initRegistryEventHandlers() {
 2	log.Info("initializing registry event handlers")
 3	// Flush cached discovery responses whenever services configuration change.
 4    // 服务本身发生变化时,触发xds全量下发
 5	serviceHandler := func(svc *model.Service, _ model.Event) {
 6        // 构造下发对象 PushRequest
 7		pushReq := &model.PushRequest{
 8			Full: true,
 9			ConfigsUpdated: map[model.ConfigKey]struct{}{{
10				Kind:      gvk.ServiceEntry,
11				Name:      string(svc.Hostname),
12				Namespace: svc.Attributes.Namespace,
13			}: {}},
14			Reason: []model.TriggerReason{model.ServiceUpdate},
15		}
16        // 下发 xds
17		s.XDSServer.ConfigUpdate(pushReq)
18	}
19	s.ServiceController().AppendServiceHandler(serviceHandler)
20
21    // 前面初始化的配置对象(VirtualService、DestinationRule)
22    // 这些配置的变化也会触发XDS的全量下发,所有与该配置有关的代理都会收到推送
23    // 不过 ServiceEntry 和 WorkloadEntry 除外,这两个资源的配置是由 ServiceEntryStore 管理的
24	if s.configController != nil {
25		configHandler := func(old config.Config, curr config.Config, event model.Event) {
26			pushReq := &model.PushRequest{
27				Full: true,
28				ConfigsUpdated: map[model.ConfigKey]struct{}{{
29					Kind:      curr.GroupVersionKind,
30					Name:      curr.Name,
31					Namespace: curr.Namespace,
32				}: {}},
33				Reason: []model.TriggerReason{model.ConfigUpdate},
34			}
35			s.XDSServer.ConfigUpdate(pushReq)
36			if event != model.EventDelete {
37				s.statusReporter.AddInProgressResource(curr)
38			} else {
39				s.statusReporter.DeleteInProgressResource(curr)
40			}
41		}
42		schemas := collections.Pilot.All()
43		if features.EnableServiceApis {
44			schemas = collections.PilotServiceApi.All()
45		}
46        // 下面是跳过 ServiceEntry 和 WorkloadEntry 的逻辑
47		for _, schema := range schemas {
48			// This resource type was handled in external/servicediscovery.go, no need to rehandle here.
49			if schema.Resource().GroupVersionKind() == collections.IstioNetworkingV1Alpha3Serviceentries.
50				Resource().GroupVersionKind() {
51				continue
52			}
53			if schema.Resource().GroupVersionKind() == collections.IstioNetworkingV1Alpha3Workloadentries.
54				Resource().GroupVersionKind() {
55				continue
56			}
57			if schema.Resource().GroupVersionKind() == collections.IstioNetworkingV1Alpha3Workloadgroups.
58				Resource().GroupVersionKind() {
59				continue
60			}
61
62			s.configController.RegisterEventHandler(schema.Resource().GroupVersionKind(), configHandler)
63		}
64	}
65}

初始化DiscoveryServevice

sidecar通过连接pilot-discovery中的discovery-service服务获取服务注册情况、流量控制策略等控制面的控制信息。

DiscoveryServer实际上实现了AggregatedDiscoveryServiceServer接口

具体初始化包括以下步骤:

  • 创建对外提供Rest协议的discovery service对象
  • 创建对外提供grpc协议的 envoy xds server
 1func (s *Server) initDiscoveryService(args *PilotArgs) {
 2    ...
 3	s.addStartFunc(func(stop <-chan struct{}) error {
 4		log.Infof("Starting ADS server")
 5    // 注册 XDSServer.Start() 方法到 startFunc中
 6		s.XDSServer.Start(stop)
 7		return nil
 8	})
 9
10    // 初始化GRPC服务
11	s.initGrpcServer(args.KeepaliveOptions)
12	...
13  // 启动http服务
14  err := m.Serve()
15}

initGrpcServer

 1
 2// 初始化 GRPC
 3func (s *Server) initGrpcServer(options *istiokeepalive.Options) {
 4	grpcOptions := s.grpcServerOptions(options)
 5	s.grpcServer = grpc.NewServer(grpcOptions...)
 6	s.XDSServer.Register(s.grpcServer)
 7	reflection.Register(s.grpcServer)
 8}
 9
10// 向 Grpc 注册服务
11// ADS 的 gRPC 服务包含两个流式方法,一个是全量推送,一个是增量推送。
12func (s *DiscoveryServer) Register(rpcs *grpc.Server) {
13	// Register v3 server
14	discovery.RegisterAggregatedDiscoveryServiceServer(rpcs, s)
15}
16
17// 调用 grpc 的实现
18func RegisterAggregatedDiscoveryServiceServer(s *grpc.Server, srv AggregatedDiscoveryServiceServer) {
19	s.RegisterService(&_AggregatedDiscoveryService_serviceDesc, srv)
20}
21

Start 方法

1func (s *DiscoveryServer) Start(stopCh <-chan struct{}) {
2	go s.WorkloadEntryController.Run(stopCh)
3  // 处理收到的请求以及防抖
4	go s.handleUpdates(stopCh)
5	go s.periodicRefreshMetrics(stopCh)
6  // 负责具体的推送
7	go s.sendPushes(stopCh)
8}

handleUpdates

负责处理 pushChannel 中收到的推送请求及防抖

 1func (s *DiscoveryServer) handleUpdates(stopCh <-chan struct{}) {
 2	debounce(s.pushChannel, stopCh, s.debounceOptions, s.Push, s.CommittedUpdates)
 3}
 4
 5// debounce 防抖,这个函数主要是处理防抖的
 6func debounce(ch chan *model.PushRequest, stopCh <-chan struct{}, opts debounceOptions, pushFn func(req *model.PushRequest), updateSent *atomic.Int64) {
 7	var timeChan <-chan time.Time
 8	var startDebounce time.Time
 9	var lastConfigUpdateTime time.Time
10
11	pushCounter := 0
12	debouncedEvents := 0
13
14	// Keeps track of the push requests. If updates are debounce they will be merged.
15	var req *model.PushRequest
16
17	free := true
18	freeCh := make(chan struct{}, 1)
19
20  // push() 方法也是 debounce 方法中包装的一个过程函数,
21  // 它会在真正的 pushFn() 完成后向 freeCh 发送消息表示这次防抖处理完成了,可以开始下一次防抖
22	push := func(req *model.PushRequest, debouncedEvents int) {
23		pushFn(req)
24		updateSent.Add(int64(debouncedEvents))
25		freeCh <- struct{}{}
26	}
27
28  // 防抖的判断逻辑
29  // 当事件的延迟时间大于等于最大延迟时间或静默时间大于等于最小静默时间,才会执行 push() 方法。
30	pushWorker := func() {
31		eventDelay := time.Since(startDebounce)
32		quietTime := time.Since(lastConfigUpdateTime)
33		// it has been too long or quiet enough
34		if eventDelay >= opts.debounceMax || quietTime >= opts.debounceAfter {
35			if req != nil {
36				pushCounter++
37				adsLog.Infof("Push debounce stable[%d] %d: %v since last change, %v since last push, full=%v",
38					pushCounter, debouncedEvents,
39					quietTime, eventDelay, req.Full)
40
41				free = false
42				go push(req, debouncedEvents)
43				req = nil
44				debouncedEvents = 0
45			}
46		} else {
47			timeChan = time.After(opts.debounceAfter - quietTime)
48		}
49	}
50
51  // 等待 channel 的处理逻辑
52	for {
53		select {
54		case <-freeCh:
55			free = true
56			pushWorker()
57    // 收到 pushRequest请求
58		case r := <-ch:
59			// If reason is not set, record it as an unknown reason
60			if len(r.Reason) == 0 {
61				r.Reason = []model.TriggerReason{model.UnknownTrigger}
62			}
63			if !opts.enableEDSDebounce && !r.Full {
64				// trigger push now, just for EDS
65				go pushFn(r)
66				continue
67			}
68      
69			lastConfigUpdateTime = time.Now()
70      // 收到第一个 pushRequest请求,延时一个最小静默时间
71			if debouncedEvents == 0 {
72				timeChan = time.After(opts.debounceAfter)
73				startDebounce = lastConfigUpdateTime
74			}
75      // 累积防抖事件数
76			debouncedEvents++
77      // 收到的请求进行合并
78			req = req.Merge(r)
79    // 最小静默时间结束
80		case <-timeChan:
81      // 判断是否有在进行防抖的操作
82			if free {
83        // 没有的话做一次防抖判断,看是否需要推送
84				pushWorker()
85			}
86		case <-stopCh:
87			return
88		}
89	}
90}

合并Push请求函数

 1// pilot/pkg/model/push_context.go
 2func (first *PushRequest) Merge(other *PushRequest) *PushRequest {
 3	if first == nil {
 4		return other
 5	}
 6	if other == nil {
 7		return first
 8	}
 9
10	reason := make([]TriggerReason, 0, len(first.Reason)+len(other.Reason))
11	reason = append(reason, first.Reason...)
12	reason = append(reason, other.Reason...)
13  
14	merged := &PushRequest{
15		// Keep the first (older) start time
16    // 保持最旧的请求的开始时间
17		Start: first.Start,
18
19    // 如果有一个是全量,合并后必须是全量
20		Full: first.Full || other.Full,
21
22    // context 信息使用最新请求的
23		Push: other.Push,
24
25		// Merge the two reasons. Note that we shouldn't deduplicate here, or we would under count
26    // 触发推送请求的原因
27		Reason: reason,
28	}
29
30	// Do not merge when any one is empty
31	if len(first.ConfigsUpdated) > 0 && len(other.ConfigsUpdated) > 0 {
32		merged.ConfigsUpdated = make(map[ConfigKey]struct{}, len(first.ConfigsUpdated)+len(other.ConfigsUpdated))
33		for conf := range first.ConfigsUpdated {
34			merged.ConfigsUpdated[conf] = struct{}{}
35		}
36		for conf := range other.ConfigsUpdated {
37			merged.ConfigsUpdated[conf] = struct{}{}
38		}
39	}
40
41	return merged
42}

推送请求的原因

 1type TriggerReason string
 2
 3const (
 4	// Describes a push triggered by an Endpoint change
 5	EndpointUpdate TriggerReason = "endpoint"
 6	// Describes a push triggered by a config (generally and Istio CRD) change.
 7	ConfigUpdate TriggerReason = "config"
 8	// Describes a push triggered by a Service change
 9	ServiceUpdate TriggerReason = "service"
10	// Describes a push triggered by a change to an individual proxy (such as label change)
11	ProxyUpdate TriggerReason = "proxy"
12	// Describes a push triggered by a change to global config, such as mesh config
13	GlobalUpdate TriggerReason = "global"
14	// Describes a push triggered by an unknown reason
15	UnknownTrigger TriggerReason = "unknown"
16	// Describes a push triggered for debugging
17	DebugTrigger TriggerReason = "debug"
18	// Describes a push triggered for a Secret change
19	SecretTrigger TriggerReason = "secret"
20	// Describes a push triggered for Networks change
21	NetworksTrigger TriggerReason = "networks"
22)

Push方法

 1func (s *DiscoveryServer) Push(req *model.PushRequest) {
 2	...
 3  // 调用 AdsPushAll,
 4	s.AdsPushAll(versionLocal, req)
 5}
 6
 7func (s *DiscoveryServer) AdsPushAll(version string, req *model.PushRequest) {
 8	...
 9  // 调用 startPush, 将PushRequest重新放入队列
10	s.startPush(req)
11}
12
13func (s *DiscoveryServer) startPush(req *model.PushRequest) {
14	...
15	for _, p := range s.AllClients() {
16    // 加入队列
17		s.pushQueue.Enqueue(p, req)
18	}
19}

PushQueue

当所有的 PushRequest 经过防抖等一系列处理后,重新入队到 pushQueue ,这时在 EnvoyXdsServer 启动时创建的协程 sendPushes 就开始工作了。

 1type PushQueue struct {
 2	cond *sync.Cond
 3
 4  // 保存了所有代理 grpc 的 PushRequest
 5  // 相同的的pushRequest入队列会被合并
 6	pending map[*Connection]*model.PushRequest
 7
 8	queue []*Connection
 9
10	// processing stores all connections that have been Dequeue(), but not MarkDone().
11	// The value stored will be initially be nil, but may be populated if the connection is Enqueue().
12	// If model.PushRequest is not nil, it will be Enqueued again once MarkDone has been called.
13	processing map[*Connection]*model.PushRequest
14
15	shuttingDown bool
16}

sendPushes

负责具体的推送

 1func (s *DiscoveryServer) sendPushes(stopCh <-chan struct{}) {
 2  // 第二个参数用于节流,默认100
 3	doSendPushes(stopCh, s.concurrentPushLimit, s.pushQueue)
 4}
 5
 6func doSendPushes(stopCh <-chan struct{}, semaphore chan struct{}, queue *PushQueue) {
 7	for {
 8		select {
 9		case <-stopCh:
10			return
11		default:
12			// We can send to it until it is full, then it will block until a pushes finishes and reads from it.
13			// This limits the number of pushes that can happen concurrently
14			semaphore <- struct{}{}
15
16			// Get the next proxy to push. This will block if there are no updates required.
17      // 从 PushQueue 中取出 PushRequest
18			client, push, shuttingdown := queue.Dequeue()
19			if shuttingdown {
20				return
21			}
22			recordPushTriggers(push.Reason...)
23			// Signals that a push is done by reading from the semaphore, allowing another send on it.
24			doneFunc := func() {
25				queue.MarkDone(client)
26				<-semaphore
27			}
28
29			proxiesQueueTime.Record(time.Since(push.Start).Seconds())
30
31			go func() {
32        // 根据 PushRequest 生成 Event
33				pushEv := &Event{
34					pushRequest: push,
35					done:        doneFunc,
36				}
37
38				select {
39        // 将 Event 放入客户端的 PushChannel中,跟前面的 PushChannel 不是同一个(前面是服务端的)
40        // 放入到pushChannel管道中的消息会在StreamAggregatedResources方法中被处理
41				case client.pushChannel <- pushEv:
42					return
43        // 如果客户端的消息处理完成了,标记 PushRequest 处理完成
44				case <-client.stream.Context().Done(): // grpc stream was closed
45					doneFunc()
46					adsLog.Infof("Client closed connection %v", client.ConID)
47				}
48			}()
49		}
50	}
51}

注册kubeClient.RunAndWait

kubeClient.RunAndWait 方法注册至 startFuncs 中, RunAndWait 启动后所有 Informer 将开始缓存,并等待它们同步完成。之所以在最后运行,可以保证所有的 Informer 都已经注册。

1if s.kubeClient != nil {
2    s.addStartFunc(func(stop <-chan struct{}) error {
3        s.kubeClient.RunAndWait(stop)
4        return nil
5    })
6}

Server启动

start函数顺序执行之前初始化过程中在server对象上注册的一系列startFunc函数

 1func (s *Server) Start(stop <-chan struct{}) error {
 2	...
 3  // 前面注册了很多种startFuncs,在这里依次启动
 4	for _, fn := range s.startFuncs {
 5		if err := fn(stop); err != nil {
 6			return err
 7		}
 8	}
 9
10  // 等待需要监听资源的 Informer 缓存完毕
11	if !s.waitForCacheSync(stop) {
12		return fmt.Errorf("failed to sync cache")
13	}
14     // 启动服务
15	...
16}