prometheus的rule有两类:
- AlertingRule: 告警规则;
- RecordingRule: 表达式规则,用于产生新的指标;
- rules.Manager在运行时的时候,会读取rules/*.yaml文件,读取出所有的分组rules.Group;
- 为每个rules.Group分配1个goroutine,周期性的执行group下所有的rules;
- 对每个Rule:
- 若是AlertingRule,则进行eval,若触发告警,则sendAlerts出去;
- 若是RecordingRule,则进行eval,将结果写入TSDB;
文章图片
假如rules/xxx.yaml文件内容如下:
groups:
- name: test
rules:
- expr: |
100 - avg without (cpu, mode) (
rate(node_cpu_seconds_total{job="node-exporter", mode="idle"}[1m])
)*100
record: instance:node_cpu_used_percent:rate1m
labels:
__type__: "gauge"
- alert: Too_Many_Goroutine
expr: |
go_goroutines > 100
labels:
serverity: high
annotations:
summary: too many goroutines
- name: cpu_used
rules:
- expr: |
node_cpu_seconds_total{mode="user", cpu="0"}
record: cpu_used_percent
labels:
cpu: "0"
那么,它将产生2个group:
- group: test,内含1个recodingRule和1个alertingRule;
- group: cpu_used,内含1个recordingRule;
// rules/manager.go
func (m *Manager) Update(interval time.Duration, files []string, externalLabels labels.Labels) error {
// 加载规则
groups, errs := m.LoadGroups(interval, externalLabels, files...)var wg sync.WaitGroup
for _, newg := range groups {
......
wg.Add(1)
go func(newg *Group) {
...
go func() {
......
// 运行group内的规则
newg.run(m.opts.Context)
}()
wg.Done()
}(newg)
}
......
wg.Wait()
m.groups = groupsreturn nil
}
读取规则的逻辑:
- 1个group下可能有多个rules;
- 根据规则中是否配置alert字段判断,是alertingRule or recordingRule;
// rules/manager.go
// LoadGroups reads groups from a list of files.
func (m *Manager) LoadGroups(
interval time.Duration, externalLabels labels.Labels, filenames ...string,
) (map[string]*Group, []error) {
groups := make(map[string]*Group)for _, fn := range filenames {
rgs, errs := rulefmt.ParseFile(fn)
for _, rg := range rgs.Groups {
itv := interval
if rg.Interval != 0 {
itv = time.Duration(rg.Interval)
}
rules := make([]Rule, 0, len(rg.Rules))
for _, r := range rg.Rules {
expr, err := parser.ParseExpr(r.Expr.Value)
// 告警规则
if r.Alert.Value != "" {
rules = append(rules, NewAlertingRule(
r.Alert.Value,
expr,
time.Duration(r.For),
labels.FromMap(r.Labels),
labels.FromMap(r.Annotations),
externalLabels,
m.restored,
log.With(m.logger, "alert", r.Alert),
))
continue
}
// 表达式规则
rules = append(rules, NewRecordingRule(
r.Record.Value,
expr,
labels.FromMap(r.Labels),
))
}
groups[groupKey(fn, rg.Name)] = NewGroup(GroupOptions{
Name:rg.Name,
File:fn,
Interval:itv,
Rules:rules,
ShouldRestore: shouldRestore,
Opts:m.opts,
done:m.done,
})
}
}return groups, nil
}
3.规则执行逻辑 对每个group,利用定时器,每隔interval执行1次:
// rules/manager.go
func (g *Group) run(ctx context.Context) {
// Wait an initial amount to have consistently slotted intervals.
evalTimestamp := g.evalTimestamp().Add(g.interval)
select {
case <-time.After(time.Until(evalTimestamp)):
case <-g.done:
return
}iter := func() {
......
g.Eval(ctx, evalTimestamp)
}
tick := time.NewTicker(g.interval)
defer tick.Stop()iter()for {
select {
case <-g.done:
return
default:
select {
case <-g.done:
return
case <-tick.C:
.....
iter()
}
}
}
}
group每个interval做的事情:
- 遍历group下的每个rule,执行每个规则;
- 若是告警规则,当告警触发时,将alert发出去;
- 若是表达式规则,将新metrics写入TSDB;
// rules/manager.go
// Eval runs a single evaluation cycle in which all rules are evaluated sequentially.
func (g *Group) Eval(ctx context.Context, ts time.Time) {
for i, rule := range g.rules {
......
func(i int, rule Rule) {
// 执行规则
vector, err := rule.Eval(ctx, ts, g.opts.QueryFunc, g.opts.ExternalURL)// 告警规则被触发
if ar, ok := rule.(*AlertingRule);
ok {
ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc)
}app := g.opts.Appendable.Appender()
seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i]))
defer func() {
// flush新metrics
if err := app.Commit();
err != nil {
level.Warn(g.logger).Log("msg", "Rule sample appending failed", "err", err)
return
}
}()
for _, s := range vector {
// 添加新metrics
_, err := app.Add(s.Metric, s.T, s.V);
....
}
}(i, rule)
}
}
4.规则的eval代码逻辑 从上面看到,不管是alertingRule还是recordingRule,都是通过rule.Eval执行的。
对于alertingRule.Eval():
- 利用PromQL查询表达式的值,若规则被触发,则PromQL将输出res;
- 利用告警维持时间,将活跃告警维护在r.active[]中;告警发送模块仅发送r.active[]中的告警;
// rules/alerting.go
// Eval evaluates the rule expression and then creates pending alerts and fires
// or removes previously pending alerts accordingly.
func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, externalURL *url.URL) (promql.Vector, error) {
// promQL查询出结果
res, err := query(ctx, r.vector.String(), ts)var alerts = make(map[uint64]*Alert, len(res))
for _, smpl := range res {
......
alerts[h] = &Alert{
Labels:lbs,
Annotations: annotations,
ActiveAt:ts,
State:StatePending,
Value:smpl.V,
}
}
for h, a := range alerts {
// Check whether we already have alerting state for the identifying label set.
// Update the last value and annotations if so, create a new alert entry otherwise.
if alert, ok := r.active[h];
ok && alert.State != StateInactive {
alert.Value = https://www.it610.com/article/a.Value
alert.Annotations = a.Annotations
continue
}
r.active[h] = a//alert被存在r.active中,active中的alert被发送给alertmanager
}
...
// We have already acquired the lock above hence using SetHealth and
// SetLastError will deadlock.
r.health = HealthGood
r.lastError = err
return vec, nil
}
对于recordingRule.Eval()
- PromQL计算出最新的metrics值;
// rules/recording.go
// Eval evaluates the rule and then overrides the metric names and labels accordingly.
func (rule *RecordingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, _ *url.URL) (promql.Vector, error) {
vector, err := query(ctx, rule.vector.String(), ts)// Override the metric name and labels.
for i := range vector {
sample := &vector[i]
lb := labels.NewBuilder(sample.Metric)
lb.Set(labels.MetricName, rule.name)
for _, l := range rule.labels {
lb.Set(l.Name, l.Value)
}
sample.Metric = lb.Labels()
}
...
return vector, nil
}
上面的PromQL计算,均使用了QueryFunc,QueryFunc使用了PromQL的引擎;
// cmd/prometheus/main.go
// 初始化
ruleManager = rules.NewManager(&rules.ManagerOptions{
Appendable:fanoutStorage,
TSDB:localStorage,
QueryFunc:rules.EngineQueryFunc(queryEngine, fanoutStorage),
NotifyFunc:sendAlerts(notifierManager, cfg.web.ExternalURL.String()),
Context:ctxRule,
ExternalURL:cfg.web.ExternalURL,
Registerer:prometheus.DefaultRegisterer,
Logger:log.With(logger, "component", "rule manager"),
OutageTolerance: time.Duration(cfg.outageTolerance),
ForGracePeriod:time.Duration(cfg.forGracePeriod),
ResendDelay:time.Duration(cfg.resendDelay),
})// rules/manager.go
// EngineQueryFunc returns a new query function that executes instant queries against
// the given engine.
// It converts scalar into vector results.
func EngineQueryFunc(engine *promql.Engine, q storage.Queryable) QueryFunc {
return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
q, err := engine.NewInstantQuery(q, qs, t)
if err != nil {
return nil, err
}
res := q.Exec(ctx)
if res.Err != nil {
return nil, res.Err
}
switch v := res.Value.(type) {
case promql.Vector:
return v, nil
case promql.Scalar:
return promql.Vector{promql.Sample{
Point:promql.Point(v),
Metric: labels.Labels{},
}}, nil
default:
return nil, errors.New("rule result is not a vector or scalar")
}
}
}
参考 【prometheus源码分析(rules模块)】1.告警判定与告警发现:https://segmentfault.com/a/11...
推荐阅读
- prometheus源码分析(scrape模块)
- prometheus源码分析(t/v数据的压缩、写入和读取)
- prometheus源码分析(index倒排索引)
- promethues源码剖析(head block)
- k8s|k8s hpa计算
- 我的大屏监控布局资料
- PromQL之label_replace/label_join