prometheus源码分析(rules模块)

prometheus的rule有两类:

  • AlertingRule: 告警规则;
  • RecordingRule: 表达式规则,用于产生新的指标;
1.整体框架 prometheus的rule管理主要在代码目录prometheus/rules/中:
  • rules.Manager在运行时的时候,会读取rules/*.yaml文件,读取出所有的分组rules.Group;
  • 为每个rules.Group分配1个goroutine,周期性的执行group下所有的rules;
  • 对每个Rule:
    • 若是AlertingRule,则进行eval,若触发告警,则sendAlerts出去;
    • 若是RecordingRule,则进行eval,将结果写入TSDB;
prometheus源码分析(rules模块)
文章图片

假如rules/xxx.yaml文件内容如下:
groups: - name: test rules: - expr: | 100 - avg without (cpu, mode) ( rate(node_cpu_seconds_total{job="node-exporter", mode="idle"}[1m]) )*100 record: instance:node_cpu_used_percent:rate1m labels: __type__: "gauge" - alert: Too_Many_Goroutine expr: | go_goroutines > 100 labels: serverity: high annotations: summary: too many goroutines - name: cpu_used rules: - expr: | node_cpu_seconds_total{mode="user", cpu="0"} record: cpu_used_percent labels: cpu: "0"

那么,它将产生2个group:
  • group: test,内含1个recodingRule和1个alertingRule;
  • group: cpu_used,内含1个recordingRule;
2.规则加载的代码逻辑 manager.Update加载规则,并为每个group启动1个goroutine运行规则:
// rules/manager.go func (m *Manager) Update(interval time.Duration, files []string, externalLabels labels.Labels) error { // 加载规则 groups, errs := m.LoadGroups(interval, externalLabels, files...)var wg sync.WaitGroup for _, newg := range groups { ...... wg.Add(1) go func(newg *Group) { ... go func() { ...... // 运行group内的规则 newg.run(m.opts.Context) }() wg.Done() }(newg) } ...... wg.Wait() m.groups = groupsreturn nil }

读取规则的逻辑:
  • 1个group下可能有多个rules;
  • 根据规则中是否配置alert字段判断,是alertingRule or recordingRule;
// rules/manager.go // LoadGroups reads groups from a list of files. func (m *Manager) LoadGroups( interval time.Duration, externalLabels labels.Labels, filenames ...string, ) (map[string]*Group, []error) { groups := make(map[string]*Group)for _, fn := range filenames { rgs, errs := rulefmt.ParseFile(fn) for _, rg := range rgs.Groups { itv := interval if rg.Interval != 0 { itv = time.Duration(rg.Interval) } rules := make([]Rule, 0, len(rg.Rules)) for _, r := range rg.Rules { expr, err := parser.ParseExpr(r.Expr.Value) // 告警规则 if r.Alert.Value != "" { rules = append(rules, NewAlertingRule( r.Alert.Value, expr, time.Duration(r.For), labels.FromMap(r.Labels), labels.FromMap(r.Annotations), externalLabels, m.restored, log.With(m.logger, "alert", r.Alert), )) continue } // 表达式规则 rules = append(rules, NewRecordingRule( r.Record.Value, expr, labels.FromMap(r.Labels), )) } groups[groupKey(fn, rg.Name)] = NewGroup(GroupOptions{ Name:rg.Name, File:fn, Interval:itv, Rules:rules, ShouldRestore: shouldRestore, Opts:m.opts, done:m.done, }) } }return groups, nil }

3.规则执行逻辑 对每个group,利用定时器,每隔interval执行1次:
// rules/manager.go func (g *Group) run(ctx context.Context) { // Wait an initial amount to have consistently slotted intervals. evalTimestamp := g.evalTimestamp().Add(g.interval) select { case <-time.After(time.Until(evalTimestamp)): case <-g.done: return }iter := func() { ...... g.Eval(ctx, evalTimestamp) } tick := time.NewTicker(g.interval) defer tick.Stop()iter()for { select { case <-g.done: return default: select { case <-g.done: return case <-tick.C: ..... iter() } } } }

group每个interval做的事情:
  • 遍历group下的每个rule,执行每个规则;
  • 若是告警规则,当告警触发时,将alert发出去;
  • 若是表达式规则,将新metrics写入TSDB;
// rules/manager.go // Eval runs a single evaluation cycle in which all rules are evaluated sequentially. func (g *Group) Eval(ctx context.Context, ts time.Time) { for i, rule := range g.rules { ...... func(i int, rule Rule) { // 执行规则 vector, err := rule.Eval(ctx, ts, g.opts.QueryFunc, g.opts.ExternalURL)// 告警规则被触发 if ar, ok := rule.(*AlertingRule); ok { ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc) }app := g.opts.Appendable.Appender() seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i])) defer func() { // flush新metrics if err := app.Commit(); err != nil { level.Warn(g.logger).Log("msg", "Rule sample appending failed", "err", err) return } }() for _, s := range vector { // 添加新metrics _, err := app.Add(s.Metric, s.T, s.V); .... } }(i, rule) } }

4.规则的eval代码逻辑 从上面看到,不管是alertingRule还是recordingRule,都是通过rule.Eval执行的。
对于alertingRule.Eval():
  • 利用PromQL查询表达式的值,若规则被触发,则PromQL将输出res;
  • 利用告警维持时间,将活跃告警维护在r.active[]中;告警发送模块仅发送r.active[]中的告警;
// rules/alerting.go // Eval evaluates the rule expression and then creates pending alerts and fires // or removes previously pending alerts accordingly. func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, externalURL *url.URL) (promql.Vector, error) { // promQL查询出结果 res, err := query(ctx, r.vector.String(), ts)var alerts = make(map[uint64]*Alert, len(res)) for _, smpl := range res { ...... alerts[h] = &Alert{ Labels:lbs, Annotations: annotations, ActiveAt:ts, State:StatePending, Value:smpl.V, } } for h, a := range alerts { // Check whether we already have alerting state for the identifying label set. // Update the last value and annotations if so, create a new alert entry otherwise. if alert, ok := r.active[h]; ok && alert.State != StateInactive { alert.Value = https://www.it610.com/article/a.Value alert.Annotations = a.Annotations continue } r.active[h] = a//alert被存在r.active中,active中的alert被发送给alertmanager } ... // We have already acquired the lock above hence using SetHealth and // SetLastError will deadlock. r.health = HealthGood r.lastError = err return vec, nil }

对于recordingRule.Eval()
  • PromQL计算出最新的metrics值;
// rules/recording.go // Eval evaluates the rule and then overrides the metric names and labels accordingly. func (rule *RecordingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, _ *url.URL) (promql.Vector, error) { vector, err := query(ctx, rule.vector.String(), ts)// Override the metric name and labels. for i := range vector { sample := &vector[i] lb := labels.NewBuilder(sample.Metric) lb.Set(labels.MetricName, rule.name) for _, l := range rule.labels { lb.Set(l.Name, l.Value) } sample.Metric = lb.Labels() } ... return vector, nil }

上面的PromQL计算,均使用了QueryFunc,QueryFunc使用了PromQL的引擎;
// cmd/prometheus/main.go // 初始化 ruleManager = rules.NewManager(&rules.ManagerOptions{ Appendable:fanoutStorage, TSDB:localStorage, QueryFunc:rules.EngineQueryFunc(queryEngine, fanoutStorage), NotifyFunc:sendAlerts(notifierManager, cfg.web.ExternalURL.String()), Context:ctxRule, ExternalURL:cfg.web.ExternalURL, Registerer:prometheus.DefaultRegisterer, Logger:log.With(logger, "component", "rule manager"), OutageTolerance: time.Duration(cfg.outageTolerance), ForGracePeriod:time.Duration(cfg.forGracePeriod), ResendDelay:time.Duration(cfg.resendDelay), })// rules/manager.go // EngineQueryFunc returns a new query function that executes instant queries against // the given engine. // It converts scalar into vector results. func EngineQueryFunc(engine *promql.Engine, q storage.Queryable) QueryFunc { return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) { q, err := engine.NewInstantQuery(q, qs, t) if err != nil { return nil, err } res := q.Exec(ctx) if res.Err != nil { return nil, res.Err } switch v := res.Value.(type) { case promql.Vector: return v, nil case promql.Scalar: return promql.Vector{promql.Sample{ Point:promql.Point(v), Metric: labels.Labels{}, }}, nil default: return nil, errors.New("rule result is not a vector or scalar") } } }

参考 【prometheus源码分析(rules模块)】1.告警判定与告警发现:https://segmentfault.com/a/11...

    推荐阅读