Flink常用API之Kafka的Source
原创
wx62be9d88ce294博主文章分类:大数据 ©著作权
文章标签 kafka flink scala apache 文章分类 Hadoop 大数据
©著作权归作者所有:来自51CTO博客作者wx62be9d88ce294的原创作品,请联系作者获取转载授权,否则将追究法律责任
基于 Kafka 的 Source
首 先 需 要 配 置 Kafka 连 接 器 的 依 赖 , 另 外 更 多 的 连 接 器 可 以 查 看 官 网 : https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/connectors/
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-kafka_2.11artifactId>
<version>1.9.1version>
dependency>
第一种:读取 Kafka 中的普通数据(String)
package sourceimport org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerimport org.apache.kafka.common.serialization.StringDeserializerimport java.util.Properties object KafkaSource { def main(args: Array[String]): Unit = { val environment = StreamExecutionEnvironment.getExecutionEnvironment environment.setParallelism(1) import org.apache.flink.streaming.api.scala._ val properties = new Properties() properties.setProperty("bootstrap.servers","node1:9092,node2:9092,node3:9092") properties.setProperty("group.id","flink01") properties.setProperty("key.deserializer",classOf[StringDeserializer].getName) properties.setProperty("value.deserializer",classOf[StringDeserializer].getName) properties.setProperty("auto.offset.reset","latest") environment.addSource(new FlinkKafkaConsumer[String]("topic_01",new SimpleStringSchema(),properties)) .print() environment.execute() }}


第二种:读取 Kafka 中的KeyValue数据
定义消费者
package sourceimport org.apache.flink.api.common.typeinfo.TypeInformationimport org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTuple2TypeInformation, createTypeInformation}import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema}import org.apache.kafka.clients.consumer.ConsumerRecordimport org.apache.kafka.common.serialization.StringDeserializerimport java.util.Properties import org.apache.flink.streaming.api.scala._object KafkaSourceKeyValue { def main(args: Array[String]): Unit = { val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment streamEnv.setParallelism(1) val props = new Properties() props.setProperty("bootstrap.servers","hadoop101:9092,hadoop102:9092,hadoop103:9092") props.setProperty("group.id","flink01") props.setProperty("key.deserializer",classOf[StringDeserializer].getName) props.setProperty("value.deserializer",classOf[StringDeserializer].getName) props.setProperty("auto.offset.reset","latest") val stream: DataStream[(String, String)] = streamEnv.addSource(new FlinkKafkaConsumer[(String,String)]("t_bjsxt",new MyKafkaReader,props)) stream.print() streamEnv.execute() } class MyKafkaReader extends KafkaDeserializationSchema[(String,String)]{ override def isEndOfStream(nextElement: (String, String)): Boolean = { false } override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) = { if(record!=null){ var key="null" var value="null" if(record.key()!=null){ key =new String(record.key(),"UTF-8") } if(record.value()!=null){ value =new String(record.value(),"UTF-8") } (key,value) }else{ ("null","nulll") } } override def getProducedType: TypeInformation[(String, String)] ={ createTuple2TypeInformation(createTypeInformation[String],createTypeInformation[String]) } }}
定义生产者
package sourceimport org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}import java.util.{Properties, Random} object MykafkaProducer { def main(args: Array[String]): Unit = { val props = new Properties() props.setProperty("bootstrap.servers","node1:9092,node2:9092,node3:9092") props.setProperty("key.serializer",classOf[StringSerializer].getName) props.setProperty("value.serializer",classOf[StringSerializer].getName) var producer =new KafkaProducer[String,String](props) var r =new Random() while(true){ val data = new ProducerRecord[String,String]("topic_01","key"+r.nextInt(10),"value"+r.nextInt(100)) producer.send(data) Thread.sleep(1000) } producer.close() }}

- 赞
- 收藏
- 评论
- *举报
下一篇:Flink常用API之转换算子->Map、reduce、Filter、KeyBy、Aggregations
Original: https://blog.51cto.com/u_15704423/5434848
Author: wx62be9d88ce294
Title: Flink常用API之Kafka的Source
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/65973/
转载文章受原作者版权保护。转载请注明原作者出处!