| 阅读 |,阅读约 9 分钟
| 复制链接:

Overview

client-go源码分析(一) - 四种客户端对象

概述

  • k8s使用client-go作为go语言的官方客户端编程工具,提供对api-server的交互访问
  • client-go非常重要,所有k8s核心组件内部都集成了它
  • 开发者要掌握k8s的二次开发,必须要掌握client-go的使用

源码结构

 1├── discovery  // 提供discovery发现客户端
 2├── dynamic    // 提供dynamic发现客户端
 3├── examples
 4├── informers  // 每种资源的k8s实现
 5├── kubernetes // 提供clientset客户端
 6├── kubernetes_test
 7├── listers    // 为每一个k8s资源提供Lister功能,该功能对Get和List请求提供只读的缓存数据
 8├── pkg
 9├── plugin     // 云服务商提供授权插件
10├── rest       // 提供restclient客户端
11├── restmapper
12├── scale      // 提供ScaleClient客户端,用于扩缩容资源对象
13├── testing
14├── third_party
15├── tools      // 提供常用工具
16├── transport  // 提供安全的tcp连接,用于客户端和容器直接传递二进制流。由内部spby包提供支持
17└── util      // 提供常用方法

client-go的四种客户端对象

client-go支持4种客户端对象与api-server的交互,都可以通过kubeconfig配置信息连接到集群:

  • RestClient:最基础的客户端,对http request进行了封装
  • ClientSet:RestClient基础上封装了Resource和Version的管理方法,只能处理k8s内置资源
  • DynamicClient:不仅能处理内置资源,也能处理CRD资源
  • DiscoveryClient:发现客户端,用于发现apiserver支持的资源组、资源版本、资源信息

kubeconfig文件结构

kubeconfig文件通常包括三个部分:

  • clusters:定义k8s集群信息,包括服务地址、证书等信息
  • users:用户身份验证的客户端凭据
  • contexts:用户信息和命名空间等,用于将请求发送到指定的集群

client-go读取kubeconfig文件后,生成config对象,用于与api通讯

 1package main
 2
 3import (
 4  "k8s.io/client-go/tools/clientcmd"
 5  "log"
 6)
 7
 8func main() {
 9  config, err := clientcmd.BuildConfigFromFlags("", "~/.kube/config")
10  if err != nil {
11    log.Printf("init kubeconfig error.", err)
12  }
13  log.Printf(config.Host)
14}

加载kubeconfig源码分析

  • BuildConfigFromFlags
  • ClientConfig
  • createClientConfig
  • Load

