Kubernetes|Kubernetes Deployment 源码分析(二)

概述startDeploymentController 入口逻辑DeploymentController 对象DeploymentController 类型定义DeploymentController 初始化ResourceEventHandlerDeployment 变更事件ReplicaSet 变更事件DeploymentController 启动Run()syncDeployment小结
概述

源码版本:kubernetes-v1.22.3 / commit-id: c92036
Deployment 是最常用的 Kubernetes 原生 Workload 资源之一,我们一开始尝试使用 Kubernetes 的时候大概率就是从运行一个 Deployment 类型的工作负载开始的。
在上一篇《Kubernetes Deployment 源码分析(一)》中我们过了下 Deployment 的全部特性,主要介绍“滚动更新”和“回滚”等主要功能,从而心中有个概念,知道 Deployment 的能力边界在那里,所以今天以此为基础,我们继续从源码角度看下 Deployment 的实现。
注意:阅读 Deployment 源码需要有一定的自定义控制器工作原理基础,里面涉及到了 Informer 工作机制、workqueue(延时/限速工作队列)、ResourceEventHandler 等等逻辑,没有相关知识储备直接看本文会有一定挑战,建议先阅读《深入理解 K8S 原理与实现》系列目录里列的相关文章。
《Kubernetes Deployment Controller 源码分析》分为两讲:
  • 《Kubernetes Deployment 源码分析(一)》 - 功能特性
  • 《Kubernetes Deployment 源码分析(二)》 - 源码流程
startDeploymentController 入口逻辑 DeploymentController 的初始化和启动入口是 startDeploymentController() 函数
  • cmd/kube-controller-manager/app/apps.go:72
1func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) {
2dc, err := deployment.NewDeploymentController(
3ctx.InformerFactory.Apps().V1().Deployments(),
4ctx.InformerFactory.Apps().V1().ReplicaSets(),
5ctx.InformerFactory.Core().V1().Pods(),
6ctx.ClientBuilder.ClientOrDie("deployment-controller"),
7)
8if err != nil {
9return nil, true, fmt.Errorf("error creating Deployment controller: %v", err)
10}
11go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop)
12return nil, true, nil
13}

startDeploymentController() 函数中先通过 NewDeploymentController() 方法初始化一个 DeploymentController 实例,这里的参数 DeploymentInformer、ReplicaSetInformer、PodInformer 和 Clientset,因而 DeploymentController 也就具备了获取Deployment、 ReplicaSet、Pod 三类资源变更事件以及 CURD apiserver 操作各种资源的能力。接着这个函数中又调用了 DeploymentController 的 Run() 方法来启动 DeploymentController,这里的参数 ConcurrentDeploymentSyncs 默认值是 5,也就是默认情况下并发调谐的 Deployment 数量是 5 个。
DeploymentController 对象 继续看下 DeploymentController 对象的定义和初始化。
DeploymentController 类型定义
接着来看 DeploymentController 类型的定义
  • pkg/controller/deployment/deployment_controller.go:68
1type DeploymentController struct {
2// ReplicaSet 操控器
3rsControlcontroller.RSControlInterface
4clientclientset.Interface
5eventRecorder record.EventRecorder
6
7syncHandler func(dKey string) error
8// 测试用
9enqueueDeployment func(deployment *apps.Deployment)
10
11// 用来从 cache 里 get/list Deployment
12dLister appslisters.DeploymentLister
13// 用来从 cache 里 get/list ReplicaSet
14rsLister appslisters.ReplicaSetLister
15// 用来从 cache 里 get/list Pod
16podLister corelisters.PodLister
17
18dListerSynced cache.InformerSynced
19rsListerSynced cache.InformerSynced
20podListerSynced cache.InformerSynced
21
22// 工作队列,限速队列实现
23queue workqueue.RateLimitingInterface
24}
25

DeploymentController 初始化
  • pkg/controller/deployment/deployment_controller.go:101
