打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
RabbitMQ 入门教程(四)

RabbitMQ 中TTL消息介绍

1. 前言

Hello,大家好。本小节会为同学们介绍 RabbitMQ 中自带的另一种机制,就是 TTL 消息,以及 TTL 消息队列。TTL 消息和 TTL 消息队列是我们实际工作中使用频率较高的一种机制,这种机制规定了在特定条件下,消息的有效时间,通过应用这种机制,我们可以很灵活地对消息进行控制。

TTL 消息以及 TTL 消息队列这种机制,一般不会单独使用,在实际工作中都会搭配死信队列一并使用,那么在本节中,会首先为同学们介绍何为 TTL 消息以及 TTL 消息队列,在下一节就会为同学们介绍什么是死信队列了。

话不多说,就让我们先来了解一下什么是 TTL 消息吧。

本节主要内容:

  • TTL 消息概述;

2. TTL 消息概述

基础概念:

TTL,全称为 Time to Live ,即生存时间。 说到这个 TTL ,想必大家都有所接触,这个 TTL 并不是 RabbitMQ 中独有的特性,我们在进行应用程序开发时,我们使用的其他工具中,也具备这个生存时间的概念,只不过描述的可能不一样,但是含义相同。那么在 RabbitMQ 中,TTL 描述的又是什么呢?

在 RabbitMQ 中,TTL 这一概念是作用于消息和消息队列上,即为消息以及消息队列规定了一个生存时间,当消息或消息队列的生存时间超过了 TTL 所规定的生存时间之后,消息就会失效,且不会被消费。

其中,对于消息来说,一旦消息的生存时间超过了 TTL 所规定的消息生存时间,那么,这条消息会立即失效,并且不会被任何消费者消费,且会变成一种死信,并最终会被 RabbitMQ 放入死信队列中(相关概念下节介绍,下同)。

而对于消息队列来说,如果消息队列的生存时间超过了 TTL 所规定的消息队列的生存时间,那么消息队列会立即失效,且该消息队列中的消息也会随着消息队列的失效而失效。 这就提醒我们,在对应用程序中的消息进行操作时,可以根据业务需要来设置专门一条消息的生存时间,也可以设置同一消息类型的消息队列的生存时间,以灵活控制消息的有效期限。

从消费者的角度来说,我们也可以这样理解:在消息被设置了 TTL 之后,如果这个消息的等待时间超过了 TTL ,则这个消息就不会被任何消费者消费;从生产者的角度来说,生产者发送了一条消息到 RabbitMQ Server 中,且已经设置了 TTL ,如果这个消息的等待时间超过了 TTL ,即使有消费者来接收消息,这个消息也不会被接收,同时,生产者也不会再次发送相同的消息。

在了解了 TTL 的基础概念之后,我们还需要了解如何通过代码,来配置 TTL 消息。

代码实现:

配置 TTL ,需要我们在生产端进行配置,代码如下:

ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("xx");
connectionFactory.setPort("5672");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChanel();
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
        .deliveryMode(2)
        .expiration("10000")
        .contentEncoding("UTF-8")
        .build();
channel.basicPublish(exchangeName, routingKey, true, properties, msg.getBytes());

代码解释:

第 1-5 行,我们使用 ConnectionFactory 创建了一个客户端连接 RabbitMQ Server 的连接。

第 6 行,我们使用建立好的连接,来创建了一个频道 channel 。

第 7-12 行,我们使用 AMQP.BasicProperties 来初始化了一个具体消息的 properties 参数,初始化 properties 的过程采用了 AMQP 的调用链模式,即通过 AMQP.BasicProperties.Builder() 来对 properties 进行初始化。

其中,deliveryMode 表示 properties 的投递模式,2 表示持久化投递,即 RabbitMQ Server 重启之后依然有效;contentEncoding 表示 properties 的编码模式,这里是支持中文的 UTF-8 格式;expiration 属性就是来对当前的消息设置生存时间的属性,其单位为毫秒,这里设置的 TTL 为 10 秒, 当我们对 properties 的属性设置完毕后,调用 build 方法即可完成 properties 的构建。

第 13 行,我们使用 channel 的 basicPublish 方法,将消息发送到 RabbitMQ Server 中,这里我们直接来看 basicPublish 方法的第四个参数,其他参数已经介绍很多次了,这里不再介绍。

