RabbitMQ—–死信队列

1.什么是TTL?

a. time to live 消息存活时间

b. 如果消息在存活时间内未被消费,则会被清除

c. RabbitMQ支持两种ttl设置
  -单独消息进行配置ttl
  -整个队列进行配置ttl(居多)

2.什么是rabbitmq的死信队列?

没有被及时消费的消息存放的队列

3.什么是rabbitmq的死信交换机?

Dead Letter Exchange(死信交换机,缩写:DLX)当消息成为死信后,会被重新发送到另一个交换机,这个交换机就是DLX死信交换机

4.消息有哪几种情况成为死信?

a. 消费者拒收消息(basic.reject/ basic.nack),并且没有重新入队 requeue=false
b. 消息在队列中未被消费,且超过队列或者消息本身的过期时间TTL(time-to-live)
c. 队列的消息长度达到极限
结果:消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列

5.RabbitMQ管控台消息TTL测试

a. 队列过期时间使用参数,对整个队列消息统一过期
x-message-ttl
单位ms(毫秒)
b. 消息过期时间使用参数(如果队列头部消息未过期,队列中级消息已经过期,则消息会还在队列里面)
expiration
单位ms(毫秒)
c. 两者都配置的话,时间短的先触发

6.如图

RabbitMQ-----死信队列

7.什么是延迟队列?

种带有延迟功能的消息队列,Producer 将消息发送到消息队列 服务端,但并不期望这条消息立马投递,而是推迟到在当前时间点之后的某一个时间投递到 Consumer 进行消费,该消息即定时消息

8.使用场景

1. 通过消息触发一些定时任务,比如在某一固定时间点向用户发送提醒消息
b. 用户登录之后5分钟给用户做分类推送、用户多少天未登录给用户做召回推送;
c. 消息生产和消费有时间窗口要求:比如在天猫电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在30分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。 如支付未完成,则关闭订单。如已完成支付则忽略

9.业界的一些实现方式

a. 定时任务高精度轮训

b. 采用RocketMQ自带延迟消息功能

c. RabbitMQ本身是不支持延迟队列的,怎么办?
结合死信队列的特性,就可以做到延迟消息

10.代码

场景:

客户提交商品订单后,需要在30分钟内完成支付,如未完成,则发送消息提醒订单失败

a. rabbitmq配置类代码,配置普通/死信队列和交换机

RabbitMQ-----死信队列RabbitMQ-----死信队列
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * Rabbitmq配置类
 * 这里普通队列也可以叫它延时队列,是没有配置消费者(Listener)去监听的
 *
 * */
@Configuration
public class RabbitmqConfig {

    public static final String LOCK_MERCHANT_DEAD_EXCHANGE = "lock_merchant_dead_exchange";
    public static final String LOCK_MERCHANT_DEAD_QUEUE = "lock_merchant_dead_queue";
    public static final String LOCK_MERCHANT_DEAD_ROUTING_KEY = "lock_merchant_dead_routing_key";
    public static final String NEW_MERCHANT_EXCHANGE = "new_merchant_exchange";
    public static final String NEW_MERCHANT_QUEUE= "new_merchant_queue";
    public static final String NEW_MERCHANT_ROUTING_KEY = "new_merchant.#";

    /**
     * 死信交换机(topic模式)
     *
     * */
    @Bean
    public Exchange lockMerchantDeadExchange(){
        //durable: 是否持久化, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失,
        // 如果想重启之后还存在就要使队列持久化,保存到Erlang自带的Mnesia数据库中,
        // 当rabbitmq重启之后会读取该数据库
        return ExchangeBuilder.topicExchange(LOCK_MERCHANT_DEAD_EXCHANGE).durable(true).build();
    }

    /**
     * 死信队列
     *
     * */
    @Bean
    public Queue lockMerchantDeadQueue() {
        return QueueBuilder.durable(LOCK_MERCHANT_DEAD_QUEUE).build();
    }

    /**
     * 绑定死信交换机和死信队列
     * 这里不加@Qualifier的话会报错:there is more than one bean of "xxx" type
     * 因为死信交换机和普通交换机都配置了Exchange, 无法区分哪种作为参数
     * Queue同理
     *
     * */
    @Bean
    public Binding lockMerchantDeadBinding(@Qualifier("lockMerchantDeadExchange") Exchange exchange, @Qualifier("lockMerchantDeadQueue") Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with(LOCK_MERCHANT_DEAD_ROUTING_KEY).noargs();
    }

