Flink常用API之Kafka的Source

Flink常用API之Kafka的Source

原创

wx62be9d88ce294博主文章分类:大数据 ©著作权

文章标签 kafka flink scala apache 文章分类 Hadoop 大数据

©著作权归作者所有:来自51CTO博客作者wx62be9d88ce294的原创作品,请联系作者获取转载授权,否则将追究法律责任

基于 Kafka 的 Source
首 先 需 要 配 置 Kafka 连 接 器 的 依 赖 , 另 外 更 多 的 连 接 器 可 以 查 看 官 网 : https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/connectors/

<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-kafka_2.11artifactId>
<version>1.9.1version>
dependency>

第一种:读取 Kafka 中的普通数据(String)

package sourceimport org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerimport org.apache.kafka.common.serialization.StringDeserializerimport java.util.Properties    object KafkaSource {  def main(args: Array[String]): Unit = {    val environment = StreamExecutionEnvironment.getExecutionEnvironment    environment.setParallelism(1)    import org.apache.flink.streaming.api.scala._        val properties = new Properties()    properties.setProperty("bootstrap.servers","node1:9092,node2:9092,node3:9092")    properties.setProperty("group.id","flink01")    properties.setProperty("key.deserializer",classOf[StringDeserializer].getName)    properties.setProperty("value.deserializer",classOf[StringDeserializer].getName)    properties.setProperty("auto.offset.reset","latest")    environment.addSource(new FlinkKafkaConsumer[String]("topic_01",new SimpleStringSchema(),properties))      .print()    environment.execute()  }}

Flink常用API之Kafka的Source
Flink常用API之Kafka的Source
第二种:读取 Kafka 中的KeyValue数据
定义消费者
package sourceimport org.apache.flink.api.common.typeinfo.TypeInformationimport org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTuple2TypeInformation, createTypeInformation}import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema}import org.apache.kafka.clients.consumer.ConsumerRecordimport org.apache.kafka.common.serialization.StringDeserializerimport java.util.Properties    import org.apache.flink.streaming.api.scala._object KafkaSourceKeyValue {      def main(args: Array[String]): Unit = {            val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment            streamEnv.setParallelism(1)             val props = new Properties()      props.setProperty("bootstrap.servers","hadoop101:9092,hadoop102:9092,hadoop103:9092")      props.setProperty("group.id","flink01")      props.setProperty("key.deserializer",classOf[StringDeserializer].getName)      props.setProperty("value.deserializer",classOf[StringDeserializer].getName)      props.setProperty("auto.offset.reset","latest")            val stream: DataStream[(String, String)] = streamEnv.addSource(new FlinkKafkaConsumer[(String,String)]("t_bjsxt",new MyKafkaReader,props))      stream.print()      streamEnv.execute()    }        class MyKafkaReader extends  KafkaDeserializationSchema[(String,String)]{            override def isEndOfStream(nextElement: (String, String)): Boolean = {        false      }            override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) = {        if(record!=null){          var key="null"          var value="null"          if(record.key()!=null){            key =new String(record.key(),"UTF-8")          }          if(record.value()!=null){             value =new String(record.value(),"UTF-8")          }          (key,value)        }else{          ("null","nulll")        }      }            override def getProducedType: TypeInformation[(String, String)] ={        createTuple2TypeInformation(createTypeInformation[String],createTypeInformation[String])      }    }}

定义生产者

package sourceimport org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}import java.util.{Properties, Random}    object MykafkaProducer {  def main(args: Array[String]): Unit = {        val props = new Properties()    props.setProperty("bootstrap.servers","node1:9092,node2:9092,node3:9092")    props.setProperty("key.serializer",classOf[StringSerializer].getName)    props.setProperty("value.serializer",classOf[StringSerializer].getName)    var producer =new KafkaProducer[String,String](props)    var r =new Random()    while(true){       val data = new ProducerRecord[String,String]("topic_01","key"+r.nextInt(10),"value"+r.nextInt(100))      producer.send(data)      Thread.sleep(1000)    }    producer.close()  }}

Flink常用API之Kafka的Source
  • 收藏
  • 评论
  • *举报

上一篇:Flink常用API之转换算子的Split算子

下一篇:Flink常用API之转换算子->Map、reduce、Filter、KeyBy、Aggregations