我们都知道,在 RabbitMQ 中,一条具体的消息被分为了消息体和消息参数两个部分, 其中,消息体指的是我们应用程序中的数据,消息参数指的是针对这一应用程序中的数据进行配置的一系列参数,常见的参数有字符集、请求头、投递方式等。basicPublish 方法的第三个参数指的就是一条消息的消息参数,即我们通过调用链模式来构造出的 properties 参数。

将我们构造好的 properties 参数一同发送到 RabbitMQ Server 中,那么,我们配置的 properties 的属性就会生效,即该条消息的 TTL 为 10 秒。

上述代码是配置 TTL 消息的,那么 TTL 消息队列又该如何配置呢,我们看以下代码:

代码实现:

// 省略客户端连接 RabbitMQ Server 代码
Map<String, Object> queueArgumentMap = new HashMap<>();
queueArgumentMap.put("x-message-ttl", "10000");
channel.queueDeclare(queueName, true, true, false, queueArgumentMap);
代码块1234

代码解释:

第 2-3 行,我们声明了一个名为 queueArgumentMap 的 HashMap ,并且指定 key 为 x-message-ttl ,value 为 10000 ,且都是字符串类型。

第 4 行,我们使用 channel 的 queueDeclare 方法,来声明一个队列,这里我们重点看 queueDeclare 方法的第五个参数,这个参数是一个 Map 类型的 arguments 参数,是专门来对队列进行额外配置的参数, 其值就是我们声明的这个 queueArgumentMap 。

在 queueArgumentMap 中,key 为 RabbitMQ 官方规定的设置 TTL 的 key 值,value 即为我们要设置的 TTL 的具体时间,单位为毫秒,这里同样设置为了 10 秒。

Tips: 1.在实际工作中,给消息或者消息队列设置 TTL 是很常见的,所以需要同学们完全掌握本节内容,这样才能在工作中运用自如;\2. queueArgumentMap 中 key 的指定必须要按照 RabbitMQ 官方给出的 key 值来声明,不能自定义声明,否则,将不会起到任何作用;\3. 在为消息或消息队列设置了 TTL 之后,如果我们想要看到直观地结果,我们可以自行编写一个测试程序,并结合 RabbitMQ 的消息管控台来观察,这样效果会很明显。

3. 小结

本小节为同学们介绍了 RabbitMQ 中,如何为消息以及消息队列设置 TTL 生存时间。从 TTL 的基础概念开始,到 TTL 消息以及 TTL 消息队列的编码实现结束,详细介绍了 TTL 是什么、TTL 的作用,以及不同组件下如何配置 TTL ,TTL 在实际工作中应用较多,希望同学们可以完全掌握通过代码的方式来对消息和消息队列配置 TTL ,这样我们才能对消息更加灵活地进行配置。

RabbitMQ 死信队列基础概念与配置概述

1. 前言

Hello,大家好。本小节会为同学们介绍 RabbitMQ 中的死信队列及其基础配置。死信队列作为 RabbitMQ 中最后一个特性,其在实际工作中发挥着重要的作用。本节会从死信队列的前置概念开始,到死信队列的基础概念,最后介绍死信队列的基本使用方法和基本配置结束,详细介绍死信队列的基础概念和基本使用方法。

话不多说,让我们直入正题吧。

本节主要内容:

  • 死信队列前置概念概述;

  • 死信队列基础概念概述;

  • 死信队列基本使用概述;

2. 死信队列前置概念概述

在正式介绍死信队列的基础概念之前,需要同学们先了解一些死信队列的前置概念,这些前置概念是后续理解死信队列的基础,同学们只有对这些前置概念有一个理解之后,才能很好地理解什么是死信队列。

什么是队列:

队列并不是只存在于 RabbitMQ 中,队列是一种计算机领域中的基本数据结构,其描述了数据在计算机内存中垂直分布的特点。 我们可以把队列看做是我们日常生活中排队做核酸的场景:当有一个人去做的时候,这个时候没有队伍,直接做就行了;当有十个人去做的时候,这个时候就需要排队了,而排队的这个过程就是队列形成的过程。

当再有人需要去做的时候,这个人只能排在队尾,不能够插队,而队前的人由于比队尾的人先到,所以队前的人就比队尾的人先做核酸,以此反复这个过程,直到所有人都做完核酸为止。

