1、到官网查询所在版本的依赖,导入pom.xml(在此用Flink1.13) 官网->教程->connectors->datastream->kafka
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-kafka_2.11artifactId>
<version>1.13.6version>
dependency>
2.在此页面找到Kafka source 示例代码,将此代码填充至类中并将其具体参数修改即可
需注意!
所以,上述代码无论运行多少次消费的都是一样的内容,想要达到这次消费起始位置是上次消费的最后一条的情况
需要手动设置,把offsets提交到kafka一份
2、Job重启时,如果开启了Checjpoint,默认从哪Checkpoint中获取之前提交的offsets
获取不到时,才会从kafka的_consumer_offsets中获取
Original: https://www.cnblogs.com/CYan521/p/16816514.html
Author: 再美不及姑娘你
Title: Flink如何使用DataStreamAPI消费Kafka
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/713568/
转载文章受原作者版权保护。转载请注明原作者出处!