prometheus-operator源码分析|prometheus-operator源码分析 -- prometheus配置自动更新之rules-reloader(三)
rules-reloader的源码: https://github.com/jimmidyson...
rules-reloader的启动参数:
Args:
--webhook-url=http://localhost:9090/-/reload
--volume-dir=/etc/prometheus/rules/prometheus-k8s-rulefiles-0
operator监听到prometheusrule配置变更,会更新configmap(目录prometheus-k8s-rulefiles-0),rules-reloader监控到prometheus-k8s-rulefiles-0目录有变更,发送reload给prometheus。
1. rules-reloader的源码分析 rules-reloader的源码很简单,使用fsnotify监听--volume-dir,发现变化就发送--webhook-url:
// configmap-reload.go
func main() {
flag.Var(&volumeDirs, "volume-dir", "the config map volume directory to watch for updates;
may be used multiple times")
flag.Var(&webhook, "webhook-url", "the url to send a request to when the specified config map volume directory has been updated")
flag.Parse()watcher, err := fsnotify.NewWatcher()go func() {
for {
select {
// 监听到变化
case event := <-watcher.Events:
log.Println("config map updated")
for _, h := range webhook {
begun := time.Now()
// HTTP发送webhook
req, err := http.NewRequest(*webhookMethod, h.String(), nil)
for retries := *webhookRetries;
retries != 0;
retries-- {
resp, err := http.DefaultClient.Do(req)
.....
}
}
case err := <-watcher.Errors:
}
}
}()
// 配置监听volumeDirs
for _, d := range volumeDirs {
log.Printf("Watching directory: %q", d)
err = watcher.Add(d)
if err != nil {
log.Fatal(err)
}
}
......
}
rulers-reloader监听的volume是挂载的ConfigMap:prometheus-k8s-rulefiles-0
--volume-dir=/etc/prometheus/rules/prometheus-k8s-rulefiles-0该目录下维护的rules文件:
/etc/prometheus/rules/prometheus-k8s-rulefiles-0 $ ls
monitoring-prometheus-k8s-rules.yaml
2. operator维护configmap的源码 【prometheus-operator源码分析|prometheus-operator源码分析 -- prometheus配置自动更新之rules-reloader(三)】configMap是由operator维护的,它对应prometheusrule CRD对象。
1) 监听prometheurule资源对象变更
与promInfo类似,有一个ruleInf专门负责处理prometheurule:
// pkg/prometheus/operator.go
// New creates a new controller.
func New(conf Config, logger log.Logger, r prometheus.Registerer) (*Operator, error) {
......
c.ruleInf = cache.NewSharedIndexInformer(
c.metrics.NewInstrumentedListerWatcher(
listwatch.MultiNamespaceListerWatcher(c.logger, c.config.Namespaces.AllowList, c.config.Namespaces.DenyList, func(namespace string) cache.ListerWatcher {
return &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return mclient.MonitoringV1().PrometheusRules(namespace).List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return mclient.MonitoringV1().PrometheusRules(namespace).Watch(context.TODO(), options)
},
}
}),
),
&monitoringv1.PrometheusRule{}, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
....
}
运行该ruleInf并添加handler:
// pkg/prometheus/operator.go
// Run the controller.
func (c *Operator) Run(stopc <-chan struct{}) error {
.......
go c.ruleInf.Run(stopc)
......
}
// addHandlers adds the eventhandlers to the informers.
func (c *Operator) addHandlers() {
......
c.ruleInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc:c.handleRuleAdd,
DeleteFunc: c.handleRuleDelete,
UpdateFunc: c.handleRuleUpdate,
})
......
}
一旦有资源变更,调用AddFunc/DeleteFunc/UpdateFunc:
// AddFunc
func (c *Operator) handleRuleAdd(obj interface{}) {
o, ok := c.getObject(obj)
if ok {
level.Debug(c.logger).Log("msg", "PrometheusRule added")
c.metrics.TriggerByCounter(monitoringv1.PrometheusRuleKind, "add").Inc()c.enqueueForMonitorNamespace(o.GetNamespace())
}
}// UpdateFunc
func (c *Operator) handleRuleUpdate(old, cur interface{}) {
if old.(*monitoringv1.PrometheusRule).ResourceVersion == cur.(*monitoringv1.PrometheusRule).ResourceVersion {
return
}
o, ok := c.getObject(cur)
if ok {
level.Debug(c.logger).Log("msg", "PrometheusRule updated")
c.metrics.TriggerByCounter(monitoringv1.PrometheusRuleKind, "update").Inc()
c.enqueueForMonitorNamespace(o.GetNamespace())
}
}
看下入队到workQueue的到底是啥:
// pkg/prometheus/operator.go
func (c *Operator) enqueueForMonitorNamespace(nsName string) {
c.enqueueForNamespace(c.nsMonInf.GetStore(), nsName)
}// enqueueForNamespace enqueues all Prometheus object keys that belong to the
// given namespace or select objects in the given namespace.
func (c *Operator) enqueueForNamespace(store cache.Store, nsName string) {
nsObject, exists, err := store.GetByKey(nsName)
ns := nsObject.(*v1.Namespace)err = cache.ListAll(c.promInf.GetStore(), labels.Everything(), func(obj interface{}) {
// Check for Prometheus instances in the namespace.
p := obj.(*monitoringv1.Prometheus)
if p.Namespace == nsName {
c.enqueue(p)
return
}
......
}
}
可以看到,入队的是个prometheus CRD对象的key。
2) 资源对象的变更处理
直接来到prometheus workQueue的处理函数:
// pkg/prometheus/operator.go
func (c *Operator) sync(key string) error {
obj, exists, err := c.promInf.GetIndexer().GetByKey(key)
......
p := obj.(*monitoringv1.Prometheus)
ruleConfigMapNames, err := c.createOrUpdateRuleConfigMaps(p)
if err != nil {
return err
}
.....
}
具体看下prometheurule的处理函数,对于prometheusrule,这里没有更新,直接delete老的,然后create新的:
// pkg/prometheus/operator.go
func (c *Operator) createOrUpdateRuleConfigMaps(p *monitoringv1.Prometheus) ([]string, error) {
cClient := c.kclient.CoreV1().ConfigMaps(p.Namespace)
......
newRules, err := c.selectRules(p, namespaces)
......
newConfigMaps, err := makeRulesConfigMaps(p, newRules)
......
// 直接删除老的规则,然后创建新的
// Simply deleting old ConfigMaps and creating new ones for now. Could be
// replaced by logic that only deletes obsolete ConfigMaps in the future.
for _, cm := range currentConfigMaps {
err := cClient.Delete(context.TODO(), cm.Name, metav1.DeleteOptions{})
if err != nil {
return nil, errors.Wrapf(err, "failed to delete current ConfigMap '%v'", cm.Name)
}
}
for _, cm := range newConfigMaps {
_, err = cClient.Create(context.TODO(), &cm, metav1.CreateOptions{})
if err != nil {
return nil, errors.Wrapf(err, "failed to create new ConfigMap '%v'", cm.Name)
}
}
......
}
构造rule的细节:rule文件名称={namespace}-{name}.yaml
// 构造rules
func (c *Operator) selectRules(p *monitoringv1.Prometheus, namespaces []string) (map[string]string, error) {
rules := map[string]string{}
.....
for _, ns := range namespaces {
var marshalErr error
err := cache.ListAllByNamespace(c.ruleInf.GetIndexer(), ns, ruleSelector, func(obj interface{}) {
promRule := obj.(*monitoringv1.PrometheusRule).DeepCopy()
if err := nsLabeler.EnforceNamespaceLabel(promRule);
err != nil {
marshalErr = err
return
}
content, err := generateContent(promRule.Spec)
if err != nil {
marshalErr = err
return
}
rules[fmt.Sprintf("%v-%v.yaml", promRule.Namespace, promRule.Name)] = content//rule的名称:{namespace}-{name}.yaml
})
if err != nil {
return nil, err
}
if marshalErr != nil {
return nil, marshalErr
}
}
...
return rules, nil
}
推荐阅读
- 如何寻找情感问答App的分析切入点
- D13|D13 张贇 Banner分析
- 自媒体形势分析
- 2020-12(完成事项)
- Android事件传递源码分析
- Python数据分析(一)(Matplotlib使用)
- Quartz|Quartz 源码解析(四) —— QuartzScheduler和Listener事件监听
- 泽宇读书会——如何阅读一本书笔记
- Java内存泄漏分析系列之二(jstack生成的Thread|Java内存泄漏分析系列之二:jstack生成的Thread Dump日志结构解析)
- [源码解析]|[源码解析] NVIDIA HugeCTR,GPU版本参数服务器---(3)