而队列描述的就是这样的场景,在上述例子中,形成队伍的过程就是我们的应用数据入队的过程,先入队的数据可以先被处理,而后入队的数据只能等待前入队的数据处理完毕之后才能进行处理,这就是队列先入先出的特点。

假设我们有 6 条数据需要入队,以下是数据入队过程:

1 号数据会先入队,然后排在 1 好数据后面的 2 3 4 5 6 好数据会依次入队,入队完成后的队列如下图所示:

说白了,队列就是数据按照固定的排列方式在计算机中存储的一种表现形式,而队列中的数据处理原则就是先入队的数据先进行处理,后入队的数据后进行处理。RabbitMQ 中的队列也是一样的,只不过这些队列里面存储的都是被称为消息的数据,所以这些队列被称为消息队列, 即在 RabbitMQ 中,根据消息类型的不同,会形成很多不同类型的队列,但是这些队列归根到底,其本质依然是消息队列。

什么是死信:

我们知道,在 RabbitMQ 中充当主角的就是消息,在不同场景下,消息会有不同地表现。死信就是消息在特定场景下的一种表现形式,这些场景包括:消息被拒绝访问,即 RabbitMQ Server 返回 nack 的信号时、消息的 TTL 过期时、消息队列达到最大长度,消息不能入队时。

经常产生死信的场景就是上述三种场景,即消息在这三种场景中时,被称为死信。

3. 死信队列基础概念概述

通过前置概念的介绍,我们知道了死信的基础的概念,那么死信队列又是什么呢?

结合上述对队列基础概念的介绍,我们不难得出:死信队列就是用于储存死信的消息队列,在死信队列中,有且只有死信构成,不会存在其余类型的消息,这就是死信队列。

死信队列在 RabbitMQ 中并不会单独存在,往往死信队列都会绑定这一个普通的消息队列,当所绑定的消息队列中,有消息变成死信了,那么这个消息就会重新被交换机路由到指定的死信队列中去,我们可以通过对这个死信队列进行监听,从而手动的去对这一消息进行补偿。

那么,我们到底如何来使用死信队列呢?

4. 死信队列基本使用概述

在 RabbitMQ 中,死信队列的标识为 x-dead-letter-exchange ,通过观察死信队列的标识,我们不难发现,其标识最后为 exchange ,即 RabbitMQ 中的交换机,没错,RabbitMQ 中的死信队列就是由死信交换机而得出的。

要想使用死信队列,我们需要首先声明一个普通的消息队列,并将死信队列的标识绑定到这个普通的消息队列上, 这个过程需要我们在生产端进行配置,代码如下所示:

ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("xx");
connectionFactory.setPort("5672");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChanel();
Map<String, Object> argumentsMap = new HashMap();
argumentsMap.put("x-dead-letter-exchange", "dlx_exchange");
channel.exchangeDeclare("dlx_common_exchange", "direct", true, false, null);
channel.queueDeclare("dlx_common_queue", true, false, false, argumentsMap);
channel.queueBind("dlx_common_queue", "dlx_common_exchange", routingKey);

代码解释:

第 1-5 行,我们使用 ConnectionFactory 创建了一个客户端连接 RabbitMQ Server 的连接。

第 6 行,我们使用建立好的连接,来创建了一个频道 channel 。

第 7-8 行,我们声明了一个普通队列的额外参数的 Map ,这个 Map 的 key 就是死信队列的标识,value 就是我们后续声明的真正的死信交换机的名称。

第 9-10 行,我们依次使用 channel 的 exchangeDeclare 方法和 queueDeclare 方法,分别声明了一个名为 dlx_common_exchange 的交换机和名为 dlx_common_queue 的普通消息队列,之所以名称中有 common ,是因为要对这个交换机和队列做一个标识,表示该交换机和队列是绑定了死信队列的。

第 11 行,我们使用 channel 的 queueBind 方法来讲声明的普通交换机和消息队列进行绑定,并且制定了 routingKey ,这样消息就可以经 dlx_common_exchange 根据 routingKey 来路由到 dlx_common_queue 中。

在我们声明了要绑定死信队列的普通队列之后,最后我们需要声明真正的死信队列,代码如下所示:

