| 阅读 |,阅读约 10 分钟
| 复制链接:

istio源码分析-启动命令

概述

源码分析

代码入口

istoctl/cmd/istioctl/main.go

 1func init() {
 2    // 设置viper的值
 3	viper.SetDefault("istioNamespace", controller.IstioNamespace)
 4	viper.SetDefault("xds-port", 15012)
 5}
 6...
 7func main() {
 8    // 配置和环境变量处理
 9	if err := cmd.ConfigAndEnvProcessing(); err != nil {
10		fmt.Fprintf(os.Stderr, "Could not initialize: %v\n", err)
11		exitCode := cmd.GetExitCode(err)
12		os.Exit(exitCode)
13	}
14
15    // 注册启动命令
16	rootCmd := cmd.GetRootCmd(os.Args[1:])
17
18	if err := rootCmd.Execute(); err != nil {
19		exitCode := cmd.GetExitCode(err)
20		os.Exit(exitCode)
21	}
22}

ConfigAndEnvProcessing

 1func ConfigAndEnvProcessing() error {
 2    // 读取配置文件,默认配置文件存放路径:"$HOME/.istioctl/config.yaml"
 3    // 得到 $HOME/.istioctl/
 4	configPath := filepath.Dir(IstioConfig)
 5    // 得到 config.yaml
 6	baseName := filepath.Base(IstioConfig)
 7    // 得到 .yaml
 8	configType := filepath.Ext(IstioConfig)
 9    // 得到 config
10	configName := baseName[0 : len(baseName)-len(configType)]
11	if configType != "" {
12        // 得到 yaml
13		configType = configType[1:]
14	}
15
16	// Allow users to override some variables through $HOME/.istioctl/config.yaml
17	// and environment variables.
18	viper.SetEnvPrefix("ISTIOCTL")
19	viper.AutomaticEnv()
20	viper.AllowEmptyEnv(true) // So we can say ISTIOCTL_CERT_DIR="" to suppress certs
21    
22    // 下面配置是viper组件需要做的配置
23	viper.SetConfigName(configName)
24	viper.SetConfigType(configType)
25	viper.AddConfigPath(configPath)
26	viper.SetEnvKeyReplacer(strings.NewReplacer("-", "_"))
27    // 读取配置文件,并放过如viper
28	err := viper.ReadInConfig()
29	// Ignore errors reading the configuration unless the file is explicitly customized
30	if IstioConfig != defaultIstioctlConfig {
31		return err
32	}
33
34	return nil
35}

源码结构

pilot

 1  istio git:(master)  tree pilot/pkg -L 1
 2pilot/pkg
 3├── bootstrap // 包含pilot-discovery启动相关的核心类 Server
 4├── config
 5├── controller
 6├── dns
 7├── features
 8├── leaderelection
 9├── model // 统一的数据模型,以支持不同的注册中心
10├── networking
11├── proto
12├── request
13├── secrets
14├── security
15├── serviceregistry // 实现不同注册中心数据到抽象数据模型的转换
16├── simulation
17├── status
18├── util
19└── xds

模型模型

  • CRD数据

    pilot/pkg/config/kube/crd/conversion.go里包括将 DestinationRule 等 CRD 转换为 Abstract Model 的 Config 对象的函数

  • k8s服务

    pilot/pkg/serviceregistry/kube/conversion.go里包括一系列将Kubernetes服务注册中心中的labelpodserviceservice port等Kubernetes资源对象转换为Abstract Model中的对应资源对象的函数

Server

代表pilot-discovery的核心数据,包含3个重要的成员:

  • ConfigStroeCache

    继承了ConfigStore对象,ConfigStore利用client-go库从k8s获取CRD,转换为model包下的Config对象,对外提供GetListCreateUpdate、Delete等CRUD服务。

  • IstioConfigStore

  • ServiceController

核心数据结构

在istio中,包括两类数据信息

  • 使用istioctl配置的VirtualService、DestinationRule等被称为configuration,

  • 从Kubernetes等服务注册中心获取的信息被称为service信息。

    ConfigStoreCacheIstioConfigStore负责处理第一类信息,ServiceController负责第二类。

ConfigStore

ConfigStore利用client-go库从k8s获取CRD,转换为model包下的Config对象,对外提供增删改查

