| 阅读 | 共 935 字,阅读约
xds
概述
- DS协议是Envoy获取配置信息的传输协议,也是Istio与Envoy连接的桥梁。
- Envoy动态的发现服务以及相关资源的API就是指xDS
- xDS可以通过两种方式承载: gRPC、 REST,这两种方式都是通过xDS-API发送DiscoveryRequest请求,然后资源通过DiscoveryResponse下发
ADS
ADS是一种xDS的实现, 它基于gRPC长连接。 gRPC的实现是承载在HTTP/2之上。
为什么需要ADS
Istio 0.8以前, Pilot提供的的单一资源的DS
每种资源需要一条单独的连接 Istio高可用环境下,可能部署多个Pilot 带来的挑战:
没办法保证配置资源更新的顺序 多Pilot配置资源的一致性没法保证 综合以上两个问题,很容易出现配置更新过程中网络流量丢失带来网络错误( 虚假的)
ADS允许通过一条连接( gRPC的同一stream),发送多种资源的请求和响应。
能够保证请求一定落在同一Pilot上,解决多个管理服务器配置不一致的问题 通过顺序的配置分发,轻松解决资源更新顺序的问题
源码
pilot/pkg/xds/ads.go
1func (s *DiscoveryServer) StreamAggregatedResources(stream discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
2 return s.Stream(stream)
3}
4
5func (s *DiscoveryServer) Stream(stream DiscoveryStream) error {
6 ...
7 // 建立新的连接
8 con := newConnection(peerAddr, stream)
9 ...
10 // 创建一个接受请求的channel
11 reqChannel := make(chan *discovery.DiscoveryRequest, 1)
12 // 处理客户端的请求
13 go s.receive(con, reqChannel, &receiveError)
14
15 // Wait for the proxy to be fully initialized before we start serving traffic. Because
16 // initialization doesn't have dependencies that will block, there is no need to add any timeout
17 // here. Prior to this explicit wait, we were implicitly waiting by receive() not sending to
18 // reqChannel and the connection not being enqueued for pushes to pushChannel until the
19 // initialization is complete.
20 <-con.initialized
21
22 for {
23 // Block until either a request is received or a push is triggered.
24 // We need 2 go routines because 'read' blocks in Recv().
25 //
26 // To avoid 2 routines, we tried to have Recv() in StreamAggregateResource - and the push
27 // on different short-lived go routines started when the push is happening. This would cut in 1/2
28 // the number of long-running go routines, since push is throttled. The main problem is with
29 // closing - the current gRPC library didn't allow closing the stream.
30 select {
31 case req, ok := <-reqChannel:
32 if !ok {
33 // Remote side closed connection or error processing the request.
34 return receiveError
35 }
36 // processRequest is calling pushXXX, accessing common structs with pushConnection.
37 // Adding sync is the second issue to be resolved if we want to save 1/2 of the threads.
38 err := s.processRequest(req, con)
39 if err != nil {
40 return err
41 }
42
43 // 获取从 Server 端推送的 PushRequest 相关Event
44 case pushEv := <-con.pushChannel:
45 err := s.pushConnection(con, pushEv)
46 pushEv.done()
47 if err != nil {
48 return err
49 }
50 case <-con.stop:
51 return nil
52 }
53 }
54}
pushConnection
处理服务端发来的响应
主要调用 pushXds 执行推送,gen.Generate生成Response,pushXds内部调用con.send(resp) 将 EDS 发送至数据面的客户端
processRequest
处理客户端请求, reqChannel
中获取 DiscoveryRequest
看客户端订阅了哪些 xDS ,组装推送即可。