Kafka 是如何做到消息不丢或不重复的

  • *消息重复。 相同的消息重复发送会造成消费者消费两次同样的消息,这同样会造成系统间数据的不一致。比如,订单支付成功后会通过消息队列给支付系统发送需要扣款的金额,如果消息发送两次一样的扣款消息,而订单只支付了一次,就会给用户带来余额多扣款的问题。

总结来说,这两个问题直接影响到业务系统间的数据一致性。

那到底该如何避免这两个问题的发生呢?

Kafka 针对这两个问题有系统的解决方案,需要服务端、客户端做相应的配置以及采取一些补偿方案。

先介绍下 三种消息语义及场景

介绍一下”消息语义”的概念,这是理论基础,会有利于你更好地抓住下面解决方案的要点。

消息语义有三种,分别是:消息最多传递一次、消息最少传递一次、消息有且仅有一次传递 ,这三种语义分别对应:消息不重复、消息不丢失、消息既不丢失也不重复。

这里的”消息传递一次”是指生产者生产消息成功,Broker 接收和保存消息成功,消费者消费消息成功。对一个消息来说,这三个要同时满足才算是”消息传递一次”。上面所说的那三种消息语义可梳理为如下。

  • *最多一次(At most once):对应消息不重复。消息最多传递一次,消息有可能会丢,但不会重复。一般运用于高并发量、高吞吐,但是对于消息的丢失不是很敏感的场景。

  • *最少一次(At least once):对应消息不丢失。消息最少传递一次,消息不会丢,但有可能重复。一般用于并发量一般,对于消息重复传递不敏感的场景。

  • *有且仅有一次(Exactly once):每条消息只会被传递一次,消息不会丢失,也不会重复。 用于对消息可靠性要求高,且对吞吐量要求不高的场景。

Kafka 如何做到消息不丢失?

我们先来讨论一下 Kafka 是如何做到消息不丢失的,也就是:生产者不少生产消息,服务端不丢失消息,消费者也不能少消费消息。

那具体要怎么来实现呢?下面我们就来详细讲解下。

生产端:不少生产消息

以下是为了保证消息不丢失,生产端需要配置的参数和相关使用方法。

acks=0,表示生产者不等待任何服务器节点的响应,只要发送消息就认为成功。

acks=1,表示生产者收到 leader 分区的响应就认为发送成功。

acks=-1,表示只有当 ISR(ISR 的含义后面我会详细介绍)中的副本全部收到消息时,生产者才会认为消息生产成功了。这种配置是最安全的,因为如果 leader 副本挂了,当 follower 副本被选为 leader 副本时,消息也不会丢失。但是系统吞吐量会降低,因为生产者要等待所有副本都收到消息后才能再次发送消息。

只要上面这四个要点配置对了,就可以保证生产端的生产者不少生产消息了。

服务端:不丢失消息

为了保证不丢失消息,消费者就不能少消费消息,该如何去实现呢?消费端需要做好如下的配置。

第一个,设置 enable.auto.commit=false。enable.auto.commit 这个参数表示是否自动提交,如果是自动提交会导致什么问题出现呢?

消费者消费消息是有两个步骤的,首先拉取消息,然后再处理消息。向服务端提交消息偏移量可以手动提交也可以自动提交。

如果把参数 enable.auto.commit 设置为 true 就表示消息偏移量是由消费端自动提交,由异步线程去完成的,业务线程无法控制。如果刚拉取了消息之后,业务处理还没进行完,这时提交了消息偏移量但是消费者却挂了,这就造成还没进行完业务处理的消息的位移被提交了,下次再消费就消费不到这些消息,造成消息的丢失。因此,一定要设置 enable.auto.commit=false,也就是手动提交消息偏移量。

第二个,要有手动提交偏移量的正确步骤。enable.auto.commit=false 并不能完全满足消费端消息不丢的条件,还要有正确的手动提交偏移量的过程。具体如何操作呢?

