打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
消息队列RockMQ
http://baijiahao.baidu.com/s?id=1594427484993646585&wfr=spider&for=pc
https://www.jianshu.com/p/e5cfb4ba925e
1. 点对点模式和发布订阅模式:是否可以重复消费
============
最近在看消息队列框架 ,alibaba的RocketMQ单机支持1万以上的持久化队列,支持诸多特性,
目前RocketMQ在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景
比kafka还是有过之无不及,其实kafka文档很丰富
但RocketMQ网上的文章太少,找不到相关的操作教程
于是研究了下源码 做个单机操作的教程,如果你也对此有兴趣不妨共同研究
下载源码的地址 https://github.com/alibaba/RocketMQ/releases
首选通过在java项目里面Maven依赖方式引用RocketMQ Java SDK
<dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>3.2.6</version></dependency>
Downloads
11.3 MBalibaba-rocketmq-3.2.6.tar.gz
2.46 MBalibaba-rocketmq-client-java-3.2.6.tar.gz
Source code (zip)
Source code (tar.gz)
在linux 下用wget 下载源码然后解压出来
在runserver.sh里面可以配置 jvm启动的参数 JAVA_OPT_1="-server -Xms4g -Xmx4g -Xmn2g -XX:PermSize=128m -XX:MaxPermSize=320m"
可以 vi runserver.sh
分别给 mqnamesrv mqbroker play.sh 执行的权限
chmod +x  mqnamersrv
chmod +x  mqbroker
chmod +x  play.sh
下面红线框的这段 命令输入错误了,忽略不用看
通过 nohup sh mqnamesrv& 启动 RocketMq
目前没看到结束的命令,也没找到相关的介绍,
我这里用的 ps -ef|grep rocketmq  查到进程pid
然后kill pid号
或则pkill -9 java [慎用]
用jps -v 查看下java进程的参数
rocketmq启动后监听 9876端口,这里还是在看源码里面看到的,资料实在是太少了
在防火墙配置里面加上 9876端口,设置iptables对外开放
部署Broker
nohup sh mqbroker -n "127.0.0.1:9876" -c ../conf/2m-2s-async/broker-a.properties &
这里ip换成本机的就是单机实例,如果配置主从 这里可以配其他的ip
Master和Slave的配置文件参考conf目录下的配置文件
Master与Slave通过指定相同的brokerName参数来配对,Master的BrokerId必须是0,Slave的BrokerId必须是大于0的数
一个Master下面可以挂载多个Slave,同一Master下的多个Slave通过指定不同的BrokerId来区分
部署一Master一Slave,集群采用异步复制方式:
Master: nohup sh mqbroker -n "192.168.1.23:9876" -c ../conf/2m-2s-async/broker-a.properties &
Slave:   nohup sh mqbroker -n "192.168.1.23:9876" -c ../conf/2m-2s-async/broker-a-s.properties &
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package com.pgsqlmybatis.common.rocketmq;/*
***************************************************************
* 公司名称    :
* 系统名称    :信用管家专业版
* 类 名 称    :Ios渠道idfa统计,推广统计用
* 功能描述    :
* 业务描述    :
* 作 者 名    :@Author Royal
* 开发日期    :2016-05-15
* Created     :IntelliJ IDEA
***************************************************************
* 修改日期    :
* 修 改 者    :
* 修改内容    :
***************************************************************
*/
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
public class Producer {
public static void main(String[] args) {
DefaultMQProducer producer = new DefaultMQProducer("Producer");
producer.setNamesrvAddr("xxxxxxxxxx:9876");
try {
producer.start();
String pushMsg="kafka activeMq rocketMq 消息队列使用1";
Message msg = new Message("PushTopic","push","1",
pushMsg.getBytes("UTF-8"));
SendResult result = producer.send(msg);
System.out.println("id:" + result.getMsgId() +
" result:" + result.getSendStatus());
String pushMsg2="海量级消息记录单机测试2";
msg = new Message("PushTopic","push","2",pushMsg2.getBytes("UTF-8"));
result = producer.send(msg);
System.out.println("id:" + result.getMsgId() +
" result:" + result.getSendStatus());
String pushMsg3="海量级消息记录单机测试3";
msg = new Message("PullTopic","pull","1",pushMsg3.getBytes());
result = producer.send(msg);
System.out.println("id:" + result.getMsgId() +
" result:" + result.getSendStatus());
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.shutdown();
}
}
}
启动生成者
启动消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package com.pgsqlmybatis.common.rocketmq;/*
***************************************************************
* 公司名称    :
* 系统名称    :信用管家专业版
* 类 名 称    :Ios渠道idfa统计,推广统计用
* 功能描述    :
* 业务描述    :
* 作 者 名    :@Author Royal
* 开发日期    :2016-05-15
* Created     :IntelliJ IDEA
***************************************************************
* 修改日期    :
* 修 改 者    :
* 修改内容    :
***************************************************************
*/
import java.io.UnsupportedEncodingException;
import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageExt;
public class Consumer {
public static void main(String[] args){
DefaultMQPushConsumer consumer =
new DefaultMQPushConsumer("PushConsumer");
consumer.setNamesrvAddr("xxxxxxxxxxxx:9876");
try {
consumer.subscribe("PushTopic", "push");
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(
ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(
new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> list,
ConsumeConcurrentlyContext Context) {
Message msg = list.get(0);
System.out.println(msg.toString());
String recString= null;
try {
recString = new String(msg.getBody() ,"UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
System.out.println(recString);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
);
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
以上为单机实例配置
如果你遇到什么问题可以私信我,如果觉得此文对你很有帮助,点下赞推荐下额^_^
参考:http://blog.csdn.net/a19881029/article/details/34446629
http://sofar.blog.51cto.com/353572/1540874
http://blog.csdn.net/loongshawn/article/details/51086876
 RocketMq最佳实践
《RocketMQ原理简介》
分布式开放消息系统(RocketMQ)的原理与实践
《RocketMQ用户指南》
本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
分布式消息中间件
分布式开放消息系统(RocketMQ)的原理与实践
RocketMQ-顺序消费
了解消息队列中间件的基本概念——注较为规范地介绍ROCKETMQ
metaq原理介绍
后端程序员必备:RocketMQ相关流程图/原理图
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服