prometheus-operator源码分析|prometheus-operator源码分析 -- 以prometheus statefulset为例

Operator的整体架构:
prometheus-operator源码分析|prometheus-operator源码分析 -- 以prometheus statefulset为例
文章图片

主要包括3大组件:

  • Informer: 监听资源对象的变化,将变化转成事件放入WorkQueue;
  • WorkQueue: 保存变化的事件;
  • Control Loop: 消费WorkQueue中的事件,对事件做响应;
其中,Informer较为复杂:
  • Reflector: 调用apiservier接口,使用List&Watch对指定类型的资源对象进行监控;
  • DeltaFIFO: 增量队列,保存Reflector监控到的change对象;
  • LocalStorage: informer的本地cache,用以查询特定类型的资源对象,以减轻apiserver的查询压力;
1. Informer源码 对于要监控的资源类型,每种类型创建一个Informer,比如prometheus CRD:
// 代码入口 // cmd/operator/main.go func Main() int { ....... r := prometheus.NewRegistry() po, err := prometheuscontroller.New(cfg, log.With(logger, "component", "prometheusoperator"), r) if err != nil { fmt.Fprint(os.Stderr, "instantiating prometheus controller failed: ", err) return 1 } ....... }

这里可以看到,用prometheuscontroller.New()创建对应的operator,这也是常说的operator=CRD+Controller:
// pkg/prometheus/operator.go // New creates a new controller. func New(conf Config, logger log.Logger, r prometheus.Registerer) (*Operator, error) { .... c.promInf = cache.NewSharedIndexInformer( c.metrics.NewInstrumentedListerWatcher( listwatch.MultiNamespaceListerWatcher(c.logger, c.config.Namespaces.PrometheusAllowList, c.config.Namespaces.DenyList, func(namespace string) cache.ListerWatcher { return &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { options.LabelSelector = c.config.PromSelector return mclient.MonitoringV1().Prometheuses(namespace).List(context.TODO(), options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { options.LabelSelector = c.config.PromSelector return mclient.MonitoringV1().Prometheuses(namespace).Watch(context.TODO(), options) }, } }), ), &monitoringv1.Prometheus{}, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) ..... }

创建了prometheus的Informor: c.promInf,可以看到,它监听对象变更使用的是List&Watch;
创建ok后,将该Informer Run起来:
// pkg/prometheus/operator.go // Run the controller. func (c *Operator) Run(stopc <-chan struct{}) error { ...... go c.promInf.Run(stopc) ...... }

同时为该Informer添加handler,包括Add/Delete/Update:
// pkg/prometheus/operator.go // Run the controller. func (c *Operator) Run(stopc <-chan struct{}) error { ...... if err := c.waitForCacheSync(stopc); err != nil { return err } c.addHandlers() ...... }// addHandlers adds the eventhandlers to the informers. func (c *Operator) addHandlers() { c.promInf.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc:c.handlePrometheusAdd, DeleteFunc: c.handlePrometheusDelete, UpdateFunc: c.handlePrometheusUpdate, }) ....... }

2. WorkQueue源码 Informer发现监听的对象变更,调用handler,handler会将变更对象放入WorkQueue:
下面是Add Prometheus的事件:
// pkg/prometheus/operator.go func (c *Operator) handlePrometheusAdd(obj interface{}) { key, ok := c.keyFunc(obj) if !ok { return } level.Debug(c.logger).Log("msg", "Prometheus added", "key", key) c.metrics.TriggerByCounter(monitoringv1.PrometheusesKind, "add").Inc() checkPrometheusSpecDeprecation(key, obj.(*monitoringv1.Prometheus), c.logger) c.enqueue(key) }

3. Control Loop源码 operator会启动1个worker,来消费workQueue中的事件:
// pkg/prometheus/operator.go // Run the controller. func (c *Operator) Run(stopc <-chan struct{}) error { ....... go c.worker() }

// pkg/prometheus/operator.go // worker runs a worker thread that just dequeues items, processes them, and // marks them done. It enforces that the syncHandler is never invoked // concurrently with the same key. func (c *Operator) worker() { for c.processNextWorkItem() { } }

