k8s源码分析- kube-apiserver


| 阅读 |,阅读约 20 分钟
| 复制链接:

Overview

前言

最近加入云原生社区组织的k8s源码研习社,开始学习k8s底层源码,并整理成笔记。欢迎感兴趣的同学一起加入,共同学习进步。群里和社区里有各种大佬,随时可以帮你答疑解惑。https://github.com/cloudnativeto/sig-k8s-source-code

上一篇整理了client-go框架的Informer机制,informer源码分析, 同时api-server用到了go-restful这个web框架,go-restful的原理和源码参考go-restful 源码分析

先放一张kube-apiserver代码调用关系图

高清地址

api-server

概述

kube-apiserver作为k8s最核心的组件,是各个组件之间沟通的桥梁,各个组件不会直接通信,而是都要经过api-server做中转。详见之前的另一篇博客,本文从源码角度分析api-server

kube-apiserver主要职责

  • 提供整个集群的api接口管理,提供api注册、发现 — 通过go-restful框架实现
  • 资源操作的唯一入口 — 操作etcd资源
  • 集群内部各个组件的枢纽
  • 提供请求认证、授权、访问控制等安全控制

三大服务

kube-apiserver提供了3种http server服务,用于将庞大的kube-apiserver组件功能进行解耦,这三种Http Server分别是:

服务名 | 概述 | 对象管理 | 资源注册表

  • | - | - | - | KubeAPIServer | 核心服务,提供k8s内置核心资源服务,不允许开发者随意修改,如:Pod,Service等 | Master | Legacyscheme.Scheme APIExtensionsServer | API扩展服务,该服务提供了CRD自定义资源服务 | CustomResourceDefinitions | extensionsapiserver.Scheme AggregatorServer | API聚合服务,提供了聚合服务 | APIAggregator | aggregatorscheme.Scheme

三种服务底层都依赖GenericAPIServer,通过GenericAPIServer可以将k8s资源与rest api进行映射

kube-apiserver的启动流程概述

kube-apiserver是所有资源控制的入口,启动流程也略复杂,启动的代码逻辑可以分为9个步骤:

  • 资源注册
  • Cobra命令行参数解析
  • 创建apiserver通用配置
  • 创建APIExtensionsServer
  • 创建KubeAPIServer
  • 创建AggregatorServer
  • 创建GenericAPIServer
  • 启动http服务
  • 启动https服务

其中,三个sever是通过委托模式连接在一起的,初始化的过程都是类似的,包括:

  • 首先为每个server创建对应的config
  • 然后初始化http server,具体包括:
    • 初始化GoRestfulContainer
    • 安装server所包含的api,细节有:
      • 为每个api-resource创建对应的后端存储RESTStorage
      • 为每个api-resource所支持的verbs添加对应的handler
      • 将handler注册到router中
      • 将router注册到webservice

0. 入口函数

kube-apiserver组件是一个单独的进程,启动的入口函数如下:

源码位置:cmd/kube-apiserver/apiserver.go

 1import (
 2  ...
 3  // 引入legacyscheme,内部的init方法实现资源注册表的注册
 4  "k8s.io/kubernetes/pkg/api/legacyscheme"
 5  // 引入master,内部的init方法实现k8s所有资源的注册
 6  "k8s.io/kubernetes/pkg/master"
 7  ...
 8)
 9func main() {
10  rand.Seed(time.Now().UnixNano())
11
12  // 创建一个带有默认参数的Cobra Command对象
13  command := app.NewAPIServerCommand()
14
15  logs.InitLogs()
16  defer logs.FlushLogs()
17
18  if err := command.Execute(); err != nil {
19    os.Exit(1)
20  }
21}

1. 资源注册

kube-apiserver组件启动后的第一件事情是将k8s所支持的资源注册到Scheme资源注册表中,这样后面的启动逻辑才能拿到资源信息,并启动和运行前面介绍的三个服务

资源的注册过程不是函数调用触发的,而是通过import和init机制触发的。前面第0步提到过的import "k8s.io/kubernetes/pkg/api/legacyscheme"

资源注册包括两步:

  • 初始化Scheme资源注册表
  • 注册k8s所支持的资源

初始化Scheme资源注册表

代码路径:pkg/api/legacyscheme/scheme.go

定义了三个全局变量服务于kube-apiserver,组件在任何地方都可以调用

 1package legacyscheme
 2
 3import (
 4  "k8s.io/apimachinery/pkg/runtime"
 5  "k8s.io/apimachinery/pkg/runtime/serializer"
 6)
 7
 8// Scheme资源注册表
 9var Scheme = runtime.NewScheme()
10// Codec编解码器
11var Codecs = serializer.NewCodecFactory(Scheme)
12// 参数编解码器
13var ParameterCodec = runtime.NewParameterCodec(Scheme)

注册k8s所支持的资源

apiserver启动时导入了master包,前面第0步介绍的import "k8s.io/kubernetes/pkg/master"

master包中的import_known_versions.go调用了k8s资源下的install包,通过导入包触发初始化函数。 每种资源下都定义install包,被引用时触发init函数完成资源注册过程

源码位置:pkg/master/import_known_versions.go

 1import (
 2  // These imports are the API groups the API server will support.
 3  _ "k8s.io/kubernetes/pkg/apis/admission/install"
 4  _ "k8s.io/kubernetes/pkg/apis/admissionregistration/install"
 5  _ "k8s.io/kubernetes/pkg/apis/apps/install"
 6  _ "k8s.io/kubernetes/pkg/apis/authentication/install"
 7  _ "k8s.io/kubernetes/pkg/apis/authorization/install"
 8  _ "k8s.io/kubernetes/pkg/apis/autoscaling/install"
 9  _ "k8s.io/kubernetes/pkg/apis/batch/install"
10  _ "k8s.io/kubernetes/pkg/apis/certificates/install"
11  _ "k8s.io/kubernetes/pkg/apis/coordination/install"
12  _ "k8s.io/kubernetes/pkg/apis/core/install"
13  _ "k8s.io/kubernetes/pkg/apis/discovery/install"
14  _ "k8s.io/kubernetes/pkg/apis/events/install"
15  _ "k8s.io/kubernetes/pkg/apis/extensions/install"
16  _ "k8s.io/kubernetes/pkg/apis/flowcontrol/install"
17  _ "k8s.io/kubernetes/pkg/apis/imagepolicy/install"
18  _ "k8s.io/kubernetes/pkg/apis/networking/install"
19  _ "k8s.io/kubernetes/pkg/apis/node/install"
20  _ "k8s.io/kubernetes/pkg/apis/policy/install"
21  _ "k8s.io/kubernetes/pkg/apis/rbac/install"
22  _ "k8s.io/kubernetes/pkg/apis/scheduling/install"
23  _ "k8s.io/kubernetes/pkg/apis/settings/install"
24  _ "k8s.io/kubernetes/pkg/apis/storage/install"
25)

