消息中间件的实际应用场景有哪些?主要有三种,分别为异步处理、应用解耦、流量削峰。
可以理解为非阻塞处理,作用提高系统响应速度与吞吐量,增加用户体验。
举个例子,领导要求开发的投诉小程序,所有接口响应需要在300ms内完成。大于300ms需要优化,有这样的一个需求:
1、投诉内容需要在投诉系统进行落盘存储(整个流程需要150ms)2、投诉系统需要把通过接口把投诉内容传给大数据进行沉淀(200ms)整个投诉环节形成闭环需要350ms,不能满足响应需求。
解耦的前提是要有耦合,什么是耦合? 耦合可以理解为耦合度,耦合度是指模块或者应用之间的依赖关系,关系越多耦合度就越高,系统的复杂度就越高(不满足架构设计的三大原则之一简单原则),那就意味着需要解耦,解耦的目的就是提高系统的独立性,方便日后开发维护。
举个例子:还是投诉系统,1、投诉系统需要把投诉数据推送到大数据中心进行沉淀汇聚;2、需要调用短信系统进行实时发送处理信息给相关工作人员(您有新的投诉待处理,请快速登录系统处理);3、系统的产生的日志(接口日志、sql日志)需要调用系统管理系统进行存储。如图
投诉后台把投诉内容写入mq后,返回成功,其他业务系统通过订阅mq进行各自业务处理即可,就算短信服务宕机也不影响其他系统。这样一来,增强了系统的健壮性、提高的系统的qps、系统也完美进行了解耦,方便维护。
我们经常看到这样的一个场景,比如双十一0点抢购,或者政府优惠券8点开枪,系统的高峰访问量可能就是持续1到2个小时,之后就恢复平静或者空闲,如何保证高并发的情况下系统正常运作呢?
我们都知道服务器的资源或者处理能力都是有限的,都有最大的qps,对于系统的瞬间海量的请求冲击,服务器可能忙不过来或者直接宕机,很多人就会增加服务器数量来提高系统的高可用,来满足高并发,但是在空闲的时候有没有什么请求,这样一来导致资源浪费,首先肯定的是技术方案是可行的,但是成本太高了,太烧钱了,不是每个老板都是大方的。
所以采用mq进行削峰更优更省钱,其实mq削峰的本质就是延缓请求处理,借用网上一张图
RabbitMQ主要有六种工作模式,通过整合SpringBoot分别介绍工作模式的实现。
简单模式也称为点对点模式 。
方式为:生产者->队列->消费者 ,生产者与消费者之间直接通过队列名称匹配,没有Exchange交换机。
应用场景:单机应用直接消息传递,后续不会在扩展新的服务或者更多的队列,比如投诉系统的日志写入Mq,日志采集系统进行读取存储。
SimpleConfig配置
/** * 直连模式只需要声明队列,所有消息都通过队列转发。 * 无需设置交换机 */ @Configuration public class SimpleConfig { @Bean public Queue setQueue() { return new Queue('logQueue'); } }
生产者
//simpleSend 直连模式 @ApiOperation(value='simpleSend发送接口',notes='直接发送到队列') @GetMapping(value='/simpleSend') public Object simpleSend(String message) throws AmqpException, UnsupportedEncodingException { //设置部分请求参数 MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN); //发消息 rabbitTemplate.send('logQueue',new Message(message.getBytes('UTF-8'),messageProperties)); return 'message sended : '+message; }
消费者
//直连模式的多个消费者,会分到其中一个消费者进行消费。类似task模式
//通过注入RabbitContainerFactory对象,来设置一些属性,相当于task里的channel.basicQos
@RabbitListener(queues='logQueue')
public void helloWorldReceive(String message) {
System.out.println('simpleSend模式 received message : ' +message);
}
工作模式可以理解为竞争消费模式,类似nginx的负载均衡轮询模式。
方式为:生产者->队列->多个消费者,与简单模式不同的是其具有多个消费者,可理解为一对多的关系。
应用场景:集群部署中的消息传递,举个例子,投诉系统与日志系统都实现集群高可用,投诉系统生产了10条日志数据,会循环发送消息给订阅的消费者,3个消费者(日志管理系统)读取到消息累加总数为10。如图所示:
代码实例
工作模式配置
@Configuration public class WorkConfig { //声明队列 @Bean public Queue workQ1() { return new Queue('workLogQueue'); } }
生产者
//工作队列模式 @ApiOperation(value='workqueue发送接口',notes='发送到所有监听该队列的消费') @GetMapping(value='/workqueueSend') public Object workqueueSend(String message) throws AmqpException, UnsupportedEncodingException { MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN); //制造多个消息进行发送操作 for (int i = 0; i <10 ; i++) { rabbitTemplate.send('workLogQueue', new Message((message+i).getBytes('UTF-8'),messageProperties)); } return 'message sended : '+message; }
消费者
//工作队列模式 @RabbitListener(queues='workLogQueue') public void wordQueueReceiveq1(String message) { System.out.println('工作队列模式1 received message : ' +message); } @RabbitListener(queues='workLogQueue') public void wordQueueReceiveq2(String message) { System.out.println('工作队列模式2 received message : ' +message); }
结果
发布订阅模式也称之为广播模式,生产者每一条消息都需要发送给订阅的消费者,和工作模式都是一对多的关系,不同的是工作模式是每条消息只能发送给一个消费者。
方式为:生产者->交换机->队列(多个)->消费者(多个)
应用场景:AI智能系统通过AI摄像头实时监控景区的客流人数,采集现场图片进行分析是否有火灾,从而产生告警事件(客流事件、火灾事件),第三方系统(管控系统、物联系统)都需要实时采集AI系统的告警事件进行各自业务处理。
/** * Fanout模式需要声明exchange,并绑定queue,由exchange负责转发到queue上。 * 广播模式 交换机类型设置为:fanout */ @Configuration public class FanoutConfig { //声明队列 @Bean public Queue fanoutQ1() { return new Queue('fanout.keliu'); } @Bean public Queue fanoutQ2() { return new Queue('fanout.huozai'); } //声明exchange @Bean public FanoutExchange setFanoutExchange() { return new FanoutExchange('fanoutExchange'); } //声明Binding,exchange与queue的绑定关系 @Bean public Binding bindQ1() { return BindingBuilder.bind(fanoutQ1()).to(setFanoutExchange()); } @Bean public Binding bindQ2() { return BindingBuilder.bind(fanoutQ2()).to(setFanoutExchange()); } }
生产者
// pub/sub 发布订阅模式 交换机类型 fanout @ApiOperation(value='fanout发送接口',notes='发送到fanoutExchange。消息将往该exchange下的所有queue转发') @GetMapping(value='/fanoutSend') public Object fanoutSend(String message) throws AmqpException, UnsupportedEncodingException { MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN); //fanout模式只往exchange里发送消息。分发到exchange下的所有queue rabbitTemplate.send('fanoutExchange', '', new Message(message.getBytes('UTF-8'),messageProperties)); return 'message sended : '+message; }
消费者
//pub/sub模式进行消息监听 @RabbitListener(queues='fanout.keliu') public void fanoutReceiveq1(String message) { System.out.println('发布订阅模式1received message : ' +message); } @RabbitListener(queues='fanout.huozai') public void fanoutReceiveq2(String message) { System.out.println('发布订阅模式2 received message : ' +message); }
代码运行结果
路由模式就是可以依据routingKey选择性的投递消息
方式为:生产者->交换机->routingKey->队列->消费者,可以理解为生产者发送的消息先到交换机,交换机依据routingKey找到匹配的队列进行发送消息,然后消费者依据交换机及routingKey进行消费。个人理解其实相当于引入二级分类(一级为交换机、二级为routingKey),目的是为了更加精准或者让消息颗粒度更细的投递。
应用场景::AI智能系统通过AI摄像头实时监控景区的客流人数,采集现场图片进行分析是否有火灾,从而产生告警事件(客流事件、火灾事件),第三方管控系统订阅AI系统的客流告警事件,而物联系统对于客流告警事件及或者告警事件都需要订阅。如图所示
代码实例
配置
/* 路由模式|Routing模式 交换机类型:direct */ @Configuration public class DirectConfig { //声明队列 @Bean public Queue directQ1() { return new Queue('direct.keliu'); } @Bean public Queue directQ2() { return new Queue('direct.huozai'); } //声明exchange @Bean public DirectExchange setDirectExchange() { return new DirectExchange('directExchange'); } //声明binding,需要声明一个routingKey @Bean public Binding bindDirectBind1() { return BindingBuilder.bind(directQ1()).to(setDirectExchange()).with('guankong'); } @Bean public Binding bindDirectBind2() { return BindingBuilder.bind(directQ1()).to(setDirectExchange()).with('wulian'); } @Bean public Binding bindDirectBind3() { return BindingBuilder.bind(directQ2()).to(setDirectExchange()).with('wulian'); } }
生产者
//routing路由工作模式 交换机类型 direct @ApiOperation(value='direct发送接口',notes='发送到directExchange。exchange转发消息时,会往routingKey匹配的queue发送') @GetMapping(value='/directSend') public Object routingSend(String routingKey,String message) throws AmqpException, UnsupportedEncodingException { if(null == routingKey) { routingKey='guankong'; } MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN); //direct模式只往exchange里发送消息。分发到exchange下所有绑定routingKey的queue rabbitTemplate.send('directExchange', routingKey, new Message(message.getBytes('UTF-8'),messageProperties)); return 'message sended : routingKey >'+routingKey+';message > '+message; }
消费者
//Routing路由模式 @RabbitListener(queues='direct.keliu') public void routingReceiveq1(String message) { System.out.println('Routing路由模式客流事件 received message : ' +message); } @RabbitListener(queues='direct.huozai') public void routingReceiveq2(String message) { System.out.println('Routing路由模式火灾事件 received message : ' +message); }
运行结果
http://localhost:9001/directSend?message=%E6%B5%8B%E8%AF%95routing%E8%B7%AF%E7%94%B1%E5%B7%A5%E4%BD%9C%E6%A8%A1%E5%BC%8F&routingKey=wulian
主题模式与路由模式类似,也是将消息路由到Routingkey与BindingKey匹配的队列中,但它不是完全匹配,而是模糊匹配。说白了就是路由模式是设置特定的routingKey绑定唯一的队列,而主题模式的是使用通配符匹配一个或者多个队列。
生产者->交换机->routingKey(模糊匹配)->队列->消费者
通配符有两种,*和#,
代码实例
配置
/* Topics模式 交换机类型 topic * */ @Configuration public class TopicConfig { //声明队列 @Bean public Queue topicQ1() { return new Queue('topic.keliu'); } @Bean public Queue topicQ2() { return new Queue('topic.huozai'); } //声明exchange @Bean public TopicExchange setTopicExchange() { return new TopicExchange('topicExchange'); } //声明binding,需要声明一个roytingKey @Bean public Binding bindTopicHebei1() { return BindingBuilder.bind(topicQ1()).to(setTopicExchange()).with('#.guankong.#'); } @Bean public Binding bindTopicHebei2() { return BindingBuilder.bind(topicQ2()).to(setTopicExchange()).with('#.wulian.#'); } }
生产者
//topic 工作模式 交换机类型 topic @ApiOperation(value='topic发送接口',notes='发送到topicExchange。exchange转发消息时,会往routingKey匹配的queue发送,*代表一个单词,#代表0个或多个单词。') @GetMapping(value='/topicSend') public Object topicSend(String routingKey,String message) throws AmqpException, UnsupportedEncodingException { if(null == routingKey) { routingKey='guankong.wulian'; } MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN); //fanout模式只往exchange里发送消息。分发到exchange下的所有queue rabbitTemplate.send('topicExchange', routingKey, new Message(message.getBytes('UTF-8'),messageProperties)); return 'message sended : routingKey >'+routingKey+';message > '+message; }
消费者
//topic 模式 @RabbitListener(queues='topic.keliu') public void topicReceiveq1(String message) { System.out.println('Topic模式 客流事件 received message : ' +message); } @RabbitListener(queues='topic.huozai') public void topicReceiveq2(String message) { System.out.println('Topic模式 火灾事件 received message : ' +message); }
运行结果
http://localhost:9001/topicSend?message=%E6%B5%8B%E8%AF%95%E4%B8%BB%E9%A2%98%E6%A8%A1%E5%BC%8F&routingKey=wulian.guankong
PRC工作模式: 通过消息队列实现PRC功能, 客户端发送消息到消费队列, 服务端进行消费消息执行程序将结果再发送到回调队列, 供客户端使用. 是一种双向生产消费模式。
1、 客户端既是生产者又是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列。
2、 服务端监听PRC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果。
3、服务端将RPC返回的结果发送到RPC响应队列。
代码实例
配置
@Configuration public class RpcConfig { @Bean public Queue rpcQueue() { return new Queue('rpcQueue'); } }
生产者
@GetMapping('/rpcSend')
public void rpcSend() {
Object receive = rabbitTemplate.convertSendAndReceive('rpcQueue','我是客户端 rpc send message');
System.out.println('【发送端接收消息】' + receive);
}
消费者
@RabbitListener(queues='rpcQueue')
public String rpcListener(String message) {
System.out.println('【rpc接收消息】' + message);
return '我是消费端 rpc 返回' + message;
}
运行结果
代码地址https://gitee.com/zhang-le03100110/springboot-rabbitmq.git
联系客服