打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
springboot整合rabbitMQ

  当前社区活跃度最好的消息中间件就是kafka和rabbitmq了,前面对kafaka的基础使用做了一些总结,最近开始研究rabbitmq,查看了很多资料,自己仿着写了一些demo,在博客园记录一下。

rabbitmq基础知识

  关于rabbitmq基础知识,可以看这篇博客,介绍的很详细了:https://www.cnblogs.com/dwlsxj/p/RabbitMQ.html

rabbitmq安装

  rabbitmq的安装很简单,我们可以根据自己的系统去网上找对应的安装说明,这里我为了方便,采用docker镜像的方式,我的虚拟机装的是centos7。步骤如下:

  1、启动docker,关闭防火墙

  2、拉取镜像:docker pull rabbitmq,如需要管理界面:docker pull rabbitmq:management

  3、执行指令启动RabbitMQ

  无管理界面:

  docker run --hostname rabbit-host --name rabbit -d -p 5672:5672 rabbitmq

  有管理界面:

  docker run --hostname rabbit-host --name rabbit -d -p 5672:5672 -p 15672:15672 rabbitmq:management

  4、启动后输入你的虚拟机地址 端口号15672,即可访问到rabbitmq登录界面,默认用户名和密码都是guest。

springboot与rabbitmq整合

  IDE:STS,这是spring官方推荐的开发工具,构建springboot项目非常方便。JDK:1.8

  1、pom.xml

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">    <modelVersion>4.0.0</modelVersion>    <groupId>powerx.io</groupId>    <artifactId>springboot-rabbitmq</artifactId>    <version>0.0.1-SNAPSHOT</version>    <packaging>jar</packaging>    <name>springboot-rabbitmq</name>    <description>Demo project for Spring Boot</description>    <parent>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-parent</artifactId>        <version>2.0.1.RELEASE</version>        <relativePath/> <!-- lookup parent from repository -->    </parent>    <properties>        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>        <java.version>1.8</java.version>    </properties>    <dependencies>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-amqp</artifactId>        </dependency>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-web</artifactId>        </dependency>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-test</artifactId>            <scope>test</scope>        </dependency>    </dependencies>    <build>        <plugins>            <plugin>                <groupId>org.springframework.boot</groupId>                <artifactId>spring-boot-maven-plugin</artifactId>            </plugin>        </plugins>    </build></project>

  2、定义常量

package com.example.demo.constant;public interface QueueConstants {    // 消息交换    String MESSAGE_EXCHANGE = "message.direct.myexchange";    // 消息队列名称    String MESSAGE_QUEUE_NAME = "message.myqueue";    // 消息路由键    String MESSAGE_ROUTE_KEY = "message.myroute";    // 死信消息交换    String MESSAGE_EXCHANGE_DL = "message.direct.dlexchange";    // 死信消息队列名称    String MESSAGE_QUEUE_NAME_DL = "message.dlqueue";    // 死信消息路由键    String MESSAGE_ROUTE_KEY_DL = "message.dlroute";}

 

  3、rabbitmq配置

package com.example.demo.config;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.ExchangeBuilder;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.QueueBuilder;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import com.example.demo.constant.QueueConstants;@Configurationpublic class MyRabbitMqConfiguration {    /**     * 交换配置     *     * @return     */    @Bean    public DirectExchange messageDirectExchange() {        return (DirectExchange) ExchangeBuilder.directExchange(QueueConstants.MESSAGE_EXCHANGE)                .durable(true)                .build();    }    /**     * 消息队列声明     *     * @return     */    @Bean    public Queue messageQueue() {        return QueueBuilder.durable(QueueConstants.MESSAGE_QUEUE_NAME)                .build();    }    /**     * 消息绑定     *     * @return     */    @Bean    public Binding messageBinding() {        return BindingBuilder.bind(messageQueue())                .to(messageDirectExchange())                .with(QueueConstants.MESSAGE_ROUTE_KEY);    }}

  4、生产者

package com.example.demo.producer;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.rabbit.support.CorrelationData;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import com.example.demo.constant.QueueConstants;@Componentpublic class MessageProducer {    @Autowired    private RabbitTemplate rabbitTemplate;    public void sendMessage(String str) {                rabbitTemplate.convertAndSend(QueueConstants.MESSAGE_EXCHANGE, QueueConstants.MESSAGE_ROUTE_KEY, str);    }}

  5、消费者

package com.example.demo.consumer;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import com.example.demo.constant.QueueConstants;import com.rabbitmq.client.Channel;@Componentpublic class MessageConsumer {    @RabbitListener(queues = QueueConstants.MESSAGE_QUEUE_NAME)    public void processMessage(Channel channel,Message  message) {        System.out.println("MessageConsumer收到消息:" new String(message.getBody()));        try {            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);        } catch (Exception e) {                    }     }}

  6、控制器类