以Core资源为例,查看install。 源码位置:pkg/apis/core/install/install.go

 1func init() {
 2  // legacyscheme.Scheme是前面介绍的全局资源注册中心
 3  Install(legacyscheme.Scheme)
 4}
 5
 6// Install registers the API group and adds types to a scheme
 7func Install(scheme *runtime.Scheme) {
 8  // 注册资源组
 9  utilruntime.Must(core.AddToScheme(scheme))
10  utilruntime.Must(v1.AddToScheme(scheme))
11  // 注册版本顺序
12  utilruntime.Must(scheme.SetVersionPriority(v1.SchemeGroupVersion))
13}

2. Cobra命令行参数解析

k8s中所有组件统一使用cobra来解析命令行参数。kube-apiserver组件通过Cobra填充配置参数默认值并验证参数,前面第0步介绍的app.NewAPIServerCommand()

源码位置:cmd/kube-apiserver/app/server.go

 1func NewAPIServerCommand() *cobra.Command {
 2  // 初始化各个模块的默认配置,内部调用了各个模块各自的默认配置
 3  s := options.NewServerRunOptions()
 4  cmd := &cobra.Command{
 5    ...
 6    RunE: func(cmd *cobra.Command, args []string) error {
 7      ...
 8      // 设置默认参数配置
 9      completedOptions, err := Complete(s)
10      // 验证参数合法性
11      if errs := completedOptions.Validate(); len(errs) != 0 {
12        return utilerrors.NewAggregate(errs)
13      }
14      // 启动运行,常驻进程
15      // Run函数后面专门介绍
16      return Run(completedOptions, genericapiserver.SetupSignalHandler())
17    },
18    ...
19  }
20  ...
21}
22
23// genericapiserver.SetupSignalHandler()
24func SetupSignalHandler() <-chan struct{} {
25  return SetupSignalContext().Done()
26}
27
28func SetupSignalContext() context.Context {
29  ...
30  // 监听操作系统信号os.Interrupt和syscall.SIGTERM
31  // 并将监听的信号与stopChan绑定,确保进程终止时,groutine优雅退出
32  signal.Notify(shutdownHandler, shutdownSignals...)
33  ...
34}

3. 创建服务链

核心流程是前面介绍的kube-apiserver组件中三大服务的配置和创建,具体包括:

  • 创建kubeapi-server通用配置
  • 创建kubeapi-extension-server配置
  • 创建kubeapi-extension服务
  • 创建kubeapi-server服务
  • 创建aggregator-server配置
  • 创建aggregator-server服务
  • 启动服务

apiserver通用配置是kube-apiserver不同模块实例化所需的配置,具体包括:

  • genericConfig实例化
  • OpenAPI、Swagger配置
  • StorageFactory(Etcd)配置
  • Authentication认证配置
  • Authorization授权配置
  • Admission准控制器配置
 1func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
 2  // 创建服务链
 3  server, err := CreateServerChain(completeOptions, stopCh)
 4  if err != nil {
 5    return err
 6  }
 7  // 预运行
 8  prepared, err := server.PrepareRun()
 9  if err != nil {
10    return err
11  }
12  // 正式运行
13  return prepared.Run(stopCh)
14}
15
16// 创建服务链
17func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error) {
18  ...
19  // 创建 kubeapi-server 配置
20  kubeAPIServerConfig, insecureServingInfo, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport)
21  if err != nil {
22    return nil, err
23  }
24
25  // 创建 kubeapi-extension-server 配置
26  apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
27    serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig))
28
29  // 创建 kubeapi-extension-server 服务
30  apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())
31
32  // 创建 kubeapi-server 服务
33  kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)
34
35  // 创建 aggregator-server 配置
36  aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, proxyTransport, pluginInitializer)
37
38  // 创建 aggregator-server 服务
39  aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
40
41  if insecureServingInfo != nil {
42    insecureHandlerChain := kubeserver.BuildInsecureHandlerChain(aggregatorServer.GenericAPIServer.UnprotectedHandler(), kubeAPIServerConfig.GenericConfig)
43    // 启动服务
44    if err := insecureServingInfo.Serve(insecureHandlerChain, kubeAPIServerConfig.GenericConfig.RequestTimeout, stopCh); err != nil {
45      return nil, err
46    }
47  }
48  return aggregatorServer, nil
49}

