| 阅读 | 共 2040 字,阅读约
pilot-agent源码分析
组件概述
sidercar容器的镜像为proxy,支持v2 api的sidecar容器的镜像为proxyv2,镜像包含两个进程:
- pilot-agent:生成envoy配置文件,管理envoy生命周期
- envoy(proxy):连接discovery-service,从k8s集群获取服务信息
proxy 和 proxyv2 中的二进制完全相同,只是 envoy 进程的 bootstrap 配置不一样:
- proxy:envoy_bootstrap_tmpl.json
- proxyv2:envoy_bootstrap_v2.json
功能概述
pilot-agent功能概述
在proxy镜像中,pilot-agent负责的工作包括:
- 生成envoy配置
- 启动envoy
- 监控并管理envoy的运行状态:
- 出错重启
- 配置变更reload
envoy功能概述
envoy负责的工作包括:
- 接收所有发往该pod的网络流程
- 分发所有从pod发出的网络流量
功能详解
生成Envoy配置功能
配置信息保存在ProxyConfig对象中
pkg/envoy/proxy.go
1type ProxyConfig struct {
2 Config meshconfig.ProxyConfig
3 Node string
4 LogLevel string
5 ComponentLogLevel string
6 PilotSubjectAltName []string
7 NodeIPs []string
8 STSPort int
9 OutlierLogPath string
10 PilotCertProvider string
11 ProvCert string
12 Sidecar bool
13 ProxyViaAgent bool
14 LogAsJSON bool
15}
pilot/pkg/model/context.go
1type Proxy struct {
2 sync.RWMutex
3
4 // Type specifies the node type. First part of the ID.
5 Type NodeType
6
7 // IPAddresses is the IP addresses of the proxy used to identify it and its
8 // co-located service instances. Example: "10.60.1.6". In some cases, the host
9 // where the poxy and service instances reside may have more than one IP address
10 IPAddresses []string
11
12 // ID is the unique platform-specific sidecar proxy ID. For k8s it is the pod ID and
13 // namespace <podName.namespace>.
14 ID string
15
16 // Locality is the location of where Envoy proxy runs. This is extracted from
17 // the registry where possible. If the registry doesn't provide a locality for the
18 // proxy it will use the one sent via ADS that can be configured in the Envoy bootstrap
19 Locality *core.Locality
20
21 // DNSDomain defines the DNS domain suffix for short hostnames (e.g.
22 // "default.svc.cluster.local")
23 DNSDomain string
24
25 // ConfigNamespace defines the namespace where this proxy resides
26 // for the purposes of network scoping.
27 // NOTE: DO NOT USE THIS FIELD TO CONSTRUCT DNS NAMES
28 ConfigNamespace string
29
30 // Metadata key-value pairs extending the Node identifier
31 Metadata *NodeMetadata
32
33 // the sidecarScope associated with the proxy
34 SidecarScope *SidecarScope
35
36 // the sidecarScope associated with the proxy previously
37 PrevSidecarScope *SidecarScope
38
39 // The merged gateways associated with the proxy if this is a Router
40 MergedGateway *MergedGateway
41
42 // service instances associated with the proxy
43 ServiceInstances []*ServiceInstance
44
45 // Istio version associated with the Proxy
46 IstioVersion *IstioVersion
47
48 // VerifiedIdentity determines whether a proxy had its identity verified. This
49 // generally occurs by JWT or mTLS authentication. This can be false when
50 // connecting over plaintext. If this is set to true, we can verify the proxy has
51 // access to ConfigNamespace namespace. However, other options such as node type
52 // are not part of an Istio identity and thus are not verified.
53 VerifiedIdentity *spiffe.Identity
54
55 // Indicates whether proxy supports IPv6 addresses
56 ipv6Support bool
57
58 // Indicates whether proxy supports IPv4 addresses
59 ipv4Support bool
60
61 // GlobalUnicastIP stores the global unicast IP if available, otherwise nil
62 GlobalUnicastIP string
63
64 // XdsResourceGenerator is used to generate resources for the node, based on the PushContext.
65 // If nil, the default networking/core v2 generator is used. This field can be set
66 // at connect time, based on node metadata, to trigger generation of a different style
67 // of configuration.
68 XdsResourceGenerator XdsResourceGenerator
69
70 // WatchedResources contains the list of watched resources for the proxy, keyed by the DiscoveryRequest TypeUrl.
71 WatchedResources map[string]*WatchedResource
72}
Envoy监控与管理
流程包括:
- 创建envoy对象
- 创建agent对象
- 创建watcher,并启动协程执行watcher.Run
- agent.Run:agent主循环,通过监听channel来监控envoy进程
- 监听configCh,调用agent.reconcile来reload envoy
- 监听statusCh,处理envoy退出状态
- time.After,监听是否到时间执行schedule的reconcile了,到了则执行agent.reconcile
- ctx.Done,执行agent.terminate
- watcher.Reload
- agent.ScheduleConfigUpdate
- agent.Run:agent主循环,通过监听channel来监控envoy进程
- 创建context
Envoy启动流程
源码分析
启动入口
支持的子命令
- proxy
- istio-iptables
- istio-clean-iptables
- collateral
- version
proxy命令
pilot/cmd/pilot-agent/main.go
1// 初始化 proxy 实例,包括配置、启动参数等
2envoyProxy := envoy.NewProxy(envoy.ProxyConfig{
3 Config: proxyConfig,
4 Node: proxy.ServiceNode(),
5 LogLevel: proxyLogLevel,
6 ComponentLogLevel: proxyComponentLogLevel,
7 LogAsJSON: loggingOptions.JSONEncoding,
8 PilotSubjectAltName: pilotSAN,
9 NodeIPs: proxy.IPAddresses,
10 STSPort: stsPort,
11 OutlierLogPath: outlierLogPath,
12 PilotCertProvider: secOpts.PilotCertProvider,
13 ProvCert: provCert,
14 Sidecar: proxy.Type == model.SidecarProxy,
15 ProxyViaAgent: agentOptions.ProxyXDSViaAgent,
16})
17
18// 通过 proxy 构造 agent 实例
19envoyAgent := envoy.NewAgent(envoyProxy, drainDuration)
20// 构造 watcher 实例
21watcher := envoy.NewWatcher(agent.Restart)
22// 启动 watcher
23go watcher.Run(ctx)
24// 优雅退出
25go cmd.WaitSignalFunc(cancel)
26// 运行 proxy 实例
27return envoyAgent.Run(ctx)
istio-iptables子命令
tools/istio-iptables/pkg/cmd/root.go
1cfg := constructConfig()
2var ext dep.Dependencies
3if cfg.DryRun {
4 ext = &dep.StdoutStubDependencies{}
5} else {
6 ext = &dep.RealDependencies{}
7}
8
9iptConfigurator := NewIptablesConfigurator(cfg, ext)
10if !cfg.SkipRuleApply {
11 iptConfigurator.run()
12}
13if cfg.RunValidation {
14 hostIP, err := getLocalIP()
15 if err != nil {
16 // Assume it is not handled by istio-cni and won't reuse the ValidationErrorCode
17 panic(err)
18 }
19 validator := validation.NewValidator(cfg, hostIP)
20
21 if err := validator.Run(); err != nil {
22 handleErrorWithCode(err, constants.ValidationErrorCode)
23 }
24}
istio-clean-iptables子命令
tools/istio-clean-iptables/pkg/cmd/root.go
1cfg := constructConfig()
2cleanup(cfg)
Agent
Agent初始化
1func NewAgent(proxy Proxy, terminationDrainDuration time.Duration) Agent {
2 return &agent{
3 proxy: proxy,
4 // 管理 envoy 状态的 channel,监视进程状态
5 statusCh: make(chan exitStatus),
6 // 活跃的 epoch 集合
7 activeEpochs: map[int]chan error{},
8 terminationDrainDuration: terminationDrainDuration,
9 // 当前的 epoch
10 currentEpoch: -1,
11 }
12}
agent启动
agent重启
Restart方法注册到watcher中的update中,便于后续watcher调用
1func (a *agent) Restart(config interface{}) {
2 // Only allow one restart to execute at a time.
3 a.restartMutex.Lock()
4 defer a.restartMutex.Unlock()
5
6 // Protect access to internal state.
7 a.mutex.Lock()
8
9 // 校验传入的参数是否有变更
10 if reflect.DeepEqual(a.currentConfig, config) {
11 // Same configuration - nothing to do.
12 a.mutex.Unlock()
13 return
14 }
15
16 // 活跃的 epoch
17 hasActiveEpoch := len(a.activeEpochs) > 0
18 // 获取当前的 epoch
19 activeEpoch := a.currentEpoch
20
21 // Increment the latest running epoch
22 // 配置变化了,epoch + 1
23 epoch := a.currentEpoch + 1
24 log.Infof("Received new config, creating new Envoy epoch %d", epoch)
25
26 // 更新当前的 epoch 及配置
27 a.currentEpoch = epoch
28 a.currentConfig = config
29
30 // Add the new epoch to the map.
31 abortCh := make(chan error, 1)
32 a.activeEpochs[a.currentEpoch] = abortCh
33
34 // Unlock before the wait to avoid delaying envoy exit logic.
35 a.mutex.Unlock()
36
37 // Wait for previous epoch to go live (if one exists) before performing a hot restart.
38 if hasActiveEpoch {
39 a.waitUntilLive(activeEpoch)
40 }
41 // 启动 envoy
42 go a.runWait(config, epoch, abortCh)
43}
Watcher
Watcher初始化
1func NewWatcher(updates func(interface{})) Watcher {
2 return &watcher{
3 updates: updates,
4 }
5}
6
7type watcher struct {
8 updates func(interface{})
9}
Watcher启动
1func (w *watcher) Run(ctx context.Context) {
2 // Run 中主要是调用 SendConfig 函数
3 w.SendConfig()
4
5 <-ctx.Done()
6 log.Info("Watcher has successfully terminated")
7}
8
9func (w *watcher) SendConfig() {
10 h := sha256.New()
11 // 调用前面 newWatcher 时传入的回调函数updates,
12 // watcher := envoy.NewWatcher(agent.Restart), 也就是 agent.Restart 方法
13 w.updates(h.Sum(nil))
14}
Envoy
启动envoy
runWait方法用于启动envoy
1func (a *agent) runWait(config interface{}, epoch int, abortCh <-chan error) {
2 log.Infof("Epoch %d starting", epoch)
3 // 启动 envoy
4 err := a.proxy.Run(config, epoch, abortCh)
5 // 删除 epoch 对应的配置文件
6 a.proxy.Cleanup(epoch)
7 a.statusCh <- exitStatus{epoch: epoch, err: err}
8}
envoy对象
1type envoy struct {
2 ProxyConfig
3 extraArgs []string
4}