kubeapps源码分析-kubeops


| 阅读 |,阅读约 4 分钟
| 复制链接:

Overview

kubeapps源码分析-kubeops

概述

这个组件主要是调用helm sdk执行相关release操作,相当于将helm的install、uninstall、upgrade、rollout等命令行操作封装成restful接口,暴露给dashboard页面调用

源码分析

代码入口

  1func main() {
  2	pflag.Parse()
  3	settings.Init(pflag.CommandLine)
  4
  5	kubeappsNamespace := os.Getenv("POD_NAMESPACE")
  6	if kubeappsNamespace == "" {
  7		log.Fatal("POD_NAMESPACE should be defined")
  8	}
  9
 10	// If there is no clusters config, we default to the previous behaviour of a "default" cluster.
 11	clustersConfig := kube.ClustersConfig{KubeappsClusterName: "default"}
 12	if clustersConfigPath != "" {
 13		var err error
 14		var cleanupCAFiles func()
 15		clustersConfig, cleanupCAFiles, err = parseClusterConfig(clustersConfigPath, clustersCAFilesPrefix)
 16		if err != nil {
 17			log.Fatalf("unable to parse additional clusters config: %+v", err)
 18		}
 19		defer cleanupCAFiles()
 20	}
 21
 22	options := handler.Options{
 23		ListLimit:         listLimit,
 24		Timeout:           timeout,
 25		KubeappsNamespace: kubeappsNamespace,
 26		ClustersConfig:    clustersConfig,
 27	}
 28
 29  // 设置helm底层的存储驱动为secret,
 30  // helm底层驱动支持secret、configmap、postgresql、memory等
 31  // 具体可参考helm源码分析序列文章
 32	storageForDriver := agent.StorageForSecrets
 33	if helmDriverArg != "" {
 34		var err error
 35		storageForDriver, err = agent.ParseDriverType(helmDriverArg)
 36		if err != nil {
 37			panic(err)
 38		}
 39	}
 40	withHandlerConfig := handler.WithHandlerConfig(storageForDriver, options)
 41	r := mux.NewRouter()
 42
 43	// Healthcheck
 44	// TODO: add app specific health and readiness checks as per https://github.com/heptiolabs/healthcheck
 45	health := healthcheck.NewHandler()
 46	r.Handle("/live", health)
 47	r.Handle("/ready", health)
 48
 49	// Routes
 50	// Auth not necessary here with Helm 3 because it's done by Kubernetes.
 51	addRoute := handler.AddRouteWith(r.PathPrefix("/v1").Subrouter(), withHandlerConfig)
 52	addRoute("GET", "/clusters/{cluster}/releases", handler.ListAllReleases)
 53	addRoute("GET", "/clusters/{cluster}/namespaces/{namespace}/releases", handler.ListReleases)
 54	addRoute("POST", "/clusters/{cluster}/namespaces/{namespace}/releases", handler.CreateRelease)
 55	addRoute("GET", "/clusters/{cluster}/namespaces/{namespace}/releases/{releaseName}", handler.GetRelease)
 56	addRoute("PUT", "/clusters/{cluster}/namespaces/{namespace}/releases/{releaseName}", handler.OperateRelease)
 57	addRoute("DELETE", "/clusters/{cluster}/namespaces/{namespace}/releases/{releaseName}", handler.DeleteRelease)
 58
 59	// Backend routes unrelated to kubeops functionality.
 60	err := backendHandlers.SetupDefaultRoutes(r.PathPrefix("/backend/v1").Subrouter(), clustersConfig)
 61	if err != nil {
 62		log.Fatalf("Unable to setup backend routes: %+v", err)
 63	}
 64
 65	// assetsvc reverse proxy
 66	// TODO(mnelson) remove this reverse proxy once the haproxy frontend
 67	// proxies requests directly to the assetsvc. Move the authz to the
 68	// assetsvc itself.
 69	authGate := auth.AuthGate(clustersConfig, kubeappsNamespace)
 70	parsedAssetsvcURL, err := url.Parse(assetsvcURL)
 71	if err != nil {
 72		log.Fatalf("Unable to parse the assetsvc URL: %v", err)
 73	}
 74	assetsvcProxy := httputil.NewSingleHostReverseProxy(parsedAssetsvcURL)
 75	assetsvcPrefix := "/assetsvc"
 76	assetsvcRouter := r.PathPrefix(assetsvcPrefix).Subrouter()
 77	// Logos don't require authentication so bypass that step. Nor are they cluster-aware as they're
 78	// embedded as links in the stored chart data.
 79	assetsvcRouter.Methods("GET").Path("/v1/ns/{namespace}/assets/{repo}/{id}/logo").Handler(negroni.New(
 80		negroni.Wrap(http.StripPrefix(assetsvcPrefix, assetsvcProxy)),
 81	))
 82	assetsvcRouter.PathPrefix("/v1/clusters/{cluster}/namespaces/{namespace}/").Handler(negroni.New(
 83		authGate,
 84		negroni.Wrap(http.StripPrefix(assetsvcPrefix, assetsvcProxy)),
 85	))
 86
 87	n := negroni.Classic()
 88	n.UseHandler(r)
 89
 90	port := os.Getenv("PORT")
 91	if port == "" {
 92		port = "8080"
 93	}
 94	addr := ":" + port
 95
 96	srv := &http.Server{
 97		Addr:    addr,
 98		Handler: n,
 99	}
100
101	go func() {
102		log.WithFields(log.Fields{"addr": addr}).Info("Started Kubeops")
103		err := srv.ListenAndServe()
104		if err != nil {
105			log.Info(err)
106		}
107	}()
108
109	// Catch SIGINT and SIGTERM
110	// Set up channel on which to send signal notifications.
111	c := make(chan os.Signal, 1)
112	signal.Notify(c, os.Interrupt, syscall.SIGTERM)
113	log.Debug("Set system to get notified on signals")
114	s := <-c
115	log.Infof("Received signal: %v. Waiting for existing requests to finish", s)
116	// Set a timeout value high enough to let k8s terminationGracePeriodSeconds to act
117	// accordingly and send a SIGKILL if needed
118	ctx, cancel := context.WithTimeout(context.Background(), time.Second*3600)
119	defer cancel()
120	// Doesn't block if no connections, but will otherwise wait
121	// until the timeout deadline.
122	srv.Shutdown(ctx)
123	log.Info("All requests have been served. Exiting")
124	os.Exit(0)
125}