// 省略客户端连接 RabbitMQ Server 的过程
channel.exchangeDeclare("dlx_exchange", "direct", true, false, null);
channel.queueDeclare("dlx_queue", true, false, false, null);
channel.queueBind("dlx_queue", "dlx_exchange", routingKey);

代码解释:

第 1 行,我们使用 chanel 的 exchangeDeclare 方法来声明了一个名为 dlx_exchange 的交换机。

第 2 行,我们使用 channel 的 queueDeclare 方法来声明了一个名为 dlx_queue 的队列。

第 3 行,我们使用 channel 的 queueBind 方法,来将 dlx_exchange 的交换机与 dlx_queue 队列进行了绑定。

当我们完成上述过程之后,死信队列就配置完成了,这也是死信队列的基本使用方法。

Tips: 1. 从声明死信队列的代码段中,我们不难看出,我们所声明的交换机和队列也都是普通的,只不过我们声明的这个交换机和队列是用来存储 dlx_common_queue 队列中的死信的;\2. 死信队列的使用在实际工作中非常重要,它可以帮助我们对那些异常的消息进行监控,并根据这些监控信息制定相应的消息补偿策略,这点同学们注意;\3. 一定要注意在声明普通队列时,我们声明的名为 argumetsMap 的变量,这个是绑定死信队列的关键。

5. 小结

本小节为同学们介绍了 RabbitMQ 中,死信队列的前置概念、死信队列的基础概念,以及死信队列的基本使用。我们只有在了解了死信队列的前置概念之后,我们才能理解死信队列的基础概念,同时,我们只有清楚的明白了应用死信队列的步骤,我们才能正确的用好死信队列。

死信队列在实际工作中使用频率非常高,希望同学们可以清楚地理解本节中的基础概念和代码实现,这些都是应用死信队列基础中的基础,望同学们注意。

RabbitAdmin 基础概念详解与配置

1. 前言

Hello,大家好。本小节作为第三章-Spring 生态链与 RabbitMQ 整合的开篇,会为同学们介绍,在 Spring 生态链中,RabbitMQ 是如何与 Spring 进行整合的,包括从最初的在 Spring 中初始化 RabbitMQ 、以及如何在 Spring 中使用 RabbitMQ 发送消息,以及后续在 Spring 中如何对消息进行监听等基础核心内容。

本小节首先会为各位同学们介绍,如何在 Spring 中初始化 RabbitMQ 。包括初始化 RabbitMQ 所使用的 Spring 组件,以及该组件的基本使用方法,快速助力同学们将 RabbitMQ 与 Spring 进行整合。

本节主要内容:

  • RabbitAdmin 基础概念概述;

  • RabbitAdmin 基础配置概述。

2. RabbitAdmin 基础概念概述

基础概念:

在 Spring 中,我们首先会接触到 RabbitAdmin 。我们都知道,RabbitMQ 是基于 AMQP 协议和 erlang 语言进行编码开发的,所以,在 Spring 中我们无法直接使用 RabbitMQ ,Spring 团队考虑到了这一点,所以做了一种名为 Spring-AMQP 的中间层依赖,我们可以把这个依赖理解成我们应用程序中的 Mapper 层,即数据库与实体间的映射关系。

Spring-AMQP 中间层依赖规定了一种映射关系,这种映射关系可以直接把 RabbitMQ 中的各种元素与 Java 程序相对应,我们只需要通过像编写普通 Java 程序那样即可在 Spring 中使用 RabbitMQ 了。

RabbitAdmin 是 Spring-AMQP 中的核心基础组件,是我们在 Spring 中对 RabbitMQ 进行初始化的必须组件, 其提供了 RabbitMQ 中声明交换机、声明队列、绑定交换机和队列,以及绑定路由 Key 等其他 API ,RabbitAdmin 正式由此得名。

在介绍完 RabbitAdmin 基础概念之后,下面让我们来看一下如何对 RabbitAdmin 进行配置吧。

3. RabbitAdmin 基础配置概述

在对 RabbitAdmin 配置进行介绍之前,我们需要先引入 RabbitAdmin 的依赖包。RabbitAdmin 的依赖包共有两个,接下来依次进行介绍。

3.1 引入 RabbitAdmin

以 Maven 引入方式为例,引入代码如下所示:

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
代码块1234

