prometheus-operator源码分析|prometheus-operator源码分析 -- prometheus配置自动更新之rules-reloader(三)

rules-reloader的源码: https://github.com/jimmidyson...
rules-reloader的启动参数:

Args: --webhook-url=http://localhost:9090/-/reload --volume-dir=/etc/prometheus/rules/prometheus-k8s-rulefiles-0

operator监听到prometheusrule配置变更,会更新configmap(目录prometheus-k8s-rulefiles-0),rules-reloader监控到prometheus-k8s-rulefiles-0目录有变更,发送reload给prometheus。
1. rules-reloader的源码分析 rules-reloader的源码很简单,使用fsnotify监听--volume-dir,发现变化就发送--webhook-url:
// configmap-reload.go func main() { flag.Var(&volumeDirs, "volume-dir", "the config map volume directory to watch for updates; may be used multiple times") flag.Var(&webhook, "webhook-url", "the url to send a request to when the specified config map volume directory has been updated") flag.Parse()watcher, err := fsnotify.NewWatcher()go func() { for { select { // 监听到变化 case event := <-watcher.Events: log.Println("config map updated") for _, h := range webhook { begun := time.Now() // HTTP发送webhook req, err := http.NewRequest(*webhookMethod, h.String(), nil) for retries := *webhookRetries; retries != 0; retries-- { resp, err := http.DefaultClient.Do(req) ..... } } case err := <-watcher.Errors: } } }() // 配置监听volumeDirs for _, d := range volumeDirs { log.Printf("Watching directory: %q", d) err = watcher.Add(d) if err != nil { log.Fatal(err) } } ...... }

rulers-reloader监听的volume是挂载的ConfigMap:prometheus-k8s-rulefiles-0
--volume-dir=/etc/prometheus/rules/prometheus-k8s-rulefiles-0该目录下维护的rules文件: /etc/prometheus/rules/prometheus-k8s-rulefiles-0 $ ls monitoring-prometheus-k8s-rules.yaml

2. operator维护configmap的源码 【prometheus-operator源码分析|prometheus-operator源码分析 -- prometheus配置自动更新之rules-reloader(三)】configMap是由operator维护的,它对应prometheusrule CRD对象。
1) 监听prometheurule资源对象变更
与promInfo类似,有一个ruleInf专门负责处理prometheurule:
// pkg/prometheus/operator.go // New creates a new controller. func New(conf Config, logger log.Logger, r prometheus.Registerer) (*Operator, error) { ...... c.ruleInf = cache.NewSharedIndexInformer( c.metrics.NewInstrumentedListerWatcher( listwatch.MultiNamespaceListerWatcher(c.logger, c.config.Namespaces.AllowList, c.config.Namespaces.DenyList, func(namespace string) cache.ListerWatcher { return &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { return mclient.MonitoringV1().PrometheusRules(namespace).List(context.TODO(), options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { return mclient.MonitoringV1().PrometheusRules(namespace).Watch(context.TODO(), options) }, } }), ), &monitoringv1.PrometheusRule{}, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) .... }

运行该ruleInf并添加handler:
// pkg/prometheus/operator.go // Run the controller. func (c *Operator) Run(stopc <-chan struct{}) error { ....... go c.ruleInf.Run(stopc) ...... } // addHandlers adds the eventhandlers to the informers. func (c *Operator) addHandlers() { ...... c.ruleInf.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc:c.handleRuleAdd, DeleteFunc: c.handleRuleDelete, UpdateFunc: c.handleRuleUpdate, }) ...... }

