prometheus中的指标t/v数据保存在block/chunks下,label数据保存在block/index下。
对于t/v数据,prometheus采用Facebook Gorilla论文的压缩方式:
- timestamp: delta-of-delta方式压缩时序点的时间值;
- value: xor方式压缩时序点的value值;
时序点t/v的压缩 1)timestamp压缩
在时序上,相邻两个点的时间戳的差值一般是固定的,若隔60s pull一次,那么timestamp差值一般都是60s,比如
- p1: 10:00:00,p2: 10:01:00,p3: 10:01:59,p4:10:03:00,p5:10:04:00,p6:10:05:00
- 时间戳的差值为:60s,59s,61s,60s,60s;
- 第一个时序点的时间戳t0,被完整存储起来;
- 第二个时序点的时间戳t1,存储delta=t1-t0;
- 对后续的时间戳tn,首先计算dod值:delta=(tn - tn-1) - (tn-1 - tn-2);
- 如果dod=0,则使用1bit=“0”存储该时间戳;
- 如果dod=[-8191, 8192],则先存入“10”作为标识,再用14bit存储该dod值;
- 如果dod=[-65535, 65536],则先存入“110”作为标识,再用17bit存储该dod值;
- 如果dod=[-524287, 524288],则先存入“1110”作为标识,再用20bit存储该dod值;
- 如果dod>524288,则先存入“1111”作为标识,再用64bit存储该dod值;
2)value压缩
Gorilla论文对时序点value的压缩基于:
- 相邻时序点的value值不会发生明显变化;
- value大多是浮点数,当两个value非常接近的时候,这两个浮点数的符号位、指数位和尾数部分的前几bit都是相同的;
- 第一个时序点的value值不压缩,直接保存;
- 从第二个点开始,将其value与上一个value进行XOR运算;
- 若XOR运算结果为“0”,则表示前后两个value相同,仅存入1bit的“0”值即可;
- 【prometheus源码分析(t/v数据的压缩、写入和读取)】否则,存入1bit值“1”;
- 若XOR结果中非0的部分包含在前一个XOR结果中,那么再写入1bit值“0”,然后存入XOR中非0的部分;
- 否则,写入1bit值“1”,用5bit存入XOR中前值0的个数,6bit存入中间非0的长度,最后再存入中间的非0位;
3)压缩示例
输入时序序列值
10:00:003.1
10:01:013.2
10:02:003.0
10:02:593.2
10:03:003.1
那么将存入
10:00:003.1
613.2 xor 3.1
-2(59-61)3.0 xor 3.2
0(59-59)3.2 xor 3.0
2(61-59)3.1 xor 3.2
写入t/v的源码分析 xorAppender负责写入t/v的值,t=int64,v=float64
// tsdb/chunkenc/xor.go
func (a *xorAppender) Append(t int64, v float64) {
var tDelta uint64
num := binary.BigEndian.Uint16(a.b.bytes())//第一个点,完整记录t1和v1的值
if num == 0 {
buf := make([]byte, binary.MaxVarintLen64)
for _, b := range buf[:binary.PutVarint(buf, t)] {
a.b.writeByte(b)//写入t1的值
}
a.b.writeBits(math.Float64bits(v), 64)//写入v1的值
} else if num == 1 {//第二个点
tDelta = uint64(t - a.t)
buf := make([]byte, binary.MaxVarintLen64)
for _, b := range buf[:binary.PutUvarint(buf, tDelta)] {
a.b.writeByte(b)//写入tDeleta=t2-t1
}
a.writeVDelta(v)//写入v2^v1的值
} else {//第三个点及以后的点
tDelta = uint64(t - a.t)
dod := int64(tDelta - a.tDelta)//计算dod// Gorilla has a max resolution of seconds, Prometheus milliseconds.
// Thus we use higher value range steps with larger bit size.
switch {
case dod == 0:
a.b.writeBit(zero)//写入0
case bitRange(dod, 14)://dod=[-8191,8192],先存入10作为标识,再用14bit存储dod的值
a.b.writeBits(0x02, 2) // '10'
a.b.writeBits(uint64(dod), 14)
case bitRange(dod, 17)://dod=[-65535,65536],先存入110作为标识,再用17bit存储该dod的值
a.b.writeBits(0x06, 3) // '110'
a.b.writeBits(uint64(dod), 17)
case bitRange(dod, 20)://dod=[-524287,524288],先存入1110作为标识,再用20bit存储该dod的值
a.b.writeBits(0x0e, 4) // '1110'
a.b.writeBits(uint64(dod), 20)
default://dod>524288,先存入1111作为标识,再用64bit存储该dod的值
a.b.writeBits(0x0f, 4) // '1111'
a.b.writeBits(uint64(dod), 64)
}
a.writeVDelta(v)//写入vn^vn-1
}
a.t = t//写入的最后一个t
a.v = v//写入的最后一个v
binary.BigEndian.PutUint16(a.b.bytes(), num+1)
a.tDelta = tDelta//写入的最后一个tDelta
}
再看一下使用xor写入VDelta的源码:
// tsdb/chunkenc/xor.go
func (a *xorAppender) writeVDelta(v float64) {
vDelta := math.Float64bits(v) ^ math.Float64bits(a.v)//当前value与上一个value进行xorif vDelta == 0 {//xor=0,存入1bit'0'即可
a.b.writeBit(zero)
return
}
a.b.writeBit(one)//先存入控制位'1'leading := uint8(bits.LeadingZeros64(vDelta))//计算vdelta前置0的个数
trailing := uint8(bits.TrailingZeros64(vDelta))//计算vdelta后置0的个数// Clamp number of leading zeros to avoid overflow when encoding.
if leading >= 32 {
leading = 31
}if a.leading != 0xff && leading >= a.leading && trailing >= a.trailing {
a.b.writeBit(zero)
a.b.writeBits(vDelta>>a.trailing, 64-int(a.leading)-int(a.trailing))
} else {
a.leading, a.trailing = leading, trailinga.b.writeBit(one)
a.b.writeBits(uint64(leading), 5)// Note that if leading == trailing == 0, then sigbits == 64.But that value doesn't actually fit into the 6 bits we have.
// Luckily, we never need to encode 0 significant bits, since that would put us in the other case (vdelta == 0).
// So instead we write out a 0 and adjust it back to 64 on unpacking.
sigbits := 64 - leading - trailing
a.b.writeBits(uint64(sigbits), 6)
a.b.writeBits(vDelta>>trailing, int(sigbits))
}
}
读取t/v的源码分析 xorIterator负责t/v数据的读取:基本就是写入过程的反过程
// tsdb/chunkenc/xor.go
func (it *xorIterator) Next() bool {
if it.err != nil || it.numRead == it.numTotal {
return false
}
//读第1个点
if it.numRead == 0 {
t, err := binary.ReadVarint(&it.br)//time原值读取
if err != nil {
it.err = err
return false
}
v, err := it.br.readBits(64)//value原值读取
if err != nil {
it.err = err
return false
}
it.t = t
it.val = math.Float64frombits(v)it.numRead++//读取数量+1
return true
}
//读第2个点
if it.numRead == 1 {
tDelta, err := binary.ReadUvarint(&it.br)//读取tDelta
if err != nil {
it.err = err
return false
}
it.tDelta = tDelta
it.t = it.t + int64(it.tDelta)//计算timereturn it.readValue()//读取xor并计算出原值
}
//读第3个及以后的点
var d byte
//读前缀,最多4bit
// read delta-of-delta
for i := 0;
i < 4;
i++ {
d <<= 1
bit, err := it.br.readBit()
if err != nil {
it.err = err
return false
}
if bit == zero {
break
}
d |= 1
}
var sz uint8
var dod int64
switch d {
case 0x00:
// dod == 0//前缀=0
case 0x02:
sz = 14//前缀=10,用14bit保存dod
case 0x06://前缀=110,用17bit保存dod
sz = 17
case 0x0e://前缀=1110,用20bit保存dod
sz = 20
case 0x0f://前缀=1111,用64bit保存dod
bits, err := it.br.readBits(64)
if err != nil {
it.err = err
return false
}
dod = int64(bits)
}if sz != 0 {
bits, err := it.br.readBits(int(sz))
if err != nil {
it.err = err
return false
}
if bits > (1 << (sz - 1)) {
// or something
bits = bits - (1 << sz)
}
dod = int64(bits)//读取并计算dod的值
}it.tDelta = uint64(int64(it.tDelta) + dod)//计算tdelta
it.t = it.t + int64(it.tDelta)//计算timereturn it.readValue()//读取xor的值
}
再看一下读xor值的流程:将上一个value与xor的值进行异或
// tsdb/chunkenc/xor.go
func (it *xorIterator) readValue() bool {
bit, err := it.br.readBit()//读第1个bit
if err != nil {
it.err = err
return false
}if bit == zero {//如果第1个bit=0,value保持不变(故无需更新)
// it.val = it.val
} else {
bit, err := it.br.readBit()
if err != nil {
it.err = err
return false
}
if bit == zero {
// reuse leading/trailing zero bits
// it.leading, it.trailing = it.leading, it.trailing
} else {
bits, err := it.br.readBits(5)
if err != nil {
it.err = err
return false
}
it.leading = uint8(bits)bits, err = it.br.readBits(6)
if err != nil {
it.err = err
return false
}
mbits := uint8(bits)
// 0 significant bits here means we overflowed and we actually need 64;
see comment in encoder
if mbits == 0 {
mbits = 64
}
it.trailing = 64 - it.leading - mbits
}mbits := int(64 - it.leading - it.trailing)
bits, err := it.br.readBits(mbits)
if err != nil {
it.err = err
return false
}
vbits := math.Float64bits(it.val)//拿到上一个value
vbits ^= (bits << it.trailing)//与xor的值进行异或,得到本地的value
it.val = math.Float64frombits(vbits)// v1^v2=xor,那么v2=v1^xor
}it.numRead++
return true
}
推荐阅读
- prometheus源码分析(rules模块)
- prometheus源码分析(scrape模块)
- prometheus源码分析(index倒排索引)
- promethues源码剖析(head block)
- k8s|k8s hpa计算
- 我的大屏监控布局资料
- PromQL之label_replace/label_join