如何保证消息的可靠性的投递

如何保证消息的可靠性的投递

在本项目中,添加员工会发送入职邮件,利用RabbitMQ的队列发送入职邮件。这部分只是实现发送邮件的功能,RabbitMQ它有它的优点就是异步、解耦、流量削峰。RabbitMQ在我们的邮件发送中扮演的角色,就相当于一个中转站。

就好比,我们平时的驿站,取快递的这个操作,大家都很熟悉了吧。生产者,就是我们的商家,驿站,相当于我们的队列,而我们就是消费者。

详细的RabbitMQ的概念:RabbitMQ(一) – BeaBrick0 – 博客园 (cnblogs.com)

但是,它也有缺点。在实际的开发中,需要考虑到 如何确保消息的可靠性投递

我们一个个来解决,之前面试的过程中,面试官,对我说的一句话,记忆犹新,看一个知识点或者问题,要知道为什么是这样、为什么会这样、怎么可以避免。接下来,我们一个一个来。

在什么场景下消息可能会丢失

如何保证消息的可靠性的投递

如图所示,RabbitMQ丢失消息的情况可以发生在任何一个节点上。

消息投递的可靠性方案

针对上面可能会造成消息丢失的节点,我们来做分析:

生产者没有成功把消息发送到MQ

丢失的原因:因为网络传输的不稳定性,当生产者在向MQ发送消息的过程中,MQ没有成功接收到消息,但是生产者却以为MQ成功接收到了消息,不会再次重复发送该消息,从而导致消息的丢失。

解决办法: 有两个解决办法:事务机制和confirm机制,最常用的是confirm机制。

事务机制

RabbitMQ 提供了事务功能,生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit。

//伪代码
//开启事务
channel.txSelect

try{
  //在这里发送消息

  .........

}catch(Exception e){
  //如果捕获到异常

  channel.txRollBack;
}

//提交事务
channel.txCommit;

Confirm机制

RabbitMQ可以开启 confirm 模式,在生产者那里设置开启 confirm 模式之后,生产者每次写的消息都会分配一个唯一的 id,如果消息成功写入 RabbitMQ 中,RabbitMQ 会给生产者回传一个 ack 消息,告诉你说这个消息 ok 了。

如果 RabbitMQ 没能处理这个消息,会回调你的一个 nack 接口,告诉你这个消息接收失败,生产者可以发送。而且你可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,那么可以重发。

注意:RabbitMQ的事务机制是同步的,很耗型能,会降低RabbitMQ的吞吐量。confirm机制是异步的,生成者发送完一个消息之后,不需要等待RabbitMQ的回调,就可以发送下一个消息,当RabbitMQ成功接收到消息之后会自动异步的回调生产者的一个接口返回成功与否的消息。

RabbitMQ接收到消息之后丢失了消息

丢失的原因:RabbitMQ接收到生产者发送过来的消息,是存在内存中的,如果没有被消费完,此时RabbitMQ宕机了,那么再次启动的时候,原来内存中的那些消息都丢失了。

解决办法:开启RabbitMQ的持久化。当生产者把消息成功写入RabbitMQ之后,RabbitMQ就把消息持久化到磁盘。结合上面的说到的confirm机制,只有当消息成功持久化磁盘之后,才会回调生产者的接口返回ack消息,否则都算失败,生产者会重新发送。存入磁盘的消息不会丢失,就算RabbitMQ挂掉了,重启之后,他会读取磁盘中的消息,不会导致消息的丢失。

持久化的配置

第一点是创建 queue 的时候将其设置为持久化,这样就可以保证 RabbitMQ 持久化 queue 的元数据,但是它是不会持久化 queue 里的数据的。

第二个是发送消息的时候将消息的 deliveryMode 设置为 2,就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。

注意:持久化要起作用必须同时设置这两个持久化才行,RabbitMQ 哪怕是挂了,再次重启,也会从磁盘上重启恢复 queue,恢复这个 queue 里的数据。

消费者弄丢了消息

丢失的原因:如果RabbitMQ成功的把消息发送给了消费者,那么RabbitMQ的ack机制会自动的返回成功,表明发送消息成功,下次就不会发送这个消息。但如果就在此时,消费者还没处理完该消息,然后宕机了,那么这个消息就丢失了。

b、解决的办法:简单来说,就是必须关闭 RabbitMQ 的自动 ack,可以通过一个 api 来调用就行,然后每次在自己代码里确保处理完的时候,再在程序里 ack 一把。这样的话,如果你还没处理完,不就没有 ack了?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。

本项目的针对消息的可靠性的解决方案

消息发送

第一种方案:消息落库,对消息进行状态打标。

如何保证消息的可靠性的投递

如何保证消息的可靠性的投递
  • Step1:进行订单数据入业务库,同时插入一条消息数据入消息库,此时数据状态status为0
  • Step2:发送消息
  • Step3:服务器发送消息确认
  • Step4:生产者监听器监听到了消息确认,这时候更改消息库数据状态status为1
  • Step5:分布式定时任务会定时查询消息库状态status为0的数据(说明数据未可靠发送,发送失败或者接收确认失败)
  • Step6:将状态status为0的数据进行二次重试发送,每重试一次次数加1
  • *Step7:如果重试次数超过3次,则将消息库数据状态status改为2,数据状态status为2的数据只能通过人工去排查问题

当然,这种要一直去数据库查询,我们的消息的状态和重试次数。

其实很明显,高并发场景下是有性能瓶颈的,每次消息的推送都需要多一次的入库(数据入消息库), 带来多一次的磁盘IO消耗。由于有性能的限制,我们来看第二种方案,消息的延迟投递,做二次确认,回调检查。

第二种方案:消息延迟投递,做二次确认,回调检查。