pilot/pkg/model/config.go

 1type ConfigStore interface {
 2	// Schemas exposes the configuration type schema known by the config store.
 3	// The type schema defines the bidrectional mapping between configuration
 4	// types and the protobuf encoding schema.
 5	Schemas() collection.Schemas
 6
 7	// Get retrieves a configuration element by a type and a key
 8	Get(typ config.GroupVersionKind, name, namespace string) *config.Config
 9
10	// List returns objects by type and namespace.
11	// Use "" for the namespace to list across namespaces.
12	List(typ config.GroupVersionKind, namespace string) ([]config.Config, error)
13
14	// Create adds a new configuration object to the store. If an object with the
15	// same name and namespace for the type already exists, the operation fails
16	// with no side effects.
17	Create(config config.Config) (revision string, err error)
18
19	// Update modifies an existing configuration object in the store.  Update
20	// requires that the object has been created.  Resource version prevents
21	// overriding a value that has been changed between prior _Get_ and _Put_
22	// operation to achieve optimistic concurrency. This method returns a new
23	// revision if the operation succeeds.
24	Update(config config.Config) (newRevision string, err error)
25
26	UpdateStatus(config config.Config) (newRevision string, err error)
27
28	// Patch applies only the modifications made in the PatchFunc rather than doing a full replace. Useful to avoid
29	// read-modify-write conflicts when there are many concurrent-writers to the same resource.
30	Patch(orig config.Config, patchFn config.PatchFunc) (string, error)
31
32	// Delete removes an object from the store by key
33	// For k8s, resourceVersion must be fulfilled before a deletion is carried out.
34	// If not possible, a 409 Conflict status will be returned.
35	Delete(typ config.GroupVersionKind, name, namespace string, resourceVersion *string) error
36}

ConfigStoreCache

继承ConfigStore,允许注册控制面信息变更处理函数

pilot/pkg/model/config.go

 1type ConfigStoreCache interface {
 2	ConfigStore
 3
 4	// RegisterEventHandler adds a handler to receive config update events for a
 5	// configuration type
 6	RegisterEventHandler(kind config.GroupVersionKind, handler func(config.Config, config.Config, Event))
 7
 8	// Run until a signal is received
 9	Run(stop <-chan struct{})
10
11	// HasSynced returns true after initial cache synchronization is complete
12	HasSynced() bool
13}

IstioConfigStore

继承了ConfigStore,主要目的是为访问route rule、virtual service等数据提供更加方便的接口。相对于ConfigStore提供的GetListCreateUpdate、Delete接口,IstioConfigStore直接提供更为方便的RouteRules、VirtualServices接口。

pilot/pkg/model/config.go

 1type IstioConfigStore interface {
 2	ConfigStore
 3
 4	// ServiceEntries lists all service entries
 5	ServiceEntries() []config.Config
 6
 7	// Gateways lists all gateways bound to the specified workload labels
 8	Gateways(workloadLabels labels.Collection) []config.Config
 9
10	// AuthorizationPolicies selects AuthorizationPolicies in the specified namespace.
11	AuthorizationPolicies(namespace string) []config.Config
12}

ServiceController

利用client-go库从k8s获取pod、service、node、endpoint,并将这些CRD转换为model包下的Service、ServiceInstance对象

XDS服务

参考

基于统一的数据存储模型,pilot-discovery为数据面提供控制信息服务,也就是所谓的discovery service或者xds服务,x是一个代词,类似于云计算的Xaas(iaas、paas、saas)。Istio中xds包括:

  • cds:cluster,一个应用集群,对应提供相同服务的endpoint,类似于k8s的service
  • lds:listener
  • rds:route,灰度发布时,一个服务对应多个版本,一个版本对应一个cluster,通过route规则规定请求如何路由到某个cluster
  • eds:endpoint,一个具体的实例,对应ip+port,类似k8s的pod
  • ads:aggregated,聚合多个xds,并保证按照顺序实现配置

xds就是Envoy从pilot-discovery动态获取endpoint、cluster等配置信息的协议实现

DiscoveryRequest

envoy为方便第三放开发者开发控制面,提供了go-control-plane库。基于go-control-plane库,开发者可以方便地实现基于gRPC协议的discovery service。

根据discovery server收到的DiscoveryRequest中指定的请求服务类型(TypeUrl),istio的ads服务统一封装了cdsldsrdseds4种服务,即在同一个双向streaming的gRPC连接上提供这4种服务

discovery-server从Envoy收到的请求类型,这个数据结构在go-control-plane包下

 1type DiscoveryRequest struct {
 2	state         protoimpl.MessageState
 3	sizeCache     protoimpl.SizeCache
 4	unknownFields protoimpl.UnknownFields
 5
 6  // Envoy在收到一个DiscoveryResponse之后会马上再发送一个DiscoveryRequest作为ACK/NACK,从而告诉discovery service消息是否成功处理。VersionInfo用来表示Envoy端到目前为止成功处理的最新的消息版本。
 7	VersionInfo string `protobuf:"bytes,1,opt,name=version_info,json=versionInfo,proto3" json:"version_info,omitempty"`
 8	
 9  // 连上discovery service的Envoy的唯一标识
10	Node *v3.Node `protobuf:"bytes,2,opt,name=node,proto3" json:"node,omitempty"`
11	
12  // Envoy sidecar关注的资源列表,对于cds、lds来说,ResourceName通常是空的,因为Envoy总是需要知道所有的相关数据。而对于eds,rds来讲,Envoy则可以选择性的指明需要监控的资源对象列表。
13	ResourceNames []string `protobuf:"bytes,3,rep,name=resource_names,json=resourceNames,proto3" json:"resource_names,omitempty"`
14	
15  // ads服务将原来分开的单独xds服务,如cds、lds等,合并在同一个双向streaming的gRPC连接上。所以当Envoy向discovery server发送DiscoveryRequest时,需要使用TypeUrl来指明当前请求的服务类型。TypeUrl值可以是cds、lds等
16	TypeUrl string `protobuf:"bytes,4,opt,name=type_url,json=typeUrl,proto3" json:"type_url,omitempty"`
17	
18  // Envoy使用ReponseNonce指定当前DiscoveryRequestACK的是之前的哪个DiscoveryResponse。具体设置方式就是把ReponseNonce指定为需要ACK的DiscoveryResponse中的Nonce值
19	ResponseNonce string `protobuf:"bytes,5,opt,name=response_nonce,json=responseNonce,proto3" json:"response_nonce,omitempty"`
20	
21  // 当Envoy处理来自discovery server的DiscoveryResponse的过程中发生错误时,会在ACK/NACK的DiscoveryRequest中带上具体错误信息ErrorDetail
22	ErrorDetail *status.Status `protobuf:"bytes,6,opt,name=error_detail,json=errorDetail,proto3" json:"error_detail,omitempty"`
23}