1func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
2// Event 相关逻辑
3eventBroadcaster := record.NewBroadcaster()
4eventBroadcaster.StartStructuredLogging(0)
5eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
6
7// ……
8
9// new dc
10dc := &DeploymentController{
11client:client,
12eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "deployment-controller"}),
13queue:workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
14}
15// 主要是 clientset
16dc.rsControl = controller.RealRSControl{
17KubeClient: client,
18Recorder:dc.eventRecorder,
19}
20// ResourceEventHandler 配置,后面会分析
21dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
22AddFunc:dc.addDeployment,
23UpdateFunc: dc.updateDeployment,
24DeleteFunc: dc.deleteDeployment,
25})
26rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
27AddFunc:dc.addReplicaSet,
28UpdateFunc: dc.updateReplicaSet,
29DeleteFunc: dc.deleteReplicaSet,
30})
31podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
32DeleteFunc: dc.deletePod,
33})
34
35// 这里有主要逻辑,后面会讲
36dc.syncHandler = dc.syncDeployment
37dc.enqueueDeployment = dc.enqueue
38// 各种 lister
39dc.dLister = dInformer.Lister()
40dc.rsLister = rsInformer.Lister()
41dc.podLister = podInformer.Lister()
42dc.dListerSynced = dInformer.Informer().HasSynced
43dc.rsListerSynced = rsInformer.Informer().HasSynced
44dc.podListerSynced = podInformer.Informer().HasSynced
45return dc, nil
46}

ResourceEventHandler 上面提到了几个 ResourceEventHandler 回调函数:
  • addDeployment
  • updateDeployment
  • deleteDeployment
  • addReplicaSet
  • updateReplicaSet
  • deleteReplicaSet
  • deletePod
逐个分析下。
Deployment 变更事件
这里逻辑比较简单,三个方法一起看:
  • pkg/controller/deployment/deployment_controller.go:167
1func (dc *DeploymentController) addDeployment(obj interface{}) {
2d := obj.(*apps.Deployment)
3klog.V(4).InfoS("Adding deployment", "deployment", klog.KObj(d))
4// 新增 Deployment 时直接 enqueue
5dc.enqueueDeployment(d)
6}
7
8func (dc *DeploymentController) updateDeployment(old, cur interface{}) {
9oldD := old.(*apps.Deployment)
10curD := cur.(*apps.Deployment)
11klog.V(4).InfoS("Updating deployment", "deployment", klog.KObj(oldD))
12// old Deployment 只用来打印一个日志,cur Deployment enqueue
13dc.enqueueDeployment(curD)
14}
15
16func (dc *DeploymentController) deleteDeployment(obj interface{}) {
17d, ok := obj.(*apps.Deployment)
18if !ok {
19// 处理 DeletedFinalStateUnknown 场景
20tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
21if !ok {
22utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
23return
24}
25d, ok = tombstone.Obj.(*apps.Deployment)
26if !ok {
27utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Deployment %#v", obj))
28return
29}
30}
31klog.V(4).InfoS("Deleting deployment", "deployment", klog.KObj(d))
32// 入队
33dc.enqueueDeployment(d)
34}

ReplicaSet 变更事件
然后来看 ReplicaSet 相关回调函数的实现。
1、Added
  • pkg/controller/deployment/deployment_controller.go:199
1func (dc *DeploymentController) addReplicaSet(obj interface{}) {
2rs := obj.(*apps.ReplicaSet)
3// 如果是准备删除了,重启的过程会收到 Added 事件,这时候直接调用删除操作
4if rs.DeletionTimestamp != nil {
5dc.deleteReplicaSet(rs)
6return
7}
8// 查询对应的 Deployment
9if controllerRef := metav1.GetControllerOf(rs); controllerRef != nil {
10d := dc.resolveControllerRef(rs.Namespace, controllerRef)
11if d == nil {
12return
13}
14klog.V(4).InfoS("ReplicaSet added", "replicaSet", klog.KObj(rs))
15// 将这个 Deployment 加入工作队列
16dc.enqueueDeployment(d)
17return
18}
19
20// 如果是一个孤儿 ReplicaSet,则看是不是能找到一个 Deployment 来领养
21ds := dc.getDeploymentsForReplicaSet(rs)
22if len(ds) == 0 {
23return
24}
25klog.V(4).InfoS("Orphan ReplicaSet added", "replicaSet", klog.KObj(rs))
26// 一般只有一个 Deployment,但是也不能排出多个的情况,所以这里用的是 ds 列表,循环 enqueue
27for _, d := range ds {
28dc.enqueueDeployment(d)
29}
30}

2、Updated
  • pkg/controller/deployment/deployment_controller.go:256
