打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
ActiveMQ学习系列(四)----消息持久化到mysql

前记:目前学习还比较杂乱,还未找到系统化地学习ActiveMq的方法。在网上看到消息持久化的demo,了解了一下,在此记录。

 

一、目前ActiveMq支持的持久化方法

url:http://activemq.apache.org/persistence.html

1、Replicated LevelDB Store 

2、LevelDB Store

3、KahaDB

4、JDBC 配合其自带的 high performance journal;根据官方说法,它内置的高性能journal的工作类似于在缓存层工作,消息会优先写入到journal,后台的定时任务会每隔一段时间间隔去

查看需要写入到jdbc的消息。

二、配置activemq.xml

1、修改persistenceAdapter

        <persistenceAdapter>            <!-- <kahaDB directory="${activemq.data}/kahadb"/>-->            <jdbcPersistenceAdapter dataSource="#my-ds"/>        </persistenceAdapter>

上面我们注释了默认的kahadb,添加了jdbc数据源。

2、增加数据源

<bean id="my-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">                <property name="driverClassName" value="com.mysql.jdbc.Driver" />                <property name="url" value="jdbc:mysql://192.168.2.140:3306/activemq?characterEncoding=utf-8" />                <property name="username" value="root" />                <property name="password" value="123456" />                <property name="initialSize" value="5" />                <property name="maxTotal" value="100" />                <property name="maxIdle" value="30" />                <property name="maxWaitMillis" value="10000" />                <property name="minIdle" value="1" /></bean>

这边有几个注意点:

2.1:上面配置的数据源类对应于dbcp2,如果1.x系列的jar包会报错,classNotFound异常。

2.2:在主安装目录的lib目录下,将mysql的jar包、dbcp的jar包放到该路径下

/usr/local/apache-activemq-5.14.4/lib

2.3:建议用bin/activemq console方式启动,可以及时查看错误信息。我在启动时,报以下错误:

 WARN | Failure Details: Table 'xckk_star_act.ACTIVEMQ_ACKS' doesn't existcom.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Table 'xckk_star_act.ACTIVEMQ_ACKS' doesn't exist        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)[:1.8.0_121]        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)[:1.8.0_121]        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)[:1.8.0_121]        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)[:1.8.0_121]        at com.mysql.jdbc.Util.handleNewInstance(Util.java:404)[mysql-connector-java-5.1.38.jar:5.1.38]        at com.mysql.jdbc.Util.getInstance(Util.java:387)[mysql-connector-java-5.1.38.jar:5.1.38]        at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:939)[mysql-connector-java-5.1.38.jar:5.1.38]        at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3878)[mysql-connector-java-5.1.38.jar:5.1.38]        at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3814)[mysql-connector-java-5.1.38.jar:5.1.38]        at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2478)[mysql-connector-java-5.1.38.jar:5.1.38]        at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2625)[mysql-connector-java-5.1.38.jar:5.1.38]        at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2551)[mysql-connector-java-5.1.38.jar:5.1.38]        at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1861)[mysql-connector-java-5.1.38.jar:5.1.38]        at com.mysql.jdbc.PreparedStatement.executeUpdateInternal(PreparedStatement.java:2073)[mysql-connector-java-5.1.38.jar:5.1.38]        at com.mysql.jdbc.PreparedStatement.executeUpdateInternal(PreparedStatement.java:2009)[mysql-connector-java-5.1.38.jar:5.1.38]        at com.mysql.jdbc.PreparedStatement.executeLargeUpdate(PreparedStatement.java:5094)[mysql-connector-java-5.1.38.jar:5.1.38]        at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement.java:1994)[mysql-connector-java-5.1.38.jar:5.1.38]        at org.apache.commons.dbcp2.DelegatingPreparedStatement.executeUpdate(DelegatingPreparedStatement.java:98)[commons-dbcp2-2.1.1.jar:2.1.1]        at org.apache.commons.dbcp2.DelegatingPreparedStatement.executeUpdate(DelegatingPreparedStatement.java:98)[commons-dbcp2-2.1.1.jar:2.1.1]        at org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter.doDeleteOldMessages(DefaultJDBCAdapter.java:832)[activemq-jdbc-store-5.14.4.jar:5.14.4]        at org.apache.activemq.store.jdbc.JDBCPersistenceAdapter.cleanup(JDBCPersistenceAdapter.java:349)[activemq-jdbc-store-5.14.4.jar:5.14.4]        at org.apache.activemq.store.jdbc.JDBCPersistenceAdapter$3.run(JDBCPersistenceAdapter.java:327)[activemq-jdbc-store-5.14.4.jar:5.14.4]        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)[:1.8.0_121]        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)[:1.8.0_121]        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)[:1.8.0_121]        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)[:1.8.0_121]        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)[:1.8.0_121]        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)[:1.8.0_121]        at java.lang.Thread.run(Thread.java:745)[:1.8.0_121]