3.1 创建kubeapi-server通用配置

 1func CreateKubeAPIServerConfig(...) (...) {
 2  // 构建通用配置
 3  genericConfig, versionedInformers, insecureServingInfo, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, err := buildGenericConfig(s.ServerRunOptions, proxyTransport)
 4  ...
 5  // 设置端口范围
 6  serviceIPRange, apiServerServiceIP, err := master.ServiceIPRange(s.PrimaryServiceClusterIPRange)
 7  ...
 8  // 构造master.Config
 9  config := &master.Config{
10    GenericConfig: genericConfig,
11    ExtraConfig: master.ExtraConfig{
12      APIResourceConfigSource: storageFactory.APIResourceConfigSource,
13      ...
14    },
15  }
16  ...
17}
3.1.1 buildGenericConfig
  1func buildGenericConfig(...) (...) {
  2  genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs)
  3  // 配置启动、禁用GV
  4  genericConfig.MergedResourceConfig = master.DefaultAPIResourceConfigSource()
  5  ...
  6  // openapi/swagger配置
  7  // OpenAPIConfig用于生成OpenAPI规范
  8  genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensionsapiserver.Scheme, aggregatorscheme.Scheme))
  9  genericConfig.OpenAPIConfig.Info.Title = "Kubernetes"
 10  genericConfig.LongRunningFunc = filters.BasicLongRunningRequestCheck(
 11    sets.NewString("watch", "proxy"),
 12    sets.NewString("attach", "exec", "proxy", "log", "portforward"),
 13  )
 14
 15  kubeVersion := version.Get()
 16  genericConfig.Version = &kubeVersion
 17  // etcd配置
 18  // storageFactoryConfig对象定义了kube-apiserver与etcd的交互方式,如:etcd认证、地址、存储前缀等
 19  // 该对象也定义了资源存储方式,如:资源信息、资源编码信息、资源状态等
 20  storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig()
 21  storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig
 22  completedStorageFactoryConfig, err := storageFactoryConfig.Complete(s.Etcd)
 23  if err != nil {
 24    lastErr = err
 25    return
 26  }
 27  storageFactory, lastErr = completedStorageFactoryConfig.New()
 28  if lastErr != nil {
 29    return
 30  }
 31  if genericConfig.EgressSelector != nil {
 32    storageFactory.StorageConfig.Transport.EgressLookup = genericConfig.EgressSelector.Lookup
 33  }
 34  if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil {
 35    return
 36  }
 37
 38  // NewSharedInformerFactory初始化
 39  versionedInformers = clientgoinformers.NewSharedInformerFactory(clientgoExternalClient, 10*time.Minute)
 40
 41  // 认证配置
 42  // 内部调用 authenticatorConfig.New()
 43  // k8s提供9种认证机制,每种认证机制被实例化后都成为认证器
 44  if lastErr = s.Authentication.ApplyTo(&genericConfig.Authentication, genericConfig.SecureServing, genericConfig.EgressSelector, genericConfig.OpenAPIConfig, clientgoExternalClient, versionedInformers); lastErr != nil {
 45    return
 46  }
 47
 48  // 授权配置
 49  // k8e提供6种授权机制,每种授权机制被实例化后都成为授权器
 50  genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, genericConfig.EgressSelector, versionedInformers)
 51
 52  // 准入器admission配置
 53  // k8s资源在认证和授权通过,被持久化到etcd之前进入准入控制逻辑
 54  // 准入控制包括:对请求的资源进行自定义操作(校验、修改、拒绝)
 55  // k8s支持31种准入控制
 56  // 准入控制器通过Plugins数据结构统一注册、存放、管理
 57  admissionConfig := &kubeapiserveradmission.Config{
 58    ExternalInformers:    versionedInformers,
 59    LoopbackClientConfig: genericConfig.LoopbackClientConfig,
 60    CloudConfigFile:      s.CloudProvider.CloudConfigFile,
 61  }
 62  serviceResolver = buildServiceResolver(s.EnableAggregatorRouting, genericConfig.LoopbackClientConfig.Host, versionedInformers)
 63  pluginInitializers, admissionPostStartHook, err = admissionConfig.New(proxyTransport, genericConfig.EgressSelector, serviceResolver)
 64  if err != nil {
 65    lastErr = fmt.Errorf("failed to create admission plugin initializer: %v", err)
 66    return
 67  }
 68
 69  err = s.Admission.ApplyTo(
 70    genericConfig,
 71    versionedInformers,
 72    kubeClientConfig,
 73    feature.DefaultFeatureGate,
 74    pluginInitializers...)
 75  if err != nil {
 76    lastErr = fmt.Errorf("failed to initialize admission: %v", err)
 77  }
 78
 79  if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIPriorityAndFairness) && s.GenericServerRunOptions.EnablePriorityAndFairness {
 80    genericConfig.FlowControl = BuildPriorityAndFairness(s, clientgoExternalClient, versionedInformers)
 81  }
 82
 83  return
 84}
 85
 86// 认证初始化
 87func (o *BuiltInAuthenticationOptions) ApplyTo(...) error {
 88  ...
 89  authInfo.Authenticator, openAPIConfig.SecurityDefinitions, err = authenticatorConfig.New()
 90  ...
 91}
 92
 93// 根据配置决定9种认证器的初始化
 94func (config Config) New() (authenticator.Request, *spec.SecurityDefinitions, error) {
 95  // 定义认证器列表
 96  var authenticators []authenticator.Request
 97
 98  // 下面根据不同的开关,决定是否配置某种认证器
 99
100  // RequestHeader认证器
101  if config.RequestHeaderConfig != nil {
102    requestHeaderAuthenticator := headerrequest.NewDynamicVerifyOptionsSecure(
103      config.RequestHeaderConfig.CAContentProvider.VerifyOptions,
104      config.RequestHeaderConfig.AllowedClientNames,
105      config.RequestHeaderConfig.UsernameHeaders,
106      config.RequestHeaderConfig.GroupHeaders,
107      config.RequestHeaderConfig.ExtraHeaderPrefixes,
108    )
109    authenticators = append(authenticators, authenticator.WrapAudienceAgnosticRequest(config.APIAudiences, requestHeaderAuthenticator))
110  }
111  // X509 methods
112  if config.ClientCAContentProvider != nil {
113    certAuth := x509.NewDynamic(config.ClientCAContentProvider.VerifyOptions, x509.CommonNameUserConversion)
114    authenticators = append(authenticators, certAuth)
115  }
116
117  // TokenAuth认证器
118  if len(config.TokenAuthFile) > 0 {
119    tokenAuth, err := newAuthenticatorFromTokenFile(config.TokenAuthFile)
120    if err != nil {
121      return nil, nil, err
122    }
123    tokenAuthenticators = append(tokenAuthenticators, authenticator.WrapAudienceAgnosticToken(config.APIAudiences, tokenAuth))
124  }
125  // ServiceAccountAuth认证器
126  if len(config.ServiceAccountKeyFiles) > 0 {
127    serviceAccountAuth, err := newLegacyServiceAccountAuthenticator(config.ServiceAccountKeyFiles, config.ServiceAccountLookup, config.APIAudiences, config.ServiceAccountTokenGetter)
128    if err != nil {
129      return nil, nil, err
130    }
131    tokenAuthenticators = append(tokenAuthenticators, serviceAccountAuth)
132  }
133  if utilfeature.DefaultFeatureGate.Enabled(features.TokenRequest) && config.ServiceAccountIssuer != "" {
134    serviceAccountAuth, err := newServiceAccountAuthenticator(config.ServiceAccountIssuer, config.ServiceAccountKeyFiles, config.APIAudiences, config.ServiceAccountTokenGetter)
135    if err != nil {
136      return nil, nil, err
137    }
138    tokenAuthenticators = append(tokenAuthenticators, serviceAccountAuth)
139  }
140  // BootstrapToken认证器
141  if config.BootstrapToken {
142    if config.BootstrapTokenAuthenticator != nil {
143      // TODO: This can sometimes be nil because of
144      tokenAuthenticators = append(tokenAuthenticators, authenticator.WrapAudienceAgnosticToken(config.APIAudiences, config.BootstrapTokenAuthenticator))
145    }
146  }
147  // NOTE(ericchiang): Keep the OpenID Connect after Service Accounts.
148  //
149  // Because both plugins verify JWTs whichever comes first in the union experiences
150  // cache misses for all requests using the other. While the service account plugin
151  // simply returns an error, the OpenID Connect plugin may query the provider to
152  // update the keys, causing performance hits.
153  if len(config.OIDCIssuerURL) > 0 && len(config.OIDCClientID) > 0 {
154    oidcAuth, err := newAuthenticatorFromOIDCIssuerURL(oidc.Options{
155      IssuerURL:            config.OIDCIssuerURL,
156      ClientID:             config.OIDCClientID,
157      CAFile:               config.OIDCCAFile,
158      UsernameClaim:        config.OIDCUsernameClaim,
159      UsernamePrefix:       config.OIDCUsernamePrefix,
160      GroupsClaim:          config.OIDCGroupsClaim,
161      GroupsPrefix:         config.OIDCGroupsPrefix,
162      SupportedSigningAlgs: config.OIDCSigningAlgs,
163      RequiredClaims:       config.OIDCRequiredClaims,
164    })
165    if err != nil {
166      return nil, nil, err
167    }
168    tokenAuthenticators = append(tokenAuthenticators, authenticator.WrapAudienceAgnosticToken(config.APIAudiences, oidcAuth))
169  }
170  // WebhookTokenAuth认证器
171  if len(config.WebhookTokenAuthnConfigFile) > 0 {
172    webhookTokenAuth, err := newWebhookTokenAuthenticator(config)
173    if err != nil {
174      return nil, nil, err
175    }
176
177    tokenAuthenticators = append(tokenAuthenticators, webhookTokenAuth)
178  }
179
180  if len(tokenAuthenticators) > 0 {
181    // Union the token authenticators
182    tokenAuth := tokenunion.New(tokenAuthenticators...)
183    // Optionally cache authentication results
184    if config.TokenSuccessCacheTTL > 0 || config.TokenFailureCacheTTL > 0 {
185      tokenAuth = tokencache.New(tokenAuth, true, config.TokenSuccessCacheTTL, config.TokenFailureCacheTTL)
186    }
187    authenticators = append(authenticators, bearertoken.New(tokenAuth), websocket.NewProtocolAuthenticator(tokenAuth))
188    securityDefinitions["BearerToken"] = &spec.SecurityScheme{
189      SecuritySchemeProps: spec.SecuritySchemeProps{
190        Type:        "apiKey",
191        Name:        "authorization",
192        In:          "header",
193        Description: "Bearer Token authentication",
194      },
195    }
196  }
197  // 匿名认证器
198  if len(authenticators) == 0 {
199    if config.Anonymous {
200      return anonymous.NewAuthenticator(), &securityDefinitions, nil
201    }
202    return nil, &securityDefinitions, nil
203  }
204  ...
205  // 将多个认证器合并
206  authenticator := union.New(authenticators...)
207  ...
208}
209
210// 授权初始化
211func BuildAuthorizer(s *options.ServerRunOptions, EgressSelector *egressselector.EgressSelector, versionedInformers clientgoinformers.SharedInformerFactory) (authorizer.Authorizer, authorizer.RuleResolver, error) {
212  ...
213  return authorizationConfig.New()
214}
215
216// 6种授权器配置
217func (config Config) New() (authorizer.Authorizer, authorizer.RuleResolver, error) {
218  ...
219  // 声明认证器Authorizer列表
220  var (
221    authorizers   []authorizer.Authorizer
222    ruleResolvers []authorizer.RuleResolver
223  )
224
225  for _, authorizationMode := range config.AuthorizationModes {
226    switch authorizationMode {
227    // Node授权器
228    case modes.ModeNode:
229      graph := node.NewGraph()
230      node.AddGraphEventHandlers(
231        graph,
232        config.VersionedInformerFactory.Core().V1().Nodes(),
233        config.VersionedInformerFactory.Core().V1().Pods(),
234        config.VersionedInformerFactory.Core().V1().PersistentVolumes(),
235        config.VersionedInformerFactory.Storage().V1().VolumeAttachments(),
236      )
237      nodeAuthorizer := node.NewAuthorizer(graph, nodeidentifier.NewDefaultNodeIdentifier(), bootstrappolicy.NodeRules())
238      authorizers = append(authorizers, nodeAuthorizer)
239      ruleResolvers = append(ruleResolvers, nodeAuthorizer)
240    // AlwaysAllow授权器
241    case modes.ModeAlwaysAllow:
242      alwaysAllowAuthorizer := authorizerfactory.NewAlwaysAllowAuthorizer()
243      authorizers = append(authorizers, alwaysAllowAuthorizer)
244      ruleResolvers = append(ruleResolvers, alwaysAllowAuthorizer)
245    // AlwaysDeny授权器
246    case modes.ModeAlwaysDeny:
247      alwaysDenyAuthorizer := authorizerfactory.NewAlwaysDenyAuthorizer()
248      authorizers = append(authorizers, alwaysDenyAuthorizer)
249      ruleResolvers = append(ruleResolvers, alwaysDenyAuthorizer)
250    // ABAC授权器
251    case modes.ModeABAC:
252      abacAuthorizer, err := abac.NewFromFile(config.PolicyFile)
253      if err != nil {
254        return nil, nil, err
255      }
256      authorizers = append(authorizers, abacAuthorizer)
257      ruleResolvers = append(ruleResolvers, abacAuthorizer)
258    // Webhook授权器
259    case modes.ModeWebhook:
260      webhookAuthorizer, err := webhook.New(config.WebhookConfigFile,
261        config.WebhookVersion,
262        config.WebhookCacheAuthorizedTTL,
263        config.WebhookCacheUnauthorizedTTL,
264        config.CustomDial)
265      if err != nil {
266        return nil, nil, err
267      }
268      authorizers = append(authorizers, webhookAuthorizer)
269      ruleResolvers = append(ruleResolvers, webhookAuthorizer)
270    // RBAC授权器
271    case modes.ModeRBAC:
272      rbacAuthorizer := rbac.New(
273        &rbac.RoleGetter{Lister: config.VersionedInformerFactory.Rbac().V1().Roles().Lister()},
274        &rbac.RoleBindingLister{Lister: config.VersionedInformerFactory.Rbac().V1().RoleBindings().Lister()},
275        &rbac.ClusterRoleGetter{Lister: config.VersionedInformerFactory.Rbac().V1().ClusterRoles().Lister()},
276        &rbac.ClusterRoleBindingLister{Lister: config.VersionedInformerFactory.Rbac().V1().ClusterRoleBindings().Lister()},
277      )
278      authorizers = append(authorizers, rbacAuthorizer)
279      ruleResolvers = append(ruleResolvers, rbacAuthorizer)
280    default:
281      return nil, nil, fmt.Errorf("unknown authorization mode %s specified", authorizationMode)
282    }
283  }
284  // 将已启用的认证器合并到列表中
285  // 请求到来时,kube-apiserver会遍历认证器列表,当有一个返回True时,表明认证成功
286  return union.New(authorizers...), union.NewRuleResolvers(ruleResolvers...), nil
287}