上述依赖是 SpringBoot 的 Starter 中封装好的 amqp 依赖,也是 Spring 与 RabbitMQ 进行整合的基础依赖。

<dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>3.6.5</version>
</dependency>

上述依赖是 RabbitMQ 的客户端依赖,即 amqp-client ,我们都知道 RabbitMQ 分为 Server 端和 Client 端,其中,Server 端正是我们所启动的 RabbitMQ Server 服务,客户端就是我们所有使用 RabbitMQ 的应用程序。

本依赖正是 RabbitMQ 的 Client 端,如果我们想在应用程序中使用 RabbitMQ ,那么这个依赖是必须的。

3.2 RabbitAdmin 基础配置

在我们的应用程序中引入相应的 RabbitMQ 依赖之后,接下来我们需要在应用程序中对 RabbitMQ 进行初始化,而进行初始化的 Spring 组件就是我们的 RabbitAdmin 。

初始化 RabbitMQ 客户端连接

初始化 RabbitMQ 客户端连接的代码如下所示:

代码实现:

@Bean
public ConnectionFactory connectionFactory () {
  CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
  cachingConnectionFactory.setAddresses("82.156.65.57:5672");
  cachingConnectionFactory.setUsername("guest");
  cachingConnectionFactory.setPassword("guest");
  cachingConnectionFactory.setVirtualHost("/");
  return cachingConnectionFactory;
}

代码解释:

第 1 行,我们使用 Spring 的 Bean 注解将我们声明的 connectionFactory 方法注入到 Spring 容器中。

第 2 行,我们使用 Spring-AMQP 中的 ConnectionFactory 类,来声明了一个名为 connectionFactory 的连接工厂方法,用于对 RabbitMQ 进行初始化。

第 3 行,我们实例化了一个 cachingConnectionFactory 实例,该实例是 Spring-AMQP 中对 RabbitMQ 连接信息进行初始化的基础实例,所有的 RabbitMQ 连接信息均来源于该实例。

第 4-7 行,我们通过 cachingConnectionFactory 实例的 setAddresses 、setUsername 、setPassword 、setVirtualHost 方法来分别初始化 RabbitMQ Server 的服务地址、用户名、密码、虚拟主机。

第 8 行,我们将填充好的 cachingConnectionFactory 实例进行返回,以初始化完成 RabbitMQ 客户端连接。

通过上述代码的配置,我们已经初始化了 RabbitMQ 的客户端连接,接下来我们需要继续初始化 RabbitAdmin ,以在 Spring 中管理 RabbitMQ 。

初始化 RabbitAdmin 的方法如下代码所示:

@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
   RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
   rabbitAdmin.setAutoStartup(true);
   return rabbitAdmin;
}

代码解释:

第 2 行,我们 RabbitAdmin 类来声明了一个名为 rabbitAdmin 的方法,用来初始化 RabbitAdmin 。

第 4-6 行,我们首先根据上述的 connectionFactory 来实例化了一个 RabbitAdmin 实例;然后通过 rabbitAdmin 实例的 setAutoStartup 方法置位 true 来设置 RabbitAdmin 的自动启动;最后,我们将设置好的 rabbitAdmin 实例进行返回,以使用 RabbitAdmin 。

Tips: 1. @Bean 注解是 Spring 容器中自带的注解,其作用就是将我们应用程序中的类,或者方法来注入到 Spring 容器中,作为 Spring 配置的一部分。\2. 当我们设置好了 connectionFactory 和 rabbitAdmin 组件之后,一旦启动我们的应用程序,这两个组件就会自动进行初始化。

RabbitAdmin 核心 API 介绍

下面我们来看一下在 RabbitAdmin 中,都有哪些核心的 API 。

declareExchange() 声明交换机的方法;

declareQueue() 声明队列的方法;

declareBinding() 将交换机与队列进行绑定的方法;

purgeQueue() 清空指定队列中所有的消息的方法。

在 RabbitAdmin 中,我们可以通过实例化对象的方法,来创建一个交换机,或者一个队列,像下面这样:

new DirectExchange("test.direct", false, false)
new Queue("test.direct.queue", false)
代码块12

在上述 new DirectExchange 方法中,第一个参数表示交换机的名称;第二个参数表示是否持久化;第三个参数表示是否自动删除。

