Flink-使用checkpoint和savepoint进行快照恢复
- 使用checkpoint(自动,由flink本身来管理)
- 准备测试代码
- 测试步骤
- flink run的常见参数
- flink-checkpoint的配置文件配置(不需要代码设置)
- flink-checkpoint的代码层面配置
- 使用Savepoint(用户手动操作)
- Savepoint需要注意的点:
- 手动savepoint的步骤
- 删除savepoint
使用checkpoint(自动,由flink本身来管理) 准备测试代码
public class Demo1 {
public static void main(String[] args) throws Exception {
// 创建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
// 一次保证性
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 保存的文件系统的地址
env.setStateBackend(new FsStateBackend("file:///tmp/flink/checkpoints"));
// 设置需要手动输入的参数(这种写法比较好,这样规定了flink-web页面输入的参数名称)
// 用args[]的方式可能出现一些格式问题,用ParameterTool工具很方便
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String ip = parameterTool.get("hostname");
Integer port = Integer.valueOf(parameterTool.get("port"));
// 创建第一个dataStream
DataStreamSource> dataStream = env.socketTextStream(ip, port);
SingleOutputStreamOperator> dataStream2 = dataStream.flatMap((String line, Collector> out) -> {
Arrays.stream(line.split(" ")).forEach(word -> {
out.collect(Tuple2.of(word, 1));
});
})// 如果使用了lambda表达式,必须使用returns来返回一个规定的类型
.returns(Types.TUPLE(Types.STRING, Types.INT));
SingleOutputStreamOperator> sum = dataStream2.keyBy(0)
.sum(1);
sum.print();
env.execute("LambdaStreamWordCount");
}
}
测试步骤 打成jar包
mvn clean package
放到flink上执行(对应的ip地址上,需要先打开8888端口,可以使用nc命令):
nc -lk 8888
文章图片
submit后可以发现,程序正常执行
文章图片
可以查看checkpoint的详细信息,包括他存储在磁盘中的地址:
文章图片
到对应环境上检验,可以发现这个chk目录名称在不停的变化,因为代码里设置的checkpoint的时间间隔为1s:
文章图片
如何进行checkpoint呢?
注意:
不可以在web页面将任务进行cancel,而是需要停止集群,这样checkpoint目录的数据还会保留着。
执行命令:
bin/stop-cluster.sh
停止后可以发现对应的chk目录不再改变,并且保留了数据。
文章图片
恢复操作(上文已经把flink集群关闭了,这里记得要开启一下):
执行命令(我执行了-n参数,参数意义在下文,否则会报错),报错信息部分如下:
If you want to allow to skip this,
you can set the --allowNonRestoredState option on the CLI.
意思是有一些操作你无法还原,你需要进行skip操作。
bin/flink run -n -s /tmp/flink/checkpoints/294069f9b37727c70c4b8c07436f87bf/chk-547/_metadata -c com.pro.flink.sink.Demo1 /opt/modules/flink-1.0-SNAPSHOT.jar --hostname 192.168.135.237 --port 8888
flink run的常见参数 这里说明下flink的参数:(flink run)
-c,--class Flink应用程序的入口
-C,--classpath 指定所有节点都可以访问到的url,可用于多个应用程序都需要的工具类加载
-d,--detached 是否使用分离模式,就是提交任务,cli是否退出,加了-d参数,cli会退出
-n,--allowNonRestoredState 允许跳过无法还原的savepoint。比如删除了代码中的部分operator
-p,--parallelism 执行并行度
-s,--fromSavepoint从savepoint恢复任务
-sae,--shutdownOnAttachedExit 以attached模式提交,客户端退出的时候关闭集群
可以看到,任务正常启动,并且可以查看本次恢复是从哪一个地方恢复的,也就是图例的latest Restore
文章图片
flink-checkpoint的配置文件配置(不需要代码设置)
# 用于指定checkpoint state存储的backend,默认为none
state.backend: filesystem
# 用于指定backend是否使用异步snapshot(默认为true),
# 有些不支持async或者只支持async的state backend可能会忽略这个参数
state.backend.async# 默认为1024,用于指定存储于files的state大小阈值
# 如果小于该值则会存储在root checkpoint metadata file
state.backend.fs.memory-threshold# 默认为none,用于指定checkpoint的data files和meta data存储的目录
# 该目录必须对所有参与的TaskManagers及JobManagers可见
state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
# Default target directory for savepoints, optional.
# 默认为none,用于指定savepoints的默认目录
state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints
# 默认为false,用于指定是否采用增量checkpoint,有些不支持增量checkpoint的backend会忽略该配置
state.backend.incremental: false# 默认为1,用于指定保留的已完成的checkpoints个数
state.checkpoints.num-retained
flink-checkpoint的代码层面配置
// 创建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 指定了checkpoint的时间间隔以及配置Mode为保持State的一致性
env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
// 也可以这么配置
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 配置Checkpoint彼此之间的停顿时间(即限制在某段时间内,只能有一个Checkpoint进行)单位毫秒
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(60 * 1000);
// 配置Checkpoint的并发量(比如某些程序的Checkpoint生产需要很长时间,可以通过这种方式加大效率)
env.getCheckpointConfig().setMaxConcurrentCheckpoints(3);
// 配置Checkpoint的超时时间(避免Checkpoint生产时间过长)默认10分钟
env.getCheckpointConfig().setCheckpointTimeout(5 * 1000);
// 配置Checkpoint失败的最大容忍次数,默认0次,如果超过了这个次数,则认为Checkpoint失败
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
// 配置Checkpoint的时间间隔,单位毫秒
env.getCheckpointConfig().setCheckpointInterval(1000);
// 配置Checkpoint的存放的文件路径
env.setStateBackend(new FsStateBackend("file:///tmp/flink/checkpoints"));
// Checkpoint默认的配置是失败了,就重启恢复。因此当一个Flink失败/人为取消的时候,Checkpoint会被人为清除
// 配置Checkpoint开启 外化功能 。即应用程序停止时候,保存Checkpoint
// 支持2种外化:DELETE_ON_CANCELLATION:当应用程序完全失败或者明确地取消时,保存 Checkpoint。
//RETAIN_ON_CANCELLATION:当应用程序完全失败时,保存 Checkpoint。如果应用程序是明确地取消时,Checkpoint 被删除。
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
使用Savepoint(用户手动操作) Savepoint需要注意的点: 触发 Savepoint 时,会创建一个新的 Savepoint 目录,其中将存储数据和元数据。可以通过配置默认 targetDirectory 或指定自定义 targetDirectory。如果没有自定义或者配置,那么使用savepoint会失败。
手动savepoint的步骤
- 在web页面查看job的Id,并复制,如下:
文章图片
- 在终端上使用命令:格式:bin/flink savepoint [jobId] [存放的快照地址]
bin/flink savepoint 93e589184e0e78d57077178438807889 /tmp/flink/checkpoints/
成功后的信息:生成了一个savepoint-93e589-957052f01c84的目录文件。
文章图片
- 执行恢复命令
bin/flink run -n -s /tmp/flink/checkpoints/savepoint-93e589-957052f01c84/_metadata -c com.pro.flink.sink.Demo1 /opt/modules/flink-1.0-SNAPSHOT.jar --hostname 192.168.135.237 --port 8888
- 结果:
文章图片
bin/flink savepoint -d /tmp/flink/checkpoints/savepoint-93e589-957052f01c84/
【Flink-使用checkpoint和savepoint进行快照恢复】结果
文章图片
如果删除flink通过checkpoint机制生成的快照文件,是会报错的,报错部分信息如下:
文章图片
推荐阅读
- 读Flink源码谈设计(图的抽象与分层)
- 一种基于Flink Window的实时指标统计方法
- Flink-State/Checkpoint和Savepoint的详解
- Flink-OperatorChain源码详解
- Flink-sink的种类和基本使用
- Flink-dataStream的种类和基本使用
- flink|Flink的State与Rescale
- flink|flink on yarn启动流程分析
- flink|Flink heartbeat逻辑梳理