Kafka 是如何做到消息不丢或不重复的

  • *消息重复。 相同的消息重复发送会造成消费者消费两次同样的消息,这同样会造成系统间数据的不一致。比如,订单支付成功后会通过消息队列给支付系统发送需要扣款的金额,如果消息发送两次一样的扣款消息,而订单只支付了一次,就会给用户带来余额多扣款的问题。

总结来说,这两个问题直接影响到业务系统间的数据一致性。

那到底该如何避免这两个问题的发生呢?

Kafka 针对这两个问题有系统的解决方案,需要服务端、客户端做相应的配置以及采取一些补偿方案。

先介绍下 三种消息语义及场景

介绍一下”消息语义”的概念,这是理论基础,会有利于你更好地抓住下面解决方案的要点。

消息语义有三种,分别是:消息最多传递一次、消息最少传递一次、消息有且仅有一次传递 ,这三种语义分别对应:消息不重复、消息不丢失、消息既不丢失也不重复。

这里的”消息传递一次”是指生产者生产消息成功,Broker 接收和保存消息成功,消费者消费消息成功。对一个消息来说,这三个要同时满足才算是”消息传递一次”。上面所说的那三种消息语义可梳理为如下。

  • *最多一次(At most once):对应消息不重复。消息最多传递一次,消息有可能会丢,但不会重复。一般运用于高并发量、高吞吐,但是对于消息的丢失不是很敏感的场景。

  • *最少一次(At least once):对应消息不丢失。消息最少传递一次,消息不会丢,但有可能重复。一般用于并发量一般,对于消息重复传递不敏感的场景。

  • *有且仅有一次(Exactly once):每条消息只会被传递一次,消息不会丢失,也不会重复。 用于对消息可靠性要求高,且对吞吐量要求不高的场景。

Kafka 如何做到消息不丢失?

我们先来讨论一下 Kafka 是如何做到消息不丢失的,也就是:生产者不少生产消息,服务端不丢失消息,消费者也不能少消费消息。

那具体要怎么来实现呢?下面我们就来详细讲解下。

生产端:不少生产消息

以下是为了保证消息不丢失,生产端需要配置的参数和相关使用方法。

acks=0,表示生产者不等待任何服务器节点的响应,只要发送消息就认为成功。

acks=1,表示生产者收到 leader 分区的响应就认为发送成功。

acks=-1,表示只有当 ISR(ISR 的含义后面我会详细介绍)中的副本全部收到消息时,生产者才会认为消息生产成功了。这种配置是最安全的,因为如果 leader 副本挂了,当 follower 副本被选为 leader 副本时,消息也不会丢失。但是系统吞吐量会降低,因为生产者要等待所有副本都收到消息后才能再次发送消息。

只要上面这四个要点配置对了,就可以保证生产端的生产者不少生产消息了。

服务端:不丢失消息

为了保证不丢失消息,消费者就不能少消费消息,该如何去实现呢?消费端需要做好如下的配置。

第一个,设置 enable.auto.commit=false。enable.auto.commit 这个参数表示是否自动提交,如果是自动提交会导致什么问题出现呢?

消费者消费消息是有两个步骤的,首先拉取消息,然后再处理消息。向服务端提交消息偏移量可以手动提交也可以自动提交。

如果把参数 enable.auto.commit 设置为 true 就表示消息偏移量是由消费端自动提交,由异步线程去完成的,业务线程无法控制。如果刚拉取了消息之后,业务处理还没进行完,这时提交了消息偏移量但是消费者却挂了,这就造成还没进行完业务处理的消息的位移被提交了,下次再消费就消费不到这些消息,造成消息的丢失。因此,一定要设置 enable.auto.commit=false,也就是手动提交消息偏移量。

第二个,要有手动提交偏移量的正确步骤。enable.auto.commit=false 并不能完全满足消费端消息不丢的条件,还要有正确的手动提交偏移量的过程。具体如何操作呢?

