RabbitMQ——死信队列(消息超时、达到最大长度、消费拒绝)(六)

RabbitMQ——死信队列(六)

死信的概念

死信:无法被消费的消息,一般情况下:生产者将消息投递到broker或者直接到queue中,消费者从queue取出消息进行消费,但是某些时候,由于特定原因导致queue中的某些消息无法被消费,这样的消息如果没有后续处理,就会成为死信消息,有了死信消息就产生了死信队列。
当消息消费发生异常时,将消息投入死信队列中。比如:用户在商城下单成功并点击去支付后,在指定时间未支付时自动失效。

死信的来源

1.消息TTL(存活时间)过期
2.队列达到最大长度(队列满了,无法再添加数据到mq中)
3.消息被拒绝(basic.reject或basic.nack)并且requeue = false(不放回队列中)

RabbitMQ------死信队列(消息超时、达到最大长度、消费拒绝)(六)
示例代码:
消费者1是最为重要的,需要声明交换机1,队列1,以及消费者2,队列2,并且需要当消息异常时,将死信消息转发到交换机2,再由交换机2转发到队列2.

消费者2只需要消费队列2的消息。
生产者需要向交换机1发送消息。
消费者1:
和以前不一样,这里需要在队列1,就是普通队列中,添加对死信的操作,因此为arguments参数进行了定义。

/**
 * 死信
 * 消费者01
 */
public class Consumer01 {
    //正常交换机
    public static  final String NORMAL_EXCHANGE = "normal_exchange";
    //死信交换机
    public static  final String DEAD_EXCHANGE = "dead_exchange";
    //正常队列
    public static  final String NORMAL_QUEUE = "normal_queue";
    //死信队列
    public static  final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtiles.getChannel();
        //声明普通交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        //声明死信交换机
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        //声明普通队列
        /**
         * 普通队列需要指定对应的参数
         * 才能够当消息成为死信后
         * 转发到死信交换机
         * 再发送到死信队列
         * arguments
         */
        Map<String, Object> arguments = new HashMap<>();
        //过期时间 10s  单位毫秒
        //也可以在生产者发送时,指定过期时间,一般使用生产者发送时指定
//        arguments.put("x-message-ttl",10000);
        //正常队列过期后设置死信交换机 x-dead-letter-exchange 固定格式
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //设置死信交换机的routingkey
        arguments.put("x-dead-letter-routing-key","roukey2");
        channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
        //绑定普通交换机与队列
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"roukey1");

        //声明死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
        //绑定死信交换机与死信队列
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"roukey2");

        //消息消费的时候如何处理消息
        DeliverCallback deliverCallback = (consumerTag, message)->{
            //消息有消息头、消息体
            System.out.println("消费者01接收到的消息"+new String(message.getBody(),"UTF-8"));
        };
        //取消消息时的回调
        CancelCallback cancelCallback = consumerTag->{
            System.out.println("消息消费被终断");
        };
        channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,cancelCallback);
    }
}

生产者:
生产者同样做了变动,设置了消息变为死信的过期时间,并且作为参数传递了进去。

public class ProducerLog {
    public static  final String EXCHANGE_NAME = "normal_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtiles.getChannel();
        //设置过期时间,变为死信 10s   单位毫秒
        AMQP.BasicProperties  properties = new AMQP.BasicProperties().

                builder().expiration("10000").

                build();
        for (int i = 0; i < 10; i++) {
            String message = "info:"+i;
            channel.basicPublish(EXCHANGE_NAME,"roukey1",properties,message.getBytes("UTF-8"));
        }

    }
}

消费者2
只消费死信队列中的消息

public class Consumer02 {
    //死信队列
    public static  final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtiles.getChannel();
        //消息消费的时候如何处理消息
        DeliverCallback deliverCallback = (consumerTag, message)->{
            //消息有消息头、消息体
            System.out.println("消费者02接收到的消息"+new String(message.getBody(),"UTF-8"));
        };
        //取消消息时的回调
        CancelCallback cancelCallback = consumerTag->{
            System.out.println("消息消费被终断");
        };
        channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);
    }
}

消息达到过期时间未被消费,变为死信,再被消费者2消费的场景

1.先启动消费者1,创建正常队列、死信队列、正常交换机、死信交换机
2.再关闭消费者1,交换机和队列创建后,依然存活,模拟消费者1,未能及时消费
3.启动生产者,生产者发送消息值正常交换机
4.现象:正常队列内信息条数为10,过了10秒后,变为0,死信队列消息条数由0变为10.

5.启动消费者2,死信队列消息被消费。

达到最大队列长度场景

需要在消费1,声明正常队列时,加入如下参数即可
设置最大长度为6,超过6个,多余的就为死信,进入死信队列。
生产者删除,最大存活时间的设置。
注,之前队列已经创建,属性改变,之前队列需要删除

        //设置正常队列长度的限制
        arguments.put("x-max—letter",6);

消息被拒绝场景

需要在消费者1接收消息的回调函数中,进行改造,增加拒绝操作

        //消息消费的时候如何处理消息
        DeliverCallback deliverCallback = (consumerTag, message)->{
            String mes = new String(message.getBody(), "UTF-8");
            if (mes.equals("info:5")){
                System.out.println("消费者01拒绝消息"+mes);
                /**
                 * 拒绝消息
                 * 消息的标签
                 * 是否放回原队列
                 */
                channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
            }else {
                /**
                 * 应答消息
                 * 消息的标签
                 * 是否批量应答
                 */
                channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
                //消息有消息头、消息体
                System.out.println("消费者01接收到的消息"+mes);
            }
        };

Original: https://blog.csdn.net/cz_chen_zhuo/article/details/127823306
Author: 诗与猿方
Title: RabbitMQ——死信队列(消息超时、达到最大长度、消费拒绝)(六)

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/660000/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球