DiscoveryServer(EnvoyXdsServer)

EnvoyXdsServer主要负责pilot中xDS协议的生成和下发,接收并处理configController和serviceController推送的PushRequest,与集群中所有的数据面代理进行grpc通讯,并处理请求。

DiscoveryServer实际上实现了AggregatedDiscoveryServiceServer接口

 1// pilot server
 2type Server struct {
 3	XDSServer *xds.DiscoveryServer
 4	...
 5}
 6
 7// xdsServer
 8type DiscoveryServer struct {
 9	// 和 pilot 的 Server 结构中的 env 是同一个
10	Env *model.Environment
11  // xds 数据的生成器接口
12	ConfigGenerator core.ConfigGenerator
13  // 统一接收从其他组件发来的 PushRequest 的 channel
14	pushChannel chan *model.PushRequest
15  // 在真正 xds 推送前的防抖缓存
16	pushQueue *PushQueue
17  // 保存了所有生效的 grpc 连接
18	adsClients      map[string]*Connection
19	...
20}

AggregatedDiscoveryServiceServer 接口

1type AggregatedDiscoveryServiceServer interface {
2	 // 全量 ADS Stream 接口
3	StreamAggregatedResources(AggregatedDiscoveryService_StreamAggregatedResourcesServer) error
4	// 增量 ADS Stream 接口
5	DeltaAggregatedResources(AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error
6}
7

CDS

  • cds 即 Cluster Discovery Service,是pilot-discovery为Envoy动态提供cluster相关信息的协议
  • Envoy可以向pilot-discovery的GRPC Server发送一个DiscoveryRequest,并将配置类型TypeUrl设置为cds
  • discovery-service,即ads服务的实现类,在收到DiscoveryRequest后,将Abstract Model中的配置信息组装成cluster,然后封装在DiscoveryResponse返回给Envoy

为了组装cluster,ads需要提取Abstract Model中的两类信息:

  • 服务注册信息:如k8s服务注册转化后的service
  • 配置信息:如DestinationRule

处理流程

  • 获取Service信息,为每个service创建一个空的cluster。service的两个来源:
    • 在k8s中定义的service资源对象
    • 通过istioctl配置的ServiceEntry资源对象,代表那些没有注册在k8s中的服务
  • 设置cluster名称
  • 设置cluster的默认流量控制策略
  • 配置与cluster相关的eds更新方式
  • 根据service的HostName属性查找对应的DestinationRule,根据ds的subnet创建subcluster
  • 根据DR里定义的traffic policy,为cluster、subcluster配置流量控制策略

组装成cluster对象后,将得到的所有cluster封装在一个DiscoveryResponse中,并将TypeUrl设置为type.googleapis.com/envoy.api.v2.Cluster,启动单独的协程与Envoy建立双向stream grpc连接发送给envoy

!(/Users/kinnylee/Library/Application Support/typora-user-images/image-20210327214840359.png)

EDS

Envoy知道cluster信息后,还需要知道集群每个成员信息,即endpoint。因此发送完cds请求后,会继续发送TypeUrl为eds的DiscoveryRequest请求。

处理流程

  • 根据cluster的名称,把k8s service对象的name和namespace解析出来,使用client-go中的SharedIndexInformer获取Service资源对象
  • 使用SharedIndexInformer获取与name和namespace匹配的Endpoint资源对象
  • 使用subnet的label过滤endpoint列表
  • 根据endpoint,获取ip、端口、可用域等信息
  • 根据可用域将endpoint分组

获取到enpoint后,组装成DiscoveryResponse对象,将TypeUrl设置为type.googleapis.com/envoy.api.v2.ClusterLoadAssignment,启动单独的协程通过与Envoy建立的双向stream gRPC连接发送给Envoy。