源码位置:k8s.io/client-go/tools/clientcmd/client-config.go

  1func BuildConfigFromFlags(masterUrl, kubeconfigPath string) (*restclient.Config, error) {
  2  ...
  3  return NewNonInteractiveDeferredLoadingClientConfig(
  4    &ClientConfigLoadingRules{ExplicitPath: kubeconfigPath},
  5    &ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: masterUrl}}).ClientConfig()
  6}
  7
  8func (config *DeferredLoadingClientConfig) ClientConfig() (*restclient.Config, error) {
  9  mergedClientConfig, err := config.createClientConfig()
 10  if err != nil {
 11    return nil, err
 12  }
 13
 14  // load the configuration and return on non-empty errors and if the
 15  // content differs from the default config
 16  mergedConfig, err := mergedClientConfig.ClientConfig()
 17  switch {
 18  case err != nil:
 19    if !IsEmptyConfig(err) {
 20      // return on any error except empty config
 21      return nil, err
 22    }
 23  case mergedConfig != nil:
 24    // the configuration is valid, but if this is equal to the defaults we should try
 25    // in-cluster configuration
 26    if !config.loader.IsDefaultConfig(mergedConfig) {
 27      return mergedConfig, nil
 28    }
 29  }
 30
 31  // check for in-cluster configuration and use it
 32  if config.icc.Possible() {
 33    klog.V(4).Infof("Using in-cluster configuration")
 34    return config.icc.ClientConfig()
 35  }
 36
 37  // 返回合并后的客户端配置
 38  return mergedConfig, err
 39}
 40
 41func (config *DeferredLoadingClientConfig) createClientConfig() (ClientConfig, error) {
 42  if config.clientConfig == nil {
 43    config.loadingLock.Lock()
 44    defer config.loadingLock.Unlock()
 45
 46    if config.clientConfig == nil {
 47      // 加载配置
 48      mergedConfig, err := config.loader.Load()
 49      if err != nil {
 50        return nil, err
 51      }
 52
 53      var mergedClientConfig ClientConfig
 54      if config.fallbackReader != nil {
 55        mergedClientConfig = NewInteractiveClientConfig(*mergedConfig, config.overrides.CurrentContext, config.overrides, config.fallbackReader, config.loader)
 56      } else {
 57        mergedClientConfig = NewNonInteractiveClientConfig(*mergedConfig, config.overrides.CurrentContext, config.overrides, config.loader)
 58      }
 59
 60      config.clientConfig = mergedClientConfig
 61    }
 62  }
 63
 64  return config.clientConfig, nil
 65}
 66
 67func (rules *ClientConfigLoadingRules) Load() (*clientcmdapi.Config, error) {
 68  if err := rules.Migrate(); err != nil {
 69    return nil, err
 70  }
 71
 72  errlist := []error{}
 73
 74  kubeConfigFiles := []string{}
 75
 76  // Make sure a file we were explicitly told to use exists
 77  // 支持两种方式获取配置信息路径
 78  if len(rules.ExplicitPath) > 0 {
 79    if _, err := os.Stat(rules.ExplicitPath); os.IsNotExist(err) {
 80      return nil, err
 81    }
 82    kubeConfigFiles = append(kubeConfigFiles, rules.ExplicitPath)
 83
 84  } else {
 85    kubeConfigFiles = append(kubeConfigFiles, rules.Precedence...)
 86  }
 87
 88  kubeconfigs := []*clientcmdapi.Config{}
 89  // read and cache the config files so that we only look at them once
 90  for _, filename := range kubeConfigFiles {
 91    if len(filename) == 0 {
 92      // no work to do
 93      continue
 94    }
 95
 96    config, err := LoadFromFile(filename)
 97    if os.IsNotExist(err) {
 98      // skip missing files
 99      continue
100    }
101    if err != nil {
102      errlist = append(errlist, fmt.Errorf("Error loading config file \"%s\": %v", filename, err))
103      continue
104    }
105
106    kubeconfigs = append(kubeconfigs, config)
107  }
108
109  // first merge all of our maps
110  mapConfig := clientcmdapi.NewConfig()
111
112  for _, kubeconfig := range kubeconfigs {
113    mergo.MergeWithOverwrite(mapConfig, kubeconfig)
114  }
115
116  // merge all of the struct values in the reverse order so that priority is given correctly
117  // errors are not added to the list the second time
118  nonMapConfig := clientcmdapi.NewConfig()
119  for i := len(kubeconfigs) - 1; i >= 0; i-- {
120    kubeconfig := kubeconfigs[i]
121    mergo.MergeWithOverwrite(nonMapConfig, kubeconfig)
122  }
123
124  // since values are overwritten, but maps values are not, we can merge the non-map config on top of the map config and
125  // get the values we expect.
126  // 合并配置信息
127  config := clientcmdapi.NewConfig()
128  mergo.MergeWithOverwrite(config, mapConfig)
129  mergo.MergeWithOverwrite(config, nonMapConfig)
130
131  if rules.ResolvePaths() {
132    if err := ResolveLocalPaths(config); err != nil {
133      errlist = append(errlist, err)
134    }
135  }
136  return config, utilerrors.NewAggregate(errlist)
137}

RestClient客户端

RestClient是最基础的客户端,其他三个都是基于RestClient实现的。RestClient对Http Request进行了封装,实现了Restful风格的api