一旦有资源变更,调用AddFunc/DeleteFunc/UpdateFunc:
// AddFunc func (c *Operator) handleRuleAdd(obj interface{}) { o, ok := c.getObject(obj) if ok { level.Debug(c.logger).Log("msg", "PrometheusRule added") c.metrics.TriggerByCounter(monitoringv1.PrometheusRuleKind, "add").Inc()c.enqueueForMonitorNamespace(o.GetNamespace()) } }// UpdateFunc func (c *Operator) handleRuleUpdate(old, cur interface{}) { if old.(*monitoringv1.PrometheusRule).ResourceVersion == cur.(*monitoringv1.PrometheusRule).ResourceVersion { return } o, ok := c.getObject(cur) if ok { level.Debug(c.logger).Log("msg", "PrometheusRule updated") c.metrics.TriggerByCounter(monitoringv1.PrometheusRuleKind, "update").Inc() c.enqueueForMonitorNamespace(o.GetNamespace()) } }

看下入队到workQueue的到底是啥:
// pkg/prometheus/operator.go func (c *Operator) enqueueForMonitorNamespace(nsName string) { c.enqueueForNamespace(c.nsMonInf.GetStore(), nsName) }// enqueueForNamespace enqueues all Prometheus object keys that belong to the // given namespace or select objects in the given namespace. func (c *Operator) enqueueForNamespace(store cache.Store, nsName string) { nsObject, exists, err := store.GetByKey(nsName) ns := nsObject.(*v1.Namespace)err = cache.ListAll(c.promInf.GetStore(), labels.Everything(), func(obj interface{}) { // Check for Prometheus instances in the namespace. p := obj.(*monitoringv1.Prometheus) if p.Namespace == nsName { c.enqueue(p) return } ...... } }

可以看到,入队的是个prometheus CRD对象的key。
2) 资源对象的变更处理
直接来到prometheus workQueue的处理函数:
// pkg/prometheus/operator.go func (c *Operator) sync(key string) error { obj, exists, err := c.promInf.GetIndexer().GetByKey(key) ...... p := obj.(*monitoringv1.Prometheus) ruleConfigMapNames, err := c.createOrUpdateRuleConfigMaps(p) if err != nil { return err } ..... }

具体看下prometheurule的处理函数,对于prometheusrule,这里没有更新,直接delete老的,然后create新的:
// pkg/prometheus/operator.go func (c *Operator) createOrUpdateRuleConfigMaps(p *monitoringv1.Prometheus) ([]string, error) { cClient := c.kclient.CoreV1().ConfigMaps(p.Namespace) ...... newRules, err := c.selectRules(p, namespaces) ...... newConfigMaps, err := makeRulesConfigMaps(p, newRules) ...... // 直接删除老的规则,然后创建新的 // Simply deleting old ConfigMaps and creating new ones for now. Could be // replaced by logic that only deletes obsolete ConfigMaps in the future. for _, cm := range currentConfigMaps { err := cClient.Delete(context.TODO(), cm.Name, metav1.DeleteOptions{}) if err != nil { return nil, errors.Wrapf(err, "failed to delete current ConfigMap '%v'", cm.Name) } } for _, cm := range newConfigMaps { _, err = cClient.Create(context.TODO(), &cm, metav1.CreateOptions{}) if err != nil { return nil, errors.Wrapf(err, "failed to create new ConfigMap '%v'", cm.Name) } } ...... }

构造rule的细节:rule文件名称={namespace}-{name}.yaml
// 构造rules func (c *Operator) selectRules(p *monitoringv1.Prometheus, namespaces []string) (map[string]string, error) { rules := map[string]string{} ..... for _, ns := range namespaces { var marshalErr error err := cache.ListAllByNamespace(c.ruleInf.GetIndexer(), ns, ruleSelector, func(obj interface{}) { promRule := obj.(*monitoringv1.PrometheusRule).DeepCopy() if err := nsLabeler.EnforceNamespaceLabel(promRule); err != nil { marshalErr = err return } content, err := generateContent(promRule.Spec) if err != nil { marshalErr = err return } rules[fmt.Sprintf("%v-%v.yaml", promRule.Namespace, promRule.Name)] = content//rule的名称:{namespace}-{name}.yaml }) if err != nil { return nil, err } if marshalErr != nil { return nil, marshalErr } } ... return rules, nil }

    推荐阅读