1func (dc *DeploymentController) updateReplicaSet(old, cur interface{}) {
2curRS := cur.(*apps.ReplicaSet)
3oldRS := old.(*apps.ReplicaSet)
4if curRS.ResourceVersion == oldRS.ResourceVersion {
5// Resync 的时候 RV 相同,不做处理
6return
7}
8
9curControllerRef := metav1.GetControllerOf(curRS)
10oldControllerRef := metav1.GetControllerOf(oldRS)
11controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
12if controllerRefChanged && oldControllerRef != nil {
13// 如果 rs 的 ref 变更了,就需要通知老的 ref 对应的 Deployment
14if d := dc.resolveControllerRef(oldRS.Namespace, oldControllerRef); d != nil {
15dc.enqueueDeployment(d)
16}
17}
18
19if curControllerRef != nil {
20d := dc.resolveControllerRef(curRS.Namespace, curControllerRef)
21if d == nil {
22return
23}
24klog.V(4).InfoS("ReplicaSet updated", "replicaSet", klog.KObj(curRS))
25// 当前 rs 对应 dp 入队
26dc.enqueueDeployment(d)
27return
28}
29
30// 孤儿 rs 的场景,和 Added 时处理逻辑一样
31labelChanged := !reflect.DeepEqual(curRS.Labels, oldRS.Labels)
32if labelChanged || controllerRefChanged {
33ds := dc.getDeploymentsForReplicaSet(curRS)
34if len(ds) == 0 {
35return
36}
37klog.V(4).InfoS("Orphan ReplicaSet updated", "replicaSet", klog.KObj(curRS))
38for _, d := range ds {
39dc.enqueueDeployment(d)
40}
41}
42}

3、Deleted
  • pkg/controller/deployment/deployment_controller.go:304
1func (dc *DeploymentController) deleteReplicaSet(obj interface{}) {
2rs, ok := obj.(*apps.ReplicaSet)
3
4// 删除场景需要处理的 DeletedFinalStateUnknown 场景
5if !ok {
6tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
7if !ok {
8utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
9return
10}
11rs, ok = tombstone.Obj.(*apps.ReplicaSet)
12if !ok {
13utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a ReplicaSet %#v", obj))
14return
15}
16}
17
18// 孤儿 rs 被删除时没有 Deployment 需要关心
19controllerRef := metav1.GetControllerOf(rs)
20if controllerRef == nil {
21return
22}
23d := dc.resolveControllerRef(rs.Namespace, controllerRef)
24if d == nil {
25return
26}
27klog.V(4).InfoS("ReplicaSet deleted", "replicaSet", klog.KObj(rs))
28// 入队
29dc.enqueueDeployment(d)
30}

DeploymentController 启动 前面看了哪些 Event 会向 workqueue 中添加 item,接着看下这些 item 是怎么被消费的。
Run()
Run() 方法本身很简洁,根据给定的并发数,也就是默认 5 并发,启动 dc.worker
  • pkg/controller/deployment/deployment_controller.go:149
1func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
2defer utilruntime.HandleCrash()
3defer dc.queue.ShutDown()
4
5klog.InfoS("Starting controller", "controller", "deployment")
6defer klog.InfoS("Shutting down controller", "controller", "deployment")
7
8if !cache.WaitForNamedCacheSync("deployment", stopCh, dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) {
9return
10}
11
12for i := 0; i < workers; i++ {
13go wait.Until(dc.worker, time.Second, stopCh)
14}
15
16<-stopCh
17}

继续看 worker 的内容
  • pkg/controller/deployment/deployment_controller.go:460
1func (dc *DeploymentController) worker() {
2for dc.processNextWorkItem() {
3}
4}
5
6func (dc *DeploymentController) processNextWorkItem() bool {
7key, quit := dc.queue.Get() // 从 workqueue 中获取一个 item
8if quit {
9return false
10}
11defer dc.queue.Done(key)
12// 主要逻辑
13err := dc.syncHandler(key.(string))
14dc.handleErr(err, key)
15
16return true
17}

这里从 workqueue 里拿到一个 key 之后,通过调用 syncHandler() 方法来处理,前面强调过这行代码:
  • dc.syncHandler = dc.syncDeployment
所以接着我们继续跟 dc.syncDeployment 的实现。
syncDeployment
syncDeployment() 方法做的事情是拿着 workqueue 里出队的 key,根据这个 key 来 sync 对应的 Deployment,继续看下具体的逻辑。
  • pkg/controller/deployment/deployment_controller.go
