打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
16-RabbitMQ高级特性-延迟队列

延迟队列

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

需求:

  1. 下单后,30分钟未支付,取消订单,回滚库存。

  2. 新用户注册成功7天后,发送短信问候。

实现方式:

对于上面两种需求,一般有两种实现方式:

  1. 定时器
  2. 延迟队列
  • 定时器:设置一个轮询时间,间隔一段时间对数据库进行扫描对比,当符合定时的数据则进行处理;

    缺点:

    • 不优雅,因为不管设置多少间隔时间,都会对数据库产生多次扫描的执行,影响性能;
    • 而且间隔的时间范围对具体时间点存在一定的误差,可能没有扫描到,例如:间隔时间设置为1分钟,那么订单可能在29分或者31分钟几秒,那么则扫描不到,这样就会影响用户体验。
  • 延迟队列:

    通过延迟队列控制消息,不会对数据库多次扫描,只有当消息达到了一定的时间,才发送至消费端处理即可,非常优雅!

很可惜,在RabbitMQ中并未提供延迟队列功能。

但是可以使用:TTL+死信队列 组合实现延迟队列的效果。

案例

下面我们就采用 TTL+死信队列 的组合实现延迟队列的功能。

1.【生产者】配置正常的交换机、队列和死信交换机、队列

<!--
    延迟队列:
        1. 定义正常交换机(order_exchange)和队列(order_queue)
        2. 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)
        3. 绑定,设置正常队列过期时间为30分钟
-->

<!-- 1. 定义正常交换机(order_exchange)和队列(order_queue)-->
<rabbit:queue id='order_queue' name='order_queue'>
    <!-- 3. 绑定,设置正常队列过期时间为30分钟-->
    <rabbit:queue-arguments>
        <!-- 转发死信交换机 -->
        <entry key='x-dead-letter-exchange' value='order_exchange_dlx' />
        <!-- 转发死信的routingkey -->
        <entry key='x-dead-letter-routing-key' value='dlx.order.cancel' />
        <!-- 设置 TTL 10秒,模拟30分钟-->
        <entry key='x-message-ttl' value='10000' value-type='java.lang.Integer' />
    </rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange name='order_exchange'>
    <rabbit:bindings>
        <rabbit:binding pattern='order.#' queue='order_queue'></rabbit:binding>
    </rabbit:bindings>
</rabbit:topic-exchange>

<!--  2. 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)-->
<rabbit:queue id='order_queue_dlx' name='order_queue_dlx'></rabbit:queue>
<rabbit:topic-exchange name='order_exchange_dlx'>
    <rabbit:bindings>
        <rabbit:binding pattern='dlx.order.#' queue='order_queue_dlx'></rabbit:binding>
    </rabbit:bindings>
</rabbit:topic-exchange>

2.【生产者】编写发送订单消息至正常队列,验证:TTL过期后,自动进入死信队列中

    @Test
    public void testDelay() throws InterruptedException {
        //1.发送订单消息。 将来是在订单系统中,下单成功后,发送消息
        rabbitTemplate.convertAndSend('order_exchange','order.msg','订单信息:id=1,time=2022年03月06日16:41:47');
        
        //2.打印倒计时10秒
        for (int i = 0; i < 10; i++) {
            System.out.println('倒计数: ' + i);
            Thread.sleep(1000);
        }
    }

执行之后,从控制面板可以看到消息已经进入了正常队列

等待过期时间过后,消息未消费则自动进入死信队列:

死信队列中查看消息,确认消息内容:

3.【消费者】编写监听器类,接收延迟的订单消息

package com.lijw.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;


/**
 * @author Aron.li
 * @date 2022/3/4 23:36
 */

@Component
public class OrderListener implements ChannelAwareMessageListener {

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        //1. 获取传递的标签,用于消息确认
        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        try {
            //2. 接收消息
            System.out.println(new String(message.getBody()));

            //3. 处理业务逻辑
            System.out.println('处理业务逻辑...');
            System.out.println('根据订单id查询其状态...');
            System.out.println('判断状态是否为支付成功');
            System.out.println('取消订单,回滚库存....');

            //4. 手动签收
            channel.basicAck(deliveryTag, true);
        } catch (Exception e) {
            //e.printStackTrace();

            //5. 拒绝签收, 设置不重复队列
            /*
            第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
             */

            channel.basicNack(deliveryTag, truefalse);
        }
    }
}

4.【消费者】配置监听器 监听死信队列,进行消费延迟订单信息

<!--  定义监听器与队列的绑定  -->
<rabbit:listener-container connection-factory='connectionFactory' acknowledge='manual' prefetch='1'>
    <!--延迟队列效果实现:  一定要监听的是 死信队列!!!-->
    <rabbit:listener ref='orderListener' queue-names='order_queue_dlx'/>
</rabbit:listener-container>

5.【消费者】启动测试代码,启动Spring框架,开启监听器

package com.lijw;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

/**
 * @author Aron.li
 * @date 2022/3/4 23:41
 */

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations 
'classpath:spring-rabbitmq-consumer.xml')
public class ConsumerTest {

    @Test
    public void test01() {
        while (true) {

        }
    }

}

6. 【生产者】发送一条消息,验证:消息 --> 正常队列 TTL过期 --> 死信队列 --> 消费订单信息

@Test
public void testDelay() throws InterruptedException {
    //1.发送订单消息。 将来是在订单系统中,下单成功后,发送消息
    rabbitTemplate.convertAndSend('order_exchange','order.msg','订单信息:id=1,time=2022年03月06日16:41:47');

    //2.打印倒计时10秒
    for (int i = 0; i < 10; i++) {
        System.out.println('倒计数: ' + i);
        Thread.sleep(1000);
    }
}

执行发送订单消息,倒计时10秒:

10秒结束后,消费者监听器接收到订单信息:

小结

  1. 延迟队列 指消息进入队列后,可以被延迟一定时间,再进行消费。

  2. RabbitMQ没有提供延迟队列功能,但是可以使用 :TTL + DLX 来实现延迟队列效果。

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
非常强悍的 RabbitMQ 总结,写得真好!
RabbitMQ 实现延时队列
RabbitMQ如何实现延迟队列
面试官:说说RabbitMQ 消费端限流、TTL、死信队列
6种延时队列的实现方案
RabbitMQ的应用场景以及基本原理介绍
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服