仓廪实则知礼节,衣食足则知荣辱。这篇文章主要讲述DruidIndexGeneratorJob源码相关的知识,希望能为你提供帮助。
【DruidIndexGeneratorJob源码】
文章图片
功能hadoop_index任务中,IndexGeneratorJob负责启动一个MapReduce任务将原始数据形成Segment导入Druid的Storage中。因此,IndexGeneratorJob主要逻辑集中在IndexGeneratorMapper,IndexGeneratorReducer类中。
IndexGeneratorMapper按行从hadoopShardSpecLookup条件中判断该行数据属于哪个Bucket,hadoopShardSpecLookup是determine_partitions过程计算结果,保存的是当前hadoop_index任务的数据分区规则。hadoopShardSpecLookup的产生逻辑在DeterminePartitionsJob或者DetermineHashedPartitionsJob中。
文章图片
Mapper输出格式中,格式< SortableBytes(groupKey, sortedKey), SerializedRow> 。
groupKey包含了shardNum,time,partitionNum。MapReduce任务再Partition过程中用到shardNum作为分区的判断条件,DateTime time标识当前bucket所在的数据起始时间范围,partitionNum当前bucket下拆分的partition。
sortedKey包含truncated timestamp和hashed dimensions。reduce的spilling是什么过程,为什么有sortedKey就能减少reducer的spill?
SerializedRow,将一个inputRow二进制序列化
context.write(
new SortableBytes(
bucket.get().toGroupKey(),
// sort rows by truncated timestamp and hashed dimensions to help reduce spilling on the reducer side
ByteBuffer.allocate(Long.BYTES + hashedDimensions.length)
.putLong(truncatedTimestamp)
.put(hashedDimensions)
.array()
).toBytesWritable(),
new BytesWritable(serializeResult.getSerializedRow())
);
IndexGeneratorReducer
文章图片
IndexGeneratorJob最重要逻辑都集中在IndexGeneratorReducer当中
1、PersistExecutor,用来持久化index文件
2、按行处理数据,累加总行数(maxRowCount)和内存数据量(maxBytesInMemory),并判断是否达到阈值。
如果达到阈值,会将该Index进行持久化生成文件。
persist(index, interval, mergedBase, progressIndicator); 过程当中调用IndexMergerV9.persist()方法,形成version.bin、factory.json、0000.smoosh、meta.smoosh等文件。
疑问:version.bin、factory.json、0000.smoosh、meta.smoosh这些文件具体是什么作用?
[gh@20:47:40]~$ ll /var/folders/pv/dkqbk3wj51b7yb4r5wd9qgdw0000gn/T/base-flush4512362737889488592/merged
total 32
-rw-r--r--1 ghstaff493 20:47 version.bin
-rw-r--r--1 ghstaff2993 20:47 factory.json
-rw-r--r--1 ghstaff130693 20:47 00000.smoosh
-rw-r--r--1 ghstaff13593 20:47 meta.smoosh
文件中内容:
[gh@20:51:24]/var/folders/pv/dkqbk3wj51b7yb4r5wd9qgdw0000gn/T/base-flush4512362737889488592/merged$ cat version.bin [gh@20:51:26]/var/folders/pv/dkqbk3wj51b7yb4r5wd9qgdw0000gn/T/base-flush4512362737889488592/merged$ cat factory.json
{"type":"mMapSegmentFactory"}[gh@20:51:29]/var/folders/pv/dkqbk3wj51b7yb4r5wd9qgdw0000gn/T/base-flush4512362737889488592/merged$ cat 00000.smoosh
d{"valueType":"LONG","hasMultipleValues":false,"parts":[{"type":"long","byteOrder":"LITTLE_ENDIAN"}]} ?,\'5I,\'5Ig{"valueType":"COMPLEX","hasMultipleValues":false,"parts":[{"type":"complex","typeName":"hyperUnique"}]}(?d{"valueType":"LONG","hasMultipleValues":false,"parts":[{"type":"long","byteOrder":"LITTLE_ENDIAN"}]} "d?2?{"valueType":"STRING","hasMultipleValues":false,"parts":[{"type":"stringDictionary","bitmapSerdeFactory":{"type":"roaring","compressRunOnSerialization":true},"byteOrder":"LITTLE_ENDIAN"}]}."a.example.comb.exmaple.com 8,:0:07\'unique_hostsvisited_numhoshostI5\',I:M?4{"type":"roaring","compressRunOnSerialization":true}{"container":{},"aggregators":[{"type":"hyperUnique","name":"unique_hosts","fieldName":"unique_hosts","isInputHyperUnique":false,"round":false},{"type":"longSum","name":"visited_num","fieldName":"visited_num","expression":null}],"timestampSpec":{"column":"timestamp","format":"yyyyMMddHH","missingValue":null},"queryGranularity":{"type":"none"},"rollup":true}[gh@20:51:32]/var/folders/pv/dkqbk3wj51b7yb4r5wd9qgdw0000gn/T/base-flush4512362737889488592/merged$ cat meta.smoosh
v1,2147483647,1
__time,0,0,150
host,0,449,792
index.drd,0,792,947
metadata.drd,0,947,1306
unique_hosts,0,150,303
visited_num,0,303,449
3、是否采用extendable shardSpecs。这个是什么,没看出来具体作用呢?
4、生成DataSegment
5、将DataSegment打包成index.zip
6、生成SegmentDescriptor。SegmentDescriptor做什么用的? SegmentDescriptor具体内容:
[gh@13:35:43]/private/var/folders/pv/dkqbk3wj51b7yb4r5wd9qgdw0000gn/T/junit2870916712364956185/junit7070039265224578711/website/2021-09-03T124308.104Z_4bd0315d3edf460daeb97264534a14a5/segmentDescriptorInfo$ ll
total 8
-rw-r--r--1 ghstaff61294 13:34 website_2014-10-22T000000.000Z_2014-10-23T000000.000Z_2021-09-03T124308.104Z.json
[gh@13:35:43]/private/var/folders/pv/dkqbk3wj51b7yb4r5wd9qgdw0000gn/T/junit2870916712364956185/junit7070039265224578711/website/2021-09-03T124308.104Z_4bd0315d3edf460daeb97264534a14a5/segmentDescriptorInfo$ cat website_2014-10-22T000000.000Z_2014-10-23T000000.000Z_2021-09-03T124308.104Z.json
{
"binaryVersion": 9,
"dataSource": "website",
"dimensions": "host",
"identifier": "website_2014-10-22T00:00:00.000Z_2014-10-23T00:00:00.000Z_2021-09-03T12:43:08.104Z",
"interval": "2014-10-22T00:00:00.000Z/2014-10-23T00:00:00.000Z",
"loadSpec": {
"path": "/private/var/folders/pv/dkqbk3wj51b7yb4r5wd9qgdw0000gn/T/junit2870916712364956185/junit7070039265224578711/website/2014-10-22T00:00:00.000Z_2014-10-23T00:00:00.000Z/2021-09-03T12:43:08.104Z/0/index.zip",
"type": "local"
},
"metrics": "visited_num,unique_hosts",
"shardSpec": {
"partitionNum": 0,
"partitions": 5,
"type": "numbered"
},
"size": 1474,
"version": "2021-09-03T12:43:08.104Z"
}
[gh@13:35:49]/private/var/folders/pv/dkqbk3wj51b7yb4r5wd9qgdw0000gn/T/junit2870916712364956185/junit7070039265224578711/website/2021-09-03T124308.104Z_4bd0315d3edf460daeb97264534a14a5/segmentDescriptorInfo$
numReducers处理逻辑:公式: # of reducers = # of partitions by segmentGranularity x numShards
假如segmentGranularity设置为MONTH," intervals" : [ " \'2020-01-01\'/\'2021-01-01\'" ],numShards:2
则reduceNum = 12 (Year / Month) * 2 = 24
注意:正因为Reducer的个数有计算公式和逻辑,所以在jobProperties中设置mapreduce.job.reduces:0是不起作用的
IndexGeneratorReducer类的处理逻辑中是将determine_partitions任务的结果,转换成Bucket类,从而得到reduceNum的值。getAllBuckets流程:
文章图片
推荐阅读
- Linux系列(Linux zip压缩 解压 文件夹)
- i.MX6ULL驱动开发1——字符设备开发模板
- oeasy教您玩转vim - 43 - # 替换模式
- Linux下查看占用CPU与内存最高的进程
- Linux经典面试题(如何查看一个进程的内存占用)
- TCP为什么需要三次握手(用最通俗的话解释给你听)
- Linux|Mmap的实现原理和应用
- 剑指offer-11-数值的整数次方
- 如何访问子主题模板页面()