| 阅读 | 共 4712 字,阅读约
istio源码分析-启动命令
概述
源码分析
代码入口
istoctl/cmd/istioctl/main.go
1func init() {
2 // 设置viper的值
3 viper.SetDefault("istioNamespace", controller.IstioNamespace)
4 viper.SetDefault("xds-port", 15012)
5}
6...
7func main() {
8 // 配置和环境变量处理
9 if err := cmd.ConfigAndEnvProcessing(); err != nil {
10 fmt.Fprintf(os.Stderr, "Could not initialize: %v\n", err)
11 exitCode := cmd.GetExitCode(err)
12 os.Exit(exitCode)
13 }
14
15 // 注册启动命令
16 rootCmd := cmd.GetRootCmd(os.Args[1:])
17
18 if err := rootCmd.Execute(); err != nil {
19 exitCode := cmd.GetExitCode(err)
20 os.Exit(exitCode)
21 }
22}
ConfigAndEnvProcessing
1func ConfigAndEnvProcessing() error {
2 // 读取配置文件,默认配置文件存放路径:"$HOME/.istioctl/config.yaml"
3 // 得到 $HOME/.istioctl/
4 configPath := filepath.Dir(IstioConfig)
5 // 得到 config.yaml
6 baseName := filepath.Base(IstioConfig)
7 // 得到 .yaml
8 configType := filepath.Ext(IstioConfig)
9 // 得到 config
10 configName := baseName[0 : len(baseName)-len(configType)]
11 if configType != "" {
12 // 得到 yaml
13 configType = configType[1:]
14 }
15
16 // Allow users to override some variables through $HOME/.istioctl/config.yaml
17 // and environment variables.
18 viper.SetEnvPrefix("ISTIOCTL")
19 viper.AutomaticEnv()
20 viper.AllowEmptyEnv(true) // So we can say ISTIOCTL_CERT_DIR="" to suppress certs
21
22 // 下面配置是viper组件需要做的配置
23 viper.SetConfigName(configName)
24 viper.SetConfigType(configType)
25 viper.AddConfigPath(configPath)
26 viper.SetEnvKeyReplacer(strings.NewReplacer("-", "_"))
27 // 读取配置文件,并放过如viper
28 err := viper.ReadInConfig()
29 // Ignore errors reading the configuration unless the file is explicitly customized
30 if IstioConfig != defaultIstioctlConfig {
31 return err
32 }
33
34 return nil
35}
源码结构
pilot
1➜ istio git:(master) ✗ tree pilot/pkg -L 1
2pilot/pkg
3├── bootstrap // 包含pilot-discovery启动相关的核心类 Server
4├── config
5├── controller
6├── dns
7├── features
8├── leaderelection
9├── model // 统一的数据模型,以支持不同的注册中心
10├── networking
11├── proto
12├── request
13├── secrets
14├── security
15├── serviceregistry // 实现不同注册中心数据到抽象数据模型的转换
16├── simulation
17├── status
18├── util
19└── xds
模型模型
-
CRD数据
pilot/pkg/config/kube/crd/conversion.go里包括将 DestinationRule 等 CRD 转换为 Abstract Model 的 Config 对象的函数
-
k8s服务
pilot/pkg/serviceregistry/kube/conversion.go里包括一系列将Kubernetes服务注册中心中的
label
、pod
、service
、service port
等Kubernetes资源对象转换为Abstract Model中的对应资源对象的函数
Server
代表pilot-discovery的核心数据,包含3个重要的成员:
-
ConfigStroeCache
继承了ConfigStore对象,ConfigStore利用client-go库从k8s获取CRD,转换为model包下的Config对象,对外提供
Get
、List
、Create
、Update、Delete
等CRUD服务。 -
IstioConfigStore
-
ServiceController
核心数据结构
在istio中,包括两类数据信息
-
使用istioctl配置的VirtualService、DestinationRule等被称为configuration,
-
从Kubernetes等服务注册中心获取的信息被称为service信息。
ConfigStoreCache
、IstioConfigStore
负责处理第一类信息,ServiceController
负责第二类。
ConfigStore
ConfigStore利用client-go库从k8s获取CRD,转换为model包下的Config对象,对外提供增删改查
pilot/pkg/model/config.go
1type ConfigStore interface {
2 // Schemas exposes the configuration type schema known by the config store.
3 // The type schema defines the bidrectional mapping between configuration
4 // types and the protobuf encoding schema.
5 Schemas() collection.Schemas
6
7 // Get retrieves a configuration element by a type and a key
8 Get(typ config.GroupVersionKind, name, namespace string) *config.Config
9
10 // List returns objects by type and namespace.
11 // Use "" for the namespace to list across namespaces.
12 List(typ config.GroupVersionKind, namespace string) ([]config.Config, error)
13
14 // Create adds a new configuration object to the store. If an object with the
15 // same name and namespace for the type already exists, the operation fails
16 // with no side effects.
17 Create(config config.Config) (revision string, err error)
18
19 // Update modifies an existing configuration object in the store. Update
20 // requires that the object has been created. Resource version prevents
21 // overriding a value that has been changed between prior _Get_ and _Put_
22 // operation to achieve optimistic concurrency. This method returns a new
23 // revision if the operation succeeds.
24 Update(config config.Config) (newRevision string, err error)
25
26 UpdateStatus(config config.Config) (newRevision string, err error)
27
28 // Patch applies only the modifications made in the PatchFunc rather than doing a full replace. Useful to avoid
29 // read-modify-write conflicts when there are many concurrent-writers to the same resource.
30 Patch(orig config.Config, patchFn config.PatchFunc) (string, error)
31
32 // Delete removes an object from the store by key
33 // For k8s, resourceVersion must be fulfilled before a deletion is carried out.
34 // If not possible, a 409 Conflict status will be returned.
35 Delete(typ config.GroupVersionKind, name, namespace string, resourceVersion *string) error
36}
ConfigStoreCache
继承ConfigStore,允许注册控制面信息变更处理函数
pilot/pkg/model/config.go
1type ConfigStoreCache interface {
2 ConfigStore
3
4 // RegisterEventHandler adds a handler to receive config update events for a
5 // configuration type
6 RegisterEventHandler(kind config.GroupVersionKind, handler func(config.Config, config.Config, Event))
7
8 // Run until a signal is received
9 Run(stop <-chan struct{})
10
11 // HasSynced returns true after initial cache synchronization is complete
12 HasSynced() bool
13}
IstioConfigStore
继承了ConfigStore,主要目的是为访问route rule、virtual service等数据提供更加方便的接口。相对于ConfigStore提供的Get
、List
、Create
、Update、Delete
接口,IstioConfigStore直接提供更为方便的RouteRules、VirtualServices接口。
pilot/pkg/model/config.go
1type IstioConfigStore interface {
2 ConfigStore
3
4 // ServiceEntries lists all service entries
5 ServiceEntries() []config.Config
6
7 // Gateways lists all gateways bound to the specified workload labels
8 Gateways(workloadLabels labels.Collection) []config.Config
9
10 // AuthorizationPolicies selects AuthorizationPolicies in the specified namespace.
11 AuthorizationPolicies(namespace string) []config.Config
12}
ServiceController
利用client-go库从k8s获取pod、service、node、endpoint,并将这些CRD转换为model包下的Service、ServiceInstance对象
XDS服务
基于统一的数据存储模型,pilot-discovery为数据面提供控制信息服务,也就是所谓的discovery service或者xds服务,x是一个代词,类似于云计算的Xaas(iaas、paas、saas)。Istio中xds包括:
- cds:cluster,一个应用集群,对应提供相同服务的endpoint,类似于k8s的service
- lds:listener
- rds:route,灰度发布时,一个服务对应多个版本,一个版本对应一个cluster,通过route规则规定请求如何路由到某个cluster
- eds:endpoint,一个具体的实例,对应ip+port,类似k8s的pod
- ads:aggregated,聚合多个xds,并保证按照顺序实现配置
xds就是Envoy从pilot-discovery动态获取endpoint、cluster等配置信息的协议实现
DiscoveryRequest
envoy为方便第三放开发者开发控制面,提供了go-control-plane库。基于go-control-plane库,开发者可以方便地实现基于gRPC协议的discovery service。
根据discovery server收到的DiscoveryRequest
中指定的请求服务类型(TypeUrl
),istio的ads
服务统一封装了cds
、lds
、rds
和eds
4种服务,即在同一个双向streaming的gRPC连接上提供这4种服务
discovery-server从Envoy收到的请求类型,这个数据结构在go-control-plane包下
1type DiscoveryRequest struct {
2 state protoimpl.MessageState
3 sizeCache protoimpl.SizeCache
4 unknownFields protoimpl.UnknownFields
5
6 // Envoy在收到一个DiscoveryResponse之后会马上再发送一个DiscoveryRequest作为ACK/NACK,从而告诉discovery service消息是否成功处理。VersionInfo用来表示Envoy端到目前为止成功处理的最新的消息版本。
7 VersionInfo string `protobuf:"bytes,1,opt,name=version_info,json=versionInfo,proto3" json:"version_info,omitempty"`
8
9 // 连上discovery service的Envoy的唯一标识
10 Node *v3.Node `protobuf:"bytes,2,opt,name=node,proto3" json:"node,omitempty"`
11
12 // Envoy sidecar关注的资源列表,对于cds、lds来说,ResourceName通常是空的,因为Envoy总是需要知道所有的相关数据。而对于eds,rds来讲,Envoy则可以选择性的指明需要监控的资源对象列表。
13 ResourceNames []string `protobuf:"bytes,3,rep,name=resource_names,json=resourceNames,proto3" json:"resource_names,omitempty"`
14
15 // ads服务将原来分开的单独xds服务,如cds、lds等,合并在同一个双向streaming的gRPC连接上。所以当Envoy向discovery server发送DiscoveryRequest时,需要使用TypeUrl来指明当前请求的服务类型。TypeUrl值可以是cds、lds等
16 TypeUrl string `protobuf:"bytes,4,opt,name=type_url,json=typeUrl,proto3" json:"type_url,omitempty"`
17
18 // Envoy使用ReponseNonce指定当前DiscoveryRequestACK的是之前的哪个DiscoveryResponse。具体设置方式就是把ReponseNonce指定为需要ACK的DiscoveryResponse中的Nonce值
19 ResponseNonce string `protobuf:"bytes,5,opt,name=response_nonce,json=responseNonce,proto3" json:"response_nonce,omitempty"`
20
21 // 当Envoy处理来自discovery server的DiscoveryResponse的过程中发生错误时,会在ACK/NACK的DiscoveryRequest中带上具体错误信息ErrorDetail
22 ErrorDetail *status.Status `protobuf:"bytes,6,opt,name=error_detail,json=errorDetail,proto3" json:"error_detail,omitempty"`
23}
DiscoveryServer(EnvoyXdsServer)
EnvoyXdsServer主要负责pilot中xDS协议的生成和下发,接收并处理configController和serviceController推送的PushRequest,与集群中所有的数据面代理进行grpc通讯,并处理请求。
DiscoveryServer实际上实现了AggregatedDiscoveryServiceServer接口
1// pilot server
2type Server struct {
3 XDSServer *xds.DiscoveryServer
4 ...
5}
6
7// xdsServer
8type DiscoveryServer struct {
9 // 和 pilot 的 Server 结构中的 env 是同一个
10 Env *model.Environment
11 // xds 数据的生成器接口
12 ConfigGenerator core.ConfigGenerator
13 // 统一接收从其他组件发来的 PushRequest 的 channel
14 pushChannel chan *model.PushRequest
15 // 在真正 xds 推送前的防抖缓存
16 pushQueue *PushQueue
17 // 保存了所有生效的 grpc 连接
18 adsClients map[string]*Connection
19 ...
20}
AggregatedDiscoveryServiceServer 接口
1type AggregatedDiscoveryServiceServer interface {
2 // 全量 ADS Stream 接口
3 StreamAggregatedResources(AggregatedDiscoveryService_StreamAggregatedResourcesServer) error
4 // 增量 ADS Stream 接口
5 DeltaAggregatedResources(AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error
6}
7
CDS
- cds 即 Cluster Discovery Service,是pilot-discovery为Envoy动态提供cluster相关信息的协议
- Envoy可以向pilot-discovery的GRPC Server发送一个DiscoveryRequest,并将配置类型TypeUrl设置为cds
- discovery-service,即ads服务的实现类,在收到DiscoveryRequest后,将Abstract Model中的配置信息组装成cluster,然后封装在DiscoveryResponse返回给Envoy
为了组装cluster,ads需要提取Abstract Model中的两类信息:
- 服务注册信息:如k8s服务注册转化后的service
- 配置信息:如DestinationRule
处理流程
- 获取Service信息,为每个service创建一个空的cluster。service的两个来源:
- 在k8s中定义的service资源对象
- 通过istioctl配置的ServiceEntry资源对象,代表那些没有注册在k8s中的服务
- 设置cluster名称
- 设置cluster的默认流量控制策略
- 配置与cluster相关的eds更新方式
- 根据service的HostName属性查找对应的DestinationRule,根据ds的subnet创建subcluster
- 根据DR里定义的traffic policy,为cluster、subcluster配置流量控制策略
组装成cluster对象后,将得到的所有cluster封装在一个DiscoveryResponse中,并将TypeUrl设置为type.googleapis.com/envoy.api.v2.Cluster,启动单独的协程与Envoy建立双向stream grpc连接发送给envoy
!(/Users/kinnylee/Library/Application Support/typora-user-images/image-20210327214840359.png)
EDS
Envoy知道cluster信息后,还需要知道集群每个成员信息,即endpoint。因此发送完cds请求后,会继续发送TypeUrl为eds的DiscoveryRequest请求。
处理流程
- 根据cluster的名称,把k8s service对象的name和namespace解析出来,使用client-go中的SharedIndexInformer获取Service资源对象
- 使用SharedIndexInformer获取与name和namespace匹配的Endpoint资源对象
- 使用subnet的label过滤endpoint列表
- 根据endpoint,获取ip、端口、可用域等信息
- 根据可用域将endpoint分组
获取到enpoint后,组装成DiscoveryResponse对象,将TypeUrl设置为type.googleapis.com/envoy.api.v2.ClusterLoadAssignment,启动单独的协程通过与Envoy建立的双向stream gRPC连接发送给Envoy。