RestClient使用示例

  • rest.RESTClientFor函数通过kubeconfig对象实例化RestClient对象
  • RestClient对象构建http请求参数
    • 请求方法:Get(),Post(),Delete(),Put(),Patch()
    • Namespace():设置命名空间
    • Resource():设置请求的资源名称
    • VersionedParams():设置查询选项
    • Do():执行请求
 1func restClient() {
 2  config.APIPath = "api"
 3  config.GroupVersion = &schema.GroupVersion{}
 4  config.NegotiatedSerializer = scheme.Codecs
 5  restClient, err := rest.RESTClientFor(config)
 6  if err != nil {
 7    log.Printf("init restclient error.", err)
 8    return
 9  }
10
11  result := &corev1.PodList{}
12  err = restClient.Get().
13    Namespace("default").
14    Resource("pods").
15    VersionedParams(&v1.ListOptions{Limit: 5000}, scheme.ParameterCodec).
16    Do(nil).Into(result)
17  if err != nil {
18    log.Printf("get pod error.", err)
19    return
20  }
21
22  for _, d := range result.Items {
23    log.Printf("namespace: %v \t name: %v \t status: %v \t", d.Namespace, d.Name, d.Status.Phase)
24  }
25}

Do函数实现

对go语言标准的net/http进行了封装

 1func (r *Request) Do(ctx context.Context) Result {
 2  var result Result
 3  err := r.request(ctx, func(req *http.Request, resp *http.Response) {
 4    result = r.transformResponse(resp, req)
 5  })
 6  if err != nil {
 7    return Result{err: err}
 8  }
 9  return result
10}
11
12/**
13* request方法主要调用go的http请求方法
14*/
15func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Response)) error {
16  // 采集请求延时指标
17  start := time.Now()
18  defer func() {
19    metrics.RequestLatency.Observe(r.verb, r.finalURLTemplate(), time.Since(start))
20  }()
21
22  // 开始重试请求
23  retries := 0
24  for {
25    url := r.URL().String()
26    // 调用go原生的请求接口创建请求
27    req, err := http.NewRequest(r.verb, url, r.body)
28    if err != nil {
29      return err
30    }
31    req = req.WithContext(ctx)
32    req.Header = r.headers
33
34    // 调用go原生的请求接口开始发送请求
35    resp, err := client.Do(req)
36    // 更新指标信息
37    updateURLMetrics(r, resp, err)
38
39    if err != nil {
40      // For connection errors and apiserver shutdown errors retry.
41      if net.IsConnectionReset(err) || net.IsProbableEOF(err) {
42        // 构造返回信息
43        resp = &http.Response{
44          StatusCode: http.StatusInternalServerError,
45          Header:     http.Header{"Retry-After": []string{"1"}},
46          Body:       ioutil.NopCloser(bytes.NewReader([]byte{})),
47        }
48      } else {
49        return err
50      }
51    }
52  }
53}

ClientSet客户端

  • RestClient是最基础的客户端,使用时需要指定Resource和Version等信息,编写代码前需要提前知道Resource所在的group和对应的version信息
  • ClientSet使用则更加便捷,二次开发时通常使用ClientSet
  • ClientSet在RestClient基础上封装了对Resource和Version的管理方法
  • 每个Resource是一个客户端,ClientSet是多个客户端的集合

ClientSet使用示例

  • kubernetes.newForConfig通过配置信息实例化ClientSet对象
 1func clientSet() {
 2  clientSet, err := kubernetes.NewForConfig(config)
 3  if err != nil {
 4    log.Printf("init clientset error.")
 5    return
 6  }
 7
 8  podClient := clientSet.CoreV1().Pods(corev1.NamespaceDefault)
 9  list, err := podClient.List(context.TODO(), v1.ListOptions{Limit: 500})
10  if err != nil {
11    log.Printf("pod list error.", err)
12    return
13  }
14  for _, d := range list.Items {
15    log.Printf("namespace: %v \t name: %v \t status: %v \t", d.Namespace, d.Name, d.Status.Phase)
16  }
17}

