打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
15-RabbitMQ高级特性-死信队列

死信队列

死信队列,英文缩写:DLX  。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。

消息成为死信的三种情况:

  1. 队列消息长度到达限制;

  2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;

  3. 原队列存在消息过期设置,消息到达超时时间未被消费;

队列绑定死信交换机:

给队列设置参数:x-dead-letter-exchange 和 x-dead-letter-routing-key

案例

1. 需求

下面我们编写生产者、消费者,根据以下三种情况进行演示。

消息成为死信的三种情况:

  1. 队列消息长度到达限制;

  2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;

  3. 原队列存在消息过期设置,消息到达超时时间未被消费;

2.【生产者】配置正常的交换机、队列和死信交换机、队列

 <!--
     死信队列:
         1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)
         2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
         3. 正常队列绑定死信交换机
             设置两个参数:
                 * x-dead-letter-exchange:死信交换机名称
                 * x-dead-letter-routing-key:发送给死信交换机的routingkey
 -->


 <!--
     1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)
 -->

 <rabbit:queue name='test_queue_dlx' id='test_queue_dlx'>
     <!--3. 正常队列绑定死信交换机-->
     <rabbit:queue-arguments>
         <!--3.1 x-dead-letter-exchange:死信交换机名称-->
         <entry key='x-dead-letter-exchange' value='exchange_dlx' />

         <!--3.2 x-dead-letter-routing-key:发送给死信交换机的routingkey, value只要符合死信交换机的routingkey规则-->
         <entry key='x-dead-letter-routing-key' value='dlx.hehe' />

         <!--4.1 设置队列的过期时间 ttl-->
         <entry key='x-message-ttl' value='10000' value-type='java.lang.Integer' />
         <!--4.2 设置队列的长度限制 max-length -->
         <entry key='x-max-length' value='10' value-type='java.lang.Integer' />
     </rabbit:queue-arguments>
 </rabbit:queue>
 <rabbit:topic-exchange name='test_exchange_dlx'>
     <rabbit:bindings>
         <rabbit:binding pattern='test.dlx.#' queue='test_queue_dlx'></rabbit:binding>
     </rabbit:bindings>
 </rabbit:topic-exchange>


 <!--
    2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
-->

 <rabbit:queue name='queue_dlx' id='queue_dlx'></rabbit:queue>
 <rabbit:topic-exchange name='exchange_dlx'>
     <rabbit:bindings>
         <rabbit:binding pattern='dlx.#' queue='queue_dlx'></rabbit:binding>
     </rabbit:bindings>
 </rabbit:topic-exchange>

3.【生产者】编写发送消息至正常队列,验证:待过期未被消费,自动进入死信队列中

/**
 * 发送测试死信消息:
 *  1. 过期时间
 *  2. 长度限制
 *  3. 消息拒收
 */

@Test
public void testDLX(){
    //1. 测试过期时间:当消息在正常队列 test_queue_dlx 时间过期未被消费 --自动进入--> 死信队列 queue_dlx
    rabbitTemplate.convertAndSend('test_exchange_dlx','test.dlx.hahahah''这将会成为一条死信消息嘛?');
}

执行之后,从控制面板可以看到消息已经进入了正常队列

等待过期时间过后,消息未消费则自动进入死信队列:

死信队列中查看消息,确认消息内容:

4.【生产者】编写发送20条消息至正常队列,验证:当消息超出正常队列限制,自动进入死信队列中

    /**
     * 发送测试死信消息:
     *  1. 过期时间
     *  2. 长度限制
     *  3. 消息拒收
     */

    @Test
    public void testDLX(){
        //2. 测试长度限制:当消息超出正常队列的最大长度10,那么后续的消息则自动转入 --> 死信队列 queue_dlx
        for (int i = 0; i < 20; i++) {
            rabbitTemplate.convertAndSend('test_exchange_dlx','test.dlx.hahahah''这将会成为一条死信消息嘛?');
        }
    }

执行之后,查询管理页面可以发现,正常队列只有10条消息,超出正常队列长度限制的10条消息自动进入死信队列:

等待正常队列的消息过期后,全部进入死信队列:

5.验证消费者拒绝签收消息,并且设置不重回队列的情况:该情况的消息自动进入死信队列

5.1【消费者】编写监听器类,拒绝签收消息

package com.lijw.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;


/**
 * @author Aron.li
 * @date 2022/3/4 23:36
 */

@Component
public class DlxListener implements ChannelAwareMessageListener {

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        //1. 获取传递的标签,用于消息确认
        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        try {
            //2. 接收消息
            System.out.println(new String(message.getBody()));

            //3. 处理业务逻辑
            System.out.println('处理业务逻辑...');
            int i = 3 / 0// 产生异常

            //4. 手动签收
            channel.basicAck(deliveryTag, true);
        } catch (Exception e) {
            //5. 拒绝签收, 设置不重复队列
            /*
            第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
             */

            channel.basicNack(deliveryTag, truefalse);
        }
    }
}

5.2 【消费者】配置监听器 监听正常队列,进行消息拒绝签收处理

    <!--  定义监听器与队列的绑定  -->
    <rabbit:listener-container connection-factory='connectionFactory' acknowledge='manual' prefetch='1'>
        <rabbit:listener ref='dlxListener' queue-names='test_queue_dlx'/>
    </rabbit:listener-container>

5.3【消费者】启动测试代码,启动Spring框架,开启监听器

package com.lijw;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

/**
 * @author Aron.li
 * @date 2022/3/4 23:41
 */

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations 
'classpath:spring-rabbitmq-consumer.xml')
public class ConsumerTest {

    @Test
    public void test01() {
        while (true) {

        }
    }

}

5.4 【生产者】发送一条消息,验证:消息 --> 正常队列 --> 消费者拒签 --> 死信队列

    /**
     * 发送测试死信消息:
     *  1. 过期时间
     *  2. 长度限制
     *  3. 消息拒收
     */

    @Test
    public void testDLX(){
        //3. 测试消息被消费者拒收:自动进入 --> 死信队列 queue_dlx
        rabbitTemplate.convertAndSend('test_exchange_dlx','test.dlx.hahahah''这将会成为一条死信消息嘛?');

    }

执行发送消息之后,消费者监听器获取到消息如下:

此时消费者拒绝签收消息,消息直接进入死信队列:

小结

  1. 死信交换机和死信队列和普通的没有区别

  2. 当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列

  3. 消息成为死信的三种情况:

  4. 队列消息长度到达限制;

  5. 消费者拒接消费消息,并且不重回队列;

  6. 原队列存在消息过期设置,消息到达超时时间未被消费;

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
非常强悍的 RabbitMQ 总结,写得真好!
MQ在高并发环境下,如果队列满了,如何防止消息丢失?
RabbitMQ 实现延时队列
RabbitMQ如何实现延迟队列
面试官:说说RabbitMQ 消费端限流、TTL、死信队列
RabbitMQ 入门教程(四)
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服