#情景引入
小白:起床起床起床起床。。。。快起床~
我:怎么了又,大惊小怪,吓到我了。
小白:我有事有事想找你,十万火急呢~~
我:你能有什么事?反正我不信。。那你说说看~~
小白:就是我有两个小表弟,叫大白和二白,他们现在每天睡觉之前都要分别和我聊天,让我给他们讲故事,如果不讲他们就不睡觉。但是,如果一个个的跟他们轮流来说的话,我就需要每天说两遍,而且我还要找准他们的时间点,这个有时候我有事情都无法实现这个问题,他们就会很生气。。。
我:这不是挺好的嘛,小孩子就是爱听故事的呀。。。
小白:我也愿意讲,但是时间这个不是很好控制,有没有类似,比如我可以之前就描述好了,然后定点给他们两个一起发消息,而可以抛开时间和其他因素的影响呢?
我:这个嘛,很简单呀,你可以让他们关注你的一个公众号,这样你再定时的推送给他们故事不就可以了嘛。。或者,你可以拉他们进你的一个群这样,就方便了呀~
小白:这样是可以,但是如果以后还有小表妹要听我讲,我就要如此反复的做。。感谢好麻烦好麻烦。。。
我:emmm,我理解你的意思,你就想实现一种很多人都能够进行类似一种消息推送的方式嘛。。。
小白:对的对的。。就是这样一种,,,我记得我们在技术方面好像也有一种类似的技术,这个叫做什么去了呢?
我:这就是消息中间件,一种生产者和消费者的关系。
小白:我也想学我也想学,,你快给我讲讲,给我讲讲呗。。
我:真拿你没办法,好吧。。。下面我就给你讲一下这方面的知识。
#情景分析
其实,小白的这个问题,是一种比较普遍的问题。既然我们作为技术人员,当然我们就要从技术成分去分析如何解决了。这里面其实就是包含着一种消息中间件的技术。它也是最近技术层面用得非常非常多的,这也是非常值得我们进行学习。。这在如今的秒杀系统,推荐系统等等,都有广泛的应用。。所以,这章我就主要来跟大家说说这方面的知识。
#基本概念的引导
本模块主要讲解关于消息中间件的相关基础知识,也是方便我们后面的学习。
###什么是中间件?
非操作系统软件,非业务应用软件,不是直接给最终用户使用,不能直接给用户带来价值的软件,我们就可以称为中间件(比如Dubbo,Tomcat,Jetty,Jboss都是属于的)。
###什么是消息中间件?
百度百科解释:消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。
关键点:关注于数据的发送和接受,利用高效可靠的异步消息机制传递机制集成分布式系统。
先简单的用下面这个图说明:
提供者:实现JMS的消息服务中间件服务器。
客户端:发送或接受消息的应用。
生产者/发布者:创建并发送消息的客户端。
消费者/订阅者:接受并处理消息的客户端。
消息:应用程序之间传递的数据。
消息模式:在客户端之间传递消息的模式,JMS主要是队列模式和主体模式。
队列模式特点:
(1)客户端包括生产者和消费者。
(2)队列中的一个消息只能被一个消费者使用。
(3)消费者可以随时取消息。
主体模式特点:
(1)客户端包括发布者和订阅者。
(2)主题中的消息可以被所有订阅者消费。
(3)消费者不能消费订阅之前发送的消息。
###什么是AMQP?
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。
简单点说:就是对于消息中间件所接受的消息传输层的协议(不懂传输层,那么就需要多看看计算机网络相关知识了,OSI的层次划分),只有这样才能保证客户端和消息中间件能够进行交互(换位思考:HTTP和HTTPS甚至说是TCP/IP与UDP协议都要的道理)。
emmm,比较一下JMS和AMQP的不同吧。。
JMS是定义与Java,而AMQP是一种传输层协议。
JMS是属于Java的API,而AMQP是跨语言的。
JMS消息类型只有两种(主题和队列,后续会说),而AMQP是有五种。
JMS主要就是针对Java的开发的Client,而AMQP是面向消息,队列,路由。
###什么是ActiveMQ呢?
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
简单点说:不就是为了实现我上述所想要的需求嘛。然后它就是一种实现的方式。就比如,Tomcat是什么?不就是为了实现一种client与服务器之间的交互的一种产品嘛。。所以,不需要死记概念,自己理解就好。
#ActiveMQ的安装
##环境:Windows
步骤:
(1)登录到ActiveMQ的官网,下载安装包。http://activemq.apache.org/activemq-5154-release.html
(2)下载Zip文件
wget https://mirrors.tuna.tsinghua.edu.cn/apache//activemq/5.15.4/apache-activemq-5.15.4-bin.tar.gz
(2)然后解压下载的文件
(3)同样进入相对应的目录,运行
./activemq start
(4)然后再访问相同的地址就可以看到啦。(具体看windows安装步骤)
#ActiveMQ的使用(基于Maven)
首先要再回头看看JMS中的一些关键接口。
<!--添加activemq的依赖-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.9.0</version>
</dependency>
###情形一:队列模型的消息
3. 编写生产者代码(使用队列模型的消息)
package com.hnu.scw.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @ Author :scw
* @ Date :Created in 上午 11:06 2018/7/14 0014
* @ Description:用于消息的创建类
* @ Modified By:
* @Version: $version$
*/
public class MessageProducer {
//定义ActivMQ的连接地址
private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
//定义发送消息的队列名称
private static final String QUEUE_NAME = "MyMessage";
public static void main(String[] args) throws JMSException {
//创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//创建连接
Connection connection = activeMQConnectionFactory.createConnection();
//打开连接
connection.start();
//创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建队列目标
Destination destination = session.createQueue(QUEUE_NAME);
//创建一个生产者
javax.jms.MessageProducer producer = session.createProducer(destination);
//创建模拟100个消息
for (int i = 1 ; i <= 100 ; i++){
TextMessage message = session.createTextMessage("我发送message:" + i);
//发送消息
producer.send(message);
//在本地打印消息
System.out.println("我现在发的消息是:" + message.getText());
}
//关闭连接
connection.close();
}
}
package com.hnu.scw.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @ Author :scw
* @ Date :Created in 上午 11:30 2018/7/14 0014
* @ Description:消息消费者
* @ Modified By:
* @Version: $version$
*/
public class MessageConsumer {
//定义ActivMQ的连接地址
private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
//定义发送消息的队列名称
private static final String QUEUE_NAME = "MyMessage";
public static void main(String[] args) throws JMSException {
//创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//创建连接
Connection connection = activeMQConnectionFactory.createConnection();
//打开连接
connection.start();
//创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建队列目标
Destination destination = session.createQueue(QUEUE_NAME);
//创建消费者
javax.jms.MessageConsumer consumer = session.createConsumer(destination);
//创建消费的监听
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("获取消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
查看是否进行了消费
其实,这就是解释了,我之前说的,队列模式的消息,是只会被一个消费者所使用的,而不会被共享,这也就是和主题模型的差别哦~~~哈哈
###情形二:主题模型的消息
前面的步骤都一样,只是生产者和消费者的代码有点区别:
编写生产者(这个和队列模型其实很像,稍微修改就可以)
package com.hnu.scw.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @ Author :scw
* @ Date :Created in 上午 11:48 2018/7/14 0014
* @ Description:${description}
* @ Modified By:
* @Version: $version$
*/
public class MessageTopicProducer {
//定义ActivMQ的连接地址
private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
//定义发送消息的主题名称
private static final String TOPIC_NAME = "MyTopicMessage";
public static void main(String[] args) throws JMSException {
//创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//创建连接
Connection connection = activeMQConnectionFactory.createConnection();
//打开连接
connection.start();
//创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建队列目标
Destination destination = session.createTopic(TOPIC_NAME);
//创建一个生产者
javax.jms.MessageProducer producer = session.createProducer(destination);
//创建模拟100个消息
for (int i = 1; i <= 100; i++) {
TextMessage message = session.createTextMessage("当前message是(主题模型):" + i);
//发送消息
producer.send(message);
//在本地打印消息
System.out.println("我现在发的消息是:" + message.getText());
}
//关闭连接
connection.close();
}
}
package com.hnu.scw.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @ Author :scw
* @ Date :Created in 上午 11:50 2018/7/14 0014
* @ Description:${description}
* @ Modified By:
* @Version: $version$
*/
public class MessageTopicConsumer {
//定义ActivMQ的连接地址
private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
//定义发送消息的队列名称
private static final String TOPIC_NAME = "MyTopicMessage";
public static void main(String[] args) throws JMSException {
//创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//创建连接
Connection connection = activeMQConnectionFactory.createConnection();
//打开连接
connection.start();
//创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建队列目标
Destination destination = session.createTopic(TOPIC_NAME);
//创建消费者
javax.jms.MessageConsumer consumer = session.createConsumer(destination);
//创建消费的监听
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("获取消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.hnu.scw</groupId>
<artifactId>activemq</artifactId>
<version>1.0-SNAPSHOT</version>
<name>activemq</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<spring.version>4.2.5.RELEASE</spring.version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!--添加activemq的依赖-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.9.0</version>
</dependency>
<!--spring整合activemq所需要的依赖-->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
<exclusions>
<exclusion>
<artifactId>spring-context</artifactId>
<groupId>org.springframework</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.0.0</version>
</plugin>
<!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.20.1</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd ">
<context:annotation-config />
<!--Activemq的连接工厂-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:61616" />
</bean>
<!--spring jms为我们提供的连接池 获取一个连接工厂-->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="targetConnectionFactory" />
</bean>
<!-- 消息目的地 点对点的模式-->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="SpringActiveMQMsg"/>
</bean>
<!-- jms模板 用于进行消息发送-->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
</beans>
package com.hnu.scw.spring;
/**
* @ Author :scw
* @ Date :Created in 下午 12:19 2018/7/14 0014
* @ Description:生产者的接口
* @ Modified By:
* @Version: $version$
*/
public interface ProduceService {
void sendMessage(String msg);
}
package com.hnu.scw.spring;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import javax.annotation.Resource;
import javax.jms.*;
/**
* @ Author :scw
* @ Date :Created in 下午 2:21 2018/7/15 0015
* @ Description:生产者的实现类
* @ Modified By:
* @Version: $version$
*/
public class ProduceServiceImpl implements ProduceService {
@Autowired
private JmsTemplate jmsTemplate;
@Resource(name = "queueDestination")
private Destination destination;
/**
* 发送消息
* @param msg
*/
@Override
public void sendMessage(final String msg) {
jmsTemplate.send(destination , new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage textMessage = session.createTextMessage(msg);
return textMessage;
}
});
System.out.println("现在发送的消息为: " + msg);
}
}
<!--注入我们的生产者-->
<bean class="com.hnu.scw.spring.ProduceServiceImpl"/>
package com.hnu.scw.spring;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
* @ Author :scw
* @ Date :Created in 下午 2:27 2018/7/15 0015
* @ Description:生产者的测试
* @ Modified By:
* @Version: $version$
*/
public class ProducerTest {
public static void main(String[] args){
ClassPathXmlApplicationContext classPathXmlApplicationContext = new ClassPathXmlApplicationContext("producer.xml");
ProduceService bean = classPathXmlApplicationContext.getBean(ProduceService.class);
//进行发送消息
for (int i = 0; i < 100 ; i++) {
bean.sendMessage("test" + i);
}
//当消息发送完后,关闭容器
classPathXmlApplicationContext.close();
}
}
<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd ">
<context:annotation-config />
<!--Activemq的连接工厂-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:61616" />
</bean>
<!--spring jms为我们提供的连接池 获取一个连接工厂-->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="targetConnectionFactory" />
</bean>
<!-- 消息目的地 点对点的模式-->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="SpringActiveMQMsg"/>
</bean>
<!-- 配置消息监听器-->
<bean id="consumerMessageListener" class="com.hnu.scw.spring.ComsumerMessageListener"/>
<!--配置消息容器-->
<bean id ="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<!--配置连接工厂-->
<property name="connectionFactory" ref="connectionFactory"/>
<!--配置监听的队列-->
<property name="destination" ref="queueDestination"/>
<!--配置消息监听器-->
<property name="messageListener" ref="consumerMessageListener"/>
</bean>
</beans>
package com.hnu.scw.spring;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
* @ Author :scw
* @ Date :Created in 下午 3:06 2018/7/15 0015
* @ Description:消息的监听者,用于处理消息
* @ Modified By:
* @Version: $version$
*/
public class ComsumerMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接受到消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
package com.hnu.scw.spring;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
* @ Author :scw
* @ Date :Created in 下午 3:13 2018/7/15 0015
* @ Description:消费者的测试
* @ Modified By:
* @Version: $version$
*/
public class ConsumerTest {
public static void main(String[] args){
//启动消费者
ClassPathXmlApplicationContext classPathXmlApplicationContext = new ClassPathXmlApplicationContext("consumer.xml");
}
}
<!-- 消息目的地 (主题模式)-->
<!--<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQTopic">
<!–配置队列模型的消息名称–>
<constructor-arg value="SpringActiveMQMsgTopic"/>
</bean>-->
将上面的代码替换之前的就可以了。。。
总结:总的来说,基于Spring来使用消息队列还是非常方便的,这比我们正常进行JMS规范操作要简单很多,毕竟很多对象都是通过Spring的IOC进行容器管理了,所以,值得推荐使用哦~~~
#ActiveMQ的集群
###为什么要进行集群呢?
原因一:实现高可用:以排除单点故障所引起的服务终端。
原因二:实现负载均衡:以提升效率为更多的客户进行服务。
###集群的方式有哪些?
方式一:客户端集群:多个客户端消费同一个队列。
方式二:Broker clusters:多个Broker之间同步消息。(实现负载均衡)
<networkConnectors>
<networkConnector name="local_network" uri ="static:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)" />
</networkConnectors>
<!--修改服务端口-->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<networkConnectors>
<networkConnector name="networktoA" uri="static:(tcp://127.0.0.1:61616)" />
</networkConnectors>
<!--并修改下面这个标签的内容 , 作为B和C的共享文件,目录就是自己之前创建的一个文件(可以回看上面的整个结构)-->
<persistenceAdapter>
<kahaDB directory="D:\Download\MQJiQun\shareDB"/>
</persistenceAdapter>
(2)修改jetty.xml内容,修改服务器的服务端口
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8162"/>
</bean>
<!--修改服务端口-->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61618?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<networkConnectors>
<networkConnector name="networktoA" uri="static:(tcp://127.0.0.1:61616)" />
</networkConnectors>
<!--并修改下面这个标签的内容 , 作为B和C的共享文件,目录就是自己之前创建的一个文件(可以回看上面的整个结构)-->
<persistenceAdapter>
<kahaDB directory="D:\Download\MQJiQun\shareDB"/>
</persistenceAdapter>
(2)修改jetty.xml中的内容
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8163"/>
</bean>
步骤:
(1)创建Maven项目
(2)导入依赖
<!--添加activemq的依赖-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.9.0</version>
</dependency>
(3)编写生产者代码
package com.hnu.scw.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @ Author :scw
* @ Date :Created in 上午 11:06 2018/7/14 0014
* @ Description:用于消息的创建类
* @ Modified By:
* @Version: $version$
*/
public class MessageProducer {
//通过集群的方式进行消息服务器的管理(failover就是进行动态转移,当某个服务器宕机,
// 那么就进行其他的服务器选择,randomize表示随机选择)
private static final String ACTIVEMQ_URL = "failover:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?randomize=true";
//定义发送消息的队列名称
private static final String QUEUE_NAME = "MyMessage";
public static void main(String[] args) throws JMSException {
//创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//创建连接
Connection connection = activeMQConnectionFactory.createConnection();
//打开连接
connection.start();
//创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建队列目标
Destination destination = session.createQueue(QUEUE_NAME);
//创建一个生产者
javax.jms.MessageProducer producer = session.createProducer(destination);
//创建模拟100个消息
for (int i = 1 ; i <= 100 ; i++){
TextMessage message = session.createTextMessage("当前message是:" + i);
//发送消息
producer.send(message);
//在本地打印消息
System.out.println("我现在发的消息是:" + message.getText());
}
//关闭连接
connection.close();
}
}
(4)编写消费者代码
package com.hnu.scw.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @ Author :scw
* @ Date :Created in 上午 11:30 2018/7/14 0014
* @ Description:消息消费者
* @ Modified By:
* @Version: $version$
*/
public class MessageConsumer {
//通过集群的方式进行消息服务器的管理(failover就是进行动态转移,当某个服务器宕机,
// 那么就进行其他的服务器选择,randomize表示随机选择)
private static final String ACTIVEMQ_URL = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?randomize=true";
//定义发送消息的队列名称
private static final String QUEUE_NAME = "MyMessage";
public static void main(String[] args) throws JMSException {
//创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//创建连接
Connection connection = activeMQConnectionFactory.createConnection();
//打开连接
connection.start();
//创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建队列目标
Destination destination = session.createQueue(QUEUE_NAME);
//创建消费者
javax.jms.MessageConsumer consumer = session.createConsumer(destination);
//创建消费的监听
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("获取消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
(5)进行查看各自的服务器的消息队列的情况。
#其他的消息中间件
其实,类似ActiveMQ这样的消息中间件,用得比较多的还有就是RabbitMQ和Kafka。它们三者各自有各自的优势。大家可以百度进行了解,我就不进行多说了。后面我会同样把这两种消息中间件的使用进行详细的讲解,欢迎大家的关注哦~总的来说,只有适合的场景对应的消息中间件才能发挥最大的作用,没有一种是只有好处而没有坏处的~
#总结
联系客服