prometheus|prometheus remote-write解析(一) -- 使用
prometheus没有提供远程存储,但提供了远程存储的接口:
- 远程存储只要实现这一接口,即可存储和读取prometheus的数据;
- 这里仅分析remote-write:
prometheus CRD中的remote-write配置:
remoteWrite:
- url: "https://1.2.3.4/api/monitor/v1/prom/write"
tlsConfig:
insecureSkipVerify: true
apply以后,prometheus生成如下的配置:
remote_write:
- url: https://1.2.3.4/api/monitor/v1/prom/write
remote_timeout: 30s
tls_config:
insecure_skip_verify: true
queue_config:
capacity: 500
max_shards: 1000
min_shards: 1
max_samples_per_send: 100
batch_send_deadline: 5s
min_backoff: 30ms
max_backoff: 100ms
可以看到,它增加了queue_config,即传输过程中的队列配置。
假设每个remoteStorage使用1个queue进行传输:
- queue中的初始shards数=min_shards,最大shards数=max_shards;
- 每个shard的容量=capacity个sample;
- 通过HTTP向remoteStorage发送数据时,若发送失败,则回退min_backoff;再次失败,则回退2*min_backoff,直到max_backoff;
文章图片
prometheus的remote-write数据协议 prometheus的samples,经过protobuf的序列化,然后再经过snappy压缩,最后通过HTTP发送给remoteStorage;
文章图片
对应的源代码:
// prometheus/storage/remote/queue_manager.go
func buildWriteRequest(samples []prompb.TimeSeries, buf []byte) ([]byte, int64, error) {
var highest int64
for _, ts := range samples {
// At the moment we only ever append a TimeSeries with a single sample in it.
if ts.Samples[0].Timestamp > highest {
highest = ts.Samples[0].Timestamp
}
}
req := &prompb.WriteRequest{
Timeseries: samples,
}data, err := proto.Marshal(req)
if err != nil {
return nil, highest, err
}// snappy uses len() to see if it needs to allocate a new slice. Make the
// buffer as long as possible.
if buf != nil {
buf = buf[0:cap(buf)]
}
compressed := snappy.Encode(buf, data)
return compressed, highest, nil
}
remoteStorage如何实现remote-write协议接口 remoteStorage要实现remoteConfigs中定义的HTTP接口,这里主要参考influxdb的实现。
HTTP接口:
// 实现如下的API
Route{
"prometheus-write", // Prometheus remote write
"POST", "/api/v1/prom/write", false, true, h.servePromWrite,
},
HTTP接口的实现:
func (h *Handler) servePromWrite(w http.ResponseWriter, r *http.Request, user meta.User) {
......
var bs []byte
if r.ContentLength > 0 {
bs = make([]byte, 0, r.ContentLength)
}
body := r.Body
buf := bytes.NewBuffer(bs)
// 读request body
_, err := buf.ReadFrom(body)
// snappy解压缩
reqBuf, err := snappy.Decode(nil, buf.Bytes())
if err != nil {
h.httpError(w, err.Error(), http.StatusBadRequest)
return
}
// Convert the Prometheus remote write request to Influx Points
var req remote.WriteRequest
// protobuf反序列化
if err := proto.Unmarshal(reqBuf, &req);
err != nil {
h.httpError(w, err.Error(), http.StatusBadRequest)
return
}
......
}
跟prometheus做的事情正好相反,这里先进行sappy的解压缩,然后再protobuf反序列化,得到真实的数据。
推荐阅读
- Quartz|Quartz 源码解析(四) —— QuartzScheduler和Listener事件监听
- Java内存泄漏分析系列之二(jstack生成的Thread|Java内存泄漏分析系列之二:jstack生成的Thread Dump日志结构解析)
- [源码解析]|[源码解析] NVIDIA HugeCTR,GPU版本参数服务器---(3)
- Android系统启动之init.rc文件解析过程
- 小程序有哪些低成本获客手段——案例解析
- Spring源码解析_属性赋值
- Android下的IO库-Okio源码解析(一)|Android下的IO库-Okio源码解析(一) 入门
- 08_JVM学习笔记_类命名空间解析
- WebSocket|WebSocket 语法解析
- jvm|【JVM】JVM08(java内存模型解析[JMM])