k8s|Kubernetes中controller-manager源码分析--启动流程

本文将对Kubernetes中controller-manager的源码进行分析。分析的时间为2017年08月19日。采用主干的代码,commitId为2ab7ad14b4fad378a4a69a64c587497d77e60f44
启动流程-初始化 【k8s|Kubernetes中controller-manager源码分析--启动流程】启动函数的位置在:
kubernetes\cmd\kube-controller-manager\controller-manager.go文件中:

func main() { s := options.NewCMServer() s.AddFlags(pflag.CommandLine, app.KnownControllers(), app.ControllersDisabledByDefault.List())flag.InitFlags() logs.InitLogs() defer logs.FlushLogs()verflag.PrintAndExitIfRequested()if err := app.Run(s); err != nil { fmt.Fprintf(os.Stderr, "%v\n", err) os.Exit(1) } }

主要的步骤包括:
1. 创建CMServer对象
2. 进行AddFlags处理
3. 进行日志初始化
4. 执行主的Run函数,启动程序
在创建CMServer时会对Controller-Manager的所有参数赋默认值。在AddFlags参数处理时,会根据程序的启动参数对参数赋上实际值。
启动流程-Run函数 Run函数的位置在:kubernetes\cmd\kube-controller-manager\app\controllermanager.go文件中。
Run函数比较长,大致分为下几个部分.(1) 基本检查和配置处理 (2)启动监控线程 (3)定义各种Controller的启动 (4) 进行选主处理
(1) 基本检查和配置处理的代码如下
// Run runs the CMServer.This should never exit. func Run(s *options.CMServer) error { // To help debugging, immediately log version glog.Infof("Version: %+v", version.Get())//对设置的controller进行名称检测 if err := s.Validate(KnownControllers(), ControllersDisabledByDefault.List()); err != nil { return err }//创建对应的配置对象,并进行配置处理 if c, err := configz.New("componentconfig"); err == nil { c.Set(s.KubeControllerManagerConfiguration) } else { glog.Errorf("unable to register configz: %s", err) } kubeconfig, err := clientcmd.BuildConfigFromFlags(s.Master, s.Kubeconfig) if err != nil { return err }kubeconfig.ContentConfig.ContentType = s.ContentType // Override kubeconfig qps/burst settings from flags kubeconfig.QPS = s.KubeAPIQPS kubeconfig.Burst = int(s.KubeAPIBurst) kubeClient, err := clientset.NewForConfig(restclient.AddUserAgent(kubeconfig, "controller-manager")) if err != nil { glog.Fatalf("Invalid API configuration: %v", err) } leaderElectionClient := kubernetes.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "leader-election"))

(2) 启动监控线程
//启动server,这些都是系统监控的api,如/metrics可以记录api调用次数 go func() { mux := http.NewServeMux() healthz.InstallHandler(mux) if s.EnableProfiling { mux.HandleFunc("/debug/pprof/", pprof.Index) mux.HandleFunc("/debug/pprof/profile", pprof.Profile) mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) mux.HandleFunc("/debug/pprof/trace", pprof.Trace) if s.EnableContentionProfiling { goruntime.SetBlockProfileRate(1) } } configz.InstallHandler(mux) mux.Handle("/metrics", prometheus.Handler())server := &http.Server{ Addr:net.JoinHostPort(s.Address, strconv.Itoa(int(s.Port))), Handler: mux, } glog.Fatal(server.ListenAndServe()) }()