ListAllReleases

  • 功能:查询所有的release资源,底层调用的helm sdk中的action模块
  • 入参:无
 1func ListAllReleases(cfg Config, w http.ResponseWriter, req *http.Request, _ handlerutil.Params) {
 2	ListReleases(cfg, w, req, make(map[string]string))
 3}
 4
 5func ListReleases(cfg Config, w http.ResponseWriter, req *http.Request, params handlerutil.Params) {
 6  // 操作委托给agent模块实现
 7	apps, err := agent.ListReleases(cfg.ActionConfig, params[namespaceParam], cfg.Options.ListLimit, req.URL.Query().Get("statuses"))
 8	if err != nil {
 9		returnErrMessage(err, w)
10		return
11	}
12  // 处理返回结果
13	response.NewDataResponse(apps).Write(w)
14}

ListReleases

  • 功能:查询指定命名空间下的release资源
  • 入参:namespace
 1func ListReleases(actionConfig *action.Configuration, namespace string, listLimit int, status string) ([]proxy.AppOverview, error) {
 2	allNamespaces := namespace == ""
 3  
 4  // 最终调用helm的sdk
 5  // helm 中主要用到action这个模块的代码,具体实现请参考helm相关源码分析
 6	cmd := action.NewList(actionConfig)
 7	...
 8  // action中的 Run 方法,返回Release对象
 9	releases, err := cmd.Run()
10	...
11  // 将helm的Release对象转换为AppOverview对象展示
12	appOverviews := make([]proxy.AppOverview, 0)
13	for _, r := range releases {
14		if allNamespaces || r.Namespace == namespace {
15			appOverviews = append(appOverviews, appOverviewFromRelease(r))
16		}
17	}
18	return appOverviews, nil
19}

CreateRelease

  • 功能:创建release,即通过chart包部署一个应用。类似helm命令行:

    1helm install --namespace {namespace} {releaseName} {repoName}/{chartName} -f values(.yaml)
    
  • 入参:Details对象

    • repoName:注册的仓库名
    • namespace:
    • chart name:
    • releaseName:
    • version:
    • values
 1func CreateRelease(cfg Config, w http.ResponseWriter, req *http.Request, params handlerutil.Params) {
 2  // 处理http请求,转换为Details对象
 3	chartDetails, err := handlerutil.ParseRequest(req)
 4	...
 5  // 获取helm中的Chart对象
 6  // 内部通过获取repo的index.yaml,从中查找到待创建的chart包的下载地址
 7  // 通过http请求下载文件,并调用helm 的 sdk,将二进制流加载为 Chart 对象
 8	ch, err := handlerutil.GetChart(
 9		chartDetails,
10		appRepo,
11		caCertSecret, authSecret,
12		cfg.Resolver.New(appRepo.Spec.Type, cfg.Options.UserAgent),
13	)
14	...
15  // 设置releaseName、namespace、values
16	releaseName := chartDetails.ReleaseName
17	namespace := params[namespaceParam]
18	valuesString := chartDetails.Values
19	..
20  // 根据传入的Chart对象,调用agent模块创建Release
21  // 生成Release的内部调用helm sdk的action实现
22	release, err := agent.CreateRelease(cfg.ActionConfig, releaseName, namespace, valuesString, ch, registrySecrets)
23	...
24	response.NewDataResponse(release).Write(w)
25}

GetRelease

  • 功能:获取release资源
  • 入参:releaseName
1func GetRelease(cfg Config, w http.ResponseWriter, req *http.Request, params handlerutil.Params) {
2	// 获取namespace
3	releaseName := params[nameParam]
4  // 调用action模块,实现release的获取
5  // agent模块调用的是helm 的action模块
6	release, err := agent.GetRelease(cfg.ActionConfig, releaseName)
7	...
8	response.NewDataResponse(compatRelease).Write(w)
9}

OperateRelease

  • 功能:升级或者回滚release
  • 入参:操作类型(upgrade、rollback)、ReleaseName、Details
 1func OperateRelease(cfg Config, w http.ResponseWriter, req *http.Request, params handlerutil.Params) {
 2	switch req.FormValue("action") {
 3	case "upgrade":
 4		upgradeRelease(cfg, w, req, params)
 5	case "rollback":
 6		rollbackRelease(cfg, w, req, params)
 7	// TODO: Add "test" case here.
 8	default:
 9		// By default, for maintaining compatibility, we call upgrade.
10		upgradeRelease(cfg, w, req, params)
11	}
12}

DeleteRelease

  • 功能:删除已安装的某个release
  • 入参:releaseName
1func DeleteRelease(cfg Config, w http.ResponseWriter, req *http.Request, params handlerutil.Params) {
2	// 获取namespace
3  releaseName := params[nameParam]
4	...
5  // 调用agent的删除方法
6  // 内部同样调用action的删除模块
7	err := agent.DeleteRelease(cfg.ActionConfig, releaseName, keepHistory)
8	...
9}