在上述 new Queue 方法中,第一个参数表示队列的名称;第二个参数表示是否持久化。

Tips: 这两个方法在 RabbitAdmin 中使用频率很高,同学们注意。

那么,我们应该如何使用 RabbitAdmin 这些核心的 API 呢,如下代码所示:

@Autowired
private RabbitAdmin rabbitAdmin;
rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));
rabbitAdmin.declareQueue(new Queue("test.direct.queue", false));
rabbitAdmin.declareBinding(new Binding("test.direct.queue", Binding.DestinationType.QUEUE,
                "test.direct", "direct", new HashMap<>()));

代码解释:

第 1-2 行,我们使用 Autowired 注解,来讲 RabbitAdmin 注入进来,并用一个私有变量 rabbitAdmin 来接收。

第 3 行,我们使用 declareExchange 方法来声明了一个名为 test.direct 的交换机。

第 4 行,我们使用 declareQueue 方法来声明了一个名为 test.direct.queue 的队列。

第 5 行,我们使用 declareBinding 方法来将我们声明的交换机和消息队列进行绑定,其中,test.direct.queue 为我们声明的队列;Binding.DestinationType.QUEUE 为绑定的类型;test.direct 为我们声明的交换机;direct 为我们要绑定的交换机的类型;new HashMap<>() 表示队列的最后一个 arguments 属性,其属性是 Map 类型,这里置为了空。

我们可以通过这些核心的 API 来声明 RabbitMQ 中最基础的元素,包括交换机、消息队列、路由 Key ,以及绑定规则。

Tips: 1. Binding.DestinationType 这种类型,除了 QUEUE 队列类型,还有一个 EXCHANGE 类型,同学们可以课下进行了解;\2. 了解 RabbitAdmin 的基础概念和基础配置,是 RabbitMQ 整合 Spring 基础中的基础,同学们要了解每个基础概念和核心 API 。

4. 小结

本小节为同学们介绍了在 Spring 生态中,如何集成 RabbitMQ 。从介绍 RabbitAdmin 基础概念开始,到引入 RabbitAdmin 依赖,最后到 RabbitAdmin 基础配置介绍结束,详细介绍了如何在 Spring 中,初始化 RabbitMQ 、声明交换机、声明消息队列,以及建立绑定关系等,旨在帮助同学们可以清楚地入门 RabbitAdmin ,我们只有在优先掌握了 RabbitAdmin 之后,才可以在 Spring 中进行后续操作,同学们加油。

RabbitTemplate 基础概念详解与配置

1. 前言

Hello,大家好。本小节会为同学们介绍 RabbitMQ 在 Spring 生态中的第二个核心组件,也就是 RabbitTemplate ,RabbitTemplate 作为 Spring 生态中的第二个核心组件,其在整个 RabbitMQ 中也扮演着重要的角色。

本小节首先会为各位同学介绍,什么是 RabbitTemplate ,即 RabbitTemplate 的基础概念,以及 RabbitTemplate 所发挥的作用,最后要给各位同学介绍 RabbitTemplate 的基本使用方法,即 RabbitTemplate 的基础配置。RabbitTemplate 的基础概念和基本使用都需我们同学有所了解和掌握。

本节主要内容:

  • RabbitTemplate 基础概念概述;

  • RabbitTemplate 基础配置概述。

2. RabbitTemplate 基础概念概述

基础概念:

在上一节中,我们接触了 RabbitMQ 在 Spring 生态中的第一个核心组件-RabbitAdmin ,了解了 RabbitAdmin 的基础概念和基本使用方法,知道了如何使用 RabbitAdmin 来配置客户端,也就是我们的应用程序连接 RabbitMQ Server 的配置方法,其实 RabbitAdmin 的主要目的就是与 RabbitMQ Server 建立连接,并声明 RabbitMQ 的核心元素。

在对 RabbitAdmin 有所了解之后,我们只是与 RabbitMQ Server 建立了一个可用的连接,并根据需求声明了 RabbitMQ 中的交换机、消息队列,以及绑定关系,此时,还并没有向 RabbitMQ Server 发送任何消息,我们的应用数据还不能被 RabbitMQ Server 处理,那么我们应该怎样在 Spring 中向 RabbitMQ Server 发送消息呢?