看下具体的消费动作:
// pkg/prometheus/operator.go func (c *Operator) processNextWorkItem() bool { key, quit := c.queue.Get()//取队列元素 if quit { return false } defer c.queue.Done(key)err := c.sync(key.(string))//进行事件操作 if err == nil { c.queue.Forget(key)//处理完毕,Forget return true } c.metrics.ReconcileErrorsCounter().Inc() utilruntime.HandleError(errors.Wrap(err, fmt.Sprintf("Sync %q failed", key))) c.queue.AddRateLimited(key) return true }

事件处理在c.sync()中,以prometehus statefulset为例:
  • 首先检查资源对象是否存在,若不存在则直接创建,返回;
  • 否则,更新statefulset对象;
  • 若都不满足,则删除statefulset对象;
// pkg/prometheus/operator.go func (c *Operator) sync(key string) error { ...... ssetClient := c.kclient.AppsV1().StatefulSets(p.Namespace) // Ensure we have a StatefulSet running Prometheus deployed. obj, exists, err = c.ssetInf.GetIndexer().GetByKey(prometheusKeyToStatefulSetKey(key))sset, err := makeStatefulSet(*p, &c.config, ruleConfigMapNames, newSSetInputHash)//不存在,就创建 if !exists { level.Debug(c.logger).Log("msg", "no current Prometheus statefulset found") level.Debug(c.logger).Log("msg", "creating Prometheus statefulset") if _, err := ssetClient.Create(context.TODO(), sset, metav1.CreateOptions{}); err != nil { return errors.Wrap(err, "creating statefulset failed") } return nil } ...... //否则,就更新 _, err = ssetClient.Update(context.TODO(), sset, metav1.UpdateOptions{})// 都不满足,则删除 if ok && sErr.ErrStatus.Code == 422 && sErr.ErrStatus.Reason == metav1.StatusReasonInvalid { level.Info(c.logger).Log("msg", "resolving illegal update of Prometheus StatefulSet", "details", sErr.ErrStatus.Details) if err := ssetClient.Delete(context.TODO(), sset.GetName(), metav1.DeleteOptions{PropagationPolicy: &propagationPolicy}); err != nil { return errors.Wrap(err, "failed to delete StatefulSet to avoid forbidden action") } return nil } ...... }

【prometheus-operator源码分析|prometheus-operator源码分析 -- 以prometheus statefulset为例】简单看下创建statefulset做的事情,无外乎根据CRD的配置,构造spec对象,然后使用ssetClient.Create创建statefulset:
// pkg/prometheus/statefulset.go func makeStatefulSet( p monitoringv1.Prometheus, config *Config, ruleConfigMapNames []string, inputHash string, ) (*appsv1.StatefulSet, error) { ...... spec, err := makeStatefulSetSpec(p, config, ruleConfigMapNames) ...... }

func makeStatefulSetSpec(p monitoringv1.Prometheus, c *Config, ruleConfigMapNames []string) (*appsv1.StatefulSetSpec, error) { ...... promArgs := []string{ "-web.console.templates=/etc/prometheus/consoles", "-web.console.libraries=/etc/prometheus/console_libraries", } switch version.Major { case 1: ...... case 2: retentionTimeFlag := "-storage.tsdb.retention=" if version.Minor >= 7 { retentionTimeFlag = "-storage.tsdb.retention.time=" if p.Spec.RetentionSize != "" { promArgs = append(promArgs, fmt.Sprintf("-storage.tsdb.retention.size=%s", p.Spec.RetentionSize), ) } } promArgs = append(promArgs, fmt.Sprintf("-config.file=%s", path.Join(confOutDir, configEnvsubstFilename)), fmt.Sprintf("-storage.tsdb.path=%s", storageDir), retentionTimeFlag+p.Spec.Retention, "-web.enable-lifecycle", "-storage.tsdb.no-lockfile", ) ......}

    推荐阅读