| 阅读 |,阅读约 5 分钟
| 复制链接:

pilot-agent源码分析

组件概述

sidercar容器的镜像为proxy,支持v2 api的sidecar容器的镜像为proxyv2,镜像包含两个进程:

  • pilot-agent:生成envoy配置文件,管理envoy生命周期
  • envoy(proxy):连接discovery-service,从k8s集群获取服务信息

proxy 和 proxyv2 中的二进制完全相同,只是 envoy 进程的 bootstrap 配置不一样:

  • proxy:envoy_bootstrap_tmpl.json
  • proxyv2:envoy_bootstrap_v2.json

功能概述

pilot-agent功能概述

在proxy镜像中,pilot-agent负责的工作包括:

  • 生成envoy配置
  • 启动envoy
  • 监控并管理envoy的运行状态:
    • 出错重启
    • 配置变更reload

envoy功能概述

envoy负责的工作包括:

  • 接收所有发往该pod的网络流程
  • 分发所有从pod发出的网络流量

功能详解

生成Envoy配置功能

配置信息保存在ProxyConfig对象中

pkg/envoy/proxy.go

 1type ProxyConfig struct {
 2	Config              meshconfig.ProxyConfig
 3	Node                string
 4	LogLevel            string
 5	ComponentLogLevel   string
 6	PilotSubjectAltName []string
 7	NodeIPs             []string
 8	STSPort             int
 9	OutlierLogPath      string
10	PilotCertProvider   string
11	ProvCert            string
12	Sidecar             bool
13	ProxyViaAgent       bool
14	LogAsJSON           bool
15}

pilot/pkg/model/context.go

 1type Proxy struct {
 2	sync.RWMutex
 3
 4	// Type specifies the node type. First part of the ID.
 5	Type NodeType
 6
 7	// IPAddresses is the IP addresses of the proxy used to identify it and its
 8	// co-located service instances. Example: "10.60.1.6". In some cases, the host
 9	// where the poxy and service instances reside may have more than one IP address
10	IPAddresses []string
11
12	// ID is the unique platform-specific sidecar proxy ID. For k8s it is the pod ID and
13	// namespace <podName.namespace>.
14	ID string
15
16	// Locality is the location of where Envoy proxy runs. This is extracted from
17	// the registry where possible. If the registry doesn't provide a locality for the
18	// proxy it will use the one sent via ADS that can be configured in the Envoy bootstrap
19	Locality *core.Locality
20
21	// DNSDomain defines the DNS domain suffix for short hostnames (e.g.
22	// "default.svc.cluster.local")
23	DNSDomain string
24
25	// ConfigNamespace defines the namespace where this proxy resides
26	// for the purposes of network scoping.
27	// NOTE: DO NOT USE THIS FIELD TO CONSTRUCT DNS NAMES
28	ConfigNamespace string
29
30	// Metadata key-value pairs extending the Node identifier
31	Metadata *NodeMetadata
32
33	// the sidecarScope associated with the proxy
34	SidecarScope *SidecarScope
35
36	// the sidecarScope associated with the proxy previously
37	PrevSidecarScope *SidecarScope
38
39	// The merged gateways associated with the proxy if this is a Router
40	MergedGateway *MergedGateway
41
42	// service instances associated with the proxy
43	ServiceInstances []*ServiceInstance
44
45	// Istio version associated with the Proxy
46	IstioVersion *IstioVersion
47
48	// VerifiedIdentity determines whether a proxy had its identity verified. This
49	// generally occurs by JWT or mTLS authentication. This can be false when
50	// connecting over plaintext. If this is set to true, we can verify the proxy has
51	// access to ConfigNamespace namespace. However, other options such as node type
52	// are not part of an Istio identity and thus are not verified.
53	VerifiedIdentity *spiffe.Identity
54
55	// Indicates whether proxy supports IPv6 addresses
56	ipv6Support bool
57
58	// Indicates whether proxy supports IPv4 addresses
59	ipv4Support bool
60
61	// GlobalUnicastIP stores the global unicast IP if available, otherwise nil
62	GlobalUnicastIP string
63
64	// XdsResourceGenerator is used to generate resources for the node, based on the PushContext.
65	// If nil, the default networking/core v2 generator is used. This field can be set
66	// at connect time, based on node metadata, to trigger generation of a different style
67	// of configuration.
68	XdsResourceGenerator XdsResourceGenerator
69
70	// WatchedResources contains the list of watched resources for the proxy, keyed by the DiscoveryRequest TypeUrl.
71	WatchedResources map[string]*WatchedResource
72}

Envoy监控与管理

流程包括:

  • 创建envoy对象
  • 创建agent对象
  • 创建watcher,并启动协程执行watcher.Run
    • agent.Run:agent主循环,通过监听channel来监控envoy进程
      • 监听configCh,调用agent.reconcile来reload envoy
      • 监听statusCh,处理envoy退出状态
      • time.After,监听是否到时间执行schedule的reconcile了,到了则执行agent.reconcile
      • ctx.Done,执行agent.terminate
    • watcher.Reload
      • agent.ScheduleConfigUpdate
  • 创建context

Envoy启动流程

源码分析

启动入口

支持的子命令

  • proxy
  • istio-iptables
  • istio-clean-iptables
  • collateral
  • version

proxy命令