1func (dc *DeploymentController) syncDeployment(key string) error {
2// 从 key 中分割出 namespace 和 name
3namespace, name, err := cache.SplitMetaNamespaceKey(key)
4if err != nil {
5klog.ErrorS(err, "Failed to split meta namespace cache key", "cacheKey", key)
6return err
7}
8
9startTime := time.Now()
10klog.V(4).InfoS("Started syncing deployment", "deployment", klog.KRef(namespace, name), "startTime", startTime)
11defer func() {
12klog.V(4).InfoS("Finished syncing deployment", "deployment", klog.KRef(namespace, name), "duration", time.Since(startTime))
13}()
14// 根据 namespace 和 name 从 cache 中检索对应 Deployment 对象
15deployment, err := dc.dLister.Deployments(namespace).Get(name)
16if errors.IsNotFound(err) {
17klog.V(2).InfoS("Deployment has been deleted", "deployment", klog.KRef(namespace, name))
18return nil
19}
20if err != nil {
21return err
22}
23
24// 为了不改动这个 cache,这是一个 ThreadSafeStore
25d := deployment.DeepCopy()
26
27// 空 LabelSelector 会匹配到所有 pods,发一个 Warning Event,更新 .Status.ObservedGeneration 然后返回
28everything := metav1.LabelSelector{}
29if reflect.DeepEqual(d.Spec.Selector, &everything) {
30dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")
31if d.Status.ObservedGeneration < d.Generation {
32d.Status.ObservedGeneration = d.Generation
33dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{})
34}
35return nil
36}
37
38// 获取当前 Deployment 拥有的所有 ReplicaSet,同时会更新这些 ReplicaSet 的 ControllerRef
39rsList, err := dc.getReplicaSetsForDeployment(d)
40if err != nil {
41return err
42}
43// 这个 map 是 map[types.UID][]*v1.Pod 类型,key 是 rs 的 UID,value 是对应 rs 管理的所有 pod 列表
44podMap, err := dc.getPodMapForDeployment(d, rsList)
45if err != nil {
46return err
47}
48
49// 已经标记要删除了,这时候只更新状态
50if d.DeletionTimestamp != nil {
51return dc.syncStatusOnly(d, rsList)
52}
53
54// 根据 .Spec.Pause配置看是否更新 Deployment 的 conditions
55if err = dc.checkPausedConditions(d); err != nil {
56return err
57}
58
59if d.Spec.Paused {
60// Pause 或 scale 时的调谐逻辑
61return dc.sync(d, rsList)
62}
63
64// 应该过期了,老版本的 "deprecated.deployment.rollback.to" 注解回滚逻辑
65if getRollbackTo(d) != nil {
66// 回滚到旧版本的逻辑
67return dc.rollback(d, rsList)
68}
69// 如果是 scale
70scalingEvent, err := dc.isScalingEvent(d, rsList)
71if err != nil {
72return err
73}
74if scalingEvent {
75// Pause 或 scale 时的调谐逻辑
76return dc.sync(d, rsList)
77}
78
79switch d.Spec.Strategy.Type {
80// 重建策略
81case apps.RecreateDeploymentStrategyType:
82return dc.rolloutRecreate(d, rsList, podMap)
83// 滚动更新策略
84case apps.RollingUpdateDeploymentStrategyType:
85return dc.rolloutRolling(d, rsList)
86}
87return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
88}

小结 syncDeployment() 方法看完之后,Deployment 控制器的逻辑就算过完一遍了。当然这个方法内部涉及到的一些小方法的调用这里只是简单介绍其功能,并没有深究所有实现细节,不过这些小方法的逻辑都不难,就暂不赘述了。
【Kubernetes|Kubernetes Deployment 源码分析(二)】对 Deployment Controller 的代码分析在 client-go 和 Job Controller 之后,所以这里看起来感觉会很简单,对应有些描述也没有那么详细,如果大家看过前面我发的相关文章,看本文的逻辑应该也不会感觉吃力,反正我希望你先回过头看下我之前发的相关文章,最新版可以在我的博客网站 Daniel Hu's Blog 查阅。
转载请保留本文原始链接 https://www.danielhu.cn)
Kubernetes|Kubernetes Deployment 源码分析(二)
文章图片

    推荐阅读