死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。
消息成为死信的三种情况:
队列消息长度到达限制;
消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
原队列存在消息过期设置,消息到达超时时间未被消费;
队列绑定死信交换机:
给队列设置参数:x-dead-letter-exchange 和 x-dead-letter-routing-key
下面我们编写生产者、消费者,根据以下三种情况进行演示。
消息成为死信的三种情况:
队列消息长度到达限制;
消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
原队列存在消息过期设置,消息到达超时时间未被消费;
<!--
死信队列:
1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)
2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
3. 正常队列绑定死信交换机
设置两个参数:
* x-dead-letter-exchange:死信交换机名称
* x-dead-letter-routing-key:发送给死信交换机的routingkey
-->
<!--
1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)
-->
<rabbit:queue name='test_queue_dlx' id='test_queue_dlx'>
<!--3. 正常队列绑定死信交换机-->
<rabbit:queue-arguments>
<!--3.1 x-dead-letter-exchange:死信交换机名称-->
<entry key='x-dead-letter-exchange' value='exchange_dlx' />
<!--3.2 x-dead-letter-routing-key:发送给死信交换机的routingkey, value只要符合死信交换机的routingkey规则-->
<entry key='x-dead-letter-routing-key' value='dlx.hehe' />
<!--4.1 设置队列的过期时间 ttl-->
<entry key='x-message-ttl' value='10000' value-type='java.lang.Integer' />
<!--4.2 设置队列的长度限制 max-length -->
<entry key='x-max-length' value='10' value-type='java.lang.Integer' />
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange name='test_exchange_dlx'>
<rabbit:bindings>
<rabbit:binding pattern='test.dlx.#' queue='test_queue_dlx'></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!--
2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
-->
<rabbit:queue name='queue_dlx' id='queue_dlx'></rabbit:queue>
<rabbit:topic-exchange name='exchange_dlx'>
<rabbit:bindings>
<rabbit:binding pattern='dlx.#' queue='queue_dlx'></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
/**
* 发送测试死信消息:
* 1. 过期时间
* 2. 长度限制
* 3. 消息拒收
*/
@Test
public void testDLX(){
//1. 测试过期时间:当消息在正常队列 test_queue_dlx 时间过期未被消费 --自动进入--> 死信队列 queue_dlx
rabbitTemplate.convertAndSend('test_exchange_dlx','test.dlx.hahahah', '这将会成为一条死信消息嘛?');
}
执行之后,从控制面板可以看到消息已经进入了正常队列:
等待过期时间过后,消息未消费则自动进入死信队列:
从死信队列中查看消息,确认消息内容:
/**
* 发送测试死信消息:
* 1. 过期时间
* 2. 长度限制
* 3. 消息拒收
*/
@Test
public void testDLX(){
//2. 测试长度限制:当消息超出正常队列的最大长度10,那么后续的消息则自动转入 --> 死信队列 queue_dlx
for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend('test_exchange_dlx','test.dlx.hahahah', '这将会成为一条死信消息嘛?');
}
}
执行之后,查询管理页面可以发现,正常队列只有10条消息,超出正常队列长度限制的10条消息自动进入死信队列:
等待正常队列的消息过期后,全部进入死信队列:
package com.lijw.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
/**
* @author Aron.li
* @date 2022/3/4 23:36
*/
@Component
public class DlxListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
//1. 获取传递的标签,用于消息确认
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//2. 接收消息
System.out.println(new String(message.getBody()));
//3. 处理业务逻辑
System.out.println('处理业务逻辑...');
int i = 3 / 0; // 产生异常
//4. 手动签收
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
//5. 拒绝签收, 设置不重复队列
/*
第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
*/
channel.basicNack(deliveryTag, true, false);
}
}
}
<!-- 定义监听器与队列的绑定 -->
<rabbit:listener-container connection-factory='connectionFactory' acknowledge='manual' prefetch='1'>
<rabbit:listener ref='dlxListener' queue-names='test_queue_dlx'/>
</rabbit:listener-container>
package com.lijw;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* @author Aron.li
* @date 2022/3/4 23:41
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = 'classpath:spring-rabbitmq-consumer.xml')
public class ConsumerTest {
@Test
public void test01() {
while (true) {
}
}
}
/**
* 发送测试死信消息:
* 1. 过期时间
* 2. 长度限制
* 3. 消息拒收
*/
@Test
public void testDLX(){
//3. 测试消息被消费者拒收:自动进入 --> 死信队列 queue_dlx
rabbitTemplate.convertAndSend('test_exchange_dlx','test.dlx.hahahah', '这将会成为一条死信消息嘛?');
}
执行发送消息之后,消费者监听器获取到消息如下:
此时消费者拒绝签收消息,消息直接进入死信队列:
死信交换机和死信队列和普通的没有区别
当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列
消息成为死信的三种情况:
队列消息长度到达限制;
消费者拒接消费消息,并且不重回队列;
原队列存在消息过期设置,消息到达超时时间未被消费;
联系客服