pilot/cmd/pilot-agent/main.go

 1// 初始化 proxy 实例,包括配置、启动参数等
 2envoyProxy := envoy.NewProxy(envoy.ProxyConfig{
 3    Config:              proxyConfig,
 4    Node:                proxy.ServiceNode(),
 5    LogLevel:            proxyLogLevel,
 6    ComponentLogLevel:   proxyComponentLogLevel,
 7    LogAsJSON:           loggingOptions.JSONEncoding,
 8    PilotSubjectAltName: pilotSAN,
 9    NodeIPs:             proxy.IPAddresses,
10    STSPort:             stsPort,
11    OutlierLogPath:      outlierLogPath,
12    PilotCertProvider:   secOpts.PilotCertProvider,
13    ProvCert:            provCert,
14    Sidecar:             proxy.Type == model.SidecarProxy,
15    ProxyViaAgent:       agentOptions.ProxyXDSViaAgent,
16})
17
18// 通过 proxy 构造 agent 实例
19envoyAgent := envoy.NewAgent(envoyProxy, drainDuration)
20// 构造 watcher 实例
21watcher := envoy.NewWatcher(agent.Restart)
22// 启动 watcher
23go watcher.Run(ctx)
24// 优雅退出
25go cmd.WaitSignalFunc(cancel)
26// 运行 proxy 实例
27return envoyAgent.Run(ctx)

istio-iptables子命令

tools/istio-iptables/pkg/cmd/root.go

 1cfg := constructConfig()
 2var ext dep.Dependencies
 3if cfg.DryRun {
 4    ext = &dep.StdoutStubDependencies{}
 5} else {
 6    ext = &dep.RealDependencies{}
 7}
 8
 9iptConfigurator := NewIptablesConfigurator(cfg, ext)
10if !cfg.SkipRuleApply {
11    iptConfigurator.run()
12}
13if cfg.RunValidation {
14    hostIP, err := getLocalIP()
15    if err != nil {
16        // Assume it is not handled by istio-cni and won't reuse the ValidationErrorCode
17        panic(err)
18    }
19    validator := validation.NewValidator(cfg, hostIP)
20
21    if err := validator.Run(); err != nil {
22        handleErrorWithCode(err, constants.ValidationErrorCode)
23    }
24}

istio-clean-iptables子命令

tools/istio-clean-iptables/pkg/cmd/root.go

1cfg := constructConfig()
2cleanup(cfg)

Agent

Agent初始化

 1func NewAgent(proxy Proxy, terminationDrainDuration time.Duration) Agent {
 2	return &agent{
 3		proxy:                    proxy,
 4    // 管理 envoy 状态的 channel,监视进程状态
 5		statusCh:                 make(chan exitStatus),
 6    // 活跃的 epoch 集合
 7		activeEpochs:             map[int]chan error{},
 8		terminationDrainDuration: terminationDrainDuration,
 9    // 当前的 epoch
10		currentEpoch:             -1,
11	}
12}

agent启动

agent重启

Restart方法注册到watcher中的update中,便于后续watcher调用

 1func (a *agent) Restart(config interface{}) {
 2	// Only allow one restart to execute at a time.
 3	a.restartMutex.Lock()
 4	defer a.restartMutex.Unlock()
 5
 6	// Protect access to internal state.
 7	a.mutex.Lock()
 8
 9  // 校验传入的参数是否有变更
10	if reflect.DeepEqual(a.currentConfig, config) {
11		// Same configuration - nothing to do.
12		a.mutex.Unlock()
13		return
14	}
15
16  // 活跃的 epoch
17	hasActiveEpoch := len(a.activeEpochs) > 0
18  // 获取当前的 epoch
19	activeEpoch := a.currentEpoch
20
21	// Increment the latest running epoch
22  // 配置变化了,epoch + 1
23	epoch := a.currentEpoch + 1
24	log.Infof("Received new config, creating new Envoy epoch %d", epoch)
25
26  // 更新当前的 epoch 及配置
27	a.currentEpoch = epoch
28	a.currentConfig = config
29
30	// Add the new epoch to the map.
31	abortCh := make(chan error, 1)
32	a.activeEpochs[a.currentEpoch] = abortCh
33
34	// Unlock before the wait to avoid delaying envoy exit logic.
35	a.mutex.Unlock()
36
37	// Wait for previous epoch to go live (if one exists) before performing a hot restart.
38	if hasActiveEpoch {
39		a.waitUntilLive(activeEpoch)
40	}
41  // 启动 envoy
42	go a.runWait(config, epoch, abortCh)
43}

Watcher

Watcher初始化

1func NewWatcher(updates func(interface{})) Watcher {
2	return &watcher{
3		updates: updates,
4	}
5}
6
7type watcher struct {
8	updates func(interface{})
9}

Watcher启动

 1func (w *watcher) Run(ctx context.Context) {
 2  // Run 中主要是调用 SendConfig 函数
 3	w.SendConfig()
 4
 5	<-ctx.Done()
 6	log.Info("Watcher has successfully terminated")
 7}
 8
 9func (w *watcher) SendConfig() {
10	h := sha256.New()
11  // 调用前面 newWatcher 时传入的回调函数updates,
12  // watcher := envoy.NewWatcher(agent.Restart), 也就是 agent.Restart 方法
13	w.updates(h.Sum(nil))
14}

Envoy

启动envoy

runWait方法用于启动envoy

1func (a *agent) runWait(config interface{}, epoch int, abortCh <-chan error) {
2	log.Infof("Epoch %d starting", epoch)
3  // 启动 envoy
4	err := a.proxy.Run(config, epoch, abortCh)
5  // 删除 epoch 对应的配置文件
6	a.proxy.Cleanup(epoch)
7	a.statusCh <- exitStatus{epoch: epoch, err: err}
8}

envoy对象

1type envoy struct {
2	ProxyConfig
3	extraArgs []string
4}