Flink常用API之Kafka的Source

Flink常用API之Kafka的Source

原创

wx62be9d88ce294博主文章分类:大数据 ©著作权

文章标签 kafka flink scala apache 文章分类 Hadoop 大数据

©著作权归作者所有:来自51CTO博客作者wx62be9d88ce294的原创作品,请联系作者获取转载授权,否则将追究法律责任

基于 Kafka 的 Source
首 先 需 要 配 置 Kafka 连 接 器 的 依 赖 , 另 外 更 多 的 连 接 器 可 以 查 看 官 网 : https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/connectors/

<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-kafka_2.11artifactId>
<version>1.9.1version>
dependency>

第一种:读取 Kafka 中的普通数据(String)

package sourceimport org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerimport org.apache.kafka.common.serialization.StringDeserializerimport java.util.Properties    object KafkaSource {  def main(args: Array[String]): Unit = {    val environment = StreamExecutionEnvironment.getExecutionEnvironment    environment.setParallelism(1)    import org.apache.flink.streaming.api.scala._        val properties = new Properties()    properties.setProperty("bootstrap.servers","node1:9092,node2:9092,node3:9092")    properties.setProperty("group.id","flink01")    properties.setProperty("key.deserializer",classOf[StringDeserializer].getName)    properties.setProperty("value.deserializer",classOf[StringDeserializer].getName)    properties.setProperty("auto.offset.reset","latest")    environment.addSource(new FlinkKafkaConsumer[String]("topic_01",new SimpleStringSchema(),properties))      .print()    environment.execute()  }}

Flink常用API之Kafka的Source
Flink常用API之Kafka的Source
第二种:读取 Kafka 中的KeyValue数据
定义消费者
package sourceimport org.apache.flink.api.common.typeinfo.TypeInformationimport org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTuple2TypeInformation, createTypeInformation}import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema}import org.apache.kafka.clients.consumer.ConsumerRecordimport org.apache.kafka.common.serialization.StringDeserializerimport java.util.Properties    import org.apache.flink.streaming.api.scala._object KafkaSourceKeyValue {      def main(args: Array[String]): Unit = {            val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment            streamEnv.setParallelism(1)             val props = new Properties()      props.setProperty("bootstrap.servers","hadoop101:9092,hadoop102:9092,hadoop103:9092")      props.setProperty("group.id","flink01")      props.setProperty("key.deserializer",classOf[StringDeserializer].getName)      props.setProperty("value.deserializer",classOf[StringDeserializer].getName)      props.setProperty("auto.offset.reset","latest")            val stream: DataStream[(String, String)] = streamEnv.addSource(new FlinkKafkaConsumer[(String,String)]("t_bjsxt",new MyKafkaReader,props))      stream.print()      streamEnv.execute()    }        class MyKafkaReader extends  KafkaDeserializationSchema[(String,String)]{            override def isEndOfStream(nextElement: (String, String)): Boolean = {        false      }            override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) = {        if(record!=null){          var key="null"          var value="null"          if(record.key()!=null){            key =new String(record.key(),"UTF-8")          }          if(record.value()!=null){             value =new String(record.value(),"UTF-8")          }          (key,value)        }else{          ("null","nulll")        }      }            override def getProducedType: TypeInformation[(String, String)] ={        createTuple2TypeInformation(createTypeInformation[String],createTypeInformation[String])      }    }}

定义生产者

package sourceimport org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}import java.util.{Properties, Random}    object MykafkaProducer {  def main(args: Array[String]): Unit = {        val props = new Properties()    props.setProperty("bootstrap.servers","node1:9092,node2:9092,node3:9092")    props.setProperty("key.serializer",classOf[StringSerializer].getName)    props.setProperty("value.serializer",classOf[StringSerializer].getName)    var producer =new KafkaProducer[String,String](props)    var r =new Random()    while(true){       val data = new ProducerRecord[String,String]("topic_01","key"+r.nextInt(10),"value"+r.nextInt(100))      producer.send(data)      Thread.sleep(1000)    }    producer.close()  }}

Flink常用API之Kafka的Source
  • 收藏
  • 评论
  • *举报

上一篇:Flink常用API之转换算子的Split算子

下一篇:Flink常用API之转换算子->Map、reduce、Filter、KeyBy、Aggregations

Original: https://blog.51cto.com/u_15704423/5434848
Author: wx62be9d88ce294
Title: Flink常用API之Kafka的Source

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/516964/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球