prometheus|prometheus remote-write解析(一) -- 使用

prometheus没有提供远程存储,但提供了远程存储的接口:

  • 远程存储只要实现这一接口,即可存储和读取prometheus的数据;
  • 这里仅分析remote-write:
【prometheus|prometheus remote-write解析(一) -- 使用】笔者的prometheus被prometheus-operator部署在kubernetes中,kubernetes使用prometheus这个CRD管理配置,prometheus-operator监听到配置变化,将新配置apply到prometheus POD上。
prometheus CRD中的remote-write配置:
remoteWrite: - url: "https://1.2.3.4/api/monitor/v1/prom/write" tlsConfig: insecureSkipVerify: true

apply以后,prometheus生成如下的配置:
remote_write: - url: https://1.2.3.4/api/monitor/v1/prom/write remote_timeout: 30s tls_config: insecure_skip_verify: true queue_config: capacity: 500 max_shards: 1000 min_shards: 1 max_samples_per_send: 100 batch_send_deadline: 5s min_backoff: 30ms max_backoff: 100ms

可以看到,它增加了queue_config,即传输过程中的队列配置。
假设每个remoteStorage使用1个queue进行传输:
  • queue中的初始shards数=min_shards,最大shards数=max_shards;
  • 每个shard的容量=capacity个sample;
  • 通过HTTP向remoteStorage发送数据时,若发送失败,则回退min_backoff;再次失败,则回退2*min_backoff,直到max_backoff;
prometheus|prometheus remote-write解析(一) -- 使用
文章图片

prometheus的remote-write数据协议 prometheus的samples,经过protobuf的序列化,然后再经过snappy压缩,最后通过HTTP发送给remoteStorage;
prometheus|prometheus remote-write解析(一) -- 使用
文章图片

对应的源代码:
// prometheus/storage/remote/queue_manager.go func buildWriteRequest(samples []prompb.TimeSeries, buf []byte) ([]byte, int64, error) { var highest int64 for _, ts := range samples { // At the moment we only ever append a TimeSeries with a single sample in it. if ts.Samples[0].Timestamp > highest { highest = ts.Samples[0].Timestamp } } req := &prompb.WriteRequest{ Timeseries: samples, }data, err := proto.Marshal(req) if err != nil { return nil, highest, err }// snappy uses len() to see if it needs to allocate a new slice. Make the // buffer as long as possible. if buf != nil { buf = buf[0:cap(buf)] } compressed := snappy.Encode(buf, data) return compressed, highest, nil }

remoteStorage如何实现remote-write协议接口 remoteStorage要实现remoteConfigs中定义的HTTP接口,这里主要参考influxdb的实现。
HTTP接口:
// 实现如下的API Route{ "prometheus-write", // Prometheus remote write "POST", "/api/v1/prom/write", false, true, h.servePromWrite, },

HTTP接口的实现:
func (h *Handler) servePromWrite(w http.ResponseWriter, r *http.Request, user meta.User) { ...... var bs []byte if r.ContentLength > 0 { bs = make([]byte, 0, r.ContentLength) } body := r.Body buf := bytes.NewBuffer(bs) // 读request body _, err := buf.ReadFrom(body) // snappy解压缩 reqBuf, err := snappy.Decode(nil, buf.Bytes()) if err != nil { h.httpError(w, err.Error(), http.StatusBadRequest) return } // Convert the Prometheus remote write request to Influx Points var req remote.WriteRequest // protobuf反序列化 if err := proto.Unmarshal(reqBuf, &req); err != nil { h.httpError(w, err.Error(), http.StatusBadRequest) return } ...... }

跟prometheus做的事情正好相反,这里先进行sappy的解压缩,然后再protobuf反序列化,得到真实的数据。

    推荐阅读