Original: https://blog.51cto.com/u_15704423/5434848
Author: wx62be9d88ce294
Title: Flink常用API之Kafka的Source

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/65973/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • 数据库SQL笔试题目

    大学生春季运动会的数据库,保存了比赛信息的三个表如下:运动员 sporter(运动员编号 sporterid,姓名name,性别 sex,所属系号 department),项目 i…

    大数据 2023年2月17日
    013
  • JVM GC算法 CMS 详解(转)

    CMS,全称Concurrent Low Pause Collector,是jdk1.4后期版本开始引入的新gc算法,在jdk5和jdk6中得到了进一步改进,它的主要适合场景是对响…

    大数据 2022年9月19日
    090
  • MySQL高可用之主备同步:MySQL是如何保证主备一致的

    🏆今日学习目标: 🍀MySql是如何保证主备一致的✅创作者:林在闪闪发光⏰预计时间:30分钟🎉个人主页:林在闪闪发光的个人主页 🍁林在闪闪发光的个人社区,欢迎你的加入: 林在闪闪发…

    大数据 2023年2月20日
    097
  • Labview_使用SQL语句操作数据库(Access)

    SQL 对大小写不敏感:SELECT 与 select 是相同的 空格一定要控制好,且在筛选某些列时,列名不需要用单引号,列名所对应数据类型为(字符串时),需要加单引号。在创建数据…

    大数据 2023年2月8日
    023
  • 如何找到redis中的dump.rdb文件?

    如何找到redis.conf文件? 因为你用brew安装的redis brew list redis 之后 cd /opt/homebrew/Cellar/redis/7.0.0 …

    大数据 2023年2月13日
    016
  • QT连接Sqlite数据库并且对数据进行操作

    一、前言 这篇文章的目的主要是为了总结我的课设项目在编程过程中所用的数据库连接方面的知识,由于之前在数据库连接方面我就写过一个学生管理的项目,连接的是SQL Server2012,…

    大数据 2023年2月23日
    011
  • Redis热点数据处理

    热点数据就是访问量特别大的数据。 流量集中,达到物理网卡上限。 请求过多,缓存分片服务被打垮。redis作为一个单线程的结构,所有的请求到来后都会去排队,当请求量远大于自身处理能力…

    大数据 2023年2月8日
    015
  • spark 通过kerberos认证连接impala 获取ResultSet集合转为DataFrame 并写入hive

    def main(args: Array[String]): Unit = { val impala_db = args(2) // 查询impala库 val impala_ta…

    大数据 2023年2月19日
    09
  • Redis之跳跃表

    一、概述 跳跃表(skiplist)是一种有序数据结构,它 通过在每个节点中维持多个指向其他节点的指针,从而达到快速访问节点的目的。 如下,对于单个链表来讲,即便链表中存储的数据是…

    大数据 2023年2月9日
    021
  • 字节跳动青训营–前端day8

    文章目录 前言 一、CSR,SSR,SSG * 1. CSR 2. SSR 3. SSG 4. SSR,SSG的优势 – 利于SEO 更短的首屏时间 二、什么是Next…

    大数据 2023年2月19日
    013
  • 关于zookeeper启动不了,需要配置admin.serverPort的记录

    最近学习的微服务需要用到zookeeper,索性今天重新安装了zookeeper-3.8.0想再找找感觉。以前3.6.X版本无脑安装完成,设定好zoo.cfg就可启动,谁知道3.8…

    大数据 2023年2月28日
    08
  • Kettle的下载、安装和简单的使用

    对接数据时,双方可能使用不同的数据库;或者把数据接过来的同时,对数据进行一定的处理,包括多表查询、多表合并等复杂情况。 通常来说,这种操作都是一次性的,即只需要导入数据一次。因此专…

    大数据 2023年3月6日
    08
  • Spark基础之:Spark SQL介绍

    Spark基础之:Spark SQL介绍 一.Spark SQL的概述 * 1、Spark SQL来源 2、从代码看Spark SQL的特点 3、从代码运行速度看来看Spark S…

    大数据 2023年2月16日
    014
  • 干货 | 携程Dynamo风格存储的落地实践

    作者简介 根泰,携程高级后端开发工程师,关注数据存储和数据库领域。 遐龄,携程研发总监,关注大数据存储、性能调优。 Dynamo风格数据库来源于亚马逊的Dynamo: Amazon…

    大数据 2023年2月13日
    010
  • 大数据Hive(一):​​​​​​​Hive基本概念

    抵扣说明: 1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。2.余额无法直接购买下载,可以购买VIP、C币套餐、付费专栏及课程。 Original: https:…

    大数据 2023年2月7日
    016
  • golang学习之go连接Kafka

    1、Kafka 本质上是⼀个消息队列,一个高吞吐量、持久性、分布式的消息系统。2、包含生产者(producer)和消费者(consumer),每个consumer属于一个特定的消费…

    大数据 2023年2月26日
    09
最近整理资源【免费获取】:   👉 程序员最新必读书单  | 👏 互联网各方向面试题下载 | ✌️计算机核心资源汇总