业务逻辑先对消息进行处理,再提交 offset,这样是能够保证不少消费消息的。但是你可以想象这样一个场景:如果消费者在处理完消息后、提交 offset 前出现宕机,待消费者再上线时,还会处理未提交的那部分消息,但是这部分已经被消费者处理过了,也就是说这样做虽然避免了丢消息,但是会有重复消费的情况出现。(这种情况比较少,一般特殊情况特殊处理就好)

具体代码需要这么写:

List

Kafka 如何做到消息不重复?

生产端不重复生产消息,服务端不重复存储消息,消费端也不能重复消费消息。

相较上面”消息不丢失”的场景,”消息不重复”的服务端无须做特别的配置,因为服务端不会重复存储消息,如果有重复消息也应该是由生产端重复发送造成的。也就是说,下面我们只需要分析生产端和消费端就行。

生产端:不重复生产消息

生产端发送消息后,服务端已经收到消息了,但是假如遇到网络问题,无法获得响应,生产端就无法判断该消息是否成功提交到了 Kafka,而我们一般会配置重试次数,但这样会引发生产端重新发送同一条消息,从而造成消息重复的发送。

对于这个问题,Kafka 0.11.0 的版本之前并没有什么解决方案,不过从 0.11.0 的版本开始,Kafka 给每个生产端生成一个 唯一的 ID ,并且在每条消息中生成一个 sequence num,sequence num 是递增且唯一的,这样就能对消息去重,达到一个生产端不重复发送一条消息的目的。

但是这个方法是有局限性的,只对在一个生产端内生产的消息有效,如果一个消息分别在两个生产端发送就不行了,还是会造成消息的重复发送。好在这种可能性比较小,因为消息的重试一般会在一个生产端内进行。当然,对应一个消息分别在两个生产端发送的请求我们也有方案,只是要多做一些补偿的工作,比如,我们可以为每一个消息分 配一个全局 ID,并把全局 ID 存放在远程缓存或关系型数据库里 ,这样在发送前可以判断一下是否已经发送过了。

消费端:不能重复消费消息

为了保证消息不重复,消费端就不能重复消费消息,该如何去实现呢?消费端需要做好如下配置。

第一步,设置 enable.auto.commit=false。跟前面一样,这里同样要避免自动提交偏移量。你可以想象这样一种情况,消费端拉取消息和处理消息都完成了,但是自动提交偏移量还没提交消费端却挂了,这时候 Kafka 消费组开始重新平衡并把分区分给另一个消费者,由于偏移量没提交新的消费者会重复拉取消息,这就最终造成重复消费消息。

第二步,单纯配成手动提交同样不能避免重复消费,还需要消费端使用正确的消费”姿势”。

消费者拉取消息后,先提交 offset 后再处理消息,这样就不会出现重复消费消息的可能。但是你可以想象这样一个场景:在提交 offset 之后、业务逻辑处理消息之前出现了宕机,待消费者重新上线时,就无法读到刚刚已经提交而未处理的这部分消息,还是会有少消费消息的情况。这种情况也是少数,可以根据业务做补偿

具体代码如下:

java;gutter:true;
List messages = consumer.poll();
consumer.commitOffset(); processMsg(messages);

总结一下:

Kafka 中消息不丢失、不重复很重要,就我个人经验来讲,业务人员除了担忧消息队列服务端宕机外,对消息的丢失和消息的重复会非常敏感,因为这直接影响到了业务本身。

总体来讲,要保证消息不丢失和不重复,你要从生产端、服务端和消费端三个部分全盘考虑才可行,只是单独考虑某一端是远远不够的。同时,我也希望你搞懂消息语义的含义,因为所有的消息队列都会有相应的涉及。

