打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
ActiveMQ使用笔记
(一)ActiveMQ的安装
安装要求:
部署需要jdk1.5及以上,编译需要jdk1.5(java5)及以上
Java的环境变量(JAVA_HOME)必须设置,即jdk安装的目录,比如c:\Program Files\jsdk.1.6
下载ActiveMQ:http://activemq.apache.org/download.html
解压,如图:
运行bin文件夹下的activemq.bat,出现如下图所示:
验证是否运行成功:
在浏览器中输入:http://localhost:8161/admin/,出现如下图所示表示成功:
此时,ActiveMQ已经安装完成了,接下来配置登录监视控制台的用户名和密码。
打开conf文件夹下的jetty.xml,找到
1
2
3
4
5
6
7
8
9
<beanid="securityConstraint">
<property name="name"value="BASIC"/>
<property name="roles"value="admin"/>
<property name="authenticate"value="false"/>
</bean>
把authenticate属性的值改成true即可,重启activemq.bat,再登录监视控制台,就需要输入密码了,默认的用户名和密码是admin/admin。roles属性指的是登录的用户角色,这些登录的用户在jetty-realm.properties配置。
修改web的访问端口,在jetty.xml找到一下配置,修改8161即可。
1
2
3
4
5
6
7
<property name="connectors">
<list>
<beanid="Connector"class="org.eclipse.jetty.server.nio.SelectChannelConnector">
<propertyname="port"value="8161"/>
</bean>
</list>
</property>
ActiveMQ的运行日志存放在data文件夹下的activemq.log中。
Linux和Aix系统下的安装:
解压:tar zxvf activemq-x.x.x.tar.gz,进入bin文件夹,运行:./activemq start &,也可以只运行:./activemq console。
验证方式和安全性配置和windows下的配置一样。
(二)ActiveMQ消息持久化(1)
在broker中设置属性persistent=”true”(默认是true),同时发送的消息也应该是persitent类型的。ActiveMQ消息持久化有三种方式:AMQ、KahaDB、JDBC。
1、AMQ
AMQ是一种文件存储形式,它具有写入速度快和容易恢复的特点。消息存储在一个个文件中,文件的默认大小为32兆,如果一条消息的大小超过了32兆,那么这个值必须设置大点。当一个存储文件中的消息已经全部被消费,那么这个文件将被标识为可删除,在下一个清除阶段,这个文件被删除。默认配置如下:
Java
1
2
3
<persistenceAdapter>
<amqPersistenceAdapterdirectory="activemq-data"maxFileLength="32mb"/>
</persistenceAdapter>
AMQ的属性:
属性名称默认值描述
directoryactivemq-data消息文件和日志的存储目录
useNIOtrue使用NIO协议存储消息
syncOnWritefalse同步写到磁盘,这个选项对性能影响非常大
maxFileLength32mb一个消息文件的大小
persistentIndextrue消息索引的持久化,如果为false,那么索引保存在内存中
maxCheckpointMessageAddSize4kb一个事务允许的最大消息量
cleanupInterval30000清除操作周期,单位ms
indexBinSize1024索引文件缓存页面数,缺省为1024,当amq扩充或者缩减存储时,会锁定整个broker,导致一定时间的阻塞,所以这个值应该调整到比较大,但是代码中实现会动态伸缩,调整效果并不理想。
indexKeySize96索引key的大小,key是消息ID
indexPageSize16kb索引的页大小
directoryArchivearchive存储被归档的消息文件目录
archiveDataLogsfalse当为true时,归档的消息文件被移到directoryArchive,而不是直接删除
(二)ActiveMQ消息持久化(2)
2、KahaDB
KahaDB是基于文件的本地数据库储存形式,虽然没有AMQ的速度快,但是它具有强扩展性,恢复的时间比AMQ短,从5.4版本之后KahaDB做为默认的持久化方式。默认配置如下:
Java
1
2
3
<persistenceAdapter>
<kahaDBdirectory="activemq-data"journalMaxFileLength="32mb"/>
</persistenceAdapter>
KahaDB的属性:
property namedefault valueComments
directoryactivemq-data消息文件和日志的存储目录
indexWriteBatchSize1000一批索引的大小,当要更新的索引量到达这个值时,更新到消息文件中
indexCacheSize10000内存中,索引的页大小
enableIndexWriteAsyncfalse索引是否异步写到消息文件中
journalMaxFileLength32mb一个消息文件的大小
enableJournalDiskSyncstrue是否讲非事务的消息同步写入到磁盘
cleanupInterval30000清除操作周期,单位ms
checkpointInterval5000索引写入到消息文件的周期,单位ms
ignoreMissingJournalfilesfalse忽略丢失的消息文件,false,当丢失了消息文件,启动异常
checkForCorruptJournalFilesfalse检查消息文件是否损坏,true,检查发现损坏会尝试修复
checksumJournalFilesfalse产生一个checksum,以便能够检测journal文件是否损坏。
5.4版本之后有效的属性:
archiveDataLogsfalse当为true时,归档的消息文件被移到directoryArchive,而不是直接删除
directoryArchivenull存储被归档的消息文件目录
databaseLockedWaitDelay10000在使用负载时,等待获得文件锁的延迟时间,单位ms
maxAsyncJobs10000同个生产者产生等待写入的异步消息最大量
concurrentStoreAndDispatchTopicsfalse当写入消息的时候,是否转发主题消息
concurrentStoreAndDispatchQueuestrue当写入消息的时候,是否转发队列消息
5.6版本之后有效的属性:
archiveCorruptedIndexfalse是否归档错误的索引
从5.6版本之后,有可能发布通过多个kahadb持久适配器来实现分布式目标队列存储。什么时候用呢?如果有一个快速的生产者和消费者,当某一个时刻生产者发生了不规范的消费,那么有可能产生一条消息被存储在两个消息文件中,同时,有些目标队列是危险的并且要求访问磁盘。在这种情况下,你应该用通配符来使用mKahaDB。如果目标队列是分布的,事务是可以跨越多个消息文件的。
每个KahaDB的实例都可以配置单独的适配器,如果没有目标队列提交给filteredKahaDB,那么意味着对所有的队列有效。如果一个队列没有对应的适配器,那么将会抛出一个异常。配置如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<persistenceAdapter>
<mKahaDBdirectory="${activemq.base}/data/kahadb">
<filteredPersistenceAdapters>
<!--matchallqueues-->
<filteredKahaDB queue=">">
<persistenceAdapter>
<kahaDB journalMaxFileLength="32mb"/>
</persistenceAdapter>
</filteredKahaDB>
<!--matchalldestinations-->
<filteredKahaDB>
<persistenceAdapter>
<kahaDBenableJournalDiskSyncs="false"/>
</persistenceAdapter>
</filteredKahaDB>
</filteredPersistenceAdapters>
</mKahaDB>
</persistenceAdapter>
如果filteredKahaDB的perDestination属性设置为true,那么匹配的目标队列将会得到自己对应的KahaDB实例。配置如下:
1
2
3
4
5
6
7
8
9
10
11
12
<persistenceAdapter>
<mKahaDBdirectory="${activemq.base}/data/kahadb">
<filteredPersistenceAdapters>
<!--kahaDBperdestinations-->
<filteredKahaDB perDestination="true">
<persistenceAdapter>
<kahaDB journalMaxFileLength="32mb"/>
</persistenceAdapter>
</filteredKahaDB>
</filteredPersistenceAdapters>
</mKahaDB>
</persistenceAdapter>
(二)ActiveMQ消息持久化(3)
3、JDBC
配置JDBC适配器:
1
2
3
<persistenceAdapter>
<jdbcPersistenceAdapterdataSource="#mysql-ds"createTablesOnStartup="false"/>
</persistenceAdapter>
dataSource指定持久化数据库的bean,createTablesOnStartup是否在启动的时候创建数据表,默认值是true,这样每次启动都会去创建数据表了,一般是第一次启动的时候设置为true,之后改成false。
MYSQL持久化bean
1
2
3
4
5
6
7
<beanid="mysql-ds"class="org.apache.commons.dbcp.BasicDataSource"destroy-method="close">
<propertyname="driverClassName"value="com.mysql.jdbc.Driver"/>
<propertyname="url"value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
<propertyname="username"value="activemq"/>
<propertyname="password"value="activemq"/>
<propertyname="poolPreparedStatements"value="true"/>
</bean>
SQL Server持久化bean
1
2
3
4
5
6
7
<beanid="mssql-ds"class="net.sourceforge.jtds.jdbcx.JtdsDataSource"destroy-method="close">
<propertyname="serverName"value="SERVERNAME"/>
<propertyname="portNumber"value="PORTNUMBER"/>
<propertyname="databaseName"value="DATABASENAME"/>
<propertyname="user"value="USER"/>
<propertyname="password"value="PASSWORD"/>
</bean>
Oracle持久化bean
1
2
3
4
5
6
7
8
<beanid="oracle-ds"class="org.apache.commons.dbcp.BasicDataSource"destroy-method="close">
<propertyname="driverClassName"value="oracle.jdbc.driver.OracleDriver"/>
<propertyname="url"value="jdbc:oracle:thin:@10.53.132.47:1521:activemq"/>
<propertyname="username"value="activemq"/>
<propertyname="password"value="activemq"/>
<propertyname="maxActive"value="200"/>
<propertyname="poolPreparedStatements"value="true"/>
</bean>
DB2持久化bean
1
2
3
4
5
6
7
8
<beanid="db2-ds"class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<propertyname="driverClassName"value="com.ibm.db2.jcc.DB2Driver"/>
<property name="url"value="jdbc:db2://hndb02.bf.ctc.com:50002/activemq"/>
<propertyname="username"value="activemq"/>
<property name="password"value="activemq"/>
<propertyname="maxActive"value="200"/>
<property name="poolPreparedStatements"value="true"/>
</bean>
(三)ActiveMQ消息发送与接收
配置完了持久化之后,我们就可以使用代码来发送和接收ActiveMQ中的消息了,我这里配置的持久化是KahaDB。
需要导入的jar包:
一段发送消息的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
publicstaticvoidsend(){
try{
// 创建一个连接工厂
Stringurl="tcp://localhost:61616";
ActiveMQConnectionFactory connectionFactory=newActiveMQConnectionFactory(url);
// 设置用户名和密码,这个用户名和密码在conf目录下的credentials.properties文件中,也可以在activemq.xml中配置
connectionFactory.setUserName("system");
connectionFactory.setPassword("manager");
// 创建连接
Connectionconnection=connectionFactory.createConnection();
connection.start();
// 创建Session,参数解释:
// 第一个参数是否使用事务:当消息发送者向消息提供者(即消息代理)发送消息时,消息发送者等待消息代理的确认,没有回应则抛出异常,消息发送程序负责处理这个错误。
// 第二个参数消息的确认模式:
// AUTO_ACKNOWLEDGE : 指定消息提供者在每次收到消息时自动发送确认。消息只向目标发送一次,但传输过程中可能因为错误而丢失消息。
// CLIENT_ACKNOWLEDGE : 由消息接收者确认收到消息,通过调用消息的acknowledge()方法(会通知消息提供者收到了消息)
// DUPS_OK_ACKNOWLEDGE : 指定消息提供者在消息接收者没有确认发送时重新发送消息(这种确认模式不在乎接收者收到重复的消息)。
Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
// 创建目标,就创建主题也可以创建队列
Destinationdestination=session.createQueue("test");
// 创建消息生产者
MessageProducerproducer=session.createProducer(destination);
// 设置持久化,DeliveryMode.PERSISTENT和DeliveryMode.NON_PERSISTENT
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 创建消息
Stringtext="Hello ActiveMQ!";
TextMessagemessage=session.createTextMessage(text);
// 发送消息到ActiveMQ
producer.send(message);
System.out.println("Message is sent!");
// 关闭资源
session.close();
connection.close();
}
catch (Exceptione){
e.printStackTrace();
}
}
执行了上面的发送方法之后,在ActiveMQ的监视控制可以看到有一个test队列,并且有一条消息,如图:
点击队列名test,然后点击消息ID即可查看消息内容,如图:
如果DeliveryMode没有设置或者设置为NON_PERSISTENT,那么重启MQ之后消息就会丢失。
一段接收消息的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
publicstaticvoidget(){
try{
String url="tcp://localhost:61616";
ActiveMQConnectionFactoryconnectionFactory=newActiveMQConnectionFactory(url);
// 设置用户名和密码,这个用户名和密码在conf目录下的credentials.properties文件中,也可以在activemq.xml中配置
connectionFactory.setUserName("system");
connectionFactory.setPassword("manager");
// 创建连接
Connection connection=connectionFactory.createConnection();
connection.start();
// 创建Session
Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
// 创建目标,就创建主题也可以创建队列
Destinationdestination=session.createQueue("test");
// 创建消息消费者
MessageConsumerconsumer=session.createConsumer(destination);
// 接收消息,参数:接收消息的超时时间,为0的话则不超时,receive返回下一个消息,但是超时了或者消费者被关闭,返回null
Messagemessage=consumer.receive(1000);
if (messageinstanceofTextMessage){
TextMessagetextMessage=(TextMessage)message;
Stringtext=textMessage.getText();
System.out.println("Received: "+ text);
} else{
System.out.println("Received: "+ message);
}
consumer.close();
session.close();
connection.close();
} catch(Exceptione){
e.printStackTrace();
}
}
执行了上面的接收方法之后,在ActiveMQ的监视控制可以看到test队列的消息已经被消费了,如图:
这里的代码只是测试用,在正式开发中一般与Spring结合使用jmsTemplate来发送消息,现实JMS的MessageListener来监听消息。
(四)ActiveMQ配置安全性
监视ActiveMQ的方式有多种,在第一部分中已经说到了Web监视控制台,设置登录用户名和密码,这里再说一下JMX监控。运行了ActiveMQ之后,再运行jdk自带的jconsole即可以看到ActiveMQ的进程,如图:
点击连接之后就可以看到ActiveMQ的运行情况。默认情况下是不需要用户名和口令的,修改activemq.bat,找到
1
2
3
SUNJMX=-Dcom.sun.management.jmxremote.port=1099
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false
修改成
1
2
3
4
5
SUNJMX=-Dcom.sun.management.jmxremote.port=1616
-Dcom.sun.management.jmxremote.authenticate=true
-Dcom.sun.management.jmxremote.ssl=false
-Dcom.sun.management.jmxremote.password.file=%ACTIVEMQ_BASE%/conf/jmx.password
-Dcom.sun.management.jmxremote.access.file=%ACTIVEMQ_BASE%/conf/jmx.access
Linux下的找到:
1
2
3
4
5
#ACTIVEMQ_SUNJMX_START="-Dcom.sun.management.jmxremote.port=11099 "
#ACTIVEMQ_SUNJMX_START="$ACTIVEMQ_SUNJMX_START -Dcom.sun.management.jmxremote.password.file=${ACTIVEMQ_CONFIG_DIR}/jmx.password"
#ACTIVEMQ_SUNJMX_START="$ACTIVEMQ_SUNJMX_START -Dcom.sun.management.jmxremote.access.file=${ACTIVEMQ_CONFIG_DIR}/jmx.access"
#ACTIVEMQ_SUNJMX_START="$ACTIVEMQ_SUNJMX_START -Dcom.sun.management.jmxremote.ssl=false"
ACTIVEMQ_SUNJMX_START="$ACTIVEMQ_SUNJMX_START -Dcom.sun.management.jmxremote"
去掉注释即可。
重启ActiveMQ之后,在用jconsole连接就需要输入用户名和密码,jmx.access文件配置用户的访问权限readonly和readwrite,admin readwrite表示用户admin具有读写权限。Jmx.password文件配置用户的密码,admin activemq 表示admin用户的密码是activemq。
除了监视台可以设置用户名和密码之后,ActiveMQ也可以对各个主题和队列设置用户名和密码,配置如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
<plugins>
<!--Configureauthentication;Username,passwordsandgroups-->
<simpleAuthenticationPlugin>
<users>
<authenticationUser username="system"password="manager"groups="users,admins"/>
<authenticationUserusername="user"password="password"groups="users"/>
<authenticationUser username="guest"password="password"groups="guests"/>
<authenticationUserusername="testUser"password="123456"groups="testGroup"/>
</users>
</simpleAuthenticationPlugin>
<!-- Letsconfigureadestinationbasedauthorizationmechanism-->
<authorizationPlugin>
<map>
<authorizationMap>
<authorizationEntries>
<authorizationEntry queue="queue.group.uum"read="users"write="users"admin="users"/>
<authorizationEntryqueue=">"read="admins"write="admins"admin="admins"/>
<authorizationEntry queue="USERS.>"read="users"write="users"admin="users"/>
<authorizationEntryqueue="GUEST.>"read="guests"write="guests,users"admin="guests,users"/>
<authorizationEntryqueue="TEST.Q"read="guests"write="guests"/>
<authorizationEntry queue="test"read=" testGroup "write=" testGroup "/>
<authorizationEntry topic=">"read="admins"write="admins"admin="admins"/>
<authorizationEntrytopic="USERS.>"read="users"write="users"admin="users"/>
<authorizationEntry topic="GUEST.>"read="guests"write="guests,users"admin="guests,users"/>
<authorizationEntry topic="ActiveMQ.Advisory.>"read="guests,users ,testGroup"write="guests,users ,testGroup "admin="guests,users ,testGroup "/>
</authorizationEntries>
</authorizationMap>
</map>
</authorizationPlugin>
</plugins>
simpleAuthenticationPlugin中设置用户名、密码和群组,authorizationPlugin设置主题和队列的访问群组,“>”表示所有的主题或者队列。上面的配置中添加了一个testUser,属于群组testGroup,同时设置test这个队列的访问读写权限为testGroup,当然admins也可以访问的,因为admins是对所有的队列都有访问权限。将第三部分代码中的设置用户名和密码改成刚刚添加的用户testUser,如果密码不正确,将会抛出User name or password is invalid.异常,如果testUser所属的群组不能访问test队列,那么会抛出User guest is not authorized to write to: queue://test异常。需要注意的是所有的群组都需要对以ActiveMQ.Advisory为前缀的主题具有访问权限。
(五)ActiveMQ负载均衡
ActiveMQ可以实现多个mq之间进行路由,假设有两个mq,分别为brokerA和brokerB,当有一条消息发送到brokerA的队列test中,有一个客户端连接到brokerB上,并且要求获取test队列的消息时,brokerA中队列test的消息就会路由到brokerB上,反之brokerB的消息也会路由到brokerA。
静态路由配置,brokerA不需要特别的配置,brokerB需要配置networkConnectors节点,具体配置如下:
1
2
3
<networkConnectors>
<networkConnectoruri="static:(tcp://localhost:61616)"duplex="true"/>
</networkConnectors>
静态路由支持failover,如:static:failover://(tcp://host1:61616,tcp://host2:61616)。
动态路由配置,每个mq都需要配置如下:
1
2
3
4
5
6
7
<networkConnectors>
<networkConnectoruri="multicast://default"/>
</networkConnectors>
<transportConnectors>
<transportConnectorname="openwire"uri="tcp://0.0.0.0:61618"discoveryUri="multicast://default"/>
</transportConnectors>
注意:networkConnectors需要配置在persistenceAdapter之前。
重启ActiveMQ,可以看到brokerA的日志如图:
networkConnector的属性请参照:http://activemq.apache.org/networks-of-brokers.html
(六)ActiveMQ主备配置
ActiveMQ的主备有三种方式:纯Master/Slave、文件共享方式、数据库共享方式。
1、纯Master/Slave
这种方式的主备不需要对Master Broker做特殊的配置,只要在Slave Broker中指定他的Master就可以了,指定Master有两种方式,最简单的配置就是在broker节点中添加masterConnectorURI=”tcp://localhost:61616″即可,还有一种方式就是添加一个services节点,可以指定连接的用户名和密码,配置如下:
1
2
3
<services>
<masterConnectorremoteURI="tcp://localhost:61616"userName="system"password="manager"/>
</services>
纯Master/Slave只允许一个Slave连接到Master上面,也就是说只能有2台MQ做集群,同时当Master挂了之后需要停止Slave来恢复负载。
2、数据库共享方式
这种方式的主备采用数据库做消息的持久化,支持多个Slave,所有broker持久化数据源配置成同一个数据源,当一个broker获取的数据库锁之后,其他的broker都成为slave并且等待获取锁,当master挂了之后,其中的一个slave将会立刻获得数据库锁成为master,重启之前挂掉的master之后,这个master也就成了slave,不需要停止slave来恢复。由于采用的是数据库做为持久化,它的性能是有限的。
3、文件共享方式
这种方式的主备具有和数据库共享方式的负载一样的特性,不同的是broker的持久化采用的是文件(我这里用KahaDB),slave等待获取的锁是文件锁,它具有更高的性能,但是需要文件共享系统的支持。
Window下共享KahaDB持久化的目录,配置如下:
1
2
3
<persistenceAdapter>
<kahaDBdirectory="//172.16.1.202/mqdata/kahadb"/>
</persistenceAdapter>
Linux下需要开启NFS服务,具体操作如下:
创建共享目录(192.168.0.1):
1、 修改etc/exports,添加需要共享的目录:/opt/mq/data *(rw,no_root_squash)
2、 启动NFS服务 service nfs start/restart
3、 查看共享 showmount –e
4、 NFS服务自启动 chkconfig –level 35 nfs on
挂载共享目录(192.168.0.2):
1、 挂载:mount –t nfs 192.168.0.1:/opt/mq/data /opt/mq/data
2、 启动自动挂载:在etc/fstab文件添加10.175.40.244:/opt/mq/data /opt/mq/data nfs defaults 0 0
然后指定KahaDB的持久化目录为/opt/mq/data即可。
AIX系统的文件共享和Linux类似,也是启动NFS服务。
注意:如果Master服务器宕机了,Slave是不会获得文件锁而启动,直到Master服务器重启。
Window下Master上有Slave连接时如图:
客户端连接的brokerURL为failover:(tcp://localhost:61616,tcp://localhost:61617)。用第三部分的代码测试,先向Master Broker发送一个消息,然后关闭master,运行获取消息的方法,即可获取之前发送的消息。
(七)ActiveMQ性能优化
1、目标策略
在节点destinationPolicy配置策略,可以对单个或者所有的主题和队列进行设置,使用流量监控,当消息达到memoryLimit的时候,ActiveMQ会减慢消息的产生甚至阻塞,destinationPolicy的配置如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntrytopic=">"producerFlowControl="true"memoryLimit="1mb">
<pendingSubscriberPolicy>
<vmCursor/>
</pendingSubscriberPolicy>
</policyEntry>
<policyEntry queue=">"producerFlowControl="true"memoryLimit="1mb">
<!--UseVMcursorforbetterlatency
Formoreinformation,see:
http://activemq.apache.org/message-cursors.html
<pendingQueuePolicy>
<vmQueueCursor/>
</pendingQueuePolicy>
-->
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
producerFlowControl表示是否监控流量,默认为true,如果设置为false,消息就会存在磁盘中以防止内存溢出;memoryLimit表示在producerFlowControl=”true”的情况下,消息存储在内存中最大量,当消息达到这个值时,ActiveMQ会减慢消息的产生甚至阻塞。policyEntry的属性参考:http://activemq.apache.org/per-destination-policies.html
当producer发送的持久化消息到达broker之后,broker首先会把它保存在持久存储中。接下来,如果发现当前有活跃的consumer,如果这个consumer消费消息的速度能跟上producer生产消息的速度,那么ActiveMQ会直接把消息传递给broker内部跟这个consumer关联的dispatch queue;如果当前没有活跃的consumer或者consumer消费消息的速度跟不上producer生产消息的速度,那么ActiveMQ会使用Pending Message Cursors保存对消息的引用。在需要的时候,Pending Message Cursors把消息引用传递给broker内部跟这个consumer关联的dispatch queue。以下是两种Pending Message Cursors:
VM Cursor:在内存中保存消息的引用。
File Cursor:首先在内存中保存消息的引用,如果内存使用量达到上限,那么会把消息引用保存到临时文件中。
在缺省情况下,ActiveMQ 会根据使用的Message Store来决定使用何种类型的Message Cursors,但是你可以根据destination来配置Message Cursors。
对于topic,可以使用的pendingSubscriberPolicy 有vmCursor和fileCursor。可以使用的PendingDurableSubscriberMessageStoragePolicy有
vmDurableCursor 和 fileDurableSubscriberCursor;对于queue,可以使用的pendingQueuePolicy有vmQueueCursor 和 fileQueueCursor。
Message Cursors的使用参考:http://activemq.apache.org/message-cursors.html
2、存储设置
设置消息在内存、磁盘中存储的大小,配置如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsagelimit="20 mb"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="1 gb"/>
</storeUsage>
<tempUsage>
<tempUsagelimit="100 mb"/>
</tempUsage>
</systemUsage>
</systemUsage>
memoryUsage表示ActiveMQ使用的内存,这个值要大于等于destinationPolicy中设置的所有队列的内存之和。
storeUsage表示持久化存储文件的大小。
tempUsage表示非持久化消息存储的临时内存大小。
3.  优化ActiveMQ性能
3.1.  一般技术
3.1.1.  Persistent vs Non-Persistent Message
持久化和非持久化传递
1.PERSISTENT(持久性消息)
这是 ActiveMQ 的默认传送模式,此模式保证这些消息只被传送一次和成功使用一次。对于这些消息,可靠性是优先考虑的因素。可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。这意味着在持久性消息传送至目标时,消息服务将其放入持久性数据存储。如果消息服务由于某种原因导致失败,它可以恢复此消息并将此消息传送至相应的消费者。虽然这样增加了消息传送的开销,但却增加了可靠性。
2.NON_PERSISTENT(非持久性消息)
保证这些消息最多被传送一次。对于这些消息,可靠性并非主要的考虑因素。
此模式并不要求持久性的数据存储,也不保证消息服务由于某种原因导致失败后消息不会丢失。
有两种方法指定传送模式:
1.使用setDeliveryMode 方法,这样所有的消息都采用此传送模式;
2.使用send 方法为每一条消息设置传送模式;
方法一:void send(Destination destination, Message message, int deliveryMode, int priority,long timeToLive);
方法二:void send(Message message, int deliveryMode, int priority, longtimeToLive);
其中 deliveryMode 为传送模式,priority 为消息优先级,timeToLive 为消息过期
时间。
方法三:producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
JMS 规范1.1允许消息传递包括Persistent和Non-Persistent。
Non-persistent传递消息比Persistents传递消息速度更快,原因如下:
1)      Non-persistent发送消息是异步的,Producer不需要等待Consumer的receipt消息。如下图:
2)      Persisting 传递消息是需要把消息存储起来。然后在传递,这样很慢 。
3.1.2.  Transactions
事务
以下列子说明了Transaction比Non-transaction的性能高。
Transaction和Non-transaction代码如下:
3.1.3.  超快回应消息
内嵌 broker;如下图:
下面以Co-lcate (合作定位)with a broker为例。
其运行原理如下图:
Java代码如下:
创建一个queue服务:
创建一个queueRequestor:
注意:
设置发送的消息不需要copy。
3.1.4.  Tuning  the OpenWire protocol
跨语言协议
//TODO
3.1.5.  Tuning the TCP Transport
TCP协议是ActiveMQ使用最常见的协议。
有以下两点影响TCP协议性能:
1)      socketBufferSize=缓存,默认是65536。
2)      tcpNoDelay=默认是false,
示例如下:
3.2.  优化消息发送
3.2.1.  Asynchronous Send
在ActiveMQ4.0以上,所有的异步或同步对于Consumer来说是变得可配置了。
默认是在ConnectionFactory、Connection、Connection URI等方面配置对于一个基于Destination 的Consumer来说。
众所周之,如果你想传递给Slow Consumer那么你可能使用异步的消息传递,但是对于Fast Consumer你可能使用同步发送消息。(这样可以避免同步和上下文切换额外的增加Queue堵塞花费。如果对于一个Slow Consumer,你使用同步发送消息可能出现Producer堵塞等显现。
ActiveMQ默认设置dispatcheAsync=true是最好的性能设置。如果你处理的是Slow Consumer则使用dispatcheAsync=true,反之,那你使用的是Fast Consumer则使用dispatcheAsync=false。
用Connection URI来配置Async如下:
ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");用ConnectionFactory配置Async如下:
((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);用Connection配置Async如下:
((ActiveMQConnection)connection).setUseAsyncSend(true);
3.2.2.  Producer Flow Control
这种适合于慢的消费者,大量的消息暂时存储到内存中,然后慢慢的dispatche。
运行原理如图下:
Java代码如下:
Xml配置的策略如下:
Disabled Producer Flow Control运行原理:
1.3.  优化消息消费者
消息消费的内部流程结构如下:
3.3.1.  Prefetch Limit
ActiveMQ默认的prefetch大小不同的:
Queue Consumer 默认大小=1000
Queue Browser Consumer默认大小=500
Persistent Topic Consumer默认大小=100
Non-persistent Topic Consumer默认大小=32766
Prefecth policy设置如下:
设置prefetch policy在 Destinations上:
3.3.2.  Delivery and Acknowledgement of messages
传递和回执消息。
建议使用Session.DUPS_ACKNOWLEDGE。
JMS消息只有在被确认之后,才认为已经被成功地消费了。消息的成功消费通常包含三个阶段:客户接收消息、客户处理消息和消息被确认。
在事务性会话中,当一个事务被提交的时候,确认自动发生。在非事务性会话中,消息何时被确认取决于创建会话时的应答模式(acknowledgement mode)。该参数有以下三个可选值:
Session.AUTO_ACKNOWLEDGE。当客户成功的从receive方法返回的时候,或者从MessageListener.onMessage方法成功返回的时候,会话自动确认客户收到的消息。
Sessiion.TRANSACTION。用session.commit()回执确认。
Session.CLIENT_ACKNOWLEDGE。客户通过消息的acknowledge方法确认消息。需要注意的是,在这种模式中,确认是在会话层上进行:确认一个被消费的消息将自动确认所有已被会话消费的消息。例如,如果一个消息消费者消费了10个消息,然后确认第5个消息,那么所有10个消息都被确认。
Session.DUPS_ACKNOWLEDGE。该选择只是会话迟钝第确认消息的提交。当消息到达一定数量后,才开始消费该消息。如果JMS provider失败,那么可能会导致一些重复的消息。如果是重复的消息,那么JMS provider必须把消息头的JMSRedelivered字段设置为true。
优化回执:
3.3.3.  Slow Consumer Handling
慢消费者绑定策略
Slow Consumer将一起一个问题,对于非持久主题上,强迫Broker发送的消息堆积起来,使得Broker对于Producer发送慢了下来,同时Fast Consumer也慢了下来。
目前,我们有一个策略来配置在原有Consumer的预存的大小的基础上增加了一定的缓存大小。因此,这个大小最终一旦满了,则旧消息将会丢弃,新消息则会进入。这将保持了一定的缓存大小。
Pending Message Limit Strategy
等待消息限制策略
对于Slow Consumer来说,你将配置PendingMessageLimitStrategy策略来处理不同的策略。
以下有两种实现的策略:
ConstantPendingMessageLimitStrategy
Limit可以设置0、>0、-1三种方式:
0表示:不额外的增加其预存大小。
>0表示:在额外的增加其预存大小。
-1表示:不增加预存也不丢弃旧的消息。
<constantPendingMessageLimitStrategy limit="50"/>PrefetchRatePendingMessageLimitStrategy
这种策略是利用Consumer的之前的预存的大小乘以其倍数等于现在的预存大小。
<prefetchRatePendingMessageLimitStrategy multiplier="2.5"/>Configuring the Eviction Policy
配置去除策略
ActiveMQ有两种方式,默认配置方式如下:
<oldestMessageEvictionStrategy/>
方式二:
去除旧消息根据它的优先级来判断,如果你有一个较高的优先级的旧消息,则去除低优先级的消息。
<oldestMessageWithLowestPriorityEvictionStrategy/>                   1.3.4.  Destination Options
Destination Options 这种方式提供了扩展了JMS Consumer但并不是扩展了JMS  API。以URL的形式来编码的。
Consumer Options
示例如下:
queue = new ActiveMQQueue("TEST.QUEUE?consumer.dispatchAsync=false&consumer.prefetchSize=10");
consumer = session.createConsumer(queue);
(八)使用过程中出现的问题
1、消息文件越来越多,导致超出了存储空间
报错日志:Usage Manager Store is Full, 100% of 1073741824. Stopping producer (ID:db01-48754-1336034955132-0:5:1:1) to prevent flooding queue://queue.land.group. See http://activemq.apache.org/producer-flow-control.html for more info (blocking for: 1s) | org.apache.activemq.broker.region.Queue | ActiveMQ Transport: tcp:///172.24.99.41:44716
这是由于我们在配置文件中设置了storeUsage ,当存储的消息文件(log文件)超过了这值就会报这个异常,在官方网站看到说消息文件不删除是5.3版本的一个bug,在5.5版本的时候已经被解决了,但是我们使用的是5.5.1版本啊,然后在看存储下来的消息文件,文件名不是连续的,那么说明其中还是有被删除的,后来在评论中看到Jeff Genender 说的这个可能是ActiveMQ的线程调度问题,只要不使用线程调度就可以了,在broker中设置属性schedulerSupport=”false” ,这样消息文件就会自动在cleanup阶段删除了。
官方网址:https://issues.apache.org/jira/browse/AMQ-2736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
成小胖学习 ActiveMQ · 基础篇
ActiveMQ --- 高级篇
ActiveMQ的几种消息持久化机制
ActiveMQ讯息策略
ActiveMQ  延时投递
ActiveMQ in Action(5)
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服