

版本 日期 备注
1.0 2022.3.8 文章首发
本文基于Flink 1.14代码进行分析。
0.前言 前阵子在生产上碰到了一个诡异现象:全量作业无法正常进行,日志中充斥着java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id container xxxx(HOSTNAME:PORT) timed out的报错。
1.排查思路 这个问题报到我这边的时候,有同学已经排查过一轮了。根据网上搜索,会告知你可能是yarn的压力过大、网络短暂不稳定等,可以调大heartbeat.timeout来缓解这个问题,经调整改问题并未解决。
/** * A bucket is the directory organization of the output of the {@link StreamingFileSink}. * * For each incoming element in the {@code StreamingFileSink}, the user-specified {@link * BucketAssigner} is queried to see in which bucket this element should be written to. */ @Internal public class Bucket {

|-- HiveTableSink \-- createStreamSink |-- StreamingFileSink \-- initializeState |-- StreamingFileSinkHelper \-- constructor |-- HadoopPathBasedBulkFormatBuilder \-- createBuckets |-- Buckets \-- onElement \-- getOrCreateBucketForBucketId

2. 解决问题后的好奇 如果每个目录都会产生一个Bucket,那如果运行一个流作业,岂不是迟早碰到相同的问题。这么显而易见的问题,社区的大神们肯定早就想到了,好奇心驱使着我寻找答案——直到看到了这段代码:
public void commitUpToCheckpoint(final long checkpointId) throws IOException { final Iterator>> activeBucketIt = activeBuckets.entrySet().iterator(); LOG.info( "Subtask {} received completion notification for checkpoint with id={}.", subtaskIndex, checkpointId); while (activeBucketIt.hasNext()) { final Bucket bucket = activeBucketIt.next().getValue(); bucket.onSuccessfulCompletionOfCheckpoint(checkpointId); if (!bucket.isActive()) { // We've dealt with all the pending files and the writer for this bucket is not // currently open. // Therefore this bucket is currently inactive and we can remove it from our state. activeBucketIt.remove(); notifyBucketInactive(bucket); } } }

boolean isActive() { return inProgressPart != null || !pendingFileRecoverablesForCurrentCheckpoint.isEmpty() || !pendingFileRecoverablesPerCheckpoint.isEmpty(); }

2.1 inProgressPart == null 该对象的类型为InProgressFileWriter,触发条件和FileSystem的滚动策略息息相关。
/** * The policy based on which a {@code Bucket} in the {@code Filesystem Sink} rolls its currently * open part file and opens a new one. */ @PublicEvolving public interface RollingPolicy extends Serializable {/** * Determines if the in-progress part file for a bucket should roll on every checkpoint. * * @param partFileState the state of the currently open part file of the bucket. * @return {@code True} if the part file should roll, {@link false} otherwise. */ boolean shouldRollOnCheckpoint(final PartFileInfo partFileState) throws IOException; /** * Determines if the in-progress part file for a bucket should roll based on its current state, * e.g. its size. * * @param element the element being processed. * @param partFileState the state of the currently open part file of the bucket. * @return {@code True} if the part file should roll, {@link false} otherwise. */ boolean shouldRollOnEvent(final PartFileInfo partFileState, IN element) throws IOException; /** * Determines if the in-progress part file for a bucket should roll based on a time condition. * * @param partFileState the state of the currently open part file of the bucket. * @param currentTime the current processing time. * @return {@code True} if the part file should roll, {@link false} otherwise. */ boolean shouldRollOnProcessingTime( final PartFileInfo partFileState, final long currentTime) throws IOException; }

  • shouldRollOnCheckpoint:做Checkpoint之前检查。
  • shouldRollOnEvent:根据当前的状态检查是否应该关闭。比如当前的buffer大小是否超过了限制。
  • shouldRollOnProcessingTime:检查当前打开时间是否太长来盘判断符合关闭的条件。
2.2 pendingFileRecoverablesForCurrentCheckpoint isNotEmpty 其中的元素也是根据RollingPolicy来触发的,不做过多的解释。
2.3 pendingFileRecoverablesPerCheckpoint isNotEmpty 基于pendingFileRecoverablesForCurrentCheckpoint isNotEmpty。用字典来保存一个CheckpointId与List的关系。
2.4 非活跃Bucket 结合前面的条件来说,其实就是已经关闭并做完所有Checkpoint的目录,则为非活跃Bucket。检查的时机一般是:
  1. Task重新恢复时,从StateBackend中读取之前的状态,并做检查
  2. 做完Checkpoint后,会进行一次检查
当Bucket变成非活跃状态时,会做一次通知Inactive的通知。告知下游该分区的数据已提交,变成可读状态。见issue:artition commit is delayed when records keep coming
3. FileSystemConnector中的整洁架构 在了解完上文的知识点后,我关注到了有这么一个Proposal:FLIP-115: Filesystem connector in Table。根据这个Proposal,我简单的翻阅了一下相关的源码,发现其实现也是一种整洁架构的体现。
|-- HiveTableSink#Table级API,负责对外,用户可以直接调用 |-- StreamingFileSink#Streaming 级API,也可以对外,位于TableAPI下方 |-- StreamingFileSinkHelper #集成了对于TimeService的逻辑,便于定期关闭Bucket;以及对于数据到Bucket的分发。这个类也被AbstractStreamingWriter使用,注释上也建议复用于 RichSinkFunction or StreamOperator |-- BucketsBuilder #场景中调到的具体类是HadoopPathBasedBulkFormatBuilder,这个类会关注Buckets的具体实现以BucketWriter的具体实现 |-- Buckets #这是一个管理Bucket生命周期的类。其中有几个关键成员对象 |-- BucketWriter#会对应具体的FileSystem实现与写入的Format |-- RolingPolicy#滚动策略,前面提到过,不再深入讨论 |-- BucketAssigner #决定每个元素输出到哪个Bucket中。比如是key还是date等等 |-- BucketFactory #负责每个Bucket的创建

  1. 如果我们要基于自己的DSL来调用Hive的写入,那么只需要写个和HiveTableSink类似的HiveDSLSink。
  2. 如果一个数仓(数据湖)一直在增加自己底层的文件系统的支持,那么当第一套代码构筑完毕时,后续只需要实现相应的BucketWriterFileSystem即可。
  3. 如果一个数仓(数据湖)一直在增加自己支持的Format,那么当第一套代码构筑完毕时,后续只需要实现相应的BucketWriter即可。
