Flink-使用checkpoint和savepoint进行快照恢复


Flink-使用checkpoint和savepoint进行快照恢复

  • 使用checkpoint(自动,由flink本身来管理)
    • 准备测试代码
    • 测试步骤
    • flink run的常见参数
    • flink-checkpoint的配置文件配置(不需要代码设置)
    • flink-checkpoint的代码层面配置
  • 使用Savepoint(用户手动操作)
    • Savepoint需要注意的点:
    • 手动savepoint的步骤
    • 删除savepoint

使用checkpoint(自动,由flink本身来管理) 准备测试代码
public class Demo1 { public static void main(String[] args) throws Exception { // 创建环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000); // 一次保证性 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 保存的文件系统的地址 env.setStateBackend(new FsStateBackend("file:///tmp/flink/checkpoints")); // 设置需要手动输入的参数(这种写法比较好,这样规定了flink-web页面输入的参数名称) // 用args[]的方式可能出现一些格式问题,用ParameterTool工具很方便 ParameterTool parameterTool = ParameterTool.fromArgs(args); String ip = parameterTool.get("hostname"); Integer port = Integer.valueOf(parameterTool.get("port")); // 创建第一个dataStream DataStreamSource> dataStream = env.socketTextStream(ip, port); SingleOutputStreamOperator> dataStream2 = dataStream.flatMap((String line, Collector> out) -> { Arrays.stream(line.split(" ")).forEach(word -> { out.collect(Tuple2.of(word, 1)); }); })// 如果使用了lambda表达式,必须使用returns来返回一个规定的类型 .returns(Types.TUPLE(Types.STRING, Types.INT)); SingleOutputStreamOperator> sum = dataStream2.keyBy(0) .sum(1); sum.print(); env.execute("LambdaStreamWordCount"); } }

测试步骤 打成jar包
mvn clean package

放到flink上执行(对应的ip地址上,需要先打开8888端口,可以使用nc命令):
nc -lk 8888

Flink-使用checkpoint和savepoint进行快照恢复
文章图片

submit后可以发现,程序正常执行
Flink-使用checkpoint和savepoint进行快照恢复
文章图片

可以查看checkpoint的详细信息,包括他存储在磁盘中的地址:
Flink-使用checkpoint和savepoint进行快照恢复
文章图片

到对应环境上检验,可以发现这个chk目录名称在不停的变化,因为代码里设置的checkpoint的时间间隔为1s:
Flink-使用checkpoint和savepoint进行快照恢复
文章图片

如何进行checkpoint呢?
注意:
不可以在web页面将任务进行cancel,而是需要停止集群,这样checkpoint目录的数据还会保留着。
执行命令:
bin/stop-cluster.sh

停止后可以发现对应的chk目录不再改变,并且保留了数据。
Flink-使用checkpoint和savepoint进行快照恢复
文章图片

恢复操作(上文已经把flink集群关闭了,这里记得要开启一下):
执行命令(我执行了-n参数,参数意义在下文,否则会报错),报错信息部分如下:
If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI.

意思是有一些操作你无法还原,你需要进行skip操作。
bin/flink run -n -s /tmp/flink/checkpoints/294069f9b37727c70c4b8c07436f87bf/chk-547/_metadata -c com.pro.flink.sink.Demo1 /opt/modules/flink-1.0-SNAPSHOT.jar --hostname 192.168.135.237 --port 8888

flink run的常见参数 这里说明下flink的参数:(flink run)
-c,--class Flink应用程序的入口 -C,--classpath 指定所有节点都可以访问到的url,可用于多个应用程序都需要的工具类加载 -d,--detached 是否使用分离模式,就是提交任务,cli是否退出,加了-d参数,cli会退出 -n,--allowNonRestoredState 允许跳过无法还原的savepoint。比如删除了代码中的部分operator -p,--parallelism 执行并行度 -s,--fromSavepoint从savepoint恢复任务 -sae,--shutdownOnAttachedExit 以attached模式提交,客户端退出的时候关闭集群

可以看到,任务正常启动,并且可以查看本次恢复是从哪一个地方恢复的,也就是图例的latest Restore
Flink-使用checkpoint和savepoint进行快照恢复
文章图片

flink-checkpoint的配置文件配置(不需要代码设置)
# 用于指定checkpoint state存储的backend,默认为none state.backend: filesystem # 用于指定backend是否使用异步snapshot(默认为true), # 有些不支持async或者只支持async的state backend可能会忽略这个参数 state.backend.async# 默认为1024,用于指定存储于files的state大小阈值 # 如果小于该值则会存储在root checkpoint metadata file state.backend.fs.memory-threshold# 默认为none,用于指定checkpoint的data files和meta data存储的目录 # 该目录必须对所有参与的TaskManagers及JobManagers可见 state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints # Default target directory for savepoints, optional. # 默认为none,用于指定savepoints的默认目录 state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints # 默认为false,用于指定是否采用增量checkpoint,有些不支持增量checkpoint的backend会忽略该配置 state.backend.incremental: false# 默认为1,用于指定保留的已完成的checkpoints个数 state.checkpoints.num-retained

flink-checkpoint的代码层面配置
// 创建环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 指定了checkpoint的时间间隔以及配置Mode为保持State的一致性 env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE); // 也可以这么配置 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 配置Checkpoint彼此之间的停顿时间(即限制在某段时间内,只能有一个Checkpoint进行)单位毫秒 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(60 * 1000); // 配置Checkpoint的并发量(比如某些程序的Checkpoint生产需要很长时间,可以通过这种方式加大效率) env.getCheckpointConfig().setMaxConcurrentCheckpoints(3); // 配置Checkpoint的超时时间(避免Checkpoint生产时间过长)默认10分钟 env.getCheckpointConfig().setCheckpointTimeout(5 * 1000); // 配置Checkpoint失败的最大容忍次数,默认0次,如果超过了这个次数,则认为Checkpoint失败 env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3); // 配置Checkpoint的时间间隔,单位毫秒 env.getCheckpointConfig().setCheckpointInterval(1000); // 配置Checkpoint的存放的文件路径 env.setStateBackend(new FsStateBackend("file:///tmp/flink/checkpoints")); // Checkpoint默认的配置是失败了,就重启恢复。因此当一个Flink失败/人为取消的时候,Checkpoint会被人为清除 // 配置Checkpoint开启 外化功能 。即应用程序停止时候,保存Checkpoint // 支持2种外化:DELETE_ON_CANCELLATION:当应用程序完全失败或者明确地取消时,保存 Checkpoint。 //RETAIN_ON_CANCELLATION:当应用程序完全失败时,保存 Checkpoint。如果应用程序是明确地取消时,Checkpoint 被删除。 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

使用Savepoint(用户手动操作) Savepoint需要注意的点: 触发 Savepoint 时,会创建一个新的 Savepoint 目录,其中将存储数据和元数据。可以通过配置默认 targetDirectory 或指定自定义 targetDirectory。如果没有自定义或者配置,那么使用savepoint会失败。
手动savepoint的步骤
  1. 在web页面查看job的Id,并复制,如下:
    Flink-使用checkpoint和savepoint进行快照恢复
    文章图片
  2. 在终端上使用命令:格式:bin/flink savepoint [jobId] [存放的快照地址]
bin/flink savepoint 93e589184e0e78d57077178438807889 /tmp/flink/checkpoints/

成功后的信息:生成了一个savepoint-93e589-957052f01c84的目录文件。
Flink-使用checkpoint和savepoint进行快照恢复
文章图片

  1. 执行恢复命令
bin/flink run -n -s /tmp/flink/checkpoints/savepoint-93e589-957052f01c84/_metadata -c com.pro.flink.sink.Demo1 /opt/modules/flink-1.0-SNAPSHOT.jar --hostname 192.168.135.237 --port 8888

  1. 结果:
    Flink-使用checkpoint和savepoint进行快照恢复
    文章图片
删除savepoint 注意:一定是要删除通过savepoint命令生成的目录(前缀为savepoint)
bin/flink savepoint -d /tmp/flink/checkpoints/savepoint-93e589-957052f01c84/

【Flink-使用checkpoint和savepoint进行快照恢复】结果
Flink-使用checkpoint和savepoint进行快照恢复
文章图片

如果删除flink通过checkpoint机制生成的快照文件,是会报错的,报错部分信息如下:
Flink-使用checkpoint和savepoint进行快照恢复
文章图片

    推荐阅读