    /**
     * 普通交换机(topic模式)
     *
     * */
    @Bean
    public Exchange newMerchantExchange(){
        //durable: 是否持久化, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失,
        // 如果想重启之后还存在就要使队列持久化,保存到Erlang自带的Mnesia数据库中,
        // 当rabbitmq重启之后会读取该数据库
        return ExchangeBuilder.topicExchange(NEW_MERCHANT_EXCHANGE).durable(true).build();
    }

    /**
     * 普通队列
     *
     * */
    @Bean
    public Queue newMerchantQueue() {
        Map args = new HashMap<>();
        //消息过期后,进入死信交换机
        args.put("x-dead-letter-exchange", LOCK_MERCHANT_DEAD_EXCHANGE);
        //消息过期后,进入死信交换机的路由键
        args.put("x-dead-letter-routing-key", LOCK_MERCHANT_DEAD_ROUTING_KEY);
        //消息过期时间 单位:毫秒 消息过期后,会从普通队列转入死信队列
        //这里方便测试设置10秒后消息过期
        args.put("x-message-ttl",10000);
        return QueueBuilder.durable(NEW_MERCHANT_QUEUE).withArguments(args).build();
    }

    /**
     * 绑定普通交换机和普通队列
     *
     * */
    @Bean
    public Binding newMerchantBinding(){
        return new Binding(NEW_MERCHANT_QUEUE, Binding.DestinationType.QUEUE,
                NEW_MERCHANT_EXCHANGE, NEW_MERCHANT_ROUTING_KEY, null);
    }
}

View Code

b. 监听死信队列代码

RabbitMQ-----死信队列RabbitMQ-----死信队列
import com.rabbitmq.client.Channel;
import com.theng.shopuser.config.RabbitmqConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * 消费者监听死信队列
 *
 */
@Component
@RabbitListener(queues = RabbitmqConfig.LOCK_MERCHANT_DEAD_QUEUE)
public class OrderMQListener {

    /**
     * body: 接收convertAndSend(String exchange, String routingKey, Object object)的object消息
     *
     * */
    @RabbitHandler
    public void messageHandler(String body, Message message, Channel channel) throws IOException {

        long msgTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("body: " + body);
        System.out.println("msgTag: " + msgTag);
        System.out.println("message: " + message.toString());

        //30分钟后,从body中获取买家信息再从数据库查询抢购到的商品订单是否处理 TODO
        //如果没有处理,则向商家发送提醒消息 TODO

        //告诉broker(消息队列服务器实体),消息已经被确认
        channel.basicAck(msgTag, false);
        //告诉broker,消息拒绝确认(可以拒绝多条,把比当前msgTag值小的也拒绝)
//        channel.basicNack(msgTag, false, true);
        //告诉broker,消息拒绝确认(只能拒绝当前msgTag的这条)
//        channel.basicReject(msgTag, true);
    }
}

View Code

c. application.ym配置文件

RabbitMQ-----死信队列RabbitMQ-----死信队列
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: 123456
    #虚拟主机 可在http://localhost:15672管理平台进行配置
    virtual-host: /dev
    #开启消息二次确认ConfirmCallback配置
    publisher-confirms: true
    #开启ReturnCallback配置
    publisher-returns: true
    #修改交换机改投消息递到队列失败策略
    #true:交换机处理消息到队列失败,则返回给生产者
    #和publisher-returns配合使用
    template:
      mandatory: true
    #消息手工确认ack
    listener:
      simple:
        acknowledge-mode: manual

View Code

d. 控制器代码

RabbitMQ-----死信队列RabbitMQ-----死信队列
@RestController
@RequestMapping("/user-info")
public class UserInfoController {

    @Autowired
    public RedisTemplate redisTemplate;

    //消息生产者
    @GetMapping("/send")
    public Object testSend(){
        //object可存储买家信息
        rabbitTemplate.convertAndSend(RabbitmqConfig.NEW_MERCHANT_EXCHANGE, "new_merchant.create", "买家抢购成功,请及时处理订单!");

        Map map = new HashMap<>();
        map.put("code", 0);
        map.put("msg", "买家抢购成功,请在30分钟内提交订单!");
        return "success";
    }
}

View Code

结果:

生产者发送消息10秒后,消息会进入死信交换机,通过死信队列将订单过期消息发送给消费者

