kubernets|kubernetes源码剖析之client-go(二) Informer机制

kubernetes源码剖析之client-go(一) Informer机制 ? Kubernetes通过informer机制,实现在不依赖任何中间件的情况下保证消息的实时性、可靠性、顺序性。其他Kubernetes组件通过client-go的informer机制与Api Server进行通信。Informer的核心组件包括:

  • Reflector
    ?用于监控(Watch)指定Kubernetes资源
  • DeltaFIFO
    ? Delta的先进先出队列,Reflector为生产者,Controller为消费者
  • Indexer
    ?自带索引功能的本地存储,用于存储资源对象
Infermers 运行原理
kubernets|kubernetes源码剖析之client-go(二) Informer机制
文章图片

代码示例
package mainimport ( "log" "time"v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" )func main() {config, err := clientcmd.BuildConfigFromFlags("", "D:\\coding\\config") if err != nil { panic(err) }// Imformer通过clientset与Api Server通信 clientset, err := kubernetes.NewForConfig(config) if err != nil { panic(err) }// 创建stopCH对象,用于进程退出前通知Imformer提前退出 stopCh := make(chan struct{}) defer close(stopCh)// 实例化SharedInformer对象,参数clientset用于与Api Server交互, time.Minute设定resync周期,0为禁用resync // 通过map共享Informer( informers map[reflect.Type]cache.SharedIndexInformer ),避免同一资源的Informer被重复实例化 sharedInformers := informers.NewSharedInformerFactory(clientset, time.Minute) // 获取Pod资源的informer对象 // 每一个K8S资源都会实现Informer机制,每个Informer实现都会提供Informer和Lister方法 informer := sharedInformers.Core().V1().Pods().Informer()// 添加资源的回调方法 informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ // 创建资源时的回调方法 AddFunc: func(obj interface{}) { mObj := obj.(v1.Object) log.Printf("New Pod Added to Stroe: %s", mObj.GetName())}, // 更新资源时的回调方法 UpdateFunc: func(oldObj, newObj interface{}) { oObj := oldObj.(v1.Object) nObj := newObj.(v1.Object) log.Printf("%s Pod Updated to %s", oObj.GetName(), nObj.GetName()) }, // 删除资源时的回调方法 DeleteFunc: func(obj interface{}) { mObj := obj.(v1.Object) log.Printf("Pod Deleted from Stroe : %s", mObj.GetName()) }, })informer.Run(stopCh) }

Reflector ?Informer对Kubernetes的Api Server资源进行监控(Watch)操作。其中最核心的功能是Reflector,Reflector用于监控指定的Kubernetes资源,当监控的资源发生变化时,触发相应的变更事件。并将其资源对象存放到本地缓冲DeltaFIFO中。
?通过NewReflector方法实例化Reflector对象,方法必须传入ListerWatcher数据接口对象。 ListerWatcher拥有List和Watch方法,用于获取和监控资源列表,只要实现了List和Watch方法的对象都可以成为ListerWatcher。
#源码路径 vender\k8s.io\client-go\tools\cache\reflector.go// NewReflector creates a new Reflector object which will keep the given store up to // date with the server's contents for the given resource. Reflector promises to // only put things in the store that have the type of expectedType, unless expectedType // is nil. If resyncPeriod is non-zero, then lists will be executed after every // resyncPeriod, so that you can use reflectors to periodically process everything as // well as incrementally processing the things that change. func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector { return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod) }

? Reflector通过Run函数启动监控进程,并处理监控的事件。其中最主要的是ListAndWatch函数,它负责List和Watch指定的Kubernetes Api Server资源。
#源码路径 vender\k8s.io\client-go\tools\cache\reflector.go // Run starts a watch and handles watch events. Will restart the watch if it is closed. // Run will exit when stopCh is closed. func (r *Reflector) Run(stopCh <-chan struct{}) { klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name) wait.Until(func() { if err := r.ListAndWatch(stopCh); err != nil { utilruntime.HandleError(err) } }, r.period, stopCh) }

ListAndWatch函数
? ListAndWatch第一次运行时,通过List获取资源下的所有对象和版本信息,后续通过版本进行watch
#源码路径 vender\k8s.io\client-go\tools\cache\reflector.go // ListAndWatch first lists all items and get the resource version at the moment of call, // and then use the resource version to watch. // It returns error if ListAndWatch didn't even try to initialize watch. func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name) var resourceVersion string// Explicitly set "0" as resource version - it's fine for the List() // to be served from cache and potentially be delayed relative to // etcd contents. Reflector framework will catch up via Watch() eventually. // 初始化时将版本置为0 options := metav1.ListOptions{ResourceVersion: "0"}if err := func() error { ... // Attempt to gather list in chunks, if supported by listerWatcher, if not, the first // list request will return the full response. pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) { // 获取资源下的所有对象的数据,当传入的opts的ResourceVersion为0时,返回 全量数据。非0时返回增量数据。 return r.listerWatcher.List(opts) })) if r.WatchListPageSize != 0 { pager.PageSize = r.WatchListPageSize } // Pager falls back to full list if paginated list calls fail due to an "Expired" error. list, err = pager.List(context.Background(), options) close(listCh) }() ......listMetaInterface, err := meta.ListAccessor(list) if err != nil { return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err) } // 获取资源的版本号信息 resourceVersion = listMetaInterface.GetResourceVersion() initTrace.Step("Resource version extracted") // 将资源数据装成资源对象列表 items, err := meta.ExtractList(list) if err != nil { return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err) } initTrace.Step("Objects extracted") // 将资源对象列表中的资源对象和资源版本存储只DeltaFIFO中 if err := r.syncWith(items, resourceVersion); err != nil { return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err) } initTrace.Step("SyncWith done") // 设置最新的资源版本号 r.setLastSyncResourceVersion(resourceVersion) initTrace.Step("Resource version updated") return nil }(); err != nil { return err }resyncerrc := make(chan error, 1) cancelCh := make(chan struct{}) defer close(cancelCh) go func() { resyncCh, cleanup := r.resyncChan() defer func() { cleanup() // Call the last one written into cleanup }() for { select { case <-resyncCh: case <-stopCh: return case <-cancelCh: return } if r.ShouldResync == nil || r.ShouldResync() { klog.V(4).Infof("%s: forcing resync", r.name) if err := r.store.Resync(); err != nil { resyncerrc <- err return } } cleanup() resyncCh, cleanup = r.resyncChan() } }()for { // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors select { case <-stopCh: return nil default: }timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0)) // 监控参数,会传入资源版本号信息 options = metav1.ListOptions{ ResourceVersion: resourceVersion, // We want to avoid situations of hanging watchers. Stop any wachers that do not // receive any events within the timeout window. TimeoutSeconds: &timeoutSeconds, // To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks. // Reflector doesn't assume bookmarks are returned at all (if the server do not support // watch bookmarks, it will ignore this field). AllowWatchBookmarks: true, }// start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent start := r.clock.Now() // 调用Watch方法,监控资源对象 w, err := r.listerWatcher.Watch(options) if err != nil { switch err { case io.EOF: // watch closed normally case io.ErrUnexpectedEOF: klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err) default: utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err)) } // If this is "connection refused" error, it means that most likely apiserver is not responsive. // It doesn't make sense to re-list all objects because most likely we will be able to restart // watch where we ended. // If that's the case wait and resend watch request. if utilnet.IsConnectionRefused(err) { time.Sleep(time.Second) continue } return nil }// 处理资源的变更事件 if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil { if err != errorStopRequested { switch { case apierrs.IsResourceExpired(err): klog.V(4).Infof("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err) default: klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err) } } return nil } } }

DeltaFIFO ? DeltaFIFO可以分开理解为FIFO和Delta。 FIFO是一个先进先出队列,拥有队列的基本操作方法。Delta是资源对象存储,可以报错资源对象的操作类型。DeltaFIFO队列中,Reflector是生长泽,controller是消费者。DeltaFIFO结构如下:
#源码路径 vendor\k8s.io\client-go\tools\cache\delta_fifo.go type DeltaFIFO struct { // lock/cond protects access to 'items' and 'queue'. lock sync.RWMutex cond sync.Cond// We depend on the property that items in the set are in // the queue and vice versa, and that all Deltas in this // map have at least one Delta. items map[string]Deltas queue []string... }

生产者方法
【kubernets|kubernetes源码剖析之client-go(二) Informer机制】?DeltaFIFO队列中的资源对象在Added、Updated、Delete等事件被调用时都调用了queueActionLocked方法,它是DeltaFIFO实现的关键。
#源码路径 vendor\k8s.io\client-go\tools\cache\delta_fifo.go## Add、Update方法 // Add inserts an item, and puts it in the queue. The item is only enqueued // if it doesn't already exist in the set. func (f *DeltaFIFO) Add(obj interface{}) error { // 执行前先进行加锁 f.lock.Lock() defer f.lock.Unlock() f.populated = true return f.queueActionLocked(Added, obj) }// Update is just like Add, but makes an Updated Delta. func (f *DeltaFIFO) Update(obj interface{}) error { f.lock.Lock() defer f.lock.Unlock() f.populated = true return f.queueActionLocked(Updated, obj) }## queueActionLockedf方法 // queueActionLocked appends to the delta list for the object. // Caller must lock first. func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error { // 计算出资源对象的key id, err := f.KeyOf(obj) if err != nil { return KeyError{obj, err} } // 将actionType和资源对象构造成dELTA,添加到items中,并通过dedupDeltas中去重。 newDeltas := append(f.items[id], Delta{actionType, obj}) newDeltas = dedupDeltas(newDeltas)if len(newDeltas) > 0 { if _, exists := f.items[id]; !exists { f.queue = append(f.queue, id) } f.items[id] = newDeltas // 通过cond.Broadcast通知所有消费者接触阻塞。 f.cond.Broadcast() } else { // We need to remove this from our map (extra items in the queue are // ignored if they are not in the map). delete(f.items, id) } return nil }

消费者
#源码路径 vendor\k8s.io\client-go\tools\cache\delta_fifo.go// Pop blocks until an item is added to the queue, and then returns it.If // multiple items are ready, they are returned in the order in which they were // added/updated. The item is removed from the queue (and the store) before it // is returned, so if you don't successfully process it, you need to add it back // with AddIfNotPresent(). // process function is called under lock, so it is safe update data structures // in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc // may return an instance of ErrRequeue with a nested error to indicate the current // item should be requeued (equivalent to calling AddIfNotPresent under the lock). // // Pop returns a 'Deltas', which has a complete list of all the things // that happened to the object (deltas) while it was sitting in the queue. func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { f.lock.Lock() defer f.lock.Unlock() for { for len(f.queue) == 0 { // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. // When Close() is called, the f.closed is set and the condition is broadcasted. // Which causes this loop to continue and return from the Pop(). if f.IsClosed() { return nil, ErrFIFOClosed } // 当队列中没有数据时,通过f.cond.wait阻塞等待数据。只有接收到cond.Broadcast时才说明有数据被添加,接触当前阻塞状态。 f.cond.Wait() } // 如果队列不为空,去除队列头部的数据。 id := f.queue[0] f.queue = f.queue[1:]if f.initialPopulationCount > 0 { f.initialPopulationCount-- } item, ok := f.items[id] if !ok { // Item may have been deleted subsequently. continue } delete(f.items, id) // 将数据传入process回调函数,由上层消费者进行处理 err := process(item) if e, ok := err.(ErrRequeue); ok { // 如果回调出错,则将数据重新添加回队列中 f.addIfNotPresent(id, item) err = e.Err } // Don't need to copyDeltas here, because we're transferring // ownership to the caller. return item, err } }

    推荐阅读