clientSet

ClientSet是一堆Client的集合

 1type Clientset struct {
 2  *discovery.DiscoveryClient
 3  ...
 4  appsV1                       *appsv1.AppsV1Client
 5  appsV1beta1                  *appsv1beta1.AppsV1beta1Client
 6  appsV1beta2                  *appsv1beta2.AppsV1beta2Client
 7  coreV1                       *corev1.CoreV1Client
 8  ...
 9}
10
11// client都实现了Interface接口
12type AppsV1Client struct {
13  restClient rest.Interface
14}
15
16// Interface接口定义了以下操作
17type Interface interface {
18  GetRateLimiter() flowcontrol.RateLimiter
19  Verb(verb string) *Request
20  Post() *Request
21  Put() *Request
22  Patch(pt types.PatchType) *Request
23  Get() *Request
24  Delete() *Request
25  APIVersion() schema.GroupVersion
26}

DynamicClient客户端

  • 一种动态客户端,可以对任意k8s资源进行restful操作,包括crd资源
  • 内部实现了Unstructured,用于处理非结构化的数据结构,这是能处理crd资源的关键
  • 处理过程会将Resource转换成Unstructured结构类型,处理完后再转换回去

DynamicClient使用示例

  • dynamic.NewForConfig通过配置信息实例化dynamicClient对象
  • dynaicClient.Resource用于设置请求的资源组、资源版本、资源名称
 1func dynamicClient() {
 2  dynamicClient, err := dynamic.NewForConfig(config)
 3  if err != nil {
 4    log.Printf("dynamic client init error.", err)
 5    return
 6  }
 7
 8  gvr := schema.GroupVersionResource{
 9    Version:  "v1",
10    Resource: "pods",
11  }
12
13  unstructObj, err := dynamicClient.Resource(gvr).
14    Namespace(corev1.NamespaceDefault).
15    List(context.TODO(), v1.ListOptions{Limit: 500})
16
17  if err != nil {
18    log.Panicf("dynamic list error.", err)
19    return
20  }
21
22  podList := &corev1.PodList{}
23  err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructObj.UnstructuredContent(), podList)
24  if err != nil {
25    log.Printf("unstruct error.", err)
26    return
27  }
28
29  for _, d := range podList.Items {
30    log.Printf("namespace: %v \t name: %v \t status: %v \t", d.Namespace, d.Name, d.Status.Phase)
31  }
32}

DiscoveryClient客户端

  • 是发现客户端,用于发现apiserver支持的资源组、资源版本、资源信息
  • kubectl的api-versions、api-resources命令输出也是通过DiscoveryClient实现的
  • 还可以将这些信息存储到本地,用于缓存。默认路径:~/.kube/cache和~/.kube/http-cache

DiscoveryClient示例代码

  • discovery.NewDiscoveryClientForConfig通过配置信息实例化discoveryClient对象
  • discoveryclient.ServerGroupsAndResources函数返回所支持的资源信息
 1func discoveryClient() {
 2  discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
 3  if err != nil {
 4    log.Printf("discovery init error. %s", err)
 5    return
 6  }
 7
 8  _, resources, err := discoveryClient.ServerGroupsAndResources()
 9  if err != nil {
10    log.Printf("group resource error. %s", err)
11  }
12
13  for _, list := range resources {
14    gv, err := schema.ParseGroupVersion(list.GroupVersion)
15    if err != nil {
16      log.Printf("parse group error. %s", err)
17      continue
18    }
19
20    for _, resources := range list.APIResources {
21      fmt.Printf("name: %v, group: %v, version: %v\n", resources.Name, gv.Group, gv.Version)
22    }
23  }
24}

获取资源核心实现

  • DiscoveryClient通过RestClient请求api
  • 将请求结果存放在APIGroupList结构体中

