延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
需求:
下单后,30分钟未支付,取消订单,回滚库存。
新用户注册成功7天后,发送短信问候。
实现方式:
对于上面两种需求,一般有两种实现方式:
定时器:设置一个轮询时间,间隔一段时间对数据库进行扫描对比,当符合定时的数据则进行处理;
缺点:
延迟队列:
通过延迟队列控制消息,不会对数据库多次扫描,只有当消息达到了一定的时间,才发送至消费端处理即可,非常优雅!
很可惜,在RabbitMQ中并未提供延迟队列功能。
但是可以使用:TTL+死信队列 组合实现延迟队列的效果。
下面我们就采用 TTL+死信队列 的组合实现延迟队列的功能。
<!--
延迟队列:
1. 定义正常交换机(order_exchange)和队列(order_queue)
2. 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)
3. 绑定,设置正常队列过期时间为30分钟
-->
<!-- 1. 定义正常交换机(order_exchange)和队列(order_queue)-->
<rabbit:queue id='order_queue' name='order_queue'>
<!-- 3. 绑定,设置正常队列过期时间为30分钟-->
<rabbit:queue-arguments>
<!-- 转发死信交换机 -->
<entry key='x-dead-letter-exchange' value='order_exchange_dlx' />
<!-- 转发死信的routingkey -->
<entry key='x-dead-letter-routing-key' value='dlx.order.cancel' />
<!-- 设置 TTL 10秒,模拟30分钟-->
<entry key='x-message-ttl' value='10000' value-type='java.lang.Integer' />
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange name='order_exchange'>
<rabbit:bindings>
<rabbit:binding pattern='order.#' queue='order_queue'></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!-- 2. 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)-->
<rabbit:queue id='order_queue_dlx' name='order_queue_dlx'></rabbit:queue>
<rabbit:topic-exchange name='order_exchange_dlx'>
<rabbit:bindings>
<rabbit:binding pattern='dlx.order.#' queue='order_queue_dlx'></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
@Test
public void testDelay() throws InterruptedException {
//1.发送订单消息。 将来是在订单系统中,下单成功后,发送消息
rabbitTemplate.convertAndSend('order_exchange','order.msg','订单信息:id=1,time=2022年03月06日16:41:47');
//2.打印倒计时10秒
for (int i = 0; i < 10; i++) {
System.out.println('倒计数: ' + i);
Thread.sleep(1000);
}
}
执行之后,从控制面板可以看到消息已经进入了正常队列:
等待过期时间过后,消息未消费则自动进入死信队列:
从死信队列中查看消息,确认消息内容:
package com.lijw.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
/**
* @author Aron.li
* @date 2022/3/4 23:36
*/
@Component
public class OrderListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
//1. 获取传递的标签,用于消息确认
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//2. 接收消息
System.out.println(new String(message.getBody()));
//3. 处理业务逻辑
System.out.println('处理业务逻辑...');
System.out.println('根据订单id查询其状态...');
System.out.println('判断状态是否为支付成功');
System.out.println('取消订单,回滚库存....');
//4. 手动签收
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
//e.printStackTrace();
//5. 拒绝签收, 设置不重复队列
/*
第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
*/
channel.basicNack(deliveryTag, true, false);
}
}
}
<!-- 定义监听器与队列的绑定 -->
<rabbit:listener-container connection-factory='connectionFactory' acknowledge='manual' prefetch='1'>
<!--延迟队列效果实现: 一定要监听的是 死信队列!!!-->
<rabbit:listener ref='orderListener' queue-names='order_queue_dlx'/>
</rabbit:listener-container>
package com.lijw;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* @author Aron.li
* @date 2022/3/4 23:41
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = 'classpath:spring-rabbitmq-consumer.xml')
public class ConsumerTest {
@Test
public void test01() {
while (true) {
}
}
}
@Test
public void testDelay() throws InterruptedException {
//1.发送订单消息。 将来是在订单系统中,下单成功后,发送消息
rabbitTemplate.convertAndSend('order_exchange','order.msg','订单信息:id=1,time=2022年03月06日16:41:47');
//2.打印倒计时10秒
for (int i = 0; i < 10; i++) {
System.out.println('倒计数: ' + i);
Thread.sleep(1000);
}
}
执行发送订单消息,倒计时10秒:
10秒结束后,消费者监听器接收到订单信息:
延迟队列 指消息进入队列后,可以被延迟一定时间,再进行消费。
RabbitMQ没有提供延迟队列功能,但是可以使用 :TTL + DLX 来实现延迟队列效果。
联系客服