RabbitMQ 发布订阅-实现延时重试队列

RabbitMQ消息处理失败,我们会让失败消息进入重试队列等待执行,因为在重试队列距离真正执行还需要定义的时间间隔,因此,我们可以将重试队列设置成延时处理。今天参考网上其他人的实现,简单梳理下消息延时重试执行的思路。

消费失败后,自动延时将消息重新投递,当达到一定的重试次数后,将消息投递到失败消息队列,等待人工介入处理。在这里我们一步一步实现一个带有失败重试功能的发布订阅组件,使用该组件后可以非常简单的实现消息的发布订阅。

业务背景

  1. 结合RabbitMQ的Topic模式和Work Queue模式实现生产方产生消息,消费方按需订阅,消息投递到消费方的队列之后,多个worker同时对消息进行消费
  2. 结合RabbitMQ的Message TTLDead Letter Exchange 实现消息的延时重试功能
  3. 消息达到最大重试次数之后,将其投递到失败队列,等待人工介入处理bug后,重新将其加入队列消费

执行流程图

  1. 生产者发布消息到主Exchange
  2. 主Exchange根据Routing Key将消息分发到对应的消息队列
  3. 多个消费者的worker进程同时对队列中的消息进行消费,因此它们之间采用”竞争”的方式来争取消息的消费
  4. 消息消费后,不管成功失败,都要返回ACK消费确认消息给队列,避免消息消费确认机制导致重复投递,同时,如果消息处理成功,则结束流程,否则进入重试阶段
  5. 如果重试次数小于设定的最大重试次数(默认为3次),则将消息重新投递到Retry Exchange的重试队列
  6. 重试队列不需要消费者直接订阅,它会等待消息的有效时间过期之后,重新将消息投递给Dead Letter Exchange,我们在这里将其设置为主Exchange,实现延时后重新投递消息,这样消费者就可以重新消费消息
  7. 如果三次以上都是消费失败,则认为消息无法被处理,直接将消息投递给Failed Exchange的Failed Queue,这时候应用可以触发报警机制,以通知相关责任人处理
  8. 等待人工介入处理(解决bug)之后,重新将消息投递到主Exchange,这样就可以重新消费了

技术实现:

创建Exchange

为了实现消息的延时重试和失败存储,我们需要创建三个Exchange来处理消息。

  • master 主Exchange,发布消息时发布到该Exchange
  • master.retry 重试Exchange,消息处理失败时(3次以内),将消息重新投递给该Exchange
  • master.failed 失败Exchange,超过三次重试失败后,消息投递到该Exchange

所有的Exchange声明(declare)必须使用以下参数

参数值说明 exchange – Exchange名称 type topic Exchange 类型 passive false 如果Exchange已经存在,则返回成功,不存在则创建 durable true 持久化存储Exchange,这里仅仅是Exchange本身持久化,消息和队列需要单独指定其持久化 no-wait false 该方法需要应答确认

在RabbitMQ的管理界面中,我们可以看到创建的三个Exchange

RabbitMQ 发布订阅-实现延时重试队列

消息发布

消息发布时,使用 basic_publish方法,参数如下

参数值说明 message – 发布的消息对象 exchange master 消息发布到的Exchange routing-key – 路由KEY,用于标识消息类型 mandatory false 是否强制路由,指定了该选项后,如果没有订阅该消息,则会返回路由不可达错误 immediate false 指定了当消息无法直接路由给消费者时如何处理

发布消息时,对于 message对象,其内容使用json编码后的字符串,同时消息进行持久化

消息订阅

消息订阅的实现相对复杂一些,需要完成队列的声明以及队列和Exchange的绑定

Declare Queue

对于每一个订阅消息的服务,都必须创建一个该服务对应的队列,将该队列绑定到关注的路由规则,这样之后,消息生产者将消息投递给Exchange之后,就会按照路由规则将消息分发到对应的队列供消费者消费了。

消费服务需要declare三个队列

  • [queue_name] 队列名称,格式符合 [服务名称]@订阅服务标识
  • [queue_name]@retry 重试队列
  • [queue_name]@failed 失败队列

Declare队列时,参数规定规则如下

参数值说明 queue – 队列名称 passive false 队列不存在则创建,存在则直接成功 durable true 队列持久化 exclusive false 排他,指定该选项为true则队列只对当前连接有效,连接断开后自动删除 no-wait false 该方法需要应答确认 auto-delete false 当不再使用时,是否自动删除

对于 @retry重试队列,需要指定额外参数

