Kafka的安装与使用(转)

9.1 Kafka 基础知识

Kafka的安装与使用(转)

9.1.1 消息系统

Kafka的安装与使用(转)
点对点消息系统:生产者发送一条消息到queue,一个queue可以有很多消费者,但是一个消息只能被一个消费者接受,当没有消费者可用时,这个消息会被保存直到有 一个可用的消费者,所以Queue实现了一个可靠的负载均衡。

发布订阅消息系统:发布者发送到topic的消息,只有订阅了topic的订阅者才会收到消息。topic实现了发布和订阅,当你发布一个消息,所有订阅这个topic的服务都能得到这个消息,所以从1到N个订阅者都能得到这个消息的拷贝。

9.1.2 kafka术语

Kafka的安装与使用(转)
消息由producer产生,消息按照topic归类,并发送到broker中,broker中保存了一个或多个topic的消息,consumer通过订阅一组topic的消息,通过持续的poll操作从broker获取消息,并进行后续的消息处理。

Producer :消息生产者,就是向broker发指定topic消息的客户端。

Consumer :消息消费者,通过订阅一组topic的消息,从broker读取消息的客户端。

Broker :一个kafka集群包含一个或多个服务器,一台kafka服务器就是一个broker,用于保存producer发送的消息。一个broker可以容纳多个topic。

Topic :每条发送到broker的消息都有一个类别,可以理解为一个队列或者数据库的一张表。

Partition:一个topic的消息由多个partition队列存储的,一个partition队列在kafka上称为一个分区。每个partition是一个有序的队列,多个partition间则是无序的。partition中的每条消息都会被分配一个有序的id(offset)。

Offset:偏移量。kafka为每条在分区的消息保存一个偏移量offset,这也是消费者在分区的位置。kafka的存储文件都是按照offset.kafka来命名,位于2049位置的即为2048.kafka的文件。比如一个偏移量是5的消费者,表示已经消费了从0-4偏移量的消息,下一个要消费的消息的偏移量是5。

Consumer Group (CG):若干个Consumer组成的集合。这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个CG只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic。

假如一个消费者组有两个消费者,订阅了一个具有4个分区的topic的消息,那么这个消费者组的每一个消费者都会消费两个分区的消息。消费者组的成员是动态维护的,如果新增或者减少了消费者组中的消费者,那么每个消费者消费的分区的消息也会动态变化。比如原来一个消费者组有两个消费者,其中一个消费者因为故障而不能继续消费消息了,那么剩下一个消费者将会消费全部4个分区的消息。

9.1.3 kafka安装和使用
在Windows安装运行Kafka:https://blog.csdn.net/weixin_38004638/article/details/91893910

Kafka的安装与使用(转)

9.1.4 kafka运行

Kafka的安装与使用(转)

Kafka的安装与使用(转)

一次写入,支持多个应用读取,读取信息是相同的

Kafka的安装与使用(转)

kafka-study.pom

Producer生产者

发送消息的方式,只管发送,不管结果:只调用接口发送消息到 Kafka 服务器,但不管成功写入与否。由于 Kafka 是高可用的,因此大部分情况下消息都会写入,但在异常情况下会丢消息
同步发送:调用 send() 方法返回一个 Future 对象,我们可以使用它的 get() 方法来判断消息发送成功与否
异步发送:调用 send() 时提供一个回调方法,当接收到 broker 结果后回调此方法

public class MyProducer {
private static KafkaProducer

/*
* 创建topic:.\bin\windows\kafka-topics.bat –create –zookeeper localhost:2181
* –replication-factor 1 –partitions 1 –topic kafka-study
* 创建消费者:.\bin\windows\kafka-console-consumer.bat –bootstrap-server localhost:9092
* –topic imooc-kafka-study –from-beginning
/
//发送消息,发送完后不做处理
private static void sendMessageForgetResult() {
ProducerRecord

public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
//获取topic所有分区
List

Kafka的安装与使用(转)

Kafka消费者(组)

  • 自动提交位移 * 手动同步提交当前位移 * 手动异步提交当前位移 * 手动异步提交当前位移带回调 * 混合同步与异步提交位移

public class MyConsumer {
private static KafkaConsumer

//自动提交位移:由consume自动管理提交
private static void generalConsumeMessageAutoCommit() {
//配置
properties.put(“enable.auto.commit”, true);
consumer = new KafkaConsumer<>(properties);
//指定topic
consumer.subscribe(Collections.singleton(“kafka-study-x”));
try {
while (true) {
boolean flag = true;
//拉取信息,超时时间100ms
ConsumerRecords

//手动同步提交当前位移,根据需求提交,但容易发送阻塞,提交失败会进行重试直到抛出异常
private static void generalConsumeMessageSyncCommit() {
properties.put(“auto.commit.offset”, false);
consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(“kafka-study-x”));
while (true) {
boolean flag = true;
ConsumerRecords

//手动异步提交当前位移,提交速度快,但失败不会记录
private static void generalConsumeMessageAsyncCommit() {
properties.put(“auto.commit.offset”, false);
consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(“kafka-study-x”));
while (true) {
boolean flag = true;
ConsumerRecords

//手动异步提交当前位移带回调
private static void generalConsumeMessageAsyncCommitWithCallback() {
properties.put(“auto.commit.offset”, false);
consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(“kafka-study-x”));
while (true) {
boolean flag = true;
ConsumerRecords

//混合同步与异步提交位移
@SuppressWarnings(“all”)
private static void mixSyncAndAsyncCommit() {
properties.put(“auto.commit.offset”, false);
consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(“kafka-study-x”));
try {
while (true) {
//boolean flag = true;
ConsumerRecords

public static void main(String[] args) {
//自动提交位移
generalConsumeMessageAutoCommit();
//手动同步提交当前位移
//generalConsumeMessageSyncCommit();
//手动异步提交当前位移
//generalConsumeMessageAsyncCommit();
//手动异步提交当前位移带回调
//generalConsumeMessageAsyncCommitWithCallback()
//混合同步与异步提交位移
//mixSyncAndAsyncCommit();
}
}
先启动消费者等待接收消息,再启动生产者发送消息,进行消费消息

Kafka的安装与使用(转)

————————————————
版权声明:本文为CSDN博主「陈晨辰~」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/weixin_38004638/article/details/91975123

Original: https://www.cnblogs.com/sandea/p/12078399.html
Author: sandea
Title: Kafka的安装与使用(转)

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/533857/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球