3.2 创建kubeapi-extension-server配置

 1func createAPIExtensionsConfig(...)(...) {
 2  ...
 3  apiextensionsConfig := &apiextensionsapiserver.Config{
 4    GenericConfig: &genericapiserver.RecommendedConfig{
 5      Config:                genericConfig,
 6      SharedInformerFactory: externalInformers,
 7    },
 8    ExtraConfig: apiextensionsapiserver.ExtraConfig{
 9      CRDRESTOptionsGetter: apiextensionsoptions.NewCRDRESTOptionsGetter(etcdOptions),
10      MasterCount:          masterCount,
11      AuthResolverWrapper:  authResolverWrapper,
12      ServiceResolver:      serviceResolver,
13    },
14  }
15  ...
16}

3.3 创建kubeapi-extension服务

  1func createAPIExtensionsServer(...) (*apiextensionsapiserver.CustomResourceDefinitions, error) {
  2  return apiextensionsConfig.Complete().New(delegateAPIServer)
  3}
  4
  5func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
  6  // APIExtensionsServer依赖GenericAPIServer
  7  // 通过GenericConfig创建一个名为apiextensions-apiserver的服务
  8  genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget)
  9
 10  // APIExtensionsServer通过CustomResourceDefinitions对象进行管理
 11  // 实例化该对象后才能注册APIExtensionsServer下的资源
 12  s := &CustomResourceDefinitions{
 13    GenericAPIServer: genericServer,
 14  }
 15
 16  apiResourceConfig := c.GenericConfig.MergedResourceConfig
 17
 18  // 实例化APIGroupInfo,该对象用于描述资源组信息
 19  apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiextensions.GroupName, Scheme, metav1.ParameterCodec, Codecs)
 20
 21  // 完成资源与资源存储对象的映射
 22  // 如果开启了v1beta1资源版本,将资源版本、资源、资源存储存放到APIGroupInfo的map中
 23  if apiResourceConfig.VersionEnabled(v1beta1.SchemeGroupVersion) {
 24    storage := map[string]rest.Storage{}
 25    // 通过NewRest创建资源存储对象
 26    customResourceDefinitionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
 27    if err != nil {
 28      return nil, err
 29    }
 30    storage["customresourcedefinitions"] = customResourceDefinitionStorage
 31    storage["customresourcedefinitions/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefinitionStorage)
 32
 33    apiGroupInfo.VersionedResourcesStorageMap[v1beta1.SchemeGroupVersion.Version] = storage
 34  }
 35  // 如果开启了v1资源版本,将资源版本、资源、资源存储存放到APIGroupInfo的map中
 36  if apiResourceConfig.VersionEnabled(v1.SchemeGroupVersion) {
 37    storage := map[string]rest.Storage{}
 38    // customresourcedefinitions
 39    customResourceDefintionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
 40    if err != nil {
 41      return nil, err
 42    }
 43    storage["customresourcedefinitions"] = customResourceDefintionStorage
 44    storage["customresourcedefinitions/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefintionStorage)
 45
 46    apiGroupInfo.VersionedResourcesStorageMap[v1.SchemeGroupVersion.Version] = storage
 47  }
 48  // 注册api,这个函数后面单独介绍
 49  if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
 50    return nil, err
 51  }
 52
 53  crdClient, err := internalclientset.NewForConfig(s.GenericAPIServer.LoopbackClientConfig)
 54
 55  s.Informers = internalinformers.NewSharedInformerFactory(crdClient, 5*time.Minute)
 56  // 初始化主controller
 57  establishingController := establish.NewEstablishingController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(), crdClient.Apiextensions())
 58  // 申明handler
 59  crdHandler, err := NewCustomResourceDefinitionHandler(
 60    versionDiscoveryHandler,
 61    groupDiscoveryHandler,
 62    s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(),
 63    delegateHandler,
 64    c.ExtraConfig.CRDRESTOptionsGetter,
 65    c.GenericConfig.AdmissionControl,
 66    establishingController,
 67    c.ExtraConfig.ServiceResolver,
 68    c.ExtraConfig.AuthResolverWrapper,
 69    c.ExtraConfig.MasterCount,
 70    s.GenericAPIServer.Authorizer,
 71    c.GenericConfig.MaxRequestBodyBytes,
 72  )
 73  if err != nil {
 74    return nil, err
 75  }
 76  // 添加handler函数
 77  s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", crdHandler)
 78  s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler)
 79
 80  // 初始化crdController
 81  crdController := NewDiscoveryController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(), versionDiscoveryHandler, groupDiscoveryHandler)
 82  // 初始化namingController
 83  namingController := status.NewNamingConditionController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(), crdClient.Apiextensions())
 84  // 初始化finalizingController
 85  finalizingController := finalizer.NewCRDFinalizer(
 86    s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(),
 87    crdClient.Apiextensions(),
 88    crdHandler,
 89  )
 90  // 初始化openapiController
 91  var openapiController *openapicontroller.Controller
 92  if utilfeature.DefaultFeatureGate.Enabled(apiextensionsfeatures.CustomResourcePublishOpenAPI) {
 93    openapiController = openapicontroller.NewController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions())
 94  }
 95
 96  // 注册hook函数:监听informer
 97  s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-informers", func(context genericapiserver.PostStartHookContext) error {
 98    s.Informers.Start(context.StopCh)
 99    return nil