Original: https://www.cnblogs.com/daohangtaiqian/p/15398724.html
Author: 道行太浅
Title: Kafka 是如何做到消息不丢或不重复的

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/594430/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • Spring5框架新功能

    一、 1、整个Spring5框架基于java8,运行时兼容java9,许多不建议使用的方法在代码库中被删除 2、Spring5自带通用的日志封装 (1)Spring5已经移除了lo…

    Java 2023年6月7日
    099
  • 如何搭建一个WEB服务器项目(二)—— 对数据库表进行基本的增删改查操作

    使用HibernateTemplate进行增删改查操作 观前提示:本系列文章有关服务器以及后端程序这些概念,我写的全是自己的理解,并不一定正确,希望不要误人子弟。欢迎各位大佬来评论…

    Java 2023年6月8日
    093
  • JWT详解与基本使用(保姆教程)

    前言: 最近准备写一篇关于security前后端分离场景下的认证与授权文章,里面使用到了jwt,所以就写了一篇jwt的文章,作为开头小菜😋 概述 讲jwt之前,先讲一下什么是tok…

    Java 2023年6月15日
    0101
  • java实现简易的局域网对话系统

    先说一下 写的确实比较一般,别喷 为什么呢,疫情原因,学校提前两周期末考试,时间也不太充足,将就一下 服务器代码: package xcvcvcx; import java.io….

    Java 2023年6月5日
    085
  • 03-Eureka注册中心

    1、介绍 2、快速开始 2.1 pom文件依赖

    Java 2023年6月7日
    082
  • 浅谈JWT

    JWT 常见的认证机制 HTTP Basic Auth HTTP Basic Auth简单点说明就是每次请求API时都提供用户的username和password,简言之,Basi…

    Java 2023年6月5日
    0106
  • windows media play javascript 全屏 单击事件

    上面代码放在HTML页面中, 倒数三行的设置,是对应如果你要做JAVASCRIPT里是否要获取到,0是false,只是不明白为什么-1是true, 然后在HTML里面加入 docu…

    Java 2023年6月14日
    078
  • docker

    一、docker安装 VMware centos7 卸载原有docker yum remove docker docker-common docker-selinux docker…

    Java 2023年6月9日
    0105
  • Spring Boot 入门(六)使用MySQL

    用MySQL客户端,新建测试数据库 客户端:MySQL Workbench 根pom添加依赖 org.springframework.boot spring-boot-starte…

    Java 2023年6月5日
    097
  • 绘制几何图形

    《零基础学Java》 绘制几何图形Java可以 分别使用 Graphics 和 Graphics2D 绘制图形, Graphics类 使用不同的方法绘制不同的图形(drawLine…

    Java 2023年6月9日
    082
  • 深入理解mysql锁与事务隔离级别

    一、锁 1、锁的定义 锁即是一种用来协调多线程或进程并发使用同一共享资源的机制 2、锁的分类 从性能上分类:乐观锁和悲观锁 从数据库操作类型上分类:读锁和写锁 从操作粒度上分类:表…

    Java 2023年6月13日
    0106
  • 一台“厉害”的打印机

    摘要:面向教育模式的转变,南京功夫豆携手华为云IoT,给打印机配上与云端互通的智能盒子,开启云端打印新模式 后疫情时代 生活的各方面都在往线上模式延展 而在不被人注意的角落里 打印…

    Java 2023年6月15日
    0105
  • 重启rabbitmq服务

    重启rabbitmq服务通过两个命令来实现: rabbitmqctl stop :停止rabbitmq rabbitmq-server restart : 重启rabbitmq 因…

    Java 2023年5月30日
    0149
  • 设计模式之策略模式

    策略模式属于行为型模式,是使用最多的设计模式之一;其作用是针对一组算法,将每一个算法封装到具体共同接口的独立的类种,从而使得他们可以相互转化。策略模式使得算法可以在不影响到客户端得…

    Java 2023年6月5日
    097
  • Java(12)静态字段与静态方法

    之前我们都定义的 main方法都被标记了 static修饰符,那到底是什么意思?下面我们来看看 静态字段 如果将一个字段定义为 static,每个类只有一个这样的字段。而对于非静态…

    Java 2023年6月9日
    081
  • (十二)springboot中shiro的使用

    一、引入maven配置 xml;toolbar:false org.apache.shiro     shiro-spring     1.4.0</p> <pr…

    Java 2023年5月29日
    080
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球