| 阅读 |,阅读约 2 分钟
| 复制链接:

xds

概述

  • DS协议是Envoy获取配置信息的传输协议,也是Istio与Envoy连接的桥梁。
  • Envoy动态的发现服务以及相关资源的API就是指xDS
  • xDS可以通过两种方式承载: gRPC、 REST,这两种方式都是通过xDS-API发送DiscoveryRequest请求,然后资源通过DiscoveryResponse下发

ADS

ADS是一种xDS的实现, 它基于gRPC长连接。 gRPC的实现是承载在HTTP/2之上。

为什么需要ADS

Istio 0.8以前, Pilot提供的的单一资源的DS

每种资源需要一条单独的连接 Istio高可用环境下,可能部署多个Pilot 带来的挑战:

没办法保证配置资源更新的顺序 多Pilot配置资源的一致性没法保证 综合以上两个问题,很容易出现配置更新过程中网络流量丢失带来网络错误( 虚假的)

ADS允许通过一条连接( gRPC的同一stream),发送多种资源的请求和响应。

能够保证请求一定落在同一Pilot上,解决多个管理服务器配置不一致的问题 通过顺序的配置分发,轻松解决资源更新顺序的问题

源码

pilot/pkg/xds/ads.go

 1func (s *DiscoveryServer) StreamAggregatedResources(stream discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
 2	return s.Stream(stream)
 3}
 4
 5func (s *DiscoveryServer) Stream(stream DiscoveryStream) error {
 6	...
 7  // 建立新的连接
 8	con := newConnection(peerAddr, stream)
 9	...
10  // 创建一个接受请求的channel
11	reqChannel := make(chan *discovery.DiscoveryRequest, 1)
12  // 处理客户端的请求
13	go s.receive(con, reqChannel, &receiveError)
14
15	// Wait for the proxy to be fully initialized before we start serving traffic. Because
16	// initialization doesn't have dependencies that will block, there is no need to add any timeout
17	// here. Prior to this explicit wait, we were implicitly waiting by receive() not sending to
18	// reqChannel and the connection not being enqueued for pushes to pushChannel until the
19	// initialization is complete.
20	<-con.initialized
21
22	for {
23		// Block until either a request is received or a push is triggered.
24		// We need 2 go routines because 'read' blocks in Recv().
25		//
26		// To avoid 2 routines, we tried to have Recv() in StreamAggregateResource - and the push
27		// on different short-lived go routines started when the push is happening. This would cut in 1/2
28		// the number of long-running go routines, since push is throttled. The main problem is with
29		// closing - the current gRPC library didn't allow closing the stream.
30		select {
31		case req, ok := <-reqChannel:
32			if !ok {
33				// Remote side closed connection or error processing the request.
34				return receiveError
35			}
36			// processRequest is calling pushXXX, accessing common structs with pushConnection.
37			// Adding sync is the second issue to be resolved if we want to save 1/2 of the threads.
38			err := s.processRequest(req, con)
39			if err != nil {
40				return err
41			}
42
43    // 获取从 Server 端推送的 PushRequest 相关Event
44		case pushEv := <-con.pushChannel:
45			err := s.pushConnection(con, pushEv)
46			pushEv.done()
47			if err != nil {
48				return err
49			}
50		case <-con.stop:
51			return nil
52		}
53	}
54}

pushConnection

处理服务端发来的响应

主要调用 pushXds 执行推送,gen.Generate生成Response,pushXds内部调用con.send(resp) 将 EDS 发送至数据面的客户端

processRequest

处理客户端请求, reqChannel 中获取 DiscoveryRequest 看客户端订阅了哪些 xDS ,组装推送即可。