(3) 定义依次启动各种Controller的方法。在启动过程serviceAccountToken这个controller会先启动。其他的Controller会按照NewControllerInitializers表中的顺序启动。
eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events("")}) recorder := eventBroadcaster.NewRecorder(api.Scheme, v1.EventSource{Component: "controller-manager"})run := func(stop <-chan struct{}) { rootClientBuilder := controller.SimpleControllerClientBuilder{ ClientConfig: kubeconfig, } var clientBuilder controller.ControllerClientBuilder if s.UseServiceAccountCredentials { if len(s.ServiceAccountKeyFile) > 0 { // It's possible another controller process is creating the tokens for us. // If one isn't, we'll timeout and exit when our client builder is unable to create the tokens. glog.Warningf("--use-service-account-credentials was specified without providing a --service-account-private-key-file") } clientBuilder = controller.SAControllerClientBuilder{ ClientConfig:restclient.AnonymousClientConfig(kubeconfig), CoreClient:kubeClient.CoreV1(), AuthenticationClient: kubeClient.Authentication(), Namespace:"kube-system", } } else { clientBuilder = rootClientBuilder } ctx, err := CreateControllerContext(s, rootClientBuilder, clientBuilder, stop) if err != nil { glog.Fatalf("error building controller context: %v", err) } saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenControllerif err := StartControllers(ctx, saTokenControllerInitFunc, NewControllerInitializers()); err != nil { glog.Fatalf("error starting controllers: %v", err) }ctx.InformerFactory.Start(ctx.Stop)select {} }

(4) 进行选主处理,如果设置的LeaderElect为false,则不进行选主,直接启动。如果设置了选主,则在被确定为主后启动controller。
选主leaderelection.RunOrDie是通用的选主的机制,他们启动的时候都会抢先注册自己为leader,当然只有一个会成功,其它的节点watch,如果leader挂了后其它节点会成为leader启动自己CM
if !s.LeaderElection.LeaderElect { run(nil) panic("unreachable") }id, err := os.Hostname() if err != nil { return err }rl, err := resourcelock.New(s.LeaderElection.ResourceLock, "kube-system", "kube-controller-manager", leaderElectionClient.CoreV1(), resourcelock.ResourceLockConfig{ Identity:id, EventRecorder: recorder, }) if err != nil { glog.Fatalf("error creating lock: %v", err) }leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{ Lock:rl, LeaseDuration: s.LeaderElection.LeaseDuration.Duration, RenewDeadline: s.LeaderElection.RenewDeadline.Duration, RetryPeriod:s.LeaderElection.RetryPeriod.Duration, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: run, OnStoppedLeading: func() { glog.Fatalf("leaderelection lost") }, }, }) panic("unreachable")

启动流程-启动Controller 在StartControllers函数中,按照顺序启动所有的controller。StartControllers函数的代码在kubernetes/cmd/kube-controller-manager/app/controllermanager.go:435
func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc) error { // Always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest // If this fails, just return here and fail since other controllers won't be able to get credentials. if _, err := startSATokenController(ctx); err != nil { return err }for controllerName, initFn := range controllers { if !ctx.IsControllerEnabled(controllerName) { glog.Warningf("%q is disabled", controllerName) continue }time.Sleep(wait.Jitter(ctx.Options.ControllerStartInterval.Duration, ControllerStartJitter))glog.V(1).Infof("Starting %q", controllerName) started, err := initFn(ctx) if err != nil { glog.Errorf("Error starting %q", controllerName) return err } if !started { glog.Warningf("Skipping %q", controllerName) continue } glog.Infof("Started %q", controllerName) }return nil }

在StartControllers函数中,会先启动serviceAccountToken这个controller。然后按照NewControllerInitializers表中的顺序启动所有的controller。
启动函数包含两个返回值(bool,err),前一个表示这个controller是否需要被启动,后一个表示启动过程是否出现异常。
启动流程-启动具体的Controller 以RC为例,分析一下Controller具体的启动。具体的代码在kubernetes/cmd/kube-controller-manager/app/core.go:205
func startReplicationController(ctx ControllerContext) (bool, error) { //创建对应Controller的实例 go replicationcontroller.NewReplicationManager( ctx.InformerFactory.Core().V1().Pods(), ctx.InformerFactory.Core().V1().ReplicationControllers(), ctx.ClientBuilder.ClientOrDie("replication-controller"), replicationcontroller.BurstReplicas, //通过Run函数执行 ).Run(int(ctx.Options.ConcurrentRCSyncs), ctx.Stop) return true, nil }

Run函数对应的代码位置为kubernetes/pkg/controller/replication/replication_controller.go:146
// Run begins watching and syncing. func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer rm.queue.ShutDown()glog.Infof("Starting RC controller") defer glog.Infof("Shutting down RC controller") //等待API Object都完成同步 if !controller.WaitForCacheSync("RC", stopCh, rm.podListerSynced, rm.rcListerSynced) { return }//按照worker数量,启动worker for i := 0; i < workers; i++ { go wait.Until(rm.worker, time.Second, stopCh) }<-stopCh }在Run函数中主要包括两个处理步骤:(1) 等待API Object都完成同步 (2) 启动worker协程,woker协程中处理函数为rm.work。会跳转到processNextWorkItem。

// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (rm *ReplicationManager) worker() {
for rm.processNextWorkItem() {
}
glog.Infof(“replication controller worker shutting down”)
}
processNextWorkItem函数处理过程为:

func (rm *ReplicationManager) processNextWorkItem() bool {
key, quit := rm.queue.Get()
if quit {
return false
}
defer rm.queue.Done(key)
//syncHandler为实际的处理函数 err := rm.syncHandler(key.(string)) if err == nil { rm.queue.Forget(key) return true }rm.queue.AddRateLimited(key) utilruntime.HandleError(err) return true

}
“`
在processNextWorkItem函数中实际的处理函数为rm.syncHandler。rm.syncHandler函数在NewReplicationManager中被赋值。实际值为rm.syncReplicationController。也就是说RC的实际处理过程在syncReplicationController中完成。后续还会单独深入对RC的处理过程进行分析,这里不再深入分析syncReplicationController的处理过程。
从RC的例子中,我们可以看到一个controller在启动过程中,一般是先创建一个对象,然后调用对象的Run函数进行启动。启动过程会先等待Api Object 都同步完成。再启动woker协程,在woker协程中进行实际的处理。

    推荐阅读