Open-falcon graph使用RRD保存指标数据

Open-falcon的graph组件使用rrd保存指标数据,rrd由于是本地存储,单机故障容易引起数据丢失,Open-falcon提供了双写的逻辑(transfer双写2个graph节点)以增强可用性。
【Open-falcon graph使用RRD保存指标数据】graph的指标数据的存储,可选择不同的TSDB,比如InfluxDB、OpenTSDB等。
本文主要结合源码,介绍Open-falcon如何使用rrd保存和查询数据。
1. rrd文件的命名 rrd文件的命名方式:

md5=md5sum(metric, endpoint, tags) md5[0:2]_md5_dsType_step.rrd

看一下代码中的实现:
// RRDTOOL UTILS // 监控数据对应的rrd文件名称 func RrdFileName(baseDir string, md5 string, dsType string, step int) string { return baseDir + "/" + md5[0:2] + "/" + md5 + "_" + dsType + "_" + strconv.Itoa(step) + ".rrd" }

md5的计算:将endpoint/metric/tags拼成string,然后md5sum(string)
//MD5的值 func Checksum(endpoint string, metric string, tags map[string]string) string { pk := PK(endpoint, metric, tags) return Md5(pk) }func PK(endpoint, metric string, tags map[string]string) string { ret := bufferPool.Get().(*bytes.Buffer) ret.Reset() defer bufferPool.Put(ret)if tags == nil || len(tags) == 0 { ret.WriteString(endpoint) ret.WriteString("/") ret.WriteString(metric) return ret.String() } ret.WriteString(endpoint) ret.WriteString("/") ret.WriteString(metric) ret.WriteString("/") ret.WriteString(SortedTags(tags)) return ret.String() }

比如:trovedev这台机器的agent.alive的数据,该指标将被存储在be/be3cdc0fce0d498e19e2c6e9f1f338b6_GAUGE_60.rrd文件中:
# echo -n "trovedev/agent.alive" | md5sum be3cdc0fce0d498e19e2c6e9f1f338b6- # ls -alh be/be3cdc0fce0d498e19e2c6e9f1f338b6_GAUGE_60.rrd -rw-r--r-- 1 root root 70K 6月2 09:43 be/be3cdc0fce0d498e19e2c6e9f1f338b6_GAUGE_60.rrd

2. rrd文件的创建 rrd文件被创建时,除了在本地磁盘上新建了一个rrd文件,还指定了指标的归档策略,也就是指定保存多久、如何聚合:
比如:原始的数据1min1个点,保留12hour;原始的数据每隔5min被聚合(AVERAGE)成1个点,原始的数据被删掉;
// modules/graph/rrdtool/rrdtool.go const ( RRA1PointCnt= 720 RRA5PointCnt= 576 RRA20PointCnt= 504 RRA180PointCnt = 766 RRA720PointCnt = 730 )func create(filename string, item *cmodel.GraphItem) error { now := time.Now() start := now.Add(time.Duration(-24) * time.Hour) step := uint(item.Step)c := rrdlite.NewCreator(filename, start, step) c.DS("metric", item.DsType, item.Heartbeat, item.Min, item.Max)// 设置各种归档策略 // 1min1个点,存12hour c.RRA("AVERAGE", 0, 1, RRA1PointCnt)//RRA1PointCnt=720: 1min1个点,存720个点,即12hour// 5m1个点,存2d c.RRA("AVERAGE", 0, 5, RRA5PointCnt)//RRA5PointCnt=576:5min1个点, 存576个点,即2d c.RRA("MAX", 0, 5, RRA5PointCnt) c.RRA("MIN", 0, 5, RRA5PointCnt)// 20m1个点,存7d c.RRA("AVERAGE", 0, 20, RRA20PointCnt)//RRA20PointCnt=504: 20m1个点, 共504个点,即7d c.RRA("MAX", 0, 20, RRA20PointCnt) c.RRA("MIN", 0, 20, RRA20PointCnt).......return c.Create(true) }

可以使用rrdtool查询rrd文件的信息:其中保存了配置的归档策略
# rrdtool info be/be3cdc0fce0d498e19e2c6e9f1f338b6_GAUGE_60.rrd filename = "be/be3cdc0fce0d498e19e2c6e9f1f338b6_GAUGE_60.rrd" rrd_version = "0003" step = 60 last_update = 1591062120 header_size = 3080 ds[metric].index = 0 ds[metric].type = "GAUGE" ds[metric].minimal_heartbeat = 120 ds[metric].min = NaN ds[metric].max = NaN ds[metric].last_ds = "1" ds[metric].value = https://www.it610.com/article/0.0000000000e+00 ds[metric].unknown_sec = 0 rra[0].cf ="AVERAGE" rra[0].rows = 720 rra[0].cur_row = 385 rra[0].pdp_per_row = 1 rra[0].xff = 0.0000000000e+00 rra[0].cdp_prep[0].value = https://www.it610.com/article/NaN rra[0].cdp_prep[0].unknown_datapoints = 0 rra[1].cf ="AVERAGE" rra[1].rows = 576 rra[1].cur_row = 543 rra[1].pdp_per_row = 5 rra[1].xff = 0.0000000000e+00 rra[1].cdp_prep[0].value = https://www.it610.com/article/2.0000000000e+00 rra[1].cdp_prep[0].unknown_datapoints = 0 rra[2].cf ="MAX" rra[2].rows = 576 rra[2].cur_row = 24 rra[2].pdp_per_row = 5 rra[2].xff = 0.0000000000e+00 rra[2].cdp_prep[0].value = https://www.it610.com/article/1.0000000000e+00 rra[2].cdp_prep[0].unknown_datapoints = 0 rra[3].cf ="MIN" .....

3. 使用rrd文件查询数据 先看下如何直接查询rrd文件中的指标数据,比如查询2020-06-02 09:30:46 ~ 2020-06-02 09:39:46这段时间, 每60s1个点的数据:
# rrdtool fetch be/be3cdc0fce0d498e19e2c6e9f1f338b6_GAUGE_60.rrd AVERAGE -r 60 -s 1591061446 -e 1591061986 metric 1591061460: 1.0000000000e+00 1591061520: 1.0000000000e+00 1591061580: 1.0000000000e+00 1591061640: 1.0000000000e+00 1591061700: 1.0000000000e+00 1591061760: 1.0000000000e+00 1591061820: 1.0000000000e+00 1591061880: 1.0000000000e+00 1591061940: 1.0000000000e+00 1591062000: 1.0000000000e+00

再看一下代码中如何查询的:
  • filename: 指标所在的文件;
  • cf: 指标聚合方法,比如AVERAGE;
  • start,end: 查询的起始/终止时间;
  • step: 指标的间隔时间;
// modules/graph/rrdtool/rrdtool.go func fetch(filename string, cf string, start, end int64, step int) ([]*cmodel.RRDData, error) { start_t := time.Unix(start, 0) end_t := time.Unix(end, 0) step_t := time.Duration(step) * time.SecondfetchRes, err := rrdlite.Fetch(filename, cf, start_t, end_t, step_t) if err != nil { return []*cmodel.RRDData{}, err }defer fetchRes.FreeValues()values := fetchRes.Values() size := len(values) ret := make([]*cmodel.RRDData, size)start_ts := fetchRes.Start.Unix() step_s := fetchRes.Step.Seconds()for i, val := range values { ts := start_ts + int64(i+1)*int64(step_s) d := &cmodel.RRDData{ Timestamp: ts, Value:cmodel.JsonFloat(val), } ret[i] = d } return ret, nil }

4. 保存数据到rrd文件 通过rrdlite.Updater来实现,先将指标数据写入cache,然后一起update到rrd文件:
// modules/graph/rrdtool/rrdtool.go func update(filename string, items []*cmodel.GraphItem) error { u := rrdlite.NewUpdater(filename)for _, item := range items { v := math.Abs(item.Value) if v > 1e+300 || (v < 1e-300 && v > 0) { continue } if item.DsType == "DERIVE" || item.DsType == "COUNTER" { u.Cache(item.Timestamp, int(item.Value)) } else { u.Cache(item.Timestamp, item.Value) } }return u.Update() }

    推荐阅读