<span class="hljs-string">'x-dead-letter-exchange'    => <span class="hljs-string">'master'
<span class="hljs-string">'x-dead-letter-routing-key' => [queue_name],
<span class="hljs-string">'x-message-ttl'              => <span class="hljs-number">30 * <span class="hljs-number">1000 </span></span></span></span></span></span>

查看队列的详细信息,我们可以看到 queueName@retry 队列与其它两个队列的不同

RabbitMQ 发布订阅-实现延时重试队列
<br><strong>&#x961F;&#x5217;&#x548C;Exchange&#x7ED1;&#x5B9A;</strong><br>&#x521B;&#x5EFA;&#x5B8C;&#x961F;&#x5217;&#x4E4B;&#x540E;&#xFF0C;&#x9700;&#x8981;&#x5C06;&#x961F;&#x5217;&#x4E0E;Exchange&#x7ED1;&#x5B9A;&#xFF08;<code><span class="hljs-built_in">bind</span></code>&#xFF09;&#xFF0C;&#x4E0D;&#x540C;&#x961F;&#x5217;&#x9700;&#x8981;&#x7ED1;&#x5B9A;&#x5230;&#x4E4B;&#x524D;&#x521B;&#x5EFA;&#x7684;&#x5BF9;&#x5E94;&#x7684;Exchange&#x4E0A;&#x9762;

QueueExchange [queue_name] master [queue_name]@retry master.retry [queue_name]@failed master.failed

绑定时,需要提供订阅的路由KEY,该路由KEY与消息发布时的路由KEY对应,区别是这里可以使用通配符同时订阅多种类型的消息。

参数值说明 queue – 绑定的队列 exchange – 绑定的Exchange routing-key – 订阅的消息路由规则 no-wait false 该方法需要应答确认

<br><br>&#x5728;RabbitMQ&#x7684;&#x7BA1;&#x7406;&#x754C;&#x9762;&#x4E2D;&#xFF0C;&#x6211;&#x4EEC;&#x53EF;&#x4EE5;&#x770B;&#x5230;&#x8BE5;&#x961F;&#x5217;&#x4E0E;Exchange&#x548C;routing-<span class="hljs-keyword">key&#x7684;&#x7ED1;&#x5B9A;&#x5173;&#x7CFB;<br><img src="https://img2018.cnblogs.com/blog/280403/201812/280403-20181228145537377-1269201897.png"></span>
<img src="https://img2018.cnblogs.com/blog/280403/201812/280403-20181228145559005-750279050.png">

RabbitMQ 发布订阅-实现延时重试队列

消息消费实现

使用 basic_consume 对消息进行消费的时候,需要注意下面参数

参数值说明 queue – 消费的队列名称 consumer-tag – 消费者标识,留空即可 no_local false 如果设置了该字段,服务器将不会发布消息到 发布它的客户端 no_ack false 需要消费确认应答 exclusive false 排他访问,设置后只允许当前消费者访问该队列 nowait false 该方法需要应答确认

消费端在消费消息时,需要从消息中获取消息被消费的次数,以此判断该消息处理失败时重试还是发送到失败队列。

在消息发送到重试队列和失败队列时,我们在消息的headers中添加了一个名为 x-orig-routing-key的字段,该字段是实现消息重试的关键字段,由于我们的消息需要在不同的Exchange,Queue之间流转,为了避免消息在重新投递到主Exchange时,被所有的消费者队列重新消费,在重试过程中,我们将消息的routing-key修改为队列名称,直接投递给原始消费消息的队列。 x-orig-routing-key用于在之后能够重新获取到最开始的routing-key。

这里的重复消费是指 某个消息被两个消费方A和B消费了,其中A消费失败,B成功,这时候,消息由A消费者重新投递到主Exchange后,B消费队列也会获取到该消息,因此就会导致B消费者重复消费已经消费国的消息

本文实现延时重试,使用了三个重试Exchange,Exchange如果订阅特别多的话,Exchange的压力会非常大,因此在非常极端的情况下,消息大批量失败,且消息收发非常快,那么Exchange的性能可能会有问题。

本文是使用发布订阅实现延时重试的消息执行,也会有其他思路。

