| 阅读 | 共 4341 字,阅读约
Overview
client-go源码分析(一) - 四种客户端对象
概述
- k8s使用client-go作为go语言的官方客户端编程工具,提供对api-server的交互访问
- client-go非常重要,所有k8s核心组件内部都集成了它
- 开发者要掌握k8s的二次开发,必须要掌握client-go的使用
源码结构
1├── discovery // 提供discovery发现客户端
2├── dynamic // 提供dynamic发现客户端
3├── examples
4├── informers // 每种资源的k8s实现
5├── kubernetes // 提供clientset客户端
6├── kubernetes_test
7├── listers // 为每一个k8s资源提供Lister功能,该功能对Get和List请求提供只读的缓存数据
8├── pkg
9├── plugin // 云服务商提供授权插件
10├── rest // 提供restclient客户端
11├── restmapper
12├── scale // 提供ScaleClient客户端,用于扩缩容资源对象
13├── testing
14├── third_party
15├── tools // 提供常用工具
16├── transport // 提供安全的tcp连接,用于客户端和容器直接传递二进制流。由内部spby包提供支持
17└── util // 提供常用方法
client-go的四种客户端对象
client-go支持4种客户端对象与api-server的交互,都可以通过kubeconfig配置信息连接到集群:
- RestClient:最基础的客户端,对http request进行了封装
- ClientSet:RestClient基础上封装了Resource和Version的管理方法,只能处理k8s内置资源
- DynamicClient:不仅能处理内置资源,也能处理CRD资源
- DiscoveryClient:发现客户端,用于发现apiserver支持的资源组、资源版本、资源信息
kubeconfig文件结构
kubeconfig文件通常包括三个部分:
- clusters:定义k8s集群信息,包括服务地址、证书等信息
- users:用户身份验证的客户端凭据
- contexts:用户信息和命名空间等,用于将请求发送到指定的集群
client-go读取kubeconfig文件后,生成config对象,用于与api通讯
1package main
2
3import (
4 "k8s.io/client-go/tools/clientcmd"
5 "log"
6)
7
8func main() {
9 config, err := clientcmd.BuildConfigFromFlags("", "~/.kube/config")
10 if err != nil {
11 log.Printf("init kubeconfig error.", err)
12 }
13 log.Printf(config.Host)
14}
加载kubeconfig源码分析
- BuildConfigFromFlags
- ClientConfig
- createClientConfig
- Load
源码位置:k8s.io/client-go/tools/clientcmd/client-config.go
1func BuildConfigFromFlags(masterUrl, kubeconfigPath string) (*restclient.Config, error) {
2 ...
3 return NewNonInteractiveDeferredLoadingClientConfig(
4 &ClientConfigLoadingRules{ExplicitPath: kubeconfigPath},
5 &ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: masterUrl}}).ClientConfig()
6}
7
8func (config *DeferredLoadingClientConfig) ClientConfig() (*restclient.Config, error) {
9 mergedClientConfig, err := config.createClientConfig()
10 if err != nil {
11 return nil, err
12 }
13
14 // load the configuration and return on non-empty errors and if the
15 // content differs from the default config
16 mergedConfig, err := mergedClientConfig.ClientConfig()
17 switch {
18 case err != nil:
19 if !IsEmptyConfig(err) {
20 // return on any error except empty config
21 return nil, err
22 }
23 case mergedConfig != nil:
24 // the configuration is valid, but if this is equal to the defaults we should try
25 // in-cluster configuration
26 if !config.loader.IsDefaultConfig(mergedConfig) {
27 return mergedConfig, nil
28 }
29 }
30
31 // check for in-cluster configuration and use it
32 if config.icc.Possible() {
33 klog.V(4).Infof("Using in-cluster configuration")
34 return config.icc.ClientConfig()
35 }
36
37 // 返回合并后的客户端配置
38 return mergedConfig, err
39}
40
41func (config *DeferredLoadingClientConfig) createClientConfig() (ClientConfig, error) {
42 if config.clientConfig == nil {
43 config.loadingLock.Lock()
44 defer config.loadingLock.Unlock()
45
46 if config.clientConfig == nil {
47 // 加载配置
48 mergedConfig, err := config.loader.Load()
49 if err != nil {
50 return nil, err
51 }
52
53 var mergedClientConfig ClientConfig
54 if config.fallbackReader != nil {
55 mergedClientConfig = NewInteractiveClientConfig(*mergedConfig, config.overrides.CurrentContext, config.overrides, config.fallbackReader, config.loader)
56 } else {
57 mergedClientConfig = NewNonInteractiveClientConfig(*mergedConfig, config.overrides.CurrentContext, config.overrides, config.loader)
58 }
59
60 config.clientConfig = mergedClientConfig
61 }
62 }
63
64 return config.clientConfig, nil
65}
66
67func (rules *ClientConfigLoadingRules) Load() (*clientcmdapi.Config, error) {
68 if err := rules.Migrate(); err != nil {
69 return nil, err
70 }
71
72 errlist := []error{}
73
74 kubeConfigFiles := []string{}
75
76 // Make sure a file we were explicitly told to use exists
77 // 支持两种方式获取配置信息路径
78 if len(rules.ExplicitPath) > 0 {
79 if _, err := os.Stat(rules.ExplicitPath); os.IsNotExist(err) {
80 return nil, err
81 }
82 kubeConfigFiles = append(kubeConfigFiles, rules.ExplicitPath)
83
84 } else {
85 kubeConfigFiles = append(kubeConfigFiles, rules.Precedence...)
86 }
87
88 kubeconfigs := []*clientcmdapi.Config{}
89 // read and cache the config files so that we only look at them once
90 for _, filename := range kubeConfigFiles {
91 if len(filename) == 0 {
92 // no work to do
93 continue
94 }
95
96 config, err := LoadFromFile(filename)
97 if os.IsNotExist(err) {
98 // skip missing files
99 continue
100 }
101 if err != nil {
102 errlist = append(errlist, fmt.Errorf("Error loading config file \"%s\": %v", filename, err))
103 continue
104 }
105
106 kubeconfigs = append(kubeconfigs, config)
107 }
108
109 // first merge all of our maps
110 mapConfig := clientcmdapi.NewConfig()
111
112 for _, kubeconfig := range kubeconfigs {
113 mergo.MergeWithOverwrite(mapConfig, kubeconfig)
114 }
115
116 // merge all of the struct values in the reverse order so that priority is given correctly
117 // errors are not added to the list the second time
118 nonMapConfig := clientcmdapi.NewConfig()
119 for i := len(kubeconfigs) - 1; i >= 0; i-- {
120 kubeconfig := kubeconfigs[i]
121 mergo.MergeWithOverwrite(nonMapConfig, kubeconfig)
122 }
123
124 // since values are overwritten, but maps values are not, we can merge the non-map config on top of the map config and
125 // get the values we expect.
126 // 合并配置信息
127 config := clientcmdapi.NewConfig()
128 mergo.MergeWithOverwrite(config, mapConfig)
129 mergo.MergeWithOverwrite(config, nonMapConfig)
130
131 if rules.ResolvePaths() {
132 if err := ResolveLocalPaths(config); err != nil {
133 errlist = append(errlist, err)
134 }
135 }
136 return config, utilerrors.NewAggregate(errlist)
137}
RestClient客户端
RestClient是最基础的客户端,其他三个都是基于RestClient实现的。RestClient对Http Request进行了封装,实现了Restful风格的api
RestClient使用示例
- rest.RESTClientFor函数通过kubeconfig对象实例化RestClient对象
- RestClient对象构建http请求参数
- 请求方法:Get(),Post(),Delete(),Put(),Patch()
- Namespace():设置命名空间
- Resource():设置请求的资源名称
- VersionedParams():设置查询选项
- Do():执行请求
1func restClient() {
2 config.APIPath = "api"
3 config.GroupVersion = &schema.GroupVersion{}
4 config.NegotiatedSerializer = scheme.Codecs
5 restClient, err := rest.RESTClientFor(config)
6 if err != nil {
7 log.Printf("init restclient error.", err)
8 return
9 }
10
11 result := &corev1.PodList{}
12 err = restClient.Get().
13 Namespace("default").
14 Resource("pods").
15 VersionedParams(&v1.ListOptions{Limit: 5000}, scheme.ParameterCodec).
16 Do(nil).Into(result)
17 if err != nil {
18 log.Printf("get pod error.", err)
19 return
20 }
21
22 for _, d := range result.Items {
23 log.Printf("namespace: %v \t name: %v \t status: %v \t", d.Namespace, d.Name, d.Status.Phase)
24 }
25}
Do函数实现
对go语言标准的net/http进行了封装
1func (r *Request) Do(ctx context.Context) Result {
2 var result Result
3 err := r.request(ctx, func(req *http.Request, resp *http.Response) {
4 result = r.transformResponse(resp, req)
5 })
6 if err != nil {
7 return Result{err: err}
8 }
9 return result
10}
11
12/**
13* request方法主要调用go的http请求方法
14*/
15func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Response)) error {
16 // 采集请求延时指标
17 start := time.Now()
18 defer func() {
19 metrics.RequestLatency.Observe(r.verb, r.finalURLTemplate(), time.Since(start))
20 }()
21
22 // 开始重试请求
23 retries := 0
24 for {
25 url := r.URL().String()
26 // 调用go原生的请求接口创建请求
27 req, err := http.NewRequest(r.verb, url, r.body)
28 if err != nil {
29 return err
30 }
31 req = req.WithContext(ctx)
32 req.Header = r.headers
33
34 // 调用go原生的请求接口开始发送请求
35 resp, err := client.Do(req)
36 // 更新指标信息
37 updateURLMetrics(r, resp, err)
38
39 if err != nil {
40 // For connection errors and apiserver shutdown errors retry.
41 if net.IsConnectionReset(err) || net.IsProbableEOF(err) {
42 // 构造返回信息
43 resp = &http.Response{
44 StatusCode: http.StatusInternalServerError,
45 Header: http.Header{"Retry-After": []string{"1"}},
46 Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
47 }
48 } else {
49 return err
50 }
51 }
52 }
53}
ClientSet客户端
- RestClient是最基础的客户端,使用时需要指定Resource和Version等信息,编写代码前需要提前知道Resource所在的group和对应的version信息
- ClientSet使用则更加便捷,二次开发时通常使用ClientSet
- ClientSet在RestClient基础上封装了对Resource和Version的管理方法
- 每个Resource是一个客户端,ClientSet是多个客户端的集合
ClientSet使用示例
- kubernetes.newForConfig通过配置信息实例化ClientSet对象
1func clientSet() {
2 clientSet, err := kubernetes.NewForConfig(config)
3 if err != nil {
4 log.Printf("init clientset error.")
5 return
6 }
7
8 podClient := clientSet.CoreV1().Pods(corev1.NamespaceDefault)
9 list, err := podClient.List(context.TODO(), v1.ListOptions{Limit: 500})
10 if err != nil {
11 log.Printf("pod list error.", err)
12 return
13 }
14 for _, d := range list.Items {
15 log.Printf("namespace: %v \t name: %v \t status: %v \t", d.Namespace, d.Name, d.Status.Phase)
16 }
17}
clientSet
ClientSet是一堆Client的集合
1type Clientset struct {
2 *discovery.DiscoveryClient
3 ...
4 appsV1 *appsv1.AppsV1Client
5 appsV1beta1 *appsv1beta1.AppsV1beta1Client
6 appsV1beta2 *appsv1beta2.AppsV1beta2Client
7 coreV1 *corev1.CoreV1Client
8 ...
9}
10
11// client都实现了Interface接口
12type AppsV1Client struct {
13 restClient rest.Interface
14}
15
16// Interface接口定义了以下操作
17type Interface interface {
18 GetRateLimiter() flowcontrol.RateLimiter
19 Verb(verb string) *Request
20 Post() *Request
21 Put() *Request
22 Patch(pt types.PatchType) *Request
23 Get() *Request
24 Delete() *Request
25 APIVersion() schema.GroupVersion
26}
DynamicClient客户端
- 一种动态客户端,可以对任意k8s资源进行restful操作,包括crd资源
- 内部实现了Unstructured,用于处理非结构化的数据结构,这是能处理crd资源的关键
- 处理过程会将Resource转换成Unstructured结构类型,处理完后再转换回去
DynamicClient使用示例
- dynamic.NewForConfig通过配置信息实例化dynamicClient对象
- dynaicClient.Resource用于设置请求的资源组、资源版本、资源名称
1func dynamicClient() {
2 dynamicClient, err := dynamic.NewForConfig(config)
3 if err != nil {
4 log.Printf("dynamic client init error.", err)
5 return
6 }
7
8 gvr := schema.GroupVersionResource{
9 Version: "v1",
10 Resource: "pods",
11 }
12
13 unstructObj, err := dynamicClient.Resource(gvr).
14 Namespace(corev1.NamespaceDefault).
15 List(context.TODO(), v1.ListOptions{Limit: 500})
16
17 if err != nil {
18 log.Panicf("dynamic list error.", err)
19 return
20 }
21
22 podList := &corev1.PodList{}
23 err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructObj.UnstructuredContent(), podList)
24 if err != nil {
25 log.Printf("unstruct error.", err)
26 return
27 }
28
29 for _, d := range podList.Items {
30 log.Printf("namespace: %v \t name: %v \t status: %v \t", d.Namespace, d.Name, d.Status.Phase)
31 }
32}
DiscoveryClient客户端
- 是发现客户端,用于发现apiserver支持的资源组、资源版本、资源信息
- kubectl的api-versions、api-resources命令输出也是通过DiscoveryClient实现的
- 还可以将这些信息存储到本地,用于缓存。默认路径:~/.kube/cache和~/.kube/http-cache
DiscoveryClient示例代码
- discovery.NewDiscoveryClientForConfig通过配置信息实例化discoveryClient对象
- discoveryclient.ServerGroupsAndResources函数返回所支持的资源信息
1func discoveryClient() {
2 discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
3 if err != nil {
4 log.Printf("discovery init error. %s", err)
5 return
6 }
7
8 _, resources, err := discoveryClient.ServerGroupsAndResources()
9 if err != nil {
10 log.Printf("group resource error. %s", err)
11 }
12
13 for _, list := range resources {
14 gv, err := schema.ParseGroupVersion(list.GroupVersion)
15 if err != nil {
16 log.Printf("parse group error. %s", err)
17 continue
18 }
19
20 for _, resources := range list.APIResources {
21 fmt.Printf("name: %v, group: %v, version: %v\n", resources.Name, gv.Group, gv.Version)
22 }
23 }
24}
获取资源核心实现
- DiscoveryClient通过RestClient请求api
- 将请求结果存放在APIGroupList结构体中
源码位置:vendor/k8s.io/client-go/discovery/discovery_client.go
1func (d *DiscoveryClient) ServerGroupsAndResources() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
2 return withRetries(defaultRetries, func() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
3 return ServerGroupsAndResources(d)
4 })
5}
6
7func ServerGroupsAndResources(d DiscoveryInterface) ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
8 sgs, err := d.ServerGroups()
9 ...
10}
11
12func (d *DiscoveryClient) ServerGroups() (apiGroupList *metav1.APIGroupList, err error) {
13 // 通过 /api 接口获取APIVersions信息
14 v := &metav1.APIVersions{}
15 err = d.restClient.Get().AbsPath(d.LegacyPrefix).Do(context.TODO()).Into(v)
16 apiGroup := metav1.APIGroup{}
17 if err == nil && len(v.Versions) != 0 {
18 apiGroup = apiVersionsToAPIGroup(v)
19 }
20
21 // 通过 /apis 接口获取 groupVersions信息
22 apiGroupList = &metav1.APIGroupList{}
23 err = d.restClient.Get().AbsPath("/apis").Do(context.TODO()).Into(apiGroupList)
24 if err != nil && !errors.IsNotFound(err) && !errors.IsForbidden(err) {
25 return nil, err
26 }
27
28 // 将两者合并返回
29 if len(v.Versions) != 0 {
30 apiGroupList.Groups = append([]metav1.APIGroup{apiGroup}, apiGroupList.Groups...)
31 }
32 return apiGroupList, nil
33}
缓存核心实现
- DiscoveryClient可以将资源相关信息存储于本地,默认存储位置为~/.kube/cache,~/.kube/http-cache
- 默认10分钟同步一次
1# 切换到~/.kube/cache 目录
2(base) ➜ cache pwd
3/Users/kinnylee/.kube/cache
4# 查看目录结构
5# 每一个group对应的version下都有一个serverresources.jon文件
6(base) ➜ cache tree .
7.
8└── discovery
9 └── rancher_dev.xxx.com
10 └── k8s
11 └── clusters
12 └── c_cflfm
13 ├── admissionregistration.k8s.io
14 │ ├── v1
15 │ │ └── serverresources.json
16 │ └── v1beta1
17 │ └── serverresources.json
18 ├── apiextensions.k8s.io
19 │ ├── v1
20 │ │ └── serverresources.json
21 │ └── v1beta1
22 │ └── serverresources.json
23 ├── apiregistration.k8s.io
24 │ ├── v1
25 │ │ └── serverresources.json
26 │ └── v1beta1
27 │ └── serverresources.json
28 ├── apps
29 │ └── v1
30 │ └── serverresources.json
源码位置:源码位置:vendor/k8s.io/client-go/discovery/cached/disk/cached_discovery.go
1func (d *CachedDiscoveryClient) ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) {
2 // 找到资源对应的缓存文件 ~/.kube/cache/.../group/version/serverresources.json
3 filename := filepath.Join(d.cacheDirectory, groupVersion, "serverresources.json")
4 // 读取文件内容
5 cachedBytes, err := d.getCachedFile(filename)
6 // 能获取到文件,就直接返回
7 if err == nil {
8 cachedResources := &metav1.APIResourceList{}
9 if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), cachedBytes, cachedResources); err == nil {
10 klog.V(10).Infof("returning cached discovery info from %v", filename)
11 return cachedResources, nil
12 }
13 }
14 // 取不到缓存,就从服务器获取
15 liveResources, err := d.delegate.ServerResourcesForGroupVersion(groupVersion)
16
17 // 获取到的信息写入本地缓存
18 if err := d.writeCachedFile(filename, liveResources); err != nil {
19 klog.V(1).Infof("failed to write cache to %v due to %v", filename, err)
20 }
21
22 return liveResources, nil
23}