k8s源码分析- kube-apiserver
| 阅读 | 共 9977 字,阅读约
Overview
前言
最近加入云原生社区组织的k8s源码研习社,开始学习k8s底层源码,并整理成笔记。欢迎感兴趣的同学一起加入,共同学习进步。群里和社区里有各种大佬,随时可以帮你答疑解惑。https://github.com/cloudnativeto/sig-k8s-source-code
上一篇整理了client-go框架的Informer机制,informer源码分析, 同时api-server用到了go-restful这个web框架,go-restful的原理和源码参考go-restful 源码分析
先放一张kube-apiserver代码调用关系图
概述
kube-apiserver作为k8s最核心的组件,是各个组件之间沟通的桥梁,各个组件不会直接通信,而是都要经过api-server做中转。详见之前的另一篇博客,本文从源码角度分析api-server
kube-apiserver主要职责
- 提供整个集群的api接口管理,提供api注册、发现 — 通过go-restful框架实现
- 资源操作的唯一入口 — 操作etcd资源
- 集群内部各个组件的枢纽
- 提供请求认证、授权、访问控制等安全控制
三大服务
kube-apiserver提供了3种http server服务,用于将庞大的kube-apiserver组件功能进行解耦,这三种Http Server分别是:
服务名 | 概述 | 对象管理 | 资源注册表
- | - | - | - | KubeAPIServer | 核心服务,提供k8s内置核心资源服务,不允许开发者随意修改,如:Pod,Service等 | Master | Legacyscheme.Scheme APIExtensionsServer | API扩展服务,该服务提供了CRD自定义资源服务 | CustomResourceDefinitions | extensionsapiserver.Scheme AggregatorServer | API聚合服务,提供了聚合服务 | APIAggregator | aggregatorscheme.Scheme
三种服务底层都依赖GenericAPIServer,通过GenericAPIServer可以将k8s资源与rest api进行映射
kube-apiserver的启动流程概述
kube-apiserver是所有资源控制的入口,启动流程也略复杂,启动的代码逻辑可以分为9个步骤:
- 资源注册
- Cobra命令行参数解析
- 创建apiserver通用配置
- 创建APIExtensionsServer
- 创建KubeAPIServer
- 创建AggregatorServer
- 创建GenericAPIServer
- 启动http服务
- 启动https服务
其中,三个sever是通过委托模式连接在一起的,初始化的过程都是类似的,包括:
- 首先为每个server创建对应的config
- 然后初始化http server,具体包括:
- 初始化GoRestfulContainer
- 安装server所包含的api,细节有:
- 为每个api-resource创建对应的后端存储RESTStorage
- 为每个api-resource所支持的verbs添加对应的handler
- 将handler注册到router中
- 将router注册到webservice
0. 入口函数
kube-apiserver组件是一个单独的进程,启动的入口函数如下:
源码位置:cmd/kube-apiserver/apiserver.go
1import (
2 ...
3 // 引入legacyscheme,内部的init方法实现资源注册表的注册
4 "k8s.io/kubernetes/pkg/api/legacyscheme"
5 // 引入master,内部的init方法实现k8s所有资源的注册
6 "k8s.io/kubernetes/pkg/master"
7 ...
8)
9func main() {
10 rand.Seed(time.Now().UnixNano())
11
12 // 创建一个带有默认参数的Cobra Command对象
13 command := app.NewAPIServerCommand()
14
15 logs.InitLogs()
16 defer logs.FlushLogs()
17
18 if err := command.Execute(); err != nil {
19 os.Exit(1)
20 }
21}
1. 资源注册
kube-apiserver组件启动后的第一件事情是将k8s所支持的资源注册到Scheme资源注册表中,这样后面的启动逻辑才能拿到资源信息,并启动和运行前面介绍的三个服务
资源的注册过程不是函数调用触发的,而是通过import和init机制触发的。前面第0步提到过的import "k8s.io/kubernetes/pkg/api/legacyscheme"
资源注册包括两步:
- 初始化Scheme资源注册表
- 注册k8s所支持的资源
初始化Scheme资源注册表
代码路径:pkg/api/legacyscheme/scheme.go
定义了三个全局变量服务于kube-apiserver,组件在任何地方都可以调用
1package legacyscheme
2
3import (
4 "k8s.io/apimachinery/pkg/runtime"
5 "k8s.io/apimachinery/pkg/runtime/serializer"
6)
7
8// Scheme资源注册表
9var Scheme = runtime.NewScheme()
10// Codec编解码器
11var Codecs = serializer.NewCodecFactory(Scheme)
12// 参数编解码器
13var ParameterCodec = runtime.NewParameterCodec(Scheme)
注册k8s所支持的资源
apiserver启动时导入了master包,前面第0步介绍的import "k8s.io/kubernetes/pkg/master"
master包中的import_known_versions.go调用了k8s资源下的install包,通过导入包触发初始化函数。 每种资源下都定义install包,被引用时触发init函数完成资源注册过程
源码位置:pkg/master/import_known_versions.go
1import (
2 // These imports are the API groups the API server will support.
3 _ "k8s.io/kubernetes/pkg/apis/admission/install"
4 _ "k8s.io/kubernetes/pkg/apis/admissionregistration/install"
5 _ "k8s.io/kubernetes/pkg/apis/apps/install"
6 _ "k8s.io/kubernetes/pkg/apis/authentication/install"
7 _ "k8s.io/kubernetes/pkg/apis/authorization/install"
8 _ "k8s.io/kubernetes/pkg/apis/autoscaling/install"
9 _ "k8s.io/kubernetes/pkg/apis/batch/install"
10 _ "k8s.io/kubernetes/pkg/apis/certificates/install"
11 _ "k8s.io/kubernetes/pkg/apis/coordination/install"
12 _ "k8s.io/kubernetes/pkg/apis/core/install"
13 _ "k8s.io/kubernetes/pkg/apis/discovery/install"
14 _ "k8s.io/kubernetes/pkg/apis/events/install"
15 _ "k8s.io/kubernetes/pkg/apis/extensions/install"
16 _ "k8s.io/kubernetes/pkg/apis/flowcontrol/install"
17 _ "k8s.io/kubernetes/pkg/apis/imagepolicy/install"
18 _ "k8s.io/kubernetes/pkg/apis/networking/install"
19 _ "k8s.io/kubernetes/pkg/apis/node/install"
20 _ "k8s.io/kubernetes/pkg/apis/policy/install"
21 _ "k8s.io/kubernetes/pkg/apis/rbac/install"
22 _ "k8s.io/kubernetes/pkg/apis/scheduling/install"
23 _ "k8s.io/kubernetes/pkg/apis/settings/install"
24 _ "k8s.io/kubernetes/pkg/apis/storage/install"
25)
以Core资源为例,查看install。 源码位置:pkg/apis/core/install/install.go
1func init() {
2 // legacyscheme.Scheme是前面介绍的全局资源注册中心
3 Install(legacyscheme.Scheme)
4}
5
6// Install registers the API group and adds types to a scheme
7func Install(scheme *runtime.Scheme) {
8 // 注册资源组
9 utilruntime.Must(core.AddToScheme(scheme))
10 utilruntime.Must(v1.AddToScheme(scheme))
11 // 注册版本顺序
12 utilruntime.Must(scheme.SetVersionPriority(v1.SchemeGroupVersion))
13}
2. Cobra命令行参数解析
k8s中所有组件统一使用cobra来解析命令行参数。kube-apiserver组件通过Cobra填充配置参数默认值并验证参数,前面第0步介绍的app.NewAPIServerCommand()
源码位置:cmd/kube-apiserver/app/server.go
1func NewAPIServerCommand() *cobra.Command {
2 // 初始化各个模块的默认配置,内部调用了各个模块各自的默认配置
3 s := options.NewServerRunOptions()
4 cmd := &cobra.Command{
5 ...
6 RunE: func(cmd *cobra.Command, args []string) error {
7 ...
8 // 设置默认参数配置
9 completedOptions, err := Complete(s)
10 // 验证参数合法性
11 if errs := completedOptions.Validate(); len(errs) != 0 {
12 return utilerrors.NewAggregate(errs)
13 }
14 // 启动运行,常驻进程
15 // Run函数后面专门介绍
16 return Run(completedOptions, genericapiserver.SetupSignalHandler())
17 },
18 ...
19 }
20 ...
21}
22
23// genericapiserver.SetupSignalHandler()
24func SetupSignalHandler() <-chan struct{} {
25 return SetupSignalContext().Done()
26}
27
28func SetupSignalContext() context.Context {
29 ...
30 // 监听操作系统信号os.Interrupt和syscall.SIGTERM
31 // 并将监听的信号与stopChan绑定,确保进程终止时,groutine优雅退出
32 signal.Notify(shutdownHandler, shutdownSignals...)
33 ...
34}
3. 创建服务链
核心流程是前面介绍的kube-apiserver组件中三大服务的配置和创建,具体包括:
- 创建kubeapi-server通用配置
- 创建kubeapi-extension-server配置
- 创建kubeapi-extension服务
- 创建kubeapi-server服务
- 创建aggregator-server配置
- 创建aggregator-server服务
- 启动服务
apiserver通用配置是kube-apiserver不同模块实例化所需的配置,具体包括:
- genericConfig实例化
- OpenAPI、Swagger配置
- StorageFactory(Etcd)配置
- Authentication认证配置
- Authorization授权配置
- Admission准控制器配置
1func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
2 // 创建服务链
3 server, err := CreateServerChain(completeOptions, stopCh)
4 if err != nil {
5 return err
6 }
7 // 预运行
8 prepared, err := server.PrepareRun()
9 if err != nil {
10 return err
11 }
12 // 正式运行
13 return prepared.Run(stopCh)
14}
15
16// 创建服务链
17func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error) {
18 ...
19 // 创建 kubeapi-server 配置
20 kubeAPIServerConfig, insecureServingInfo, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport)
21 if err != nil {
22 return nil, err
23 }
24
25 // 创建 kubeapi-extension-server 配置
26 apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
27 serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig))
28
29 // 创建 kubeapi-extension-server 服务
30 apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())
31
32 // 创建 kubeapi-server 服务
33 kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)
34
35 // 创建 aggregator-server 配置
36 aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, proxyTransport, pluginInitializer)
37
38 // 创建 aggregator-server 服务
39 aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
40
41 if insecureServingInfo != nil {
42 insecureHandlerChain := kubeserver.BuildInsecureHandlerChain(aggregatorServer.GenericAPIServer.UnprotectedHandler(), kubeAPIServerConfig.GenericConfig)
43 // 启动服务
44 if err := insecureServingInfo.Serve(insecureHandlerChain, kubeAPIServerConfig.GenericConfig.RequestTimeout, stopCh); err != nil {
45 return nil, err
46 }
47 }
48 return aggregatorServer, nil
49}
3.1 创建kubeapi-server通用配置
1func CreateKubeAPIServerConfig(...) (...) {
2 // 构建通用配置
3 genericConfig, versionedInformers, insecureServingInfo, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, err := buildGenericConfig(s.ServerRunOptions, proxyTransport)
4 ...
5 // 设置端口范围
6 serviceIPRange, apiServerServiceIP, err := master.ServiceIPRange(s.PrimaryServiceClusterIPRange)
7 ...
8 // 构造master.Config
9 config := &master.Config{
10 GenericConfig: genericConfig,
11 ExtraConfig: master.ExtraConfig{
12 APIResourceConfigSource: storageFactory.APIResourceConfigSource,
13 ...
14 },
15 }
16 ...
17}
3.1.1 buildGenericConfig
1func buildGenericConfig(...) (...) {
2 genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs)
3 // 配置启动、禁用GV
4 genericConfig.MergedResourceConfig = master.DefaultAPIResourceConfigSource()
5 ...
6 // openapi/swagger配置
7 // OpenAPIConfig用于生成OpenAPI规范
8 genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensionsapiserver.Scheme, aggregatorscheme.Scheme))
9 genericConfig.OpenAPIConfig.Info.Title = "Kubernetes"
10 genericConfig.LongRunningFunc = filters.BasicLongRunningRequestCheck(
11 sets.NewString("watch", "proxy"),
12 sets.NewString("attach", "exec", "proxy", "log", "portforward"),
13 )
14
15 kubeVersion := version.Get()
16 genericConfig.Version = &kubeVersion
17 // etcd配置
18 // storageFactoryConfig对象定义了kube-apiserver与etcd的交互方式,如:etcd认证、地址、存储前缀等
19 // 该对象也定义了资源存储方式,如:资源信息、资源编码信息、资源状态等
20 storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig()
21 storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig
22 completedStorageFactoryConfig, err := storageFactoryConfig.Complete(s.Etcd)
23 if err != nil {
24 lastErr = err
25 return
26 }
27 storageFactory, lastErr = completedStorageFactoryConfig.New()
28 if lastErr != nil {
29 return
30 }
31 if genericConfig.EgressSelector != nil {
32 storageFactory.StorageConfig.Transport.EgressLookup = genericConfig.EgressSelector.Lookup
33 }
34 if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil {
35 return
36 }
37
38 // NewSharedInformerFactory初始化
39 versionedInformers = clientgoinformers.NewSharedInformerFactory(clientgoExternalClient, 10*time.Minute)
40
41 // 认证配置
42 // 内部调用 authenticatorConfig.New()
43 // k8s提供9种认证机制,每种认证机制被实例化后都成为认证器
44 if lastErr = s.Authentication.ApplyTo(&genericConfig.Authentication, genericConfig.SecureServing, genericConfig.EgressSelector, genericConfig.OpenAPIConfig, clientgoExternalClient, versionedInformers); lastErr != nil {
45 return
46 }
47
48 // 授权配置
49 // k8e提供6种授权机制,每种授权机制被实例化后都成为授权器
50 genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, genericConfig.EgressSelector, versionedInformers)
51
52 // 准入器admission配置
53 // k8s资源在认证和授权通过,被持久化到etcd之前进入准入控制逻辑
54 // 准入控制包括:对请求的资源进行自定义操作(校验、修改、拒绝)
55 // k8s支持31种准入控制
56 // 准入控制器通过Plugins数据结构统一注册、存放、管理
57 admissionConfig := &kubeapiserveradmission.Config{
58 ExternalInformers: versionedInformers,
59 LoopbackClientConfig: genericConfig.LoopbackClientConfig,
60 CloudConfigFile: s.CloudProvider.CloudConfigFile,
61 }
62 serviceResolver = buildServiceResolver(s.EnableAggregatorRouting, genericConfig.LoopbackClientConfig.Host, versionedInformers)
63 pluginInitializers, admissionPostStartHook, err = admissionConfig.New(proxyTransport, genericConfig.EgressSelector, serviceResolver)
64 if err != nil {
65 lastErr = fmt.Errorf("failed to create admission plugin initializer: %v", err)
66 return
67 }
68
69 err = s.Admission.ApplyTo(
70 genericConfig,
71 versionedInformers,
72 kubeClientConfig,
73 feature.DefaultFeatureGate,
74 pluginInitializers...)
75 if err != nil {
76 lastErr = fmt.Errorf("failed to initialize admission: %v", err)
77 }
78
79 if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIPriorityAndFairness) && s.GenericServerRunOptions.EnablePriorityAndFairness {
80 genericConfig.FlowControl = BuildPriorityAndFairness(s, clientgoExternalClient, versionedInformers)
81 }
82
83 return
84}
85
86// 认证初始化
87func (o *BuiltInAuthenticationOptions) ApplyTo(...) error {
88 ...
89 authInfo.Authenticator, openAPIConfig.SecurityDefinitions, err = authenticatorConfig.New()
90 ...
91}
92
93// 根据配置决定9种认证器的初始化
94func (config Config) New() (authenticator.Request, *spec.SecurityDefinitions, error) {
95 // 定义认证器列表
96 var authenticators []authenticator.Request
97
98 // 下面根据不同的开关,决定是否配置某种认证器
99
100 // RequestHeader认证器
101 if config.RequestHeaderConfig != nil {
102 requestHeaderAuthenticator := headerrequest.NewDynamicVerifyOptionsSecure(
103 config.RequestHeaderConfig.CAContentProvider.VerifyOptions,
104 config.RequestHeaderConfig.AllowedClientNames,
105 config.RequestHeaderConfig.UsernameHeaders,
106 config.RequestHeaderConfig.GroupHeaders,
107 config.RequestHeaderConfig.ExtraHeaderPrefixes,
108 )
109 authenticators = append(authenticators, authenticator.WrapAudienceAgnosticRequest(config.APIAudiences, requestHeaderAuthenticator))
110 }
111 // X509 methods
112 if config.ClientCAContentProvider != nil {
113 certAuth := x509.NewDynamic(config.ClientCAContentProvider.VerifyOptions, x509.CommonNameUserConversion)
114 authenticators = append(authenticators, certAuth)
115 }
116
117 // TokenAuth认证器
118 if len(config.TokenAuthFile) > 0 {
119 tokenAuth, err := newAuthenticatorFromTokenFile(config.TokenAuthFile)
120 if err != nil {
121 return nil, nil, err
122 }
123 tokenAuthenticators = append(tokenAuthenticators, authenticator.WrapAudienceAgnosticToken(config.APIAudiences, tokenAuth))
124 }
125 // ServiceAccountAuth认证器
126 if len(config.ServiceAccountKeyFiles) > 0 {
127 serviceAccountAuth, err := newLegacyServiceAccountAuthenticator(config.ServiceAccountKeyFiles, config.ServiceAccountLookup, config.APIAudiences, config.ServiceAccountTokenGetter)
128 if err != nil {
129 return nil, nil, err
130 }
131 tokenAuthenticators = append(tokenAuthenticators, serviceAccountAuth)
132 }
133 if utilfeature.DefaultFeatureGate.Enabled(features.TokenRequest) && config.ServiceAccountIssuer != "" {
134 serviceAccountAuth, err := newServiceAccountAuthenticator(config.ServiceAccountIssuer, config.ServiceAccountKeyFiles, config.APIAudiences, config.ServiceAccountTokenGetter)
135 if err != nil {
136 return nil, nil, err
137 }
138 tokenAuthenticators = append(tokenAuthenticators, serviceAccountAuth)
139 }
140 // BootstrapToken认证器
141 if config.BootstrapToken {
142 if config.BootstrapTokenAuthenticator != nil {
143 // TODO: This can sometimes be nil because of
144 tokenAuthenticators = append(tokenAuthenticators, authenticator.WrapAudienceAgnosticToken(config.APIAudiences, config.BootstrapTokenAuthenticator))
145 }
146 }
147 // NOTE(ericchiang): Keep the OpenID Connect after Service Accounts.
148 //
149 // Because both plugins verify JWTs whichever comes first in the union experiences
150 // cache misses for all requests using the other. While the service account plugin
151 // simply returns an error, the OpenID Connect plugin may query the provider to
152 // update the keys, causing performance hits.
153 if len(config.OIDCIssuerURL) > 0 && len(config.OIDCClientID) > 0 {
154 oidcAuth, err := newAuthenticatorFromOIDCIssuerURL(oidc.Options{
155 IssuerURL: config.OIDCIssuerURL,
156 ClientID: config.OIDCClientID,
157 CAFile: config.OIDCCAFile,
158 UsernameClaim: config.OIDCUsernameClaim,
159 UsernamePrefix: config.OIDCUsernamePrefix,
160 GroupsClaim: config.OIDCGroupsClaim,
161 GroupsPrefix: config.OIDCGroupsPrefix,
162 SupportedSigningAlgs: config.OIDCSigningAlgs,
163 RequiredClaims: config.OIDCRequiredClaims,
164 })
165 if err != nil {
166 return nil, nil, err
167 }
168 tokenAuthenticators = append(tokenAuthenticators, authenticator.WrapAudienceAgnosticToken(config.APIAudiences, oidcAuth))
169 }
170 // WebhookTokenAuth认证器
171 if len(config.WebhookTokenAuthnConfigFile) > 0 {
172 webhookTokenAuth, err := newWebhookTokenAuthenticator(config)
173 if err != nil {
174 return nil, nil, err
175 }
176
177 tokenAuthenticators = append(tokenAuthenticators, webhookTokenAuth)
178 }
179
180 if len(tokenAuthenticators) > 0 {
181 // Union the token authenticators
182 tokenAuth := tokenunion.New(tokenAuthenticators...)
183 // Optionally cache authentication results
184 if config.TokenSuccessCacheTTL > 0 || config.TokenFailureCacheTTL > 0 {
185 tokenAuth = tokencache.New(tokenAuth, true, config.TokenSuccessCacheTTL, config.TokenFailureCacheTTL)
186 }
187 authenticators = append(authenticators, bearertoken.New(tokenAuth), websocket.NewProtocolAuthenticator(tokenAuth))
188 securityDefinitions["BearerToken"] = &spec.SecurityScheme{
189 SecuritySchemeProps: spec.SecuritySchemeProps{
190 Type: "apiKey",
191 Name: "authorization",
192 In: "header",
193 Description: "Bearer Token authentication",
194 },
195 }
196 }
197 // 匿名认证器
198 if len(authenticators) == 0 {
199 if config.Anonymous {
200 return anonymous.NewAuthenticator(), &securityDefinitions, nil
201 }
202 return nil, &securityDefinitions, nil
203 }
204 ...
205 // 将多个认证器合并
206 authenticator := union.New(authenticators...)
207 ...
208}
209
210// 授权初始化
211func BuildAuthorizer(s *options.ServerRunOptions, EgressSelector *egressselector.EgressSelector, versionedInformers clientgoinformers.SharedInformerFactory) (authorizer.Authorizer, authorizer.RuleResolver, error) {
212 ...
213 return authorizationConfig.New()
214}
215
216// 6种授权器配置
217func (config Config) New() (authorizer.Authorizer, authorizer.RuleResolver, error) {
218 ...
219 // 声明认证器Authorizer列表
220 var (
221 authorizers []authorizer.Authorizer
222 ruleResolvers []authorizer.RuleResolver
223 )
224
225 for _, authorizationMode := range config.AuthorizationModes {
226 switch authorizationMode {
227 // Node授权器
228 case modes.ModeNode:
229 graph := node.NewGraph()
230 node.AddGraphEventHandlers(
231 graph,
232 config.VersionedInformerFactory.Core().V1().Nodes(),
233 config.VersionedInformerFactory.Core().V1().Pods(),
234 config.VersionedInformerFactory.Core().V1().PersistentVolumes(),
235 config.VersionedInformerFactory.Storage().V1().VolumeAttachments(),
236 )
237 nodeAuthorizer := node.NewAuthorizer(graph, nodeidentifier.NewDefaultNodeIdentifier(), bootstrappolicy.NodeRules())
238 authorizers = append(authorizers, nodeAuthorizer)
239 ruleResolvers = append(ruleResolvers, nodeAuthorizer)
240 // AlwaysAllow授权器
241 case modes.ModeAlwaysAllow:
242 alwaysAllowAuthorizer := authorizerfactory.NewAlwaysAllowAuthorizer()
243 authorizers = append(authorizers, alwaysAllowAuthorizer)
244 ruleResolvers = append(ruleResolvers, alwaysAllowAuthorizer)
245 // AlwaysDeny授权器
246 case modes.ModeAlwaysDeny:
247 alwaysDenyAuthorizer := authorizerfactory.NewAlwaysDenyAuthorizer()
248 authorizers = append(authorizers, alwaysDenyAuthorizer)
249 ruleResolvers = append(ruleResolvers, alwaysDenyAuthorizer)
250 // ABAC授权器
251 case modes.ModeABAC:
252 abacAuthorizer, err := abac.NewFromFile(config.PolicyFile)
253 if err != nil {
254 return nil, nil, err
255 }
256 authorizers = append(authorizers, abacAuthorizer)
257 ruleResolvers = append(ruleResolvers, abacAuthorizer)
258 // Webhook授权器
259 case modes.ModeWebhook:
260 webhookAuthorizer, err := webhook.New(config.WebhookConfigFile,
261 config.WebhookVersion,
262 config.WebhookCacheAuthorizedTTL,
263 config.WebhookCacheUnauthorizedTTL,
264 config.CustomDial)
265 if err != nil {
266 return nil, nil, err
267 }
268 authorizers = append(authorizers, webhookAuthorizer)
269 ruleResolvers = append(ruleResolvers, webhookAuthorizer)
270 // RBAC授权器
271 case modes.ModeRBAC:
272 rbacAuthorizer := rbac.New(
273 &rbac.RoleGetter{Lister: config.VersionedInformerFactory.Rbac().V1().Roles().Lister()},
274 &rbac.RoleBindingLister{Lister: config.VersionedInformerFactory.Rbac().V1().RoleBindings().Lister()},
275 &rbac.ClusterRoleGetter{Lister: config.VersionedInformerFactory.Rbac().V1().ClusterRoles().Lister()},
276 &rbac.ClusterRoleBindingLister{Lister: config.VersionedInformerFactory.Rbac().V1().ClusterRoleBindings().Lister()},
277 )
278 authorizers = append(authorizers, rbacAuthorizer)
279 ruleResolvers = append(ruleResolvers, rbacAuthorizer)
280 default:
281 return nil, nil, fmt.Errorf("unknown authorization mode %s specified", authorizationMode)
282 }
283 }
284 // 将已启用的认证器合并到列表中
285 // 请求到来时,kube-apiserver会遍历认证器列表,当有一个返回True时,表明认证成功
286 return union.New(authorizers...), union.NewRuleResolvers(ruleResolvers...), nil
287}
3.2 创建kubeapi-extension-server配置
1func createAPIExtensionsConfig(...)(...) {
2 ...
3 apiextensionsConfig := &apiextensionsapiserver.Config{
4 GenericConfig: &genericapiserver.RecommendedConfig{
5 Config: genericConfig,
6 SharedInformerFactory: externalInformers,
7 },
8 ExtraConfig: apiextensionsapiserver.ExtraConfig{
9 CRDRESTOptionsGetter: apiextensionsoptions.NewCRDRESTOptionsGetter(etcdOptions),
10 MasterCount: masterCount,
11 AuthResolverWrapper: authResolverWrapper,
12 ServiceResolver: serviceResolver,
13 },
14 }
15 ...
16}
3.3 创建kubeapi-extension服务
1func createAPIExtensionsServer(...) (*apiextensionsapiserver.CustomResourceDefinitions, error) {
2 return apiextensionsConfig.Complete().New(delegateAPIServer)
3}
4
5func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
6 // APIExtensionsServer依赖GenericAPIServer
7 // 通过GenericConfig创建一个名为apiextensions-apiserver的服务
8 genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget)
9
10 // APIExtensionsServer通过CustomResourceDefinitions对象进行管理
11 // 实例化该对象后才能注册APIExtensionsServer下的资源
12 s := &CustomResourceDefinitions{
13 GenericAPIServer: genericServer,
14 }
15
16 apiResourceConfig := c.GenericConfig.MergedResourceConfig
17
18 // 实例化APIGroupInfo,该对象用于描述资源组信息
19 apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiextensions.GroupName, Scheme, metav1.ParameterCodec, Codecs)
20
21 // 完成资源与资源存储对象的映射
22 // 如果开启了v1beta1资源版本,将资源版本、资源、资源存储存放到APIGroupInfo的map中
23 if apiResourceConfig.VersionEnabled(v1beta1.SchemeGroupVersion) {
24 storage := map[string]rest.Storage{}
25 // 通过NewRest创建资源存储对象
26 customResourceDefinitionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
27 if err != nil {
28 return nil, err
29 }
30 storage["customresourcedefinitions"] = customResourceDefinitionStorage
31 storage["customresourcedefinitions/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefinitionStorage)
32
33 apiGroupInfo.VersionedResourcesStorageMap[v1beta1.SchemeGroupVersion.Version] = storage
34 }
35 // 如果开启了v1资源版本,将资源版本、资源、资源存储存放到APIGroupInfo的map中
36 if apiResourceConfig.VersionEnabled(v1.SchemeGroupVersion) {
37 storage := map[string]rest.Storage{}
38 // customresourcedefinitions
39 customResourceDefintionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
40 if err != nil {
41 return nil, err
42 }
43 storage["customresourcedefinitions"] = customResourceDefintionStorage
44 storage["customresourcedefinitions/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefintionStorage)
45
46 apiGroupInfo.VersionedResourcesStorageMap[v1.SchemeGroupVersion.Version] = storage
47 }
48 // 注册api,这个函数后面单独介绍
49 if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
50 return nil, err
51 }
52
53 crdClient, err := internalclientset.NewForConfig(s.GenericAPIServer.LoopbackClientConfig)
54
55 s.Informers = internalinformers.NewSharedInformerFactory(crdClient, 5*time.Minute)
56 // 初始化主controller
57 establishingController := establish.NewEstablishingController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(), crdClient.Apiextensions())
58 // 申明handler
59 crdHandler, err := NewCustomResourceDefinitionHandler(
60 versionDiscoveryHandler,
61 groupDiscoveryHandler,
62 s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(),
63 delegateHandler,
64 c.ExtraConfig.CRDRESTOptionsGetter,
65 c.GenericConfig.AdmissionControl,
66 establishingController,
67 c.ExtraConfig.ServiceResolver,
68 c.ExtraConfig.AuthResolverWrapper,
69 c.ExtraConfig.MasterCount,
70 s.GenericAPIServer.Authorizer,
71 c.GenericConfig.MaxRequestBodyBytes,
72 )
73 if err != nil {
74 return nil, err
75 }
76 // 添加handler函数
77 s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", crdHandler)
78 s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler)
79
80 // 初始化crdController
81 crdController := NewDiscoveryController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(), versionDiscoveryHandler, groupDiscoveryHandler)
82 // 初始化namingController
83 namingController := status.NewNamingConditionController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(), crdClient.Apiextensions())
84 // 初始化finalizingController
85 finalizingController := finalizer.NewCRDFinalizer(
86 s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(),
87 crdClient.Apiextensions(),
88 crdHandler,
89 )
90 // 初始化openapiController
91 var openapiController *openapicontroller.Controller
92 if utilfeature.DefaultFeatureGate.Enabled(apiextensionsfeatures.CustomResourcePublishOpenAPI) {
93 openapiController = openapicontroller.NewController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions())
94 }
95
96 // 注册hook函数:监听informer
97 s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-informers", func(context genericapiserver.PostStartHookContext) error {
98 s.Informers.Start(context.StopCh)
99 return nil
100 })
101 // 注册hook函数:启动controller
102 s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-controllers", func(context genericapiserver.PostStartHookContext) error {
103 if utilfeature.DefaultFeatureGate.Enabled(apiextensionsfeatures.CustomResourcePublishOpenAPI) {
104 go openapiController.Run(s.GenericAPIServer.StaticOpenAPISpec, s.GenericAPIServer.OpenAPIVersionedService, context.StopCh)
105 }
106 // 启动前面初始化的各种controller
107 go crdController.Run(context.StopCh)
108 go namingController.Run(context.StopCh)
109 go establishingController.Run(context.StopCh)
110 go finalizingController.Run(5, context.StopCh)
111 return nil
112 })
113 // 注册hook函数:同步crd资源
114 s.GenericAPIServer.AddPostStartHookOrDie("crd-informer-synced", func(context genericapiserver.PostStartHookContext) error {
115 return wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
116 return s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions().Informer().HasSynced(), nil
117 }, context.StopCh)
118 })
119
120 return s, nil
121}
3.3.1 实例化APIGroupInfo
APIGroupInfo用于描述资源组信息,一个资源对应一个APIGroupInfo对象,每个资源对应一个资源存储对象
1func NewDefaultAPIGroupInfo(group string, scheme *runtime.Scheme, parameterCodec runtime.ParameterCodec, codecs serializer.CodecFactory) APIGroupInfo {
2 return APIGroupInfo{
3 PrioritizedVersions: scheme.PrioritizedVersionsForGroup(group),
4 // 这个map用于存储资源、资源存储对象的映射关系
5 // 格式:资源版本/资源/资源存储对象
6 // 资源存储对象RESTStorage,负责资源的增删改查
7 // 后续会将RESTStorage转换为http的handler函数
8 VersionedResourcesStorageMap: map[string]map[string]rest.Storage{},
9 // TODO unhardcode this. It was hardcoded before, but we need to re-evaluate
10 OptionsExternalVersion: &schema.GroupVersion{Version: "v1"},
11 Scheme: scheme,
12 ParameterCodec: parameterCodec,
13 NegotiatedSerializer: codecs,
14 }
15}
3.3.2 注册api函数:InstallAPIGroup
注册APIGroupInfo的函数非常重要,将APIGroupInfo中的资源对象注册到APIExtensionServerHandler函数。其过程是:
- 遍历APIGroupInfo
- 将资源组、资源版本、资源名称映射到http path请求路径
- 通过InstallREST函数将资源存储对象作为资源的handlers方法
- 最后用go-restful的ws.Route将定义好的请求路径和handlers方法添加路由到go-restful
1func (s *GenericAPIServer) InstallAPIGroup(apiGroupInfo *APIGroupInfo) error {
2 return s.InstallAPIGroups(apiGroupInfo)
3}
4
5// InstallAPIGroups
6func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) error {
7 ...
8 // 遍历所有的资源信息,一次安装资源版本处理器
9 for _, apiGroupInfo := range apiGroupInfos {
10 if err := s.installAPIResources(APIGroupPrefix, apiGroupInfo, openAPIModels); err != nil {
11 return fmt.Errorf("unable to install api resources: %v", err)
12 }
13 ...
14 apiGroup := metav1.APIGroup{
15 Name: apiGroupInfo.PrioritizedVersions[0].Group,
16 Versions: apiVersionsForDiscovery,
17 PreferredVersion: preferredVersionForDiscovery,
18 }
19
20 s.DiscoveryGroupManager.AddGroup(apiGroup)
21 s.Handler.GoRestfulContainer.Add(discovery.NewAPIGroupHandler(s.Serializer, apiGroup).WebService())
22 }
23 return nil
24}
25
26// installAPIResources
27func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo, openAPIModels openapiproto.Models) error {
28 for _, groupVersion := range apiGroupInfo.PrioritizedVersions {
29 ...
30 // 调用InstallREST
31 // 参数为go-restful的container对象
32 if err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer); err != nil {
33 return fmt.Errorf("unable to setup API %v: %v", apiGroupInfo, err)
34 }
35 }
36
37 return nil
38}
39
40// InstallREST
41func (g *APIGroupVersion) InstallREST(container *restful.Container) error {
42 // 定义http path请求路径
43 // 格式:<apiPrefix>/<group>/<version>
44 // apiPrefix是api或者apis
45 prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
46 // 实例化APIInstaller实例化器
47 installer := &APIInstaller{
48 group: g,
49 prefix: prefix,
50 minRequestTimeout: g.MinRequestTimeout,
51 }
52 // 注册api,返回go-restful的WebService对象
53 apiResources, ws, registrationErrors := installer.Install()
54 versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, staticLister{apiResources})
55 versionDiscoveryHandler.AddToWebService(ws)
56 // 这里用到go-restful框架的知识:将WebService添加到Container中
57 container.Add(ws)
58 return utilerrors.NewAggregate(registrationErrors)
59}
60
61// installer.Install
62func (a *APIInstaller) Install() ([]metav1.APIResource, *restful.WebService, []error) {
63 // 构造WebService对象
64 ws := a.newWebService()
65 ...
66 // 遍历所有的路径
67 for _, path := range paths {
68 // 实现Storage到Router的转换,将路由注册到webservice
69 apiResource, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)
70 ...
71 if apiResource != nil {
72 // 添加到列表中
73 apiResources = append(apiResources, *apiResource)
74 }
75 }
76 return apiResources, ws, errors
77}
78
79// 这个方法很长,核心功能是根据storage构造handler,再将handler和path构造成go-restful框架的Route对象,最后Route添加到webservice
80func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (...) {
81 ...
82 // 判断storage实现了哪些Rest接口
83 creater, isCreater := storage.(rest.Creater)
84 namedCreater, isNamedCreater := storage.(rest.NamedCreater)
85 ...
86 // 构造action列表
87 actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister)
88 ...
89 for _, action := range actions {
90 ...
91 // 构造go-restful的RouteBuilder对象
92 routes := []*restful.RouteBuilder{}
93 // 根据action的不同Verb,注册不同的handler
94 switch action.Verb {
95 case "GET":
96 ...
97 // 初始化handler
98 handler = restfulGetResource(getter, exporter, reqScope)
99 ...
100 // 构造route
101 route := ws.GET(action.Path).To(handler).xxx
102 ...
103 // route追加到routes
104 routes = append(routes, route)
105 ...
106 case "POST":
107 ...
108 // handler初始化,后面专门介绍
109 handler = restfulCreateResource(creater, reqScope, admit)
110 route := ws.POST(action.Path).To(handler).xxx
111 routes = append(routes, route)
112 ...
113
114 // 遍历所有的route
115 for _, route := range routes {
116 route.Metadata(ROUTE_META_GVK, metav1.GroupVersionKind{
117 Group: reqScope.Kind.Group,
118 Version: reqScope.Kind.Version,
119 Kind: reqScope.Kind.Kind,
120 })
121 // 添加自定义扩展属性(k8s所有的扩展属性以x-打头)
122 route.Metadata(ROUTE_META_ACTION, strings.ToLower(action.Verb))
123 // 将route加入到WebService中
124 ws.Route(route)
125 }
126 }
127}
128
129// handler初始化
130func restfulCreateResource(r rest.Creater, scope handlers.RequestScope, admit admission.Interface) restful.RouteFunction {
131 return func(req *restful.Request, res *restful.Response) {
132 handlers.CreateResource(r, &scope, admit)(res.ResponseWriter, req.Request)
133 }
134}
135
136// 返回一个处理资源的handler
137func CreateResource(r rest.Creater, scope *RequestScope, admission admission.Interface) http.HandlerFunc {
138 return createHandler(&namedCreaterAdapter{r}, scope, admission, false)
139}
140
141// 返回一个http标准库handler函数,处理对应的路由请求
142func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Interface, includeName bool) http.HandlerFunc {
143 // http库标准的handler写法
144 return func(w http.ResponseWriter, req *http.Request) {
145 ...
146 // 找到合适的SerializeInfo
147 s, err := negotiation.NegotiateInputSerializer(req, false, scope.Serializer)
148 ...
149 // 寻找合适的编解码器
150 decoder := scope.Serializer.DecoderToVersion(s.Serializer, scope.HubGroupVersion)
151
152 // 解码
153 obj, gvk, err := decoder.Decode(body, &defaultGVK, original)
154 // 处理请求
155 result, err := finishRequest(timeout, func() (runtime.Object, error) {
156 ...
157 })
158 ...
159 }
160}
3.4 创建kubeapi-server服务
创建KubeAPIServer的流程与创建KubeAPIExtensionServer的流程类似,原理一样。包括:
- 将//与资源存储对象进行映射并存储到APIGroupInfo的map中
- 通过installer.install安装器为资源注册对应的handlers方法(即资源存储对象的ResourceStorage)
- 完成资源与handlers方法的绑定,并构造Route添加到WebService
- 最后将WebService添加到container中
1func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, delegateAPIServer genericapiserver.DelegationTarget) (*master.Master, error) {
2 kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
3 ...
4}
5
6func (c *Config) Complete() CompletedConfig {
7 ...
8 // 内部调用createEndpointReconciler
9 if cfg.ExtraConfig.EndpointReconcilerConfig.Reconciler == nil {
10 cfg.ExtraConfig.EndpointReconcilerConfig.Reconciler = c.createEndpointReconciler()
11 }
12
13 return CompletedConfig{&cfg}
14}
15
16// createEndpointReconciler
17func (c *Config) createEndpointReconciler() reconcilers.EndpointReconciler {
18 switch c.ExtraConfig.EndpointReconcilerType {
19 // there are numerous test dependencies that depend on a default controller
20 case "", reconcilers.MasterCountReconcilerType:
21 return c.createMasterCountReconciler()
22 case reconcilers.LeaseEndpointReconcilerType:
23 return c.createLeaseReconciler()
24 case reconcilers.NoneEndpointReconcilerType:
25 return c.createNoneReconciler()
26 default:
27 klog.Fatalf("Reconciler not implemented: %v", c.ExtraConfig.EndpointReconcilerType)
28 }
29 return nil
30}
31
32func (c *Config) createLeaseReconciler() reconcilers.EndpointReconciler {
33 ...
34 // 初始化Storage
35 leaseStorage, _, err := storagefactory.Create(*config)
36 ...
37 return reconcilers.NewLeaseEndpointReconciler(endpointsAdapter, masterLeases)
38}
39
40func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
41 switch c.Type {
42 ...
43 case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
44 // 初始化Storage
45 return newETCD3Storage(c)
46 ...
47 }
48}
49
50func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Master, error) {
51 // 初始化kube-apiserver
52 s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)
53 // 初始化Master,k8s的核心服务通过Master对象进行管理
54 // 实例化后的对象才能注册KubeAPIServer下的资源
55 m := &Master{
56 GenericAPIServer: s,
57 ClusterAuthenticationInfo: c.ExtraConfig.ClusterAuthenticationInfo,
58 }
59
60 if c.ExtraConfig.APIResourceConfigSource.VersionEnabled(apiv1.SchemeGroupVersion) {
61 // 注册没有组名的资源组,路径前缀为"/api"
62 if err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider); err != nil {
63 return nil, err
64 }
65 }
66
67 // 注册有组名的资源组,路径前缀为"/apis"
68 if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err != nil {
69 return nil, err
70 }
71
72 m.GenericAPIServer.AddPostStartHookOrDie("start-cluster-authentication-info-controller", func(hookContext genericapiserver.PostStartHookContext) error {
73 kubeClient, err := kubernetes.NewForConfig(hookContext.LoopbackClientConfig)
74 // 创建认证controller
75 controller := clusterauthenticationtrust.NewClusterAuthenticationTrustController(m.ClusterAuthenticationInfo, kubeClient)
76 ...
77 // 启动controller
78 go controller.Run(1, hookContext.StopCh)
79 return nil
80 })
81
82 return m, nil
83}
3.4.1 InstallLegacyAPI
1func (m *Master) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.RESTOptionsGetter, legacyRESTStorageProvider corerest.LegacyRESTStorageProvider) error {
2 // 实例化APIGroupInfo
3 // 生成各种资源对应的storage
4 legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)
5
6 ...
7 // 创建bootstrapController
8 bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient, coreClient.RESTClient())
9 // 注册api
10 if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
11 return fmt.Errorf("error in registering group versions: %v", err)
12 }
13 return nil
14}
15
16// 通过NewStorage、NewRest等创建各种资源的存储,存放到map中
17func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {
18 ...
19 restStorage := LegacyRESTStorage{}
20 podTemplateStorage, err := podtemplatestore.NewREST(restOptionsGetter)
21 podStorage, err := podstore.NewStorage(...)
22 ...
23 // NewStorage内部通过etcd客户端去操作etcd
24 controllerStorage, err := controllerstore.NewStorage(restOptionsGetter)
25 ...
26 restStorageMap := map[string]rest.Storage{
27 "pods": podStorage.Pod,
28 ...
29 "replicationControllers": controllerStorage.Controller,
30 ...
31 }
32 ...
33 return restStorage, apiGroupInfo, nil
34}
3.4.2 InstallLegacyAPIGroup
1func (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo *APIGroupInfo) error {
2 ...
3 // 内部也是调用installAPIResources,这个函数前面介绍过
4 if err := s.installAPIResources(apiPrefix, apiGroupInfo, openAPIModels); err != nil {
5 return err
6 }
7
8 s.Handler.GoRestfulContainer.Add(discovery.NewLegacyRootAPIHandler(s.discoveryAddresses, s.Serializer, apiPrefix).WebService())
9 return nil
10}
3.4.2 InstallAPIs
1func (m *Master) InstallAPIs(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter, restStorageProviders ...RESTStorageProvider) error {
2 apiGroupsInfo := []*genericapiserver.APIGroupInfo{}
3
4 // 遍历所有的provider
5 for _, restStorageBuilder := range restStorageProviders {
6 ...
7 // 获取资源对应的Storage
8 apiGroupInfo, enabled, err := restStorageBuilder.NewRESTStorage(apiResourceConfigSource, restOptionsGetter)
9 ...
10 }
11
12 // InstallAPIGroups这个函数在前面已经分析过
13 if err := m.GenericAPIServer.InstallAPIGroups(apiGroupsInfo...); err != nil {
14 return fmt.Errorf("error in registering group versions: %v", err)
15 }
16 return nil
17}
18
19// RESTStorageProvider接口,每种资源都实现了该接口,并实现自己的业务逻辑
20// 实现逻辑都大同小异,跟前面介绍的一样。都调用NewStorage、NewREST操作etcd
21type RESTStorageProvider interface {
22 GroupName() string
23 NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, bool, error)
24}
3.5 创建aggregator-server配置
1func createAggregatorConfig(...) (*aggregatorapiserver.Config, error) {
2 ...
3 aggregatorConfig := &aggregatorapiserver.Config{
4 GenericConfig: &genericapiserver.RecommendedConfig{
5 Config: genericConfig,
6 SharedInformerFactory: externalInformers,
7 },
8 ExtraConfig: aggregatorapiserver.ExtraConfig{
9 ProxyClientCertFile: commandOptions.ProxyClientCertFile,
10 ProxyClientKeyFile: commandOptions.ProxyClientKeyFile,
11 ServiceResolver: serviceResolver,
12 ProxyTransport: proxyTransport,
13 },
14 }
15
16 return aggregatorConfig, nil
17}
3.6 创建aggregator-server服务
创建AggregatorServer的流程与创建KubeAPIExtensionServer的流程类似。
1func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget, apiExtensionInformers apiextensionsinformers.SharedInformerFactory) (*aggregatorapiserver.APIAggregator, error) {
2 // 初始化delegate
3 aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer)
4 ...
5 // 创建autoRegistrationController
6 autoRegistrationController := autoregister.NewAutoRegisterController(aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(), apiRegistrationClient)
7 apiServices := apiServicesToRegister(delegateAPIServer, autoRegistrationController)
8 // 创建crdRegistrationController
9 crdRegistrationController := crdregistration.NewCRDRegistrationController(
10 apiExtensionInformers.Apiextensions().V1().CustomResourceDefinitions(),
11 autoRegistrationController)
12
13 err = aggregatorServer.GenericAPIServer.AddPostStartHook("kube-apiserver-autoregistration", func(context genericapiserver.PostStartHookContext) error {
14 // 启动crdRegistrationController
15 go crdRegistrationController.Run(5, context.StopCh)
16 go func() {
17 if aggregatorConfig.GenericConfig.MergedResourceConfig.AnyVersionForGroupEnabled("apiextensions.k8s.io") {
18 crdRegistrationController.WaitForInitialSync()
19 }
20 // 启动autoRegistrationController
21 autoRegistrationController.Run(5, context.StopCh)
22 }()
23 return nil
24 })
25 ...
26}
27
28func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) {
29 // 创建kube-aggregator
30 genericServer, err := c.GenericConfig.New("kube-aggregator", delegationTarget)
31 // 初始化APIAggregator
32 s := &APIAggregator{
33 GenericAPIServer: genericServer,
34 delegateHandler: delegationTarget.UnprotectedHandler(),
35 proxyTransport: c.ExtraConfig.ProxyTransport,
36 proxyHandlers: map[string]*proxyHandler{},
37 handledGroups: sets.String{},
38 lister: informerFactory.Apiregistration().V1().APIServices().Lister(),
39 APIRegistrationInformers: informerFactory,
40 serviceResolver: c.ExtraConfig.ServiceResolver,
41 openAPIConfig: openAPIConfig,
42 egressSelector: c.GenericConfig.EgressSelector,
43 proxyCurrentCertKeyContent: func() (bytes []byte, bytes2 []byte) { return nil, nil },
44 }
45 // 初始化Storage,逻辑和前面是一样的
46 apiGroupInfo := apiservicerest.NewRESTStorage(c.GenericConfig.MergedResourceConfig, c.GenericConfig.RESTOptionsGetter)
47 // 安装api,和前面也一样
48 if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
49 return nil, err
50 }
51 ...
52}
3.7 创建GenericAPIServer-server
前面三个服务的创建过程,都依赖GenericAPIServer。通过genericapiserver将k8s资源与RestAPI进行映射
3.7.1 genericConfig实例化
1func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*GenericAPIServer, error) {
2 ...
3 // 构造handler链
4 handlerChainBuilder := func(handler http.Handler) http.Handler {
5 return c.BuildHandlerChainFunc(handler, c.Config)
6 }
7 apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())
8
9 s := &GenericAPIServer{
10 ...
11 }
12 ...
13 installAPI(s, c.Config)
14 ...
15}
16
17// NewAPIServerHandler
18func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {
19 ...
20 // 创建go-restful的container对象
21 gorestfulContainer := restful.NewContainer()
22 gorestfulContainer.ServeMux = http.NewServeMux()
23 gorestfulContainer.Router(restful.CurlyRouter{}) // e.g. for proxy/{kind}/{name}/{*}
24 gorestfulContainer.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
25 logStackOnRecover(s, panicReason, httpWriter)
26 })
27 gorestfulContainer.ServiceErrorHandler(func(serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) {
28 serviceErrorHandler(s, serviceErr, request, response)
29 })
30
31 director := director{
32 name: name,
33 goRestfulContainer: gorestfulContainer,
34 nonGoRestfulMux: nonGoRestfulMux,
35 }
36 // 创建handler
37 return &APIServerHandler{
38 FullHandlerChain: handlerChainBuilder(director),
39 GoRestfulContainer: gorestfulContainer,
40 NonGoRestfulMux: nonGoRestfulMux,
41 Director: director,
42 }
43}
3.8 启动服务
CreateServerChain的最后一步便是启动服务insecureServingInfo.Serve
函数
1func (s *DeprecatedInsecureServingInfo) Serve(handler http.Handler, shutdownTimeout time.Duration, stopCh <-chan struct{}) error {
2 // 初始化http服务
3 insecureServer := &http.Server{
4 Addr: s.Listener.Addr().String(),
5 Handler: handler,
6 MaxHeaderBytes: 1 << 20,
7 }
8
9 ...
10 // 启动服务,内部调用server.Serve(listener)
11 _, err := RunServer(insecureServer, s.Listener, shutdownTimeout, stopCh)
12 return err
13}
14
15func RunServer(
16 server *http.Server,
17 ln net.Listener,
18 shutDownTimeout time.Duration,
19 stopCh <-chan struct{},
20) (<-chan struct{}, error) {
21 ...
22 go func() {
23 ...
24 // 通过go语言标准库server.Serve监听listener
25 // 并在运行过程中为每个连接建立groutine,groutine读取请求,调用handler函数来处理并响应请求
26 err := server.Serve(listener)
27 ...
28 }()
29
30 return stoppedCh, nil
31}
参考
- 《kubernetes源码剖析》