业务逻辑先对消息进行处理,再提交 offset,这样是能够保证不少消费消息的。但是你可以想象这样一个场景:如果消费者在处理完消息后、提交 offset 前出现宕机,待消费者再上线时,还会处理未提交的那部分消息,但是这部分已经被消费者处理过了,也就是说这样做虽然避免了丢消息,但是会有重复消费的情况出现。(这种情况比较少,一般特殊情况特殊处理就好)

具体代码需要这么写:

List

Kafka 如何做到消息不重复?

生产端不重复生产消息,服务端不重复存储消息,消费端也不能重复消费消息。

相较上面”消息不丢失”的场景,”消息不重复”的服务端无须做特别的配置,因为服务端不会重复存储消息,如果有重复消息也应该是由生产端重复发送造成的。也就是说,下面我们只需要分析生产端和消费端就行。

生产端:不重复生产消息

生产端发送消息后,服务端已经收到消息了,但是假如遇到网络问题,无法获得响应,生产端就无法判断该消息是否成功提交到了 Kafka,而我们一般会配置重试次数,但这样会引发生产端重新发送同一条消息,从而造成消息重复的发送。

对于这个问题,Kafka 0.11.0 的版本之前并没有什么解决方案,不过从 0.11.0 的版本开始,Kafka 给每个生产端生成一个 唯一的 ID ,并且在每条消息中生成一个 sequence num,sequence num 是递增且唯一的,这样就能对消息去重,达到一个生产端不重复发送一条消息的目的。

但是这个方法是有局限性的,只对在一个生产端内生产的消息有效,如果一个消息分别在两个生产端发送就不行了,还是会造成消息的重复发送。好在这种可能性比较小,因为消息的重试一般会在一个生产端内进行。当然,对应一个消息分别在两个生产端发送的请求我们也有方案,只是要多做一些补偿的工作,比如,我们可以为每一个消息分 配一个全局 ID,并把全局 ID 存放在远程缓存或关系型数据库里 ,这样在发送前可以判断一下是否已经发送过了。

消费端:不能重复消费消息

为了保证消息不重复,消费端就不能重复消费消息,该如何去实现呢?消费端需要做好如下配置。

第一步,设置 enable.auto.commit=false。跟前面一样,这里同样要避免自动提交偏移量。你可以想象这样一种情况,消费端拉取消息和处理消息都完成了,但是自动提交偏移量还没提交消费端却挂了,这时候 Kafka 消费组开始重新平衡并把分区分给另一个消费者,由于偏移量没提交新的消费者会重复拉取消息,这就最终造成重复消费消息。

第二步,单纯配成手动提交同样不能避免重复消费,还需要消费端使用正确的消费”姿势”。

消费者拉取消息后,先提交 offset 后再处理消息,这样就不会出现重复消费消息的可能。但是你可以想象这样一个场景:在提交 offset 之后、业务逻辑处理消息之前出现了宕机,待消费者重新上线时,就无法读到刚刚已经提交而未处理的这部分消息,还是会有少消费消息的情况。这种情况也是少数,可以根据业务做补偿

具体代码如下:

java;gutter:true;
List messages = consumer.poll();
consumer.commitOffset(); processMsg(messages);

总结一下:

Kafka 中消息不丢失、不重复很重要,就我个人经验来讲,业务人员除了担忧消息队列服务端宕机外,对消息的丢失和消息的重复会非常敏感,因为这直接影响到了业务本身。

总体来讲,要保证消息不丢失和不重复,你要从生产端、服务端和消费端三个部分全盘考虑才可行,只是单独考虑某一端是远远不够的。同时,我也希望你搞懂消息语义的含义,因为所有的消息队列都会有相应的涉及。