源码位置:vendor/k8s.io/client-go/discovery/discovery_client.go

 1func (d *DiscoveryClient) ServerGroupsAndResources() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
 2  return withRetries(defaultRetries, func() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
 3    return ServerGroupsAndResources(d)
 4  })
 5}
 6
 7func ServerGroupsAndResources(d DiscoveryInterface) ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
 8  sgs, err := d.ServerGroups()
 9  ...
10}
11
12func (d *DiscoveryClient) ServerGroups() (apiGroupList *metav1.APIGroupList, err error) {
13  // 通过 /api 接口获取APIVersions信息
14  v := &metav1.APIVersions{}
15  err = d.restClient.Get().AbsPath(d.LegacyPrefix).Do(context.TODO()).Into(v)
16  apiGroup := metav1.APIGroup{}
17  if err == nil && len(v.Versions) != 0 {
18    apiGroup = apiVersionsToAPIGroup(v)
19  }
20
21  // 通过 /apis 接口获取 groupVersions信息
22  apiGroupList = &metav1.APIGroupList{}
23  err = d.restClient.Get().AbsPath("/apis").Do(context.TODO()).Into(apiGroupList)
24  if err != nil && !errors.IsNotFound(err) && !errors.IsForbidden(err) {
25    return nil, err
26  }
27
28  // 将两者合并返回
29  if len(v.Versions) != 0 {
30    apiGroupList.Groups = append([]metav1.APIGroup{apiGroup}, apiGroupList.Groups...)
31  }
32  return apiGroupList, nil
33}

缓存核心实现

  • DiscoveryClient可以将资源相关信息存储于本地,默认存储位置为~/.kube/cache,~/.kube/http-cache
  • 默认10分钟同步一次
 1# 切换到~/.kube/cache 目录
 2(base) ➜  cache pwd
 3/Users/kinnylee/.kube/cache
 4# 查看目录结构
 5# 每一个group对应的version下都有一个serverresources.jon文件
 6(base) ➜  cache tree .
 7.
 8└── discovery
 9    └── rancher_dev.xxx.com
10        └── k8s
11            └── clusters
12                └── c_cflfm
13                    ├── admissionregistration.k8s.io
14                    │   ├── v1
15                    │   │   └── serverresources.json
16                    │   └── v1beta1
17                    │       └── serverresources.json
18                    ├── apiextensions.k8s.io
19                    │   ├── v1
20                    │   │   └── serverresources.json
21                    │   └── v1beta1
22                    │       └── serverresources.json
23                    ├── apiregistration.k8s.io
24                    │   ├── v1
25                    │   │   └── serverresources.json
26                    │   └── v1beta1
27                    │       └── serverresources.json
28                    ├── apps
29                    │   └── v1
30                    │       └── serverresources.json

源码位置:源码位置:vendor/k8s.io/client-go/discovery/cached/disk/cached_discovery.go

 1func (d *CachedDiscoveryClient) ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) {
 2  // 找到资源对应的缓存文件 ~/.kube/cache/.../group/version/serverresources.json
 3  filename := filepath.Join(d.cacheDirectory, groupVersion, "serverresources.json")
 4  // 读取文件内容
 5  cachedBytes, err := d.getCachedFile(filename)
 6  // 能获取到文件,就直接返回
 7  if err == nil {
 8    cachedResources := &metav1.APIResourceList{}
 9    if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), cachedBytes, cachedResources); err == nil {
10      klog.V(10).Infof("returning cached discovery info from %v", filename)
11      return cachedResources, nil
12    }
13  }
14  // 取不到缓存,就从服务器获取
15  liveResources, err := d.delegate.ServerResourcesForGroupVersion(groupVersion)
16
17  // 获取到的信息写入本地缓存
18  if err := d.writeCachedFile(filename, liveResources); err != nil {
19    klog.V(1).Infof("failed to write cache to %v due to %v", filename, err)
20  }
21
22  return liveResources, nil
23}