100  })
101  // 注册hook函数:启动controller
102  s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-controllers", func(context genericapiserver.PostStartHookContext) error {
103    if utilfeature.DefaultFeatureGate.Enabled(apiextensionsfeatures.CustomResourcePublishOpenAPI) {
104      go openapiController.Run(s.GenericAPIServer.StaticOpenAPISpec, s.GenericAPIServer.OpenAPIVersionedService, context.StopCh)
105    }
106    // 启动前面初始化的各种controller
107    go crdController.Run(context.StopCh)
108    go namingController.Run(context.StopCh)
109    go establishingController.Run(context.StopCh)
110    go finalizingController.Run(5, context.StopCh)
111    return nil
112  })
113  // 注册hook函数:同步crd资源
114  s.GenericAPIServer.AddPostStartHookOrDie("crd-informer-synced", func(context genericapiserver.PostStartHookContext) error {
115    return wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
116      return s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions().Informer().HasSynced(), nil
117    }, context.StopCh)
118  })
119
120  return s, nil
121}
3.3.1 实例化APIGroupInfo

APIGroupInfo用于描述资源组信息,一个资源对应一个APIGroupInfo对象,每个资源对应一个资源存储对象

 1func NewDefaultAPIGroupInfo(group string, scheme *runtime.Scheme, parameterCodec runtime.ParameterCodec, codecs serializer.CodecFactory) APIGroupInfo {
 2  return APIGroupInfo{
 3    PrioritizedVersions:          scheme.PrioritizedVersionsForGroup(group),
 4    // 这个map用于存储资源、资源存储对象的映射关系
 5    // 格式:资源版本/资源/资源存储对象
 6    // 资源存储对象RESTStorage,负责资源的增删改查
 7    // 后续会将RESTStorage转换为http的handler函数
 8    VersionedResourcesStorageMap: map[string]map[string]rest.Storage{},
 9    // TODO unhardcode this.  It was hardcoded before, but we need to re-evaluate
10    OptionsExternalVersion: &schema.GroupVersion{Version: "v1"},
11    Scheme:                 scheme,
12    ParameterCodec:         parameterCodec,
13    NegotiatedSerializer:   codecs,
14  }
15}
3.3.2 注册api函数:InstallAPIGroup