要想在 Spring 中向 RabbitMQ Server 发送消息,我们需要使用 RabbitTemplate 才行。 RabbitTemplate 是 Spring-AMQP 依赖为我们提供的一种 RabbitMQ 消息模板,它像 RabbitAdmin 一样,与 RabbitMQ Server 建立了一种映射关系,我们只需要使用 Java 代码来对 RabbitTemplate 进行配置,就可以将我们应用程序中的数据发送到 RabbitMQ Server 中。

RabbitTemplate 提供了编辑消息、发送消息、发送消息前的监听、发送消息后的监听等消息制造和消息监听功能,可以让我们像操作原生 RabbitMQ API 那样在 Spring 中通过 RabbitTemplate 来操作消息并发送和监听消息,这就是 RabbitTemplate 的作用之处。

我们可以通俗的来这样理解 RabbitTemplate ,对于 RabbitAdmin 而言,RabbitAdmin 是将应用程序和 RabbitMQ Server 建立链接的,而 RabbitTemplate 则是在建立连接之后,将我们应用程序中的数据发送到 RabbitMQ Server 中的。

在介绍完 RabbitTemplate 基础概念之后,下面让我们来看一下如何对 RabbitTemplate 进行配置吧。

3. RabbitTemplate 基础配置概述

对 RabbitTemplate 进行配置,和 RabbitAdmin 的配置步骤相似,都需要首先引入 Spring-AMQP 的依赖,就是我们上节中所述的如下两个依赖:

3.1 引入 RabbitTemplate

以 Maven 引入方式为例,引入代码如下所示:

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>3.6.5</version>
</dependency>

和 RabbitAdmin 相同,在将这两个依赖进行引入之后,我们就可以对 RabbitTemplate 进行配置了。

3.2 RabbitTemplate 基础配置

初始化 RabbitTemplate 消息模板

在使用 RabbitTemplate 消息模板来往 RabbitMQ Server 中发送消息之前,我们需要先对 RabbitTemplate 消息模板进行初始化,这个初始化过程非常简单,初始化 RabbitTemplate 消息模板的代码如下所示:

代码实现:

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
  RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
  return rabbitTemplate;
}

代码解释:

第 1 行,我们使用 Spring 的 Bean 注解将我们声明的 rabbitTemplate 方法注入到 Spring 容器中,这样 Spring 容器就可以监听到我们注入的配置。

第 2 行,我们使用 Spring-AMQP 中的 RabbitTemplate 类,来声明了一个名为 rabbitTemplate 的方法,用来对 RabbitTemplate 消息模板进行初始化。

第 3 行,我们实例化了一个 rabbitTemplate 实例,该实例是 Spring-AMQP 中对 RabbitTemplate 消息模板进行初始化的实例,要想使用 RabbitTemplate 发送消息,就必须要初始化该实例。

第 4 行,我们将初始化好的 rabbitTemplate 实例进行返回。

Tips: 1. @Bean 注解是 Spring 容器中自带的注解,其作用就是将我们应用程序中的类,或者方法来注入到 Spring 容器中,作为 Spring 配置的一部分。\2. RabbitTemplate 实例接收我们所定义的 ConnectionFactory 连接,传入的连接名称应该和我们定义的 connectionFactory 名称保持一致,即与使用 Bean 注解来声明的连接名称保持一致,如果名称不保持一致,则 RabbitTemplate 就不会初始化,且会抛出异常。

RabbitTemplate 核心 API 介绍

下面我们来看一下在 RabbitTemplate 中,都有哪些核心的 API 。

MessageProperties 实例,用于对消息的 properties 参数进行描述;

Message 实例,用于对消息体进行描述;

send() 方法,用于将原始消息发送到 RabbitMQ Server 中;

convertAndSend() 方法,用于将原始消息进行转换,并且将转换过后的消息发送到 RabbitMQ Server 中;

addListener() 方法,用于为当前消息模板设置消息监听类型。

在介绍完常用核心 API 之后,接下来我们用一个发送消息的例子,来详细说明一下这下常用核心 API ,如下代码所示:

