1.需求:创建一个独立消费者,消费 first 主题中数据。
2.在IDEA创建包名:com.kafka.consumer
3.新建一个CustomConsumer类
package com.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
public class CustomConsumer {
public static void main(String[] args) {
//配置
Properties properties = new Properties();
//连接Kafka集群
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
//反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
//GroupId(必须配置)
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
//创建消费者对象
KafkaConsumer kafkaConsumer = new KafkaConsumer<>(properties);
//topic数组
ArrayList topics = new ArrayList<>();
topics.add("first");
//订阅topic
kafkaConsumer.subscribe(topics);
//处理数据
while (true){
ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}
}
}
4.在 IDEA 中执行消费者程序。
5.在 Kafka 集群控制台,创建 Kafka 生产者,并输入数据。
bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
同时IDEA收到消息
sumerRecord(topic = first, partition = 1, leaderEpoch = 9, offset = 23, CreateTime = 1653112344357, serialized key size = -1, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello)
Original: https://www.cnblogs.com/hz-Master/p/16294975.html
Author: hz15968199950
Title: Kafka消费者 API(1)
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/563038/
转载文章受原作者版权保护。转载请注明原作者出处!