在网上查询结果,原来是数据库需要字符集设置为latin1.

 

于是,新建了一个数据库,字符集设为Latin1.然后启动ActiveMq,查看数据库,发现多了三张表:

在这边将结构导出来,供手动建表:

/*SQLyog v10.2 MySQL - 5.1.71 : Database - activemq**********************************************************************//*!40101 SET NAMES utf8 */;/*!40101 SET SQL_MODE=''*/;/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;CREATE DATABASE /*!32312 IF NOT EXISTS*/`activemq` /*!40100 DEFAULT CHARACTER SET latin1 COLLATE latin1_bin */;USE `activemq`;/*Table structure for table `ACTIVEMQ_ACKS` */DROP TABLE IF EXISTS `ACTIVEMQ_ACKS`;CREATE TABLE `ACTIVEMQ_ACKS` (  `CONTAINER` varchar(250) COLLATE latin1_bin NOT NULL,  `SUB_DEST` varchar(250) COLLATE latin1_bin DEFAULT NULL,  `CLIENT_ID` varchar(250) COLLATE latin1_bin NOT NULL,  `SUB_NAME` varchar(250) COLLATE latin1_bin NOT NULL,  `SELECTOR` varchar(250) COLLATE latin1_bin DEFAULT NULL,  `LAST_ACKED_ID` bigint(20) DEFAULT NULL,  `PRIORITY` bigint(20) NOT NULL DEFAULT '5',  `XID` varchar(250) COLLATE latin1_bin DEFAULT NULL,  PRIMARY KEY (`CONTAINER`,`CLIENT_ID`,`SUB_NAME`,`PRIORITY`),  KEY `ACTIVEMQ_ACKS_XIDX` (`XID`)) ENGINE=MyISAM DEFAULT CHARSET=latin1 COLLATE=latin1_bin;/*Table structure for table `ACTIVEMQ_LOCK` */DROP TABLE IF EXISTS `ACTIVEMQ_LOCK`;CREATE TABLE `ACTIVEMQ_LOCK` (  `ID` bigint(20) NOT NULL,  `TIME` bigint(20) DEFAULT NULL,  `BROKER_NAME` varchar(250) COLLATE latin1_bin DEFAULT NULL,  PRIMARY KEY (`ID`)) ENGINE=MyISAM DEFAULT CHARSET=latin1 COLLATE=latin1_bin;/*Table structure for table `ACTIVEMQ_MSGS` */DROP TABLE IF EXISTS `ACTIVEMQ_MSGS`;CREATE TABLE `ACTIVEMQ_MSGS` (  `ID` bigint(20) NOT NULL,  `CONTAINER` varchar(250) COLLATE latin1_bin DEFAULT NULL,  `MSGID_PROD` varchar(250) COLLATE latin1_bin DEFAULT NULL,  `MSGID_SEQ` bigint(20) DEFAULT NULL,  `EXPIRATION` bigint(20) DEFAULT NULL,  `MSG` longblob,  `PRIORITY` bigint(20) DEFAULT NULL,  `XID` varchar(250) COLLATE latin1_bin DEFAULT NULL,  PRIMARY KEY (`ID`),  KEY `ACTIVEMQ_MSGS_MIDX` (`MSGID_PROD`,`MSGID_SEQ`),  KEY `ACTIVEMQ_MSGS_CIDX` (`CONTAINER`),  KEY `ACTIVEMQ_MSGS_EIDX` (`EXPIRATION`),  KEY `ACTIVEMQ_MSGS_PIDX` (`PRIORITY`),  KEY `ACTIVEMQ_MSGS_XIDX` (`XID`)) ENGINE=MyISAM DEFAULT CHARSET=latin1 COLLATE=latin1_bin;/*!40101 SET SQL_MODE=@OLD_SQL_MODE */;/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;/*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;

 

三、测试消息持久化

package com.ckl.activemq;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class HelloActiveMQ {    public static void main(String[] args) throws Exception {        HelloWorldProducer producer = new HelloWorldProducer();        HelloWorldConsumer consumer = new HelloWorldConsumer();        Thread threadProducer = new Thread(producer);        threadProducer.start();      //注释掉消费者,不然的话,马上消费者马上把消息消费了,来不及持久化//        Thread threadConsumer = new Thread(consumer);//        threadConsumer.start();    }    public static class HelloWorldProducer implements Runnable {        public void run() {            try {                // Create a ConnectionFactory                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.2.140:61616");                // Create a Connection                Connection connection = connectionFactory.createConnection();                connection.start();                // Create a Session                Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);                // Create the destination (Topic or Queue)                Destination destination = session.createQueue("DemoQueue");                // Create a MessageProducer from the Session to the Topic or Queue                MessageProducer producer = session.createProducer(destination);//                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);                producer.setDeliveryMode(DeliveryMode.PERSISTENT);//消息需要持久化                // Create a messages                String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();                TextMessage message = session.createTextMessage(text);                // Tell the producer to send the message                System.out.println("Sent message: " + message.hashCode() + " : " + Thread.currentThread().getName());                producer.send(message);                // Clean up                session.close();                connection.close();            } catch (Exception e) {                System.out.println("Caught: " + e);                e.printStackTrace();            }        }    }    public static class HelloWorldConsumer implements Runnable, ExceptionListener {        public void run() {            try {                // Create a ConnectionFactory                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(                        "tcp://192.168.2.140:61616");                // Create a Connection                Connection connection = connectionFactory.createConnection();                connection.start();                connection.setExceptionListener(this);                // Create a Session                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);                // Create the destination (Topic or Queue)                Destination destination = session.createQueue("DemoQueue");                // Create a MessageConsumer from the Session to the Topic or                // Queue                MessageConsumer consumer = session.createConsumer(destination);                // Wait for a message                Message message = consumer.receive(1000);                if (message instanceof TextMessage) {                    TextMessage textMessage = (TextMessage) message;                    String text = textMessage.getText();                    System.out.println("Received: " + text);                } else {                    System.out.println("Received: " + message);                }                consumer.close();                session.close();                connection.close();            } catch (Exception e) {                System.out.println("Caught: " + e);                e.printStackTrace();            }        }        public synchronized void onException(JMSException ex) {            System.out.println("JMS Exception occured.  Shutting down client.");        }    }}

运行后,效果如下:

 四、单独启动消费者,查看数据库

注释掉上面的生产者,单独启动消费者后,发现数据库中的消息已被消费。

 五、备注

Although the JDBC Store does not offer the best performance, it makes fairly simple to create a simple Master-Slave robust broker setup. 
When a group of Active MQ brokers is configured to use a shared database, they’ll all try to connect and grab a lock in the lock table,
but only one will succeed and become the master. The remaining brokers will be slaves, and will be in a wait state, not accepting client
connections until the master fails.
The lock table, is ACTIVEMQ_LOCK , and it is used to ensure that only one Active MQ broker instance can access the database at one time.
If an Active MQ broker can’t grab the database lock, that broker won’t initialize fully, and will wait until the lock becomes free, or it’s shut down.

看到一段资料补充下,涉及到ActiveMq配置为主从模式时,都会去尝试连接该数据源,为了防止资源争用出现的问题,加了ACTIVEMQ_LOCK表。

 

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
[java]log4j写sql server数据库日志的统一写法
Hibernate框架(一)
在应用程序中配Proxool连接池
Hive总结
java开发之如何连接数据库MySQL
mysql配置zerodatetimebehavior=converttonull什么意思
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服