package com.example.demo.controller;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import com.example.demo.producer.MessageProducer;@RestControllerpublic class TestController {    @Autowired    private MessageProducer messageProducer;        @RequestMapping(value = "/index")    public String index(String str) {        // 将实体实例写入消息队列        messageProducer.sendMessage(str);        return "Success";    }}

  7、application.properties

#用户名spring.rabbitmq.username=guest#密码spring.rabbitmq.password=guest#服务器ipspring.rabbitmq.host=192.168.1.124#虚拟空间地址spring.rabbitmq.virtual-host=/#端口号spring.rabbitmq.port=5672

  至此,springboot整合rabbitmq基本demo完毕,这里不再贴出演示截图。

消息确认

  RabbitMQ的消息确认有两种。

  一种是消息发送确认。这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递。发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列。

  第二种是消费接收确认。这种是确认消费者是否成功消费了队列中的消息。

消息发送确认

  1、ConfirmCallback

  确认消息发送成功,通过实现ConfirmCallBack接口,消息发送到交换器Exchange后触发回调,使用该功能需要开启确认,spring-boot中配置如下:

  spring.rabbitmq.publisher-confirms = true

  在MessageProducer.java加入如下代码:

rabbitTemplate.setConfirmCallback((CorrelationData correlationData, boolean ack, String cause) -> {            System.out.println("消息唯一标识"   correlationData);            System.out.println("消息确认结果"   ack);            System.out.println("失败原因"   cause);        });

  2、ReturnCallback

  通过实现ReturnCallback接口,如果消息从交换器发送到对应队列失败时触发(比如根据发送消息时指定的routingKey找不到队列时会触发),使用该功能需要开启确认,spring-boot中配置如下:spring.rabbitmq.publisher-returns = true

  在MessageProducer.java加入如下代码:

rabbitTemplate.setReturnCallback((Message message, int replyCode, String replyText,                String exchange, String routingKey) ->{                    System.out.println("消息主体message"   message);                    System.out.println("消息replyCode"   replyCode);                    System.out.println("消息replyText"   replyText);                    System.out.println("消息使用的交换器"   exchange);                    System.out.println("消息使用的路由键"   routingKey);                });

消息消费确认

  消费确认模式有三种:NONE、AUTO、MANUAL。

  开启手动确认需要在配置中加入:spring.rabbitmq.listener.direct.acknowledge-mode=manual

  消息在处理失败后将再次返回队列,重新尝试消费,如果再次失败则直接拒绝。

  实例代码如下:

package com.example.demo.consumer;import java.io.IOException;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import com.example.demo.constant.QueueConstants;import com.rabbitmq.client.Channel;@Componentpublic class MessageConsumer {    @RabbitListener(queues = QueueConstants.MESSAGE_QUEUE_NAME)    public void processMessage(Channel channel, Message message) {        System.out.println("MessageConsumer收到消息:"   new String(message.getBody()));        try {            //模拟消息处理失败            int a = 3 / 0;            // false只确认当前一个消息收到,true确认所有consumer获得的消息            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);        } catch (Exception e) {            if (message.getMessageProperties().getRedelivered()) {                System.out.println("消息已重复处理失败,拒绝再次接收...");                try {                    channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);//requeue为false,拒绝                } catch (IOException e1) {                }            } else {                System.out.println("消息即将再次返回队列处理...");                try {                    channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // requeue为true重新回到队列                } catch (IOException e1) {                }            }        }    }}

死信队列

  DLX, Dead-Letter-Exchange。利用DLX, 当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这样我们就可以重新去处理这个消息。DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性,当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列,可以监听这个队列中消息做相应的处理 。消息变成死信一向有一下几种情况:

  利用DLX,我们可以实现消息的延迟消费,可参考:https://www.jianshu.com/p/b74a14c7f31d,还可以像我的demo那样,对于有问题的消息进行重新处理,实例代码如下

  首先在MyRabbitMqConfiguration上加入如下配置:

@Bean    DirectExchange messagedlDirect() {        return (DirectExchange) ExchangeBuilder.directExchange(QueueConstants.MESSAGE_EXCHANGE_DL).durable(true)                .build();    }    @Bean    Queue messagedlQueue() {        return QueueBuilder.durable(QueueConstants.MESSAGE_QUEUE_NAME_DL)                // 配置到期后转发的交换                .withArgument("x-dead-letter-exchange", QueueConstants.MESSAGE_EXCHANGE)                // 配置到期后转发的路由键                .withArgument("x-dead-letter-routing-key", QueueConstants.MESSAGE_ROUTE_KEY).build();    }    @Bean    public Binding messageTtlBinding(Queue messagedlQueue, DirectExchange messagedlDirect) {        return BindingBuilder.bind(messagedlQueue).to(messagedlDirect).with(QueueConstants.MESSAGE_ROUTE_KEY_DL);    }

  其次,修改我们的消息发送者,发送消息到我们新加入的交换器和路由键上,如下:

rabbitTemplate.convertAndSend(QueueConstants.MESSAGE_EXCHANGE_DL, QueueConstants.MESSAGE_ROUTE_KEY_DL, str);

  新添加一个消费者,同时将原来的消费者的监听队列换成新加入的

package com.example.demo.consumer;import java.io.IOException;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import com.example.demo.constant.QueueConstants;import com.rabbitmq.client.Channel;@Componentpublic class MessageConsumer {    @RabbitListener(queues = QueueConstants.MESSAGE_QUEUE_NAME_DL)    public void processMessage(Channel channel, Message message) {        System.out.println("MessageConsumer收到消息:"   new String(message.getBody()));        try {            //模拟消息处理失败            int a = 3 / 0;            // false只确认当前一个消息收到,true确认所有consumer获得的消息            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);        } catch (Exception e) {            if (message.getMessageProperties().getRedelivered()) {                System.out.println("消息已重复处理失败,拒绝再次接收...");                try {                    channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);//requeue为false,拒绝                } catch (IOException e1) {                }            } else {                System.out.println("消息即将再次返回队列处理...");                try {                    channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // requeue为true重新回到队列                } catch (IOException e1) {                }            }        }    }}
package com.example.demo.consumer;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import com.example.demo.constant.QueueConstants;import com.rabbitmq.client.Channel;@Componentpublic class MessageConsumer2 {    @RabbitListener(queues = QueueConstants.MESSAGE_QUEUE_NAME)    public void processMessage(Channel channel,Message  message) {        System.out.println("MessageConsumer2收到消息:" new String(message.getBody()));        try {            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);        } catch (Exception e) {                    }     }}

  启动项目,发送请求http://localhost:8082/index?str=asdfgh,可以看到后台日志如下:

 

  rabbitmq支持四种交换器,同时还支持很多种插件,功能非常强大,这里我自己还没亲手用过,所以不再展开。

 

来源:http://www.icode9.com/content-4-27671.html
本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
RabbitMQ教程
了解如何使用 Spring 和 RabbitMQ 创建一个简单的发布和订阅应用程序
Spring集成RabbitMQ并实现延迟队列
领导:谁再用定时任务实现关闭订单,立马滚蛋!
SpringMVC和rabbitmq集成的使用案例
3W字带你迅速上手MQ
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服