注册APIGroupInfo的函数非常重要,将APIGroupInfo中的资源对象注册到APIExtensionServerHandler函数。其过程是:

  • 遍历APIGroupInfo
  • 将资源组、资源版本、资源名称映射到http path请求路径
  • 通过InstallREST函数将资源存储对象作为资源的handlers方法
  • 最后用go-restful的ws.Route将定义好的请求路径和handlers方法添加路由到go-restful
  1func (s *GenericAPIServer) InstallAPIGroup(apiGroupInfo *APIGroupInfo) error {
  2  return s.InstallAPIGroups(apiGroupInfo)
  3}
  4
  5// InstallAPIGroups
  6func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) error {
  7  ...
  8  // 遍历所有的资源信息,一次安装资源版本处理器
  9  for _, apiGroupInfo := range apiGroupInfos {
 10    if err := s.installAPIResources(APIGroupPrefix, apiGroupInfo, openAPIModels); err != nil {
 11      return fmt.Errorf("unable to install api resources: %v", err)
 12    }
 13    ...
 14    apiGroup := metav1.APIGroup{
 15      Name:             apiGroupInfo.PrioritizedVersions[0].Group,
 16      Versions:         apiVersionsForDiscovery,
 17      PreferredVersion: preferredVersionForDiscovery,
 18    }
 19
 20    s.DiscoveryGroupManager.AddGroup(apiGroup)
 21    s.Handler.GoRestfulContainer.Add(discovery.NewAPIGroupHandler(s.Serializer, apiGroup).WebService())
 22  }
 23  return nil
 24}
 25
 26// installAPIResources
 27func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo, openAPIModels openapiproto.Models) error {
 28  for _, groupVersion := range apiGroupInfo.PrioritizedVersions {
 29    ...
 30    // 调用InstallREST
 31    // 参数为go-restful的container对象
 32    if err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer); err != nil {
 33      return fmt.Errorf("unable to setup API %v: %v", apiGroupInfo, err)
 34    }
 35  }
 36
 37  return nil
 38}
 39
 40// InstallREST
 41func (g *APIGroupVersion) InstallREST(container *restful.Container) error {
 42  // 定义http path请求路径
 43  // 格式:<apiPrefix>/<group>/<version>
 44  // apiPrefix是api或者apis
 45  prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
 46  // 实例化APIInstaller实例化器
 47  installer := &APIInstaller{
 48    group:             g,
 49    prefix:            prefix,
 50    minRequestTimeout: g.MinRequestTimeout,
 51  }
 52  // 注册api,返回go-restful的WebService对象
 53  apiResources, ws, registrationErrors := installer.Install()
 54  versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, staticLister{apiResources})
 55  versionDiscoveryHandler.AddToWebService(ws)
 56  // 这里用到go-restful框架的知识:将WebService添加到Container中
 57  container.Add(ws)
 58  return utilerrors.NewAggregate(registrationErrors)
 59}
 60
 61// installer.Install
 62func (a *APIInstaller) Install() ([]metav1.APIResource, *restful.WebService, []error) {
 63  // 构造WebService对象
 64  ws := a.newWebService()
 65  ...
 66  // 遍历所有的路径
 67  for _, path := range paths {
 68    // 实现Storage到Router的转换,将路由注册到webservice
 69    apiResource, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)
 70    ...
 71    if apiResource != nil {
 72      // 添加到列表中
 73      apiResources = append(apiResources, *apiResource)
 74    }
 75  }
 76  return apiResources, ws, errors
 77}
 78
 79// 这个方法很长,核心功能是根据storage构造handler,再将handler和path构造成go-restful框架的Route对象,最后Route添加到webservice
 80func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (...) {
 81  ...
 82  // 判断storage实现了哪些Rest接口
 83  creater, isCreater := storage.(rest.Creater)
 84  namedCreater, isNamedCreater := storage.(rest.NamedCreater)
 85  ...
 86  // 构造action列表
 87  actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister)
 88  ...
 89  for _, action := range actions {
 90    ...
 91    // 构造go-restful的RouteBuilder对象
 92    routes := []*restful.RouteBuilder{}
 93    // 根据action的不同Verb,注册不同的handler
 94    switch action.Verb {
 95    case "GET":
 96      ...
 97      // 初始化handler
 98      handler = restfulGetResource(getter, exporter, reqScope)
 99      ...
100      // 构造route
101      route := ws.GET(action.Path).To(handler).xxx
102      ...
103      // route追加到routes
104      routes = append(routes, route)
105    ...
106    case "POST":
107      ...
108      // handler初始化,后面专门介绍
109      handler = restfulCreateResource(creater, reqScope, admit)
110      route := ws.POST(action.Path).To(handler).xxx
111      routes = append(routes, route)
112    ...
113
114    // 遍历所有的route
115    for _, route := range routes {
116        route.Metadata(ROUTE_META_GVK, metav1.GroupVersionKind{
117          Group:   reqScope.Kind.Group,
118          Version: reqScope.Kind.Version,
119          Kind:    reqScope.Kind.Kind,
120        })
121        // 添加自定义扩展属性(k8s所有的扩展属性以x-打头)
122        route.Metadata(ROUTE_META_ACTION, strings.ToLower(action.Verb))
123        // 将route加入到WebService中
124        ws.Route(route)
125      }
126  }
127}
128
129// handler初始化
130func restfulCreateResource(r rest.Creater, scope handlers.RequestScope, admit admission.Interface) restful.RouteFunction {
131  return func(req *restful.Request, res *restful.Response) {
132    handlers.CreateResource(r, &scope, admit)(res.ResponseWriter, req.Request)
133  }
134}
135
136// 返回一个处理资源的handler
137func CreateResource(r rest.Creater, scope *RequestScope, admission admission.Interface) http.HandlerFunc {
138  return createHandler(&namedCreaterAdapter{r}, scope, admission, false)
139}
140
141// 返回一个http标准库handler函数,处理对应的路由请求
142func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Interface, includeName bool) http.HandlerFunc {
143  // http库标准的handler写法
144  return func(w http.ResponseWriter, req *http.Request) {
145    ...
146    // 找到合适的SerializeInfo
147    s, err := negotiation.NegotiateInputSerializer(req, false, scope.Serializer)
148    ...
149    // 寻找合适的编解码器
150    decoder := scope.Serializer.DecoderToVersion(s.Serializer, scope.HubGroupVersion)
151
152    // 解码
153    obj, gvk, err := decoder.Decode(body, &defaultGVK, original)
154    // 处理请求
155    result, err := finishRequest(timeout, func() (runtime.Object, error) {
156      ...
157    })
158    ...
159  }
160}