Original: https://www.cnblogs.com/liliuguang/p/15247947.html
Author: 李留广
Title: RabbitMQ 发布订阅-实现延时重试队列

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/540189/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • MyBatis复杂映射开发之一对多查询

    一对多查询模型 用户和订单表的关系为,一个用户有多个订单,一个订单只能属于一个用户。 一对多查询需求:查询多有用户,与此同时查询用户具有的订单信息。 一对多查询语句 对应的sql语…

    Java 2023年6月5日
    0189
  • 鸿蒙(HarmonyOS)开发笔记一:系统简介

    1. HarmonyOS是什么? 根据华为官方的说明,harmonyOS是分布式,提供新交互,新服务,万物互联的一款操作系统。下面简单介绍一下何为:新服务,新交互 基于harmon…

    Java 2023年6月16日
    099
  • spring cloud 服务链路追踪 skywalking 6.1

    随着微服务架构的流行,服务按照不同的维度进行拆分,一次请求往往需要涉及到多个服务。互联网应用构建在不同的软件模块集上,这些软件模块,有可能是由不同的团队开发、可能使用不同的编程语言…

    Java 2023年6月16日
    0177
  • 重构聚合支付案例教你如何写出高扩展性易读的代码

    人间清醒 以下代码逻辑为:按照不同的支付方式调用不同支付方式的逻辑流程。痛点: /** * 旧的支付 * * @param mode 模式 * @param payVO 支付签证官…

    Java 2023年6月5日
    085
  • 西门子PLC数据读取 Observer设计模式

    当我听到这个需求的时候,我差点爆粗口(实际上可能已经爆了,不过我忘了)。 需求刚开始是: C#连接PLC Modbus读取值。 我用C#写完了,觉得太简单了,还弄了个窗体。 接着是…

    Java 2023年6月9日
    072
  • ExceptionHandler配合RestControllerAdvice全局处理异常

    Java全局处理异常 引言 对于controller中的代码,为了保证其稳定性,我们总会对每一个controller中的代码进行try-catch,但是由于接口太多,try-cat…

    Java 2023年6月8日
    080
  • 并发编程之:线程

    大家好,我是小黑,一个在互联网苟且偷生的农民工。前段时间公司面试招人,发现好多小伙伴虽然已经有两三年的工作经验,但是对于一些Java基础的知识掌握的都不是很扎实,所以小黑决定开始跟…

    Java 2023年6月7日
    068
  • Linux 日志管理

    Linux 日志管理 日志文件是重要的系统信息文件,其中记录了许多重要的系统事件,包括用户的登录信息、系统的启动信息、系统的安全信息、邮件相关信息、各种服务相关信息等。 日志对于安…

    Java 2023年6月5日
    0108
  • Spring知识点总结3 Spring框架

    什么是 Spring 框架? Spring 是一款开源的轻量级 Java 开发框架,旨在提高开发人员的开发效率以及系统的可维护性。 我们一般说 Spring 框架指的都是 Spri…

    Java 2023年6月6日
    070
  • Seatunnel超高性能分布式数据集成平台使用体会

    @ 概述 定义 使用场景 特点 工作流程 连接器 转换 为何选择SeaTunnel 安装 下载 配置文件 部署模式 入门示例 启动脚本 配置文件使用参数示例 Kafka进Kafka…

    Java 2023年6月5日
    0125
  • NGINX中root和alias的区别

    nginx指定文件路径有两种方式root和alias。主要区别在于nginx如何解释location后面的uri,这会使两者分别以不同的方式将请求映射到服务器文件上。 locati…

    Java 2023年5月30日
    068
  • 正则 捕获组之反向引用

    之前写正则的时候,经常用到 (.*?) 之类的用法.一般在替换的时候会用 $1 来引用括号里面匹配到的内容比如, 1.1.1.1 aaaa 2.2.2.2 bbbb 3.3.2.3…

    Java 2023年6月16日
    096
  • Mysql学习笔记-临键锁实验

    前言昨天同事跟我聊到一个问题:InnoDB里面间隙锁锁住的数据可以update么?我们经常都说间隙锁是InnoDB在RR隔离级别下防止幻读的一种处理手段。它可以防止数据在间隙范围中…

    Java 2023年6月5日
    085
  • Django数据库–事务及事务回滚

    数据库的读写操作中,事务在保证数据的安全性和一致性方面起着关键的作用,而回滚正是这里面的核心操作。Django的ORM在事务方面也提供了不少的API。有事务出错的整体回滚操作,也有…

    Java 2023年6月13日
    069
  • opencv-python函数

    opencv-python读取、展示和存储图像 cv2.imshow(windows_name, image) 函数参数一: 窗口名称(字符串)函数参数二: 图像对象,类型是num…

    Java 2023年6月9日
    064
  • Effective Java 第三版——79. 避免过度同步

    Tips书中的源代码地址:https://github.com/jbloch/effective-java-3e-source-code注意,书中的有些代码里方法是基于Java 9…

    Java 2023年5月29日
    073
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球