kubeapps源码分析-kubeops
| 阅读 | 共 1849 字,阅读约
Overview
kubeapps源码分析-kubeops
概述
这个组件主要是调用helm sdk执行相关release操作,相当于将helm的install、uninstall、upgrade、rollout等命令行操作封装成restful接口,暴露给dashboard页面调用
源码分析
代码入口
1func main() {
2 pflag.Parse()
3 settings.Init(pflag.CommandLine)
4
5 kubeappsNamespace := os.Getenv("POD_NAMESPACE")
6 if kubeappsNamespace == "" {
7 log.Fatal("POD_NAMESPACE should be defined")
8 }
9
10 // If there is no clusters config, we default to the previous behaviour of a "default" cluster.
11 clustersConfig := kube.ClustersConfig{KubeappsClusterName: "default"}
12 if clustersConfigPath != "" {
13 var err error
14 var cleanupCAFiles func()
15 clustersConfig, cleanupCAFiles, err = parseClusterConfig(clustersConfigPath, clustersCAFilesPrefix)
16 if err != nil {
17 log.Fatalf("unable to parse additional clusters config: %+v", err)
18 }
19 defer cleanupCAFiles()
20 }
21
22 options := handler.Options{
23 ListLimit: listLimit,
24 Timeout: timeout,
25 KubeappsNamespace: kubeappsNamespace,
26 ClustersConfig: clustersConfig,
27 }
28
29 // 设置helm底层的存储驱动为secret,
30 // helm底层驱动支持secret、configmap、postgresql、memory等
31 // 具体可参考helm源码分析序列文章
32 storageForDriver := agent.StorageForSecrets
33 if helmDriverArg != "" {
34 var err error
35 storageForDriver, err = agent.ParseDriverType(helmDriverArg)
36 if err != nil {
37 panic(err)
38 }
39 }
40 withHandlerConfig := handler.WithHandlerConfig(storageForDriver, options)
41 r := mux.NewRouter()
42
43 // Healthcheck
44 // TODO: add app specific health and readiness checks as per https://github.com/heptiolabs/healthcheck
45 health := healthcheck.NewHandler()
46 r.Handle("/live", health)
47 r.Handle("/ready", health)
48
49 // Routes
50 // Auth not necessary here with Helm 3 because it's done by Kubernetes.
51 addRoute := handler.AddRouteWith(r.PathPrefix("/v1").Subrouter(), withHandlerConfig)
52 addRoute("GET", "/clusters/{cluster}/releases", handler.ListAllReleases)
53 addRoute("GET", "/clusters/{cluster}/namespaces/{namespace}/releases", handler.ListReleases)
54 addRoute("POST", "/clusters/{cluster}/namespaces/{namespace}/releases", handler.CreateRelease)
55 addRoute("GET", "/clusters/{cluster}/namespaces/{namespace}/releases/{releaseName}", handler.GetRelease)
56 addRoute("PUT", "/clusters/{cluster}/namespaces/{namespace}/releases/{releaseName}", handler.OperateRelease)
57 addRoute("DELETE", "/clusters/{cluster}/namespaces/{namespace}/releases/{releaseName}", handler.DeleteRelease)
58
59 // Backend routes unrelated to kubeops functionality.
60 err := backendHandlers.SetupDefaultRoutes(r.PathPrefix("/backend/v1").Subrouter(), clustersConfig)
61 if err != nil {
62 log.Fatalf("Unable to setup backend routes: %+v", err)
63 }
64
65 // assetsvc reverse proxy
66 // TODO(mnelson) remove this reverse proxy once the haproxy frontend
67 // proxies requests directly to the assetsvc. Move the authz to the
68 // assetsvc itself.
69 authGate := auth.AuthGate(clustersConfig, kubeappsNamespace)
70 parsedAssetsvcURL, err := url.Parse(assetsvcURL)
71 if err != nil {
72 log.Fatalf("Unable to parse the assetsvc URL: %v", err)
73 }
74 assetsvcProxy := httputil.NewSingleHostReverseProxy(parsedAssetsvcURL)
75 assetsvcPrefix := "/assetsvc"
76 assetsvcRouter := r.PathPrefix(assetsvcPrefix).Subrouter()
77 // Logos don't require authentication so bypass that step. Nor are they cluster-aware as they're
78 // embedded as links in the stored chart data.
79 assetsvcRouter.Methods("GET").Path("/v1/ns/{namespace}/assets/{repo}/{id}/logo").Handler(negroni.New(
80 negroni.Wrap(http.StripPrefix(assetsvcPrefix, assetsvcProxy)),
81 ))
82 assetsvcRouter.PathPrefix("/v1/clusters/{cluster}/namespaces/{namespace}/").Handler(negroni.New(
83 authGate,
84 negroni.Wrap(http.StripPrefix(assetsvcPrefix, assetsvcProxy)),
85 ))
86
87 n := negroni.Classic()
88 n.UseHandler(r)
89
90 port := os.Getenv("PORT")
91 if port == "" {
92 port = "8080"
93 }
94 addr := ":" + port
95
96 srv := &http.Server{
97 Addr: addr,
98 Handler: n,
99 }
100
101 go func() {
102 log.WithFields(log.Fields{"addr": addr}).Info("Started Kubeops")
103 err := srv.ListenAndServe()
104 if err != nil {
105 log.Info(err)
106 }
107 }()
108
109 // Catch SIGINT and SIGTERM
110 // Set up channel on which to send signal notifications.
111 c := make(chan os.Signal, 1)
112 signal.Notify(c, os.Interrupt, syscall.SIGTERM)
113 log.Debug("Set system to get notified on signals")
114 s := <-c
115 log.Infof("Received signal: %v. Waiting for existing requests to finish", s)
116 // Set a timeout value high enough to let k8s terminationGracePeriodSeconds to act
117 // accordingly and send a SIGKILL if needed
118 ctx, cancel := context.WithTimeout(context.Background(), time.Second*3600)
119 defer cancel()
120 // Doesn't block if no connections, but will otherwise wait
121 // until the timeout deadline.
122 srv.Shutdown(ctx)
123 log.Info("All requests have been served. Exiting")
124 os.Exit(0)
125}
ListAllReleases
- 功能:查询所有的release资源,底层调用的helm sdk中的action模块
- 入参:无
1func ListAllReleases(cfg Config, w http.ResponseWriter, req *http.Request, _ handlerutil.Params) {
2 ListReleases(cfg, w, req, make(map[string]string))
3}
4
5func ListReleases(cfg Config, w http.ResponseWriter, req *http.Request, params handlerutil.Params) {
6 // 操作委托给agent模块实现
7 apps, err := agent.ListReleases(cfg.ActionConfig, params[namespaceParam], cfg.Options.ListLimit, req.URL.Query().Get("statuses"))
8 if err != nil {
9 returnErrMessage(err, w)
10 return
11 }
12 // 处理返回结果
13 response.NewDataResponse(apps).Write(w)
14}
ListReleases
- 功能:查询指定命名空间下的release资源
- 入参:namespace
1func ListReleases(actionConfig *action.Configuration, namespace string, listLimit int, status string) ([]proxy.AppOverview, error) {
2 allNamespaces := namespace == ""
3
4 // 最终调用helm的sdk
5 // helm 中主要用到action这个模块的代码,具体实现请参考helm相关源码分析
6 cmd := action.NewList(actionConfig)
7 ...
8 // action中的 Run 方法,返回Release对象
9 releases, err := cmd.Run()
10 ...
11 // 将helm的Release对象转换为AppOverview对象展示
12 appOverviews := make([]proxy.AppOverview, 0)
13 for _, r := range releases {
14 if allNamespaces || r.Namespace == namespace {
15 appOverviews = append(appOverviews, appOverviewFromRelease(r))
16 }
17 }
18 return appOverviews, nil
19}
CreateRelease
-
功能:创建release,即通过chart包部署一个应用。类似helm命令行:
1helm install --namespace {namespace} {releaseName} {repoName}/{chartName} -f values(.yaml)
-
入参:Details对象
- repoName:注册的仓库名
- namespace:
- chart name:
- releaseName:
- version:
- values
1func CreateRelease(cfg Config, w http.ResponseWriter, req *http.Request, params handlerutil.Params) {
2 // 处理http请求,转换为Details对象
3 chartDetails, err := handlerutil.ParseRequest(req)
4 ...
5 // 获取helm中的Chart对象
6 // 内部通过获取repo的index.yaml,从中查找到待创建的chart包的下载地址
7 // 通过http请求下载文件,并调用helm 的 sdk,将二进制流加载为 Chart 对象
8 ch, err := handlerutil.GetChart(
9 chartDetails,
10 appRepo,
11 caCertSecret, authSecret,
12 cfg.Resolver.New(appRepo.Spec.Type, cfg.Options.UserAgent),
13 )
14 ...
15 // 设置releaseName、namespace、values
16 releaseName := chartDetails.ReleaseName
17 namespace := params[namespaceParam]
18 valuesString := chartDetails.Values
19 ..
20 // 根据传入的Chart对象,调用agent模块创建Release
21 // 生成Release的内部调用helm sdk的action实现
22 release, err := agent.CreateRelease(cfg.ActionConfig, releaseName, namespace, valuesString, ch, registrySecrets)
23 ...
24 response.NewDataResponse(release).Write(w)
25}
GetRelease
- 功能:获取release资源
- 入参:releaseName
1func GetRelease(cfg Config, w http.ResponseWriter, req *http.Request, params handlerutil.Params) {
2 // 获取namespace
3 releaseName := params[nameParam]
4 // 调用action模块,实现release的获取
5 // agent模块调用的是helm 的action模块
6 release, err := agent.GetRelease(cfg.ActionConfig, releaseName)
7 ...
8 response.NewDataResponse(compatRelease).Write(w)
9}
OperateRelease
- 功能:升级或者回滚release
- 入参:操作类型(upgrade、rollback)、ReleaseName、Details
1func OperateRelease(cfg Config, w http.ResponseWriter, req *http.Request, params handlerutil.Params) {
2 switch req.FormValue("action") {
3 case "upgrade":
4 upgradeRelease(cfg, w, req, params)
5 case "rollback":
6 rollbackRelease(cfg, w, req, params)
7 // TODO: Add "test" case here.
8 default:
9 // By default, for maintaining compatibility, we call upgrade.
10 upgradeRelease(cfg, w, req, params)
11 }
12}
DeleteRelease
- 功能:删除已安装的某个release
- 入参:releaseName
1func DeleteRelease(cfg Config, w http.ResponseWriter, req *http.Request, params handlerutil.Params) {
2 // 获取namespace
3 releaseName := params[nameParam]
4 ...
5 // 调用agent的删除方法
6 // 内部同样调用action的删除模块
7 err := agent.DeleteRelease(cfg.ActionConfig, releaseName, keepHistory)
8 ...
9}