3.4 创建kubeapi-server服务

创建KubeAPIServer的流程与创建KubeAPIExtensionServer的流程类似,原理一样。包括:

  • //与资源存储对象进行映射并存储到APIGroupInfo的map中
  • 通过installer.install安装器为资源注册对应的handlers方法(即资源存储对象的ResourceStorage)
  • 完成资源与handlers方法的绑定,并构造Route添加到WebService
  • 最后将WebService添加到container中
 1func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, delegateAPIServer genericapiserver.DelegationTarget) (*master.Master, error) {
 2  kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
 3  ...
 4}
 5
 6func (c *Config) Complete() CompletedConfig {
 7  ...
 8  // 内部调用createEndpointReconciler
 9  if cfg.ExtraConfig.EndpointReconcilerConfig.Reconciler == nil {
10    cfg.ExtraConfig.EndpointReconcilerConfig.Reconciler = c.createEndpointReconciler()
11  }
12
13  return CompletedConfig{&cfg}
14}
15
16// createEndpointReconciler
17func (c *Config) createEndpointReconciler() reconcilers.EndpointReconciler {
18  switch c.ExtraConfig.EndpointReconcilerType {
19  // there are numerous test dependencies that depend on a default controller
20  case "", reconcilers.MasterCountReconcilerType:
21    return c.createMasterCountReconciler()
22  case reconcilers.LeaseEndpointReconcilerType:
23    return c.createLeaseReconciler()
24  case reconcilers.NoneEndpointReconcilerType:
25    return c.createNoneReconciler()
26  default:
27    klog.Fatalf("Reconciler not implemented: %v", c.ExtraConfig.EndpointReconcilerType)
28  }
29  return nil
30}
31
32func (c *Config) createLeaseReconciler() reconcilers.EndpointReconciler {
33  ...
34  // 初始化Storage
35  leaseStorage, _, err := storagefactory.Create(*config)
36  ...
37  return reconcilers.NewLeaseEndpointReconciler(endpointsAdapter, masterLeases)
38}
39
40func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
41  switch c.Type {
42  ...
43  case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
44    // 初始化Storage
45    return newETCD3Storage(c)
46  ...
47  }
48}
49
50func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Master, error) {
51  // 初始化kube-apiserver
52  s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)
53  // 初始化Master,k8s的核心服务通过Master对象进行管理
54  // 实例化后的对象才能注册KubeAPIServer下的资源
55  m := &Master{
56    GenericAPIServer:          s,
57    ClusterAuthenticationInfo: c.ExtraConfig.ClusterAuthenticationInfo,
58  }
59
60  if c.ExtraConfig.APIResourceConfigSource.VersionEnabled(apiv1.SchemeGroupVersion) {
61    // 注册没有组名的资源组,路径前缀为"/api"
62    if err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider); err != nil {
63      return nil, err
64    }
65  }
66
67 // 注册有组名的资源组,路径前缀为"/apis"
68  if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err != nil {
69    return nil, err
70  }
71
72  m.GenericAPIServer.AddPostStartHookOrDie("start-cluster-authentication-info-controller", func(hookContext genericapiserver.PostStartHookContext) error {
73    kubeClient, err := kubernetes.NewForConfig(hookContext.LoopbackClientConfig)
74    // 创建认证controller
75    controller := clusterauthenticationtrust.NewClusterAuthenticationTrustController(m.ClusterAuthenticationInfo, kubeClient)
76    ...
77    // 启动controller
78    go controller.Run(1, hookContext.StopCh)
79    return nil
80  })
81
82  return m, nil
83}
3.4.1 InstallLegacyAPI
 1func (m *Master) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.RESTOptionsGetter, legacyRESTStorageProvider corerest.LegacyRESTStorageProvider) error {
 2  // 实例化APIGroupInfo
 3  // 生成各种资源对应的storage
 4  legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)
 5
 6  ...
 7  // 创建bootstrapController
 8  bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient, coreClient.RESTClient())
 9  // 注册api
10  if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
11    return fmt.Errorf("error in registering group versions: %v", err)
12  }
13  return nil
14}
15
16// 通过NewStorage、NewRest等创建各种资源的存储,存放到map中
17func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {
18  ...
19  restStorage := LegacyRESTStorage{}
20  podTemplateStorage, err := podtemplatestore.NewREST(restOptionsGetter)
21  podStorage, err := podstore.NewStorage(...)
22  ...
23  // NewStorage内部通过etcd客户端去操作etcd
24  controllerStorage, err := controllerstore.NewStorage(restOptionsGetter)
25  ...
26  restStorageMap := map[string]rest.Storage{
27    "pods":             podStorage.Pod,
28    ...
29    "replicationControllers":        controllerStorage.Controller,
30    ...
31  }
32  ...
33  return restStorage, apiGroupInfo, nil
34}
3.4.2 InstallLegacyAPIGroup
 1func (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo *APIGroupInfo) error {
 2  ...
 3  // 内部也是调用installAPIResources,这个函数前面介绍过
 4  if err := s.installAPIResources(apiPrefix, apiGroupInfo, openAPIModels); err != nil {
 5    return err
 6  }
 7
 8  s.Handler.GoRestfulContainer.Add(discovery.NewLegacyRootAPIHandler(s.discoveryAddresses, s.Serializer, apiPrefix).WebService())
 9  return nil
10}
3.4.2 InstallAPIs
 1func (m *Master) InstallAPIs(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter, restStorageProviders ...RESTStorageProvider) error {
 2  apiGroupsInfo := []*genericapiserver.APIGroupInfo{}
 3
 4  // 遍历所有的provider
 5  for _, restStorageBuilder := range restStorageProviders {
 6    ...
 7    // 获取资源对应的Storage
 8    apiGroupInfo, enabled, err := restStorageBuilder.NewRESTStorage(apiResourceConfigSource, restOptionsGetter)
 9    ...
10  }
11
12  // InstallAPIGroups这个函数在前面已经分析过
13  if err := m.GenericAPIServer.InstallAPIGroups(apiGroupsInfo...); err != nil {
14    return fmt.Errorf("error in registering group versions: %v", err)
15  }
16  return nil
17}
18
19// RESTStorageProvider接口,每种资源都实现了该接口,并实现自己的业务逻辑
20// 实现逻辑都大同小异,跟前面介绍的一样。都调用NewStorage、NewREST操作etcd
21type RESTStorageProvider interface {
22  GroupName() string
23  NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, bool, error)
24}