如何保证消息的可靠性的投递
  • Step1:首先对业务订单数据落库,再发送消息到消息服务器
  • Step2:第一次发送消息几分钟之后重新发送该消息,目的是做confirm消息确认检查(延迟投递)
  • Step3:下游服务监听消息,进行消费
  • Step4:接收到消息之后会发送一个confirm确认消息到服务器
  • Step5:回调服务会监听下游服务发送的确认消息,监听到之后会落入消息库
  • *Step6:几分钟之后,Step2发送了消息确认数据,回调服务接收到了该消息,会和消息库中的消息进行一个比对检查,假如消息库中没有该消息的确认,回调服务就会发起一个ReSend Command命令,通知上游服务重新发送一次消息。

在本项目中,我们选择第一种方案,来取保消息的可靠性投递。

项目的实现流程:

  1. 发送消息时,将当前消息数据存入数据库中,投递状态改为正在投递中
  2. 开启消息的回调机制,确认成功,更新投递状态为消息的投递状态
  3. 开启定时任务,重新投递失败。重试超过3次,更新投递状态为投递失败。

Original: https://www.cnblogs.com/bearbrick0/p/16147949.html
Author: BearBrick0
Title: 如何保证消息的可靠性的投递

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/572468/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • Spring Boot 入门(八)部署RabbitMQ

    RabbitMQ Erlang 版本对照表:https://www.rabbitmq.com/which-erlang.html 安装erlang 由于rabbitmq是基于erl…

    Java 2023年6月5日
    0120
  • Smack 3.3.1 发布,Java 的 XMPP 开发包

    Smack 3.3.1 发布了,这是一个小更新版本,主要更新包括: [SMACK-441] – Memory leak in KeepAliveManager [SMA…

    Java 2023年5月29日
    081
  • 设计模式——单例模式

    引言 今天来谈谈设计模式中的单例模式,温故知新,以免生疏。 软件设计领域的四位世界级大师Gang Of Four (GoF):Erich Gamma,Richard Helm,Ra…

    Java 2023年6月8日
    0111
  • Spring原型bean-prototype不能销毁? 转载

    一个原型bean的例子: 结果是 注解的@PreDestroy的方法close()没有执行,而如果是单例bean 的singleton则会执行 那若想销毁Spring的原型bean…

    Java 2023年5月30日
    0103
  • t-io 学习笔记(一)

    基础介绍理解篇 序:本文也是在t-io官网学习的基础上写的理解学习笔记;1.什么是t-io? t-io是基于JVM的网络编程框架,和netty属同类,所以netty能做的t-io都…

    Java 2023年6月16日
    096
  • mac(m1)配置my.cnf

    今天开始学习了数据库,在安装MySQL之后启动一直报错,然后在网上找了很多解决方法,最后用以下方法解决 对于习惯了windows的小伙伴来说,直接去安装目录里边修改my.ini就可…

    Java 2023年6月14日
    073
  • 解决Watt Toolkit(原steam++)的host代理443端口被占用的问题(电脑有虚拟机进!!)

    解决Watt Toolkit(原steam++) 的host代理443端口被占用的问题 写在前面 本来没法打算更改,但是公司为了上线微软商店还是将名字改了,所以我也修改一下。 一、…

    Java 2023年6月5日
    091
  • mybatis-plus

    mybatis-plus执行 SQL 分析打印 https://gitee.com/baomidou/mybatis-plus-samples/blob/master/mybati…

    Java 2023年5月30日
    091
  • 群晖 docker 下nexus3 创建docker私有仓库

    登录后创建Blob Stores(docker MyDockerBlobStores) 创建docker host(可以push/pull) docker 代理仓库,可以是dock…

    Java 2023年6月6日
    069
  • Effective Java 第三版——76. 争取保持失败原子性

    Tips书中的源代码地址:https://github.com/jbloch/effective-java-3e-source-code注意,书中的有些代码里方法是基于Java 9…

    Java 2023年5月29日
    0115
  • RPM 与 YUM

    RPM 与 YUM rpm 用于互联网下载包的打包及安装工具,它包含在某些 Linux 分发版中。它生成具有.RPM 扩展名的文件。RPM是 RedHat Package Mana…

    Java 2023年6月5日
    0119
  • [学习笔记] Java重写和重载

    重写(Override) 重写是子类对允许访问的父类方法的方法体重新进行编写,返回值和形参不发生改变; 通过重写,子类可以根据需要定义特定于自身的行为,根据需要实现父类的方法; 方…

    Java 2023年6月5日
    083
  • Java随谈(三)如何创建好一个对象?

    本文推荐阅读时间30分钟 (注意: 编写这篇文章的意义在于,希望自己能够在今后的编程生涯中,每一个实现,都是根据需求去思考,选择较优的实现方式(不是最优而是较优是考虑时间、实现成本…

    Java 2023年6月8日
    058
  • 26.服务端单线程模式下性能瓶颈测试

    VS2015 提供的性能探查器,可以看到程序的哪部分代码占用了多少的cpu 在Release版本下,使用,性能探查器———开始 运行一段时间之后…

    Java 2023年5月29日
    084
  • JPA作持久层操作

    JPA(Hibernate是jpa的实现) jpa是对实体类操作,从而通过封装好的接口直接设置数据库的表结构。虽然jpa可以直接通过编写java代码来操作数据库表结构,避免了sql…

    Java 2023年6月13日
    077
  • 分享实用小工具:JAVA版本位运算工具类

    将二进制数中的每位数字1或0代表着某种开关标记,1为是,0为否,则一个数字可以代表N位的开关标记值,可有效减少过多的变量定义 或 过多的表字段,同时也能在一些复杂的组合判断场景下利…

    Java 2023年5月29日
    078
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球