@Autowired
private RabbitTemplate rabbitTemplate;
MessageProperties messageProperties = new MessageProperties();
messageProperties.getHeaders().put("test1", "test1");
messageProperties.getHeaders().put("test2", "test2");
Message message = new Message("Hello RabbitTemplate".getBytes(), messageProperties);
rabbitTemplate.convertAndSend("test_direct_001", "test.123", message, new MessagePostProcessor() {
  @Override
  public Message postProcessMessage(Message message) throws AmqpException {
  message.getMessageProperties().getHeaders().put("test1", "test111");
  message.getMessageProperties().getHeaders().put("test2", "test222");
  return message;
  }
});

代码解释:

第 1-2 行,我们使用 Autowired 注解,来将 RabbitTemplate 注入进来,并用一个私有变量 rabbitTemplate 来接收。

第 3 行,我们实例化了一个 MessageProperties 的实例 messageProperties 。

第 4-5 行,我们使用 messageProperties 实例的 getHeaders 方法获取到消息的 headers 参数,并分别 put 了两个不同的 header 。

第 6 行,我们初始化了一个 Message 消息实例 message ,并在构造方法中设置了消息体的内容为 Hello RabbitTemplate ,且将该消息的额外参数 messageProperties 也一并进行了设置。

第 7-14 行,我们使用 rabbitTemplate 的 convertAndSend 方法,将消息进行转换之后发送到了 RabbitMQ Server 中, 其中,convertAndSend 方法的第一个参数为要使用的交换机名称,这里是 test_direct_001 ,第二个参数为 routingKey ,这里是 test.123 ,第三个参数是我们需要发送的消息 message ,第四个参数则是消息发送成功后的监听器。

对于这个监听器,这里采用了 new MessagePostProcessor 的匿名内部类的形式进行实现,要添加 MessagePostProcessor 消息监听器,需要重写 postProcessMessage 方法,即消息发送成功后的方法,在该方法中,我们可以对消息发送成功后进行进一步的设置,最后将设置好的 Message 进行返回。

关于 send 方法和 addListener 方法,如下代码所示:

rabbitTemplate.send("test_direct_001", "test.123", message);
// 省略对 channel 的配置
rabbitTemplate.addListener(channel);

代码解释:

第 1 行,我们使用 rabbitTemplate 的 send 方法向 RabbitMQ Server 中发送了一条消息,其中,send 方法的第一个参数为绑定的交换机,这里是 test_direct_001 ,第二个参数为 routingKey ,这里是 test.123,第三个参数为我们要发送的消息 message 。

第 3 行,我们使用 rabbitTemplate 的 addListener 方法,为当前的消息模板添加了消息监听器,addListener 方法需要传递一个 channel ,这个channel 需要我们事先配置好,就像介绍 Channel 小节那样。

Tips: 1. send 方法和 convertAndSend 方法,区别之处在于,前者不会对消息进行转换,我们传递进去的是什么消息,就会往 RabbitMQ Server 中发送什么消息;后者则会将我们传递进去的消息进行转换,具体如何转换的需要我们观察源码之后才会清楚,并将转换过后的消息发送到 RabbitMQ Server中;\2. addListener 方法在 RabbitTemplate 中并不是使用频率最高的,但确是经常使用的一种消息监听的添加方法,如果我们不能在 convertAndSend 方法中添加消息监听,才会考虑使用 addListener 方法。

4. 小结

本小节为同学们介绍了在 Spring 生态中的 RabbitTemplate 消息模板。RabbitTemplate 作为 Spring-AMQP 中的消息模板,其发挥着重要的作用,RabbitTemplate 消息模板是连接应用程序和 RabbitMQ Server 中间的桥梁,如果我们只配置了 RabbitAdmin ,没有配置 RabbitTemplate ,那么我们的消息就无法发送到 RabbitMQ Server 中,也就没法实现业务的闭环。

本小节详细介绍了 RabbitTemplate 的基础概念和作用,以及 RabbitTemplate 的基础常用核心 API ,以及基础配置的配置方法,对于这其中容易出现问题的地方也做了提示,旨在帮助同学们可以对 RabbitTemplate 有一个基础的了解,并且要会使用 RabbitTemplate 进行基本的消息发送和消息监听处理。

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
消息队列
领导:谁再用定时任务实现关闭订单,立马滚蛋!
从头开始搭建一个Spring boot+RabbitMQ环境
微服务架构Day10-SpringBoot之消息
Java SpringBoot集成RabbitMq实战和总结
14-RabbitMQ高级特性-TTL
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服