3.5 创建aggregator-server配置

 1func createAggregatorConfig(...) (*aggregatorapiserver.Config, error) {
 2  ...
 3  aggregatorConfig := &aggregatorapiserver.Config{
 4    GenericConfig: &genericapiserver.RecommendedConfig{
 5      Config:                genericConfig,
 6      SharedInformerFactory: externalInformers,
 7    },
 8    ExtraConfig: aggregatorapiserver.ExtraConfig{
 9      ProxyClientCertFile: commandOptions.ProxyClientCertFile,
10      ProxyClientKeyFile:  commandOptions.ProxyClientKeyFile,
11      ServiceResolver:     serviceResolver,
12      ProxyTransport:      proxyTransport,
13    },
14  }
15
16  return aggregatorConfig, nil
17}

3.6 创建aggregator-server服务

创建AggregatorServer的流程与创建KubeAPIExtensionServer的流程类似。

 1func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget, apiExtensionInformers apiextensionsinformers.SharedInformerFactory) (*aggregatorapiserver.APIAggregator, error) {
 2  // 初始化delegate
 3  aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer)
 4  ...
 5  // 创建autoRegistrationController
 6  autoRegistrationController := autoregister.NewAutoRegisterController(aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(), apiRegistrationClient)
 7  apiServices := apiServicesToRegister(delegateAPIServer, autoRegistrationController)
 8  // 创建crdRegistrationController
 9  crdRegistrationController := crdregistration.NewCRDRegistrationController(
10    apiExtensionInformers.Apiextensions().V1().CustomResourceDefinitions(),
11    autoRegistrationController)
12
13  err = aggregatorServer.GenericAPIServer.AddPostStartHook("kube-apiserver-autoregistration", func(context genericapiserver.PostStartHookContext) error {
14    // 启动crdRegistrationController
15    go crdRegistrationController.Run(5, context.StopCh)
16    go func() {
17      if aggregatorConfig.GenericConfig.MergedResourceConfig.AnyVersionForGroupEnabled("apiextensions.k8s.io") {
18        crdRegistrationController.WaitForInitialSync()
19      }
20      // 启动autoRegistrationController
21      autoRegistrationController.Run(5, context.StopCh)
22    }()
23    return nil
24  })
25  ...
26}
27
28func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) {
29  // 创建kube-aggregator
30  genericServer, err := c.GenericConfig.New("kube-aggregator", delegationTarget)
31  // 初始化APIAggregator
32  s := &APIAggregator{
33    GenericAPIServer:           genericServer,
34    delegateHandler:            delegationTarget.UnprotectedHandler(),
35    proxyTransport:             c.ExtraConfig.ProxyTransport,
36    proxyHandlers:              map[string]*proxyHandler{},
37    handledGroups:              sets.String{},
38    lister:                     informerFactory.Apiregistration().V1().APIServices().Lister(),
39    APIRegistrationInformers:   informerFactory,
40    serviceResolver:            c.ExtraConfig.ServiceResolver,
41    openAPIConfig:              openAPIConfig,
42    egressSelector:             c.GenericConfig.EgressSelector,
43    proxyCurrentCertKeyContent: func() (bytes []byte, bytes2 []byte) { return nil, nil },
44  }
45  // 初始化Storage,逻辑和前面是一样的
46  apiGroupInfo := apiservicerest.NewRESTStorage(c.GenericConfig.MergedResourceConfig, c.GenericConfig.RESTOptionsGetter)
47  // 安装api,和前面也一样
48 if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
49    return nil, err
50  }
51  ...
52}

3.7 创建GenericAPIServer-server

前面三个服务的创建过程,都依赖GenericAPIServer。通过genericapiserver将k8s资源与RestAPI进行映射

3.7.1 genericConfig实例化
 1func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*GenericAPIServer, error) {
 2  ...
 3  // 构造handler链
 4  handlerChainBuilder := func(handler http.Handler) http.Handler {
 5    return c.BuildHandlerChainFunc(handler, c.Config)
 6  }
 7  apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())
 8
 9  s := &GenericAPIServer{
10    ...
11  }
12  ...
13  installAPI(s, c.Config)
14  ...
15}
16
17// NewAPIServerHandler
18func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {
19  ...
20  // 创建go-restful的container对象
21  gorestfulContainer := restful.NewContainer()
22  gorestfulContainer.ServeMux = http.NewServeMux()
23  gorestfulContainer.Router(restful.CurlyRouter{}) // e.g. for proxy/{kind}/{name}/{*}
24  gorestfulContainer.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
25    logStackOnRecover(s, panicReason, httpWriter)
26  })
27  gorestfulContainer.ServiceErrorHandler(func(serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) {
28    serviceErrorHandler(s, serviceErr, request, response)
29  })
30
31  director := director{
32    name:               name,
33    goRestfulContainer: gorestfulContainer,
34    nonGoRestfulMux:    nonGoRestfulMux,
35  }
36  // 创建handler
37  return &APIServerHandler{
38    FullHandlerChain:   handlerChainBuilder(director),
39    GoRestfulContainer: gorestfulContainer,
40    NonGoRestfulMux:    nonGoRestfulMux,
41    Director:           director,
42  }
43}

3.8 启动服务

CreateServerChain的最后一步便是启动服务insecureServingInfo.Serve函数

 1func (s *DeprecatedInsecureServingInfo) Serve(handler http.Handler, shutdownTimeout time.Duration, stopCh <-chan struct{}) error {
 2  // 初始化http服务
 3  insecureServer := &http.Server{
 4    Addr:           s.Listener.Addr().String(),
 5    Handler:        handler,
 6    MaxHeaderBytes: 1 << 20,
 7  }
 8
 9  ...
10  // 启动服务,内部调用server.Serve(listener)
11  _, err := RunServer(insecureServer, s.Listener, shutdownTimeout, stopCh)
12  return err
13}
14
15func RunServer(
16  server *http.Server,
17  ln net.Listener,
18  shutDownTimeout time.Duration,
19  stopCh <-chan struct{},
20) (<-chan struct{}, error) {
21  ...
22  go func() {
23    ...
24    // 通过go语言标准库server.Serve监听listener
25    // 并在运行过程中为每个连接建立groutine,groutine读取请求,调用handler函数来处理并响应请求
26    err := server.Serve(listener)
27    ...
28  }()
29
30  return stoppedCh, nil
31}

参考

  • 《kubernetes源码剖析》