InfluxDB -- TSM存储引擎的读写操作
数据写入
数据写入时,首先points按shard划分,归属于一个shard的points一起写入:
//tsdb/store.go
// WriteToShard writes a list of points to a shard identified by its ID.
func (s *Store) WriteToShard(shardID uint64, points []models.Point) error {
sh := s.shards[shardID]
return sh.WritePoints(points)
}
//tsdb/shard.go
// WritePoints will write the raw data points and any new metadata to the index in the shard.
func (s *Shard) WritePoints(points []models.Point) error {
.....
// Write to the engine.
err := engine.WritePoints(points);
.....
}
由tsm1.Engine负责写入points:
- 首先,构造数据,由points构造values=map[string][]Values,key=seriesKey+分隔符+fieldName, value=https://www.it610.com/article/[]Value={timestamp,fieldValue}集合;
- 然后,将values写入cache;
- 最后,将values写入WAL;
//tsdb/engine/tsm1/engine.go
// WritePoints writes metadata and point data into the engine.
// It returns an error if new points are added to an existing key.
func (e *Engine) WritePoints(points []models.Point) error {
values := make(map[string][]Value, len(points))
for _, p := range points {
keyBuf = append(keyBuf[:0], p.Key()...)
keyBuf = append(keyBuf, keyFieldSeparator...)
//一个Point中可能含多个field
iter := p.FieldIterator()
t := p.Time().UnixNano()
for iter.Next() {
keyBuf = append(keyBuf[:baseLen], iter.FieldKey()...)
var v Value
switch iter.Type() {
case models.Float:
fv, err := iter.FloatValue()
if err != nil {
return err
}
v = NewFloatValue(t, fv)
......
}
values[string(keyBuf)] = append(values[string(keyBuf)], v)
}
}
//先写到cache
// first try to write to the cache
if err := e.Cache.WriteMulti(values);
err != nil {
return err
}
//再写到WAL
if e.WALEnabled {
if _, err := e.WAL.WriteMulti(values);
err != nil {
return err
}
}
return seriesErr
}
数据删除 与LSM-Tree类似,influxdb使用标记删除的方法,待执行compactor的时候,再真正的将其删除。
在data目录,有.tombstone文件,记录了哪个时间段的数据需要删除:
- 查询时,将查询结果和.tombstone内容比对,将要删除的记录去掉;
- compactor时,查询.tombstone内容,将数据删除;
influxdb中有两种类型的索引:元数据索引和TSM File索引
元数据索引 元数据指measurement和series信息,每个database都有一个Index结构,存储该database中的元数据索引信息:
//tsdb/store.go
type Store struct {
pathstring
// shared per-database indexes, only if using "inmem".
indexes map[string]interface{}//key=databaseName, value实际是*Index
....
}
元数据索引的内部结构:
type Index struct {
//数据库下name-->*measurement
measurements map[string]*measurement // measurement name to object and index
//数据库下seriesKey-->*series
seriesmap[string]*series// map series key to the Series object
//数据库名称
database string
}
type measurement struct {
Databasestring
Namestring `json:"name,omitempty"`
fieldNames map[string]struct{}
// in-memory index fields
//seriesId-->*series
seriesByIDmap[uint64]*series// lookup table for series by their id
//tagKey-->tagValue-->[]seriesId
//查询时,可根据tagKey找到seriesId,然后再找到相关的series
seriesByTagKeyValue map[string]*tagKeyValue // map from tag key to value to sorted set of series ids
sortedSeriesIDs seriesIDs // sorted list of series IDs in this measurement
}
type tagKeyValue struct {
musync.RWMutex
entries map[string]*tagKeyValueEntry
}
type tagKeyValueEntry struct {
m map[uint64]struct{} // series id set
}
文章图片
对于元数据查询语句:
show tag values from "cpu_usage" with key="host"
该语句的查询过程:
- 根据"cpu_usage"找到measurement对象;
- 在measurement对象内,根据tagKey="host",找到其对应的tagValue+[]seriesId;
select value from "cpu_usage" where host='server01' and time > now() - 1h
该语句的查询过程:
- 根据时间:time > now() - 1h,得到数据shard;
- 在shard内,根据"cpu_usage"找到measurement对象;
- 在measurement对象内,根据tagKey="server01",找到其对应的tagValue+[]seriesId;
- 遍历[]seriesId,获得[]series对象,再使用TSM File索引查找TSM File,读取TSM File block得到结果;
文章图片
Blocks中存放压缩后的timestamp/value。
Index中存放Block中的索引,Index会存储到内存做间接索引,以便实现快速检索。
间接索引的数据结构:
//tsdb/engine/tsm1/reader.go
type indirectIndex struct {
b []byte//Index的内容
offsets []byte
minKey, maxKey []byte//最小/最大key
minTime, maxTime int64//最小/最大时间
}
文章图片
TSM File的查找过程:
- 根据seriesKey,在[]offset和Index中各offset的key进行二分查找,得到offset;
- 根据offset读取[]byte内容,得到indexEntries;
- 在indexEntries中,得到TSM File的偏移量,然后读取文件内容得到结果;
//tsdb/engine/tsm1/reader.go
type indexEntries struct {
Typebyte
entries []IndexEntry
}
// IndexEntry is the index information for a given block in a TSM file.
type IndexEntry struct {
// The min and max time of all points stored in the block.
MinTime, MaxTime int64
// The absolute position in the file where this block is located.
Offset int64//TSM文件的偏移量
// The size in bytes of the block in the file.
Size uint32
}
参考 【InfluxDB -- TSM存储引擎的读写操作】1.http://blog.fatedier.com/2016...
推荐阅读
- MySQL|MySQL 存储过程语法及实例
- 数据技术|一文了解Gauss数据库(开发历程、OLTP&OLAP特点、行式&列式存储,及与Oracle和AWS对比)
- 2018-03-11|2018-03-11 存储过程
- 笔记|C语言数据结构——二叉树的顺序存储和二叉树的遍历
- C语言学习(bit)|16.C语言进阶——深度剖析数据在内存中的存储
- 03_ARMv8指令集介绍加载与存储指令
- 区块链学习第二十七周2018.02.20-02.26|区块链学习第二十七周2018.02.20-02.26 关于EOS存储系统
- 集合框架(集合嵌套存储和遍历元素的案例代码实现)
- 创建、执行存储过程
- JDBC实战教程(四)-控制事务和调用存储过程