Flink中State管理与恢复之状态后端Backend案例
原创
wx62be9d88ce294博主文章分类:大数据 ©著作权
文章标签 后端 flink big data apache hdfs 文章分类 Hadoop 大数据
©著作权归作者所有:来自51CTO博客作者wx62be9d88ce294的原创作品,请联系作者获取转载授权,否则将追究法律责任
设置 HDFS 文件系统的状态后端,取消 Job 之后再次恢复 Job。
package stateimport org.apache.flink.runtime.state.filesystem.FsStateBackendimport org.apache.flink.streaming.api.CheckpointingModeimport org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanupimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironment object TestCheckPointByHDFS { def main(args: Array[String]): Unit = { val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment environment.enableCheckpointing(5000) environment.setStateBackend(new FsStateBackend("hdfs://mycluster/checkpoint/cp1")) environment.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) environment.getCheckpointConfig.setCheckpointTimeout(5000) environment.getCheckpointConfig.setMaxConcurrentCheckpoints(1) environment.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) environment.setParallelism(1) import org.apache.flink.streaming.api.scala._ val stream: DataStream[String] = environment.socketTextStream("node1", 8888) val result: DataStream[(String, Int)] = stream.flatMap(_.split(" ")) .map((_, 1)) .keyBy(0) .sum(1) result.print("结果") environment.execute("wordCount") }}
打包在服务器上执行
查看执行结果,可以看出 flink 出现三
取消 Job,可以看到 Job 已经停止
查看 HDFS 目录上的状态文件
重启任务,再次输入两个flink单词
查看结果,flink单词统计为5
- 赞
- 收藏
- 评论
- *举报
上一篇:Flink中State管理与恢复之状态后端Backend
Original: https://blog.51cto.com/u_15704423/5434855
Author: wx62be9d88ce294
Title: Flink中State管理与恢复之状态后端Backend案例
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/516950/
转载文章受原作者版权保护。转载请注明原作者出处!