Original: https://www.cnblogs.com/tianhengblogs/p/15384506.html
Author: 玉天恒
Title: RabbitMQ—–死信队列

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/540241/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • 为什么重写equals必须重写hashCode

    equals常见面试题 为什么要重写equals 重写equals不重写hashCode会存在什么问题 总结 equals常见面试题 在开始聊之前,我们先看几个常见的面试题,看看你…

    Java 2023年6月5日
    080
  • SpringBoot笔记(-):入门

    Spring Boot 是由 Pivotal 团队提供的全新框架,其设计目的为了简化Spring的配置,核心思想是约定大于配置。在开发一个Web项目中只需要引入相关的依赖,而不需要…

    Java 2023年6月7日
    073
  • 小数点的几种精确方法

    小数点的几种精确方法 1.直接用格式化输出String.format() double b=123.4; System.out.println(String.format(&quo…

    Java 2023年6月5日
    072
  • 【设计模式】Java设计模式-建造者模式

    【设计模式】Java设计模式 – 建造者模式 😄 不断学习才是王道🔥 继续踏上学习之路,学之分享笔记👊 总有一天我也能像各位大佬一样🏆 @一个有梦有戏的人 @怒放吧德德…

    Java 2023年6月16日
    079
  • java实现文件加解密方案

    0、前序 上传加密:客户端上传文件,后台加密,将加密后的文件存储到文件服务器 下载解密:客户端请求文件,后台从文件服务器获取密文,解密返回客户端 注:对文件服务器上存储的文件需要做…

    Java 2023年6月7日
    093
  • java并发和排序的简单例子(Runnable+TreeSet)

    很多时候并发需要考虑线程安全,但也有很多时候和线程安全毛关系都没有,因为并发最大的作用是并行,线程安全仅仅是并发的一个子话题。 例如常常会用于并发运算,并发i/o。 下文是一个练习…

    Java 2023年6月9日
    083
  • Javafx-【直方图】文本频次统计工具 中文/英文单词统计

    上周倒腾了下 javafx,本来是做平时成绩系统。跟老师提了一下 javafx,他突然兴起,发了个统计中文和英文单词并以直方图显示的实验……只给两三天的期限…

    Java 2023年6月5日
    0108
  • 在ASP.NET 中调用RSACryptoServiceProvider失败,提示未找到文件

    CspParameters RSAParams = new CspParameters(); RSAParams.Flags = CspProviderFlags.UseMachi…

    Java 2023年6月14日
    056
  • Leetcode栈&队列

    Leetcode栈&队列 题干: 思路: 栈是 FILO,队列是 FIFO,所以如果要用栈实现队列,目的就是要栈实现一个 FIFO的特性。 具体实现方法可以理解为,准备两个…

    Java 2023年6月7日
    086
  • Spring Boot 请求返回字符串中文乱码

    新搭建的spring boot的项目,后台返回的String数据到浏览器时中文乱码 1.在返回有乱码的controller 的RequestMapping里增加 produces …

    Java 2023年5月30日
    081
  • java中的定时任务

    java中的定时任务, 使用java实现有3种方式: 1, 使用普通thread实现 2, 使用timer实现: 可控制启动或取消任务, 可指定第一次执行的延迟 线程安全, 但只会…

    Java 2023年5月29日
    073
  • 15、lock锁

    1、lock是显示锁,手动开启和关闭,synchronized是隐式锁,出了作用域自动释放。 2、lock只有代码块锁,synchronized有代码锁和方法锁 3、使用lock锁…

    Java 2023年6月8日
    086
  • Java学习-第一部分-第三阶段-第一节:网络编程

    网络编程 笔记目录:(https://www.cnblogs.com/wenjie2000/p/16378441.html) 网络基础 网络通信 概念:两台设备之间通过网络实现数据…

    Java 2023年6月15日
    060
  • MySQL性能优化方法和实践

    前置准备 硬件优化 查询缓存 查询缓存不命中的情况 查询缓存的代价 表结构设计 数据类型 整数类型 实数类型 字符串类型 (**)BLOB和TEXT —— 存储大的数据 DATET…

    Java 2023年6月9日
    0102
  • Nginx $remote_addr和$proxy_add_x_forwarded_for变量详解

    $remote_addr 代表客户端IP。注意,这里的客户端指的是直接请求Nginx的客户端,非间接请求的客户端。假设用户请求过程如下: &#x7528;&#x62…

    Java 2023年6月16日
    077
  • springboot的定时任务使用

    二、动态:基于接口 基于接口(SchedulingConfigurer) 1、导入依赖包: org.springframework.boot spring-boot-starter…

    Java 2023年5月29日
    0154
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球