Original: https://www.cnblogs.com/daohangtaiqian/p/15398724.html
Author: 道行太浅
Title: Kafka 是如何做到消息不丢或不重复的

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/594430/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • mybatis学习

    404. 抱歉,您访问的资源不存在。 可能是网址有误,或者对应的内容被删除,或者处于私有状态。 代码改变世界,联系邮箱 contact@cnblogs.com 园子的商业化努力-困…

    Java 2023年5月30日
    071
  • Feign调用,get请求,参数为对象, 解决请求对象以及参数值为null

    请求参数过多,所以包装成一个请求对象 服务端: @GetMapping(value = "/readInfos") public List readHotels…

    Java 2023年6月8日
    070
  • 识别一个文件的真实格式

    识别一个文件的真实格式 import java.io.File; import java.io.FileInputStream; import java.io.FileNotFou…

    Java 2023年6月7日
    067
  • Java基础 awt Frame 窗体的大小不可调

    JDK :OpenJDK-11 OS :CentOS 7.6.1810 IDE :Eclipse 2019‑03 typesetting :Markdown code packag…

    Java 2023年5月29日
    078
  • 面试准备 — 大数据Hive相关

    1、拉链表 好文 需要查看一件事物从开始到现在的全部状态,比如用户从注册到今天改了几次手机号? 实际应用场景:1、用户数据量很大 2、某些字段经常被更新,其他字段基本不变 3、需求…

    Java 2023年6月7日
    073
  • 解决IDEA 读取properties配置文件中文乱码问题

    方法一查看文件编码类型是不是 utf-8 如果不是 修改为uft-8 然后就设置读取时的编码类型 InputStream resourceAsStream = equalsDemo…

    Java 2023年6月7日
    078
  • NotePad++的基本使用方法

    第一步:下载完成后的基本设置 设置>>首选项 进行如下操作 这个设置主要是为了在NotePad++的页面中可以输入汉字 第二步:在文件夹中新建文本文档 将后面的后缀改成…

    Java 2023年6月16日
    082
  • Jquery $(this).attr和$(this).val用法示例

    以下是个人心得整理,有兴趣朋友可以参考参考 $(this).attr(key); 获取节点属性名为key的值,相当于getAttribute(key)方法 $(this).attr…

    Java 2023年6月5日
    082
  • 本地项目推送到远程仓库(原来可以这么玩)

    前言:请各大网友尊重本人原创知识分享,谨记本人博客: 南国以南i 方式一:在idea中将项目推送至远程仓库 注:此处远程仓库以码云为例 第一步:登录码云,进入个人主页 点击个人头像…

    Java 2023年6月5日
    095
  • 描述性统计

    Part2 描述性统计 一、直方图 直方图是用面积而不是用高度来表示数,所以其不同于条形图 左边的刻度表示该块 每单位所占总面积的百分比,可以称其为 密度尺度。例如以每50元为一个…

    Java 2023年6月7日
    088
  • 坑爹!Quartz 重复调度问题,你遇到过么?

    作者:Lavender来源:https://segmentfault.com/a/1190000015492260 1. 引子 公司前期改用quartz做任务调度,一日的调度量均在…

    Java 2023年6月15日
    088
  • 二叉树的遍历

    二叉树的遍历应用实例 前序遍历,中序遍历,后序遍历步骤 前序遍历 1.先输出当前节点2.如果当前节点的左子节点不为空,则递归前序遍历3.如果当前节点的右子节点不为空,则递归前序遍历…

    Java 2023年6月15日
    068
  • NoteOfMySQL-09-存储过程与函数

    常用的SQL语句在执行时需要先编译,然后执行;而存储过程(Store Procedure)是经编译后存储在数据库中的SQL语句集,在数据库中创建和保存。 一、存储过程与函数的区别 …

    Java 2023年6月5日
    099
  • java处理集合工具

    public static Map parseListToMap(Collection list, Function mapper) { if (list == null || l…

    Java 2023年6月16日
    080
  • 真·Android Zxing 扫码中文乱码解决

    Zxing3.2.1 之前遇到过Android Zxing扫码乱码的问题,网上搜了下解决了下。 今天又遇到了问题。 依然会乱码。 研究总结如下: Zxing 可以在Hints中添加…

    Java 2023年6月15日
    070
  • Java基础问题

    基础问题 谈谈你对面向对象的理解 — 结合场景 为何要使用对象编程? 可重复利用,方便拓展 面向对象有三大特征:封装、继承和多态 封装:为什么要封装?可以使类的 成员(…

    Java 2023年6月7日
    064
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球