prometheus remote-read使用与源码解读

prometheus中remote-write和remote-read的配置:

# store data to influxdb remote_write: - url: "http://10.21.1.74:8086/api/v1/prom/write?db=prometheus" # read data from influxdb remote_read: - url: "http://10.21.1.74:8086/api/v1/prom/read?db=prometheus"

remote-read可以让prometheus读取远程存储上的时序数据,扩展了本地存储。
prometheus在应对/query查询请求时,由fanoutStorage处理;
  • fanoutStorage包含localStorage(本地TSDB)和remoteStorage(远程存储),它们均实现了查询接口;
  • localStorage执行本地查询;
  • remoteStorage通过HTTP执行远程查询;
  • 将上述2个查询结果进行合并,返回给client;
prometheus remote-read使用与源码解读
文章图片

demo演示
  1. prometheus配置remote-write和remote-read;
  2. 运行一段时间后:
停止prometheus: stop prometheus;
删除本地数据:delete prometheus/data目录;
启动prometheus: start promethesu;
上述操作模拟:本地存储宕机,使用远程存储的场景。
  1. 在prometheus UI上执行查询,可以得到历史数据(远程存储);
prometheus remote-read使用与源码解读
文章图片

remote-read的代码 执行远程查询的入口代码:生成query,然后发送HTTP远程查询
// storage/remote/read.go func (q *querier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { if len(q.requiredMatchers) > 0 { requiredMatchers := append([]*labels.Matcher{}, q.requiredMatchers...) for _, m := range matchers { for i, r := range requiredMatchers { if m.Type == labels.MatchEqual && m.Name == r.Name && m.Value =https://www.it610.com/article/= r.Value { // Requirement matched. requiredMatchers = append(requiredMatchers[:i], requiredMatchers[i+1:]...) break } } } } // 加label m, added := q.addExternalLabels(matchers) // 生成查询 query, err := ToQuery(q.mint, q.maxt, m, hints) // HTTP client发起远程查询 res, err := q.client.Read(q.ctx, query) return newSeriesSetFilter(FromQueryResult(sortSeries, res), added) }

【prometheus remote-read使用与源码解读】client对象的构造:每个remote有一个client,使用其配置的URL/HttpConfig构造
//storage/remote/storage.go func (s *Storage) ApplyConfig(conf *config.Config) error { for _, rrConf := range conf.RemoteReadConfigs { c, err := newReadClient(name, &ClientConfig{ URL:rrConf.URL, Timeout:rrConf.RemoteTimeout, HTTPClientConfig: rrConf.HTTPClientConfig, }) queryables = append(queryables, NewSampleAndChunkQueryableClient( c, conf.GlobalConfig.ExternalLabels, labelsToEqualityMatchers(rrConf.RequiredMatchers), rrConf.ReadRecent, s.localStartTimeCallback, )) ...... } ...... }

发起HTTP远程查询请求:
  • HTTP request: 先用protobuf序列化,再用snappy压缩;
  • HTTP response: 先用snappy解压缩,然后再用protobuf反序列化;
//storage/remote/client.go // Read reads from a remote endpoint. func (c *client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryResult, error) { req := &prompb.ReadRequest{ Queries: []*prompb.Query{ query, }, } // protobuf序列化 data, err := proto.Marshal(req) // snappy压缩 compressed := snappy.Encode(nil, data) // 发送HTTP POST httpReq, err := http.NewRequest("POST", c.url.String(), bytes.NewReader(compressed))httpReq.Header.Add("Content-Encoding", "snappy") httpReq.Header.Add("Accept-Encoding", "snappy") httpReq.Header.Set("Content-Type", "application/x-protobuf") httpReq.Header.Set("User-Agent", userAgent) httpReq.Header.Set("X-Prometheus-Remote-Read-Version", "0.1.0")ctx, cancel := context.WithTimeout(ctx, c.timeout) defer cancel()httpReq = httpReq.WithContext(ctx) // 发送request httpResp, err := c.client.Do(httpReq) compressed, err = ioutil.ReadAll(httpResp.Body)//返回的结果,先snappy解压缩 uncompressed, err := snappy.Decode(nil, compressed) var resp prompb.ReadResponse // 再protobuf反序列化 err = proto.Unmarshal(uncompressed, &resp) return resp.Results[0], nil }

参考:
1.https://yunlzheng.gitbook.io/...

    推荐阅读