ClickHouse 源码泛读
前言
首先从最整体的视角看下ClickHouse的处理流程:
入口函数TCP/HTTP/RPCHandler::runImpl
- 构建pipeline
state.io = executeQuery()
- 调度执行pipeline, reply to client
if(state.io.pipeline.pushing()) {
processInsertQuery();
} else if (state.io.pipeline.pulling()) {
processOrdinaryQueryWithProcessors();
} else if ... {
...
}
整体分为两大块:
- 解析sql,构建pipeline。
- 然后根据pipeline的特点(insert or other)选择对应的调度器执行pipeline,拿到结果返回给客户端。
关于第二部分可以参考我之前写的文章:ClickHouse之Pipeline执行引擎,这篇文章主要分析第一部分。
转发到executeQueryImpl。
executeQueryImpl 位置:src/Interpreters/executeQuery.cpp 358
解析SQL,并根据sql类型构造对应的Interpreter,调用Interpreter的execute()函数,获得pipeline,本文以Select语句为例进行分析。
InterpreterSelectQuery::execute 位置:src/Interpreters/InterpreterSelectQuery.cpp 684
- 构造QueryPlan
- 根据QueryPlan构造QueryPipelineBuilder
- 根据builder构造pipeline
InterpreterSelectQuery::buildQueryPlan 位置:src/Interpreters/InterpreterSelectQuery.cpp 656
主要工作转发到executeImpl
InterpreterSelectQuery::executeImpl 位置:src/Interpreters/InterpreterSelectQuery.cpp 1105
/// Read the data from Storage. from_stage - to what stage the request was completed in Storage.
executeFetchColumns(from_stage, query_plan);
/// 根据解析后的ast以及其他信息向query_plan中不断添加各种类型的QueryPlanStep,注:QueryPlan实际上是一个树状结构,树节点类型为QueryPlanStep。
这里将executeFetchColumns单独列出来,因为这里涉及到构建从存储引擎读取数据的QueryPlanStep,本文着重分析这里。
InterpreterSelectQuery::executeFetchColumns 位置:src/Interpreters/InterpreterSelectQuery.cpp 1926
函数前半部分设计很多优化相关以及各种参数的获取,在刚开始阅读源码的时候这些内容可以暂且跳过,首先梳理清楚整个项目的枝干,由粗到细慢慢分析,否则很容易迷失在繁杂的细节中。关注2159行这里:
storage->read(query_plan, required_columns, storage_snapshot, query_info, context, processing_stage, max_block_size, max_streams);
StorageMergeTree::read 位置:src/Storages/StorageMergeTree.cpp 215
关注这里:
if (auto plan = reader.read(column_names, storage_snapshot, query_info, local_context, max_block_size, num_streams, processed_stage, nullptr, enable_parallel_reading))
query_plan = std::move(*plan);
MergeTreeDataSelectExecutor::read 位置:src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp 135
这里对于查询是否使用projection进行分情况处理,我们暂且关注不使用projection的分支。
MergeTreeDataSelectExecutor::readFromParts 位置:src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp 1282
关注这部分代码:
auto read_from_merge_tree = std::make_unique(
std::move(parts),
real_column_names,
virt_column_names,
data,
query_info,
storage_snapshot,
context,
max_block_size,
num_streams,
sample_factor_column_queried,
max_block_numbers_to_read,
log,
merge_tree_select_result_ptr,
enable_parallel_reading
);
QueryPlanPtr plan = std::make_unique();
plan->addStep(std::move(read_from_merge_tree));
return plan;
分析到这里可知,在构造QueryPlan阶段我们实际上只往QueryPlan中添加了一个QueryPlanStep,它的类型是ReadFromMergeTree,读者可以看下这个类的继承关系验证,它确实是QueryPlanStep子类型。接下来的重点就是分析ReadFromMergeTree这个类型。
在分析之前我们有必要知道以下信息:
在 根据QueryPlan构造QueryPipelineBuilder阶段,我们实际上依赖于QueryPlanStep的虚函数:
/// Add processors from current step to QueryPipeline.
/// Calling this method, we assume and don't check that:
///* pipelines.size() == getInputStreams.size()
///* header from each pipeline is the same as header from corresponding input_streams
/// Result pipeline must contain any number of streams with compatible output header is hasOutputStream(),
///or pipeline should be completed otherwise.
virtual QueryPipelineBuilderPtr updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings & settings) = 0;
但是我们发现ReadFromMergeTree并没有重写这个函数,原因如下:
ReadFromMergeTree的继承链为 ReadFromMergeTree -> ISourceStep -> QueryPlanStep。
在ISourceStep中已经重写了这个函数,因此我们只需要关注initializePipeline这个虚函数即可。
QueryPipelineBuilderPtr ISourceStep::updatePipeline(QueryPipelineBuilders, const BuildQueryPipelineSettings & settings)
{
auto pipeline = std::make_unique();
QueryPipelineProcessorsCollector collector(*pipeline, this);
initializePipeline(*pipeline, settings);
auto added_processors = collector.detachProcessors();
processors.insert(processors.end(), added_processors.begin(), added_processors.end());
return pipeline;
}
ReadFromMergeTree::initializePipeline 关注:
auto result = getAnalysisResult();
...
pipe = spreadMarkRangesAmongStreams(
std::move(result.parts_with_ranges),
column_names_to_read);
...
pipeline.init(std::move(pipe));
可以看到,我们是通过一个pipe初始化了pipeline(type : QueryPipelineBuilder),然后在ISourceStep::updatePipeline中返回并参与构建pipeline,因此我们的重点转移到了如何构建这个pipe。注:关于QueryPipelineBuilder和Pipe的关系,大家可以跳转看看,其实只是一层很浅的封装。
ReadFromMergeTree::spreadMarkRangesAmongStreams 位置:src/Processors/QueryPlan/ReadFromMergeTree.cpp 375
转发到read函数
ReadFromMergeTree::read 位置:src/Processors/QueryPlan/ReadFromMergeTree.cpp 287
代码如下:
Pipe ReadFromMergeTree::read(
RangesInDataParts parts_with_range, Names required_columns, ReadType read_type,
size_t max_streams, size_t min_marks_for_concurrent_read, bool use_uncompressed_cache)
{
if (read_type == ReadType::Default && max_streams > 1)
return readFromPool(parts_with_range, required_columns, max_streams,
min_marks_for_concurrent_read, use_uncompressed_cache);
auto pipe = readInOrder(parts_with_range, required_columns, read_type, use_uncompressed_cache, 0);
/// Use ConcatProcessor to concat sources together.
/// It is needed to read in parts order (and so in PK order) if single thread is used.
if (read_type == ReadType::Default && pipe.numOutputPorts() > 1)
pipe.addTransform(std::make_shared(pipe.getHeader(), pipe.numOutputPorts()));
return pipe;
}
如果max_streams > 1,则转发到readFromPool,并且在pipe中添加一个ConcatProcessor,将多个source合并为一个。否则转发到readInOrder。
todo 【ClickHouse 源码泛读】整个系统的链路实在太长了,后面的内容有时间再分析吧,之后的内容可以看下这篇文章。
推荐阅读
- clickhouse 20.x 三分片两副本部署与本地表的压力测试
- Clickhouse重复数据处理
- Prometheus监控clickhouse服务
- ClickHouse镜像在阿里云镜像站首发上线
- ClickHouse在大数据领域应用实践
- ClickHouse 在 UBA 系统中的字典编码优化实践
- Spark+ES+ClickHouse 构建DMP用户画像
- 解决clickhouse问题库中无这个张表创建时已经存在的问题
- 基于Clickhouse的日志体系