1.需求分析
近期笔者项目需要用到mqtt实现消息推送,笔者选择emq作为mqtt服务器载体,上篇笔者讲解了如何在linux中安装mqtt服务,安装链接:https://blog.csdn.net/zhangxing52077/article/details/80567270,接下来笔者将讲解如何在springboot中集成mqtt
2.实现方案
①pom依赖
<!--mqtt-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
②yml中配置mqtt(自定义配置)
#mq配置
com:
mqtt:
host: tcp://ip:1883
clientid: mqttjs_e8022a4d0b
topic: good,test,yes
username: zhangxing
password: zxp52077
timeout: 10
keepalive: 20
③创建mqtt消息属性配置类
@Component
@ConfigurationProperties(prefix = "com.mqtt")
@Setter
@Getter
public class MqttConfiguration {
private String host;
private String clientid;
private String topic;
private String username;
private String password;
private int timeout;
private int keepalive;
}
④创建mqtt消息推送实体
@Slf4j
@Setter
@Getter
public class PushPayload {
//推送类型
private String type;
//推送对象
private String mobile;
//标题
private String title;
//内容
private String content;
//数量
private Integer badge = 1;
//铃声
private String sound = "default";
public PushPayload(String type, String mobile, String title, String content, Integer badge , String sound){
this.type = type;
this.mobile = mobile;
this.title = title;
this.content = content;
this.badge = badge;
this.sound = sound;
}
public static class Builder{
//推送类型
private String type;
//推送对象
private String mobile;
//标题
private String title;
//内容
private String content;
//数量
private Integer badge = 1;
//铃声
private String sound = "default";
public Builder setType(String type) {
this.type = type;
return this;
}
public Builder setMobile(String mobile) {
this.mobile = mobile;
return this;
}
public Builder setTitle(String title) {
this.title = title;
return this;
}
public Builder setContent(String content) {
this.content = content;
return this;
}
public Builder setBadge(Integer badge) {
this.badge = badge;
return this;
}
public Builder setSound(String sound) {
this.sound = sound;
return this;
}
public PushPayload bulid(){
return new PushPayload(type,mobile,title,content,badge,sound);
}
}
public static Builder getPushPayloadBuider(){
return new Builder();
}
@Override
public String toString() {
return JSON.toJSONString(this, SerializerFeature.DisableCircularReferenceDetect);
}
}
⑤创建mqtt消息推送或订阅客户端
@Slf4j
public class MqttPushClient {
private MqttClient client;
private static volatile MqttPushClient mqttPushClient = null;
public static MqttPushClient getInstance(){
if(null == mqttPushClient){
synchronized (MqttPushClient.class){
if(null == mqttPushClient){
mqttPushClient = new MqttPushClient();
}
}
}
return mqttPushClient;
}
private MqttPushClient() {
connect();
}
private void connect(){
try {
client = new MqttClient(PropertiesUtil.MQTT_HOST, PropertiesUtil.MQTT_CLIENTID, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
options.setUserName(PropertiesUtil.MQTT_USER_NAME);
options.setPassword(PropertiesUtil.MQTT_PASSWORD.toCharArray());
options.setConnectionTimeout(PropertiesUtil.MQTT_TIMEOUT);
options.setKeepAliveInterval(PropertiesUtil.MQTT_KEEP_ALIVE);
try {
client.setCallback(new PushCallback());
client.connect(options);
} catch (Exception e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 发布,默认qos为0,非持久化
* @param topic
* @param pushMessage
*/
public void publish(String topic,PushPayload pushMessage){
publish(0, false, topic, pushMessage);
}
/**
* 发布
* @param qos
* @param retained
* @param topic
* @param pushMessage
*/
public void publish(int qos,boolean retained,String topic,PushPayload pushMessage){
MqttMessage message = new MqttMessage();
message.setQos(qos);
message.setRetained(retained);
message.setPayload(pushMessage.toString().getBytes());
MqttTopic mTopic = client.getTopic(topic);
if(null == mTopic){
log.error("topic not exist");
}
MqttDeliveryToken token;
try {
token = mTopic.publish(message);
token.waitForCompletion();
} catch (MqttPersistenceException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 订阅某个主题,qos默认为0
* @param topic
*/
public void subscribe(String topic){
subscribe(topic,0);
}
/**
* 订阅某个主题
* @param topic
* @param qos
*/
public void subscribe(String topic,int qos){
try {
client.subscribe(topic, qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
String kdTopic = "good";
PushPayload pushMessage = PushPayload.getPushPayloadBuider().setMobile("15345715326")
.setContent("designModel")
.bulid();
MqttPushClient.getInstance().publish(0, false, kdTopic, pushMessage);
}
}
⑥配置获取类的编写
public class PropertiesUtil {
public static String MQTT_HOST;
public static String MQTT_CLIENTID;
public static String MQTT_USER_NAME;
public static String MQTT_PASSWORD;
public static int MQTT_TIMEOUT;
public static int MQTT_KEEP_ALIVE;
public static final String ELASTIC_SEARCH_HOST;
public static final int ELASTIC_SEARCH_PORT;
public static final String ELASTIC_SEARCH_CLUSTER_NAME;
static {
MQTT_HOST = loadMqttProperties().getProperty("MQTT_HOST");
MQTT_CLIENTID = loadMqttProperties().getProperty("MQTT_CLIENTID");
MQTT_USER_NAME = loadMqttProperties().getProperty("MQTT_USER_NAME");
MQTT_PASSWORD = loadMqttProperties().getProperty("MQTT_PASSWORD");
MQTT_TIMEOUT = Integer.valueOf(loadMqttProperties().getProperty("MQTT_TIMEOUT"));
MQTT_KEEP_ALIVE = Integer.valueOf(loadMqttProperties().getProperty("MQTT_KEEP_ALIVE"));
}
static {
ELASTIC_SEARCH_HOST = loadEsProperties().getProperty("ES_HOST");
ELASTIC_SEARCH_PORT = Integer.valueOf(loadEsProperties().getProperty("ES_PORT"));
ELASTIC_SEARCH_CLUSTER_NAME = loadEsProperties().getProperty("ES_CLUSTER_NAME");
}
private static Properties loadMqttProperties() {
InputStream inputstream = PropertiesUtil.class.getResourceAsStream("/mqtt.yml");
Properties properties = new Properties();
try {
properties.load(inputstream);
return properties;
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
try {
if (inputstream != null) {
inputstream.close();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
private static Properties loadEsProperties() {
InputStream inputstream = PropertiesUtil.class.getResourceAsStream("/elasticsearch.properties");
Properties properties = new Properties();
try {
properties.load(inputstream);
return properties;
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
try {
if (inputstream != null) {
inputstream.close();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
⑦mqtt推送回调类
/**
* @auther zx
* @date 2018/5/28 9:20
*/
public class PushCallback implements MqttCallback {
public void connectionLost(Throwable cause) {
// 连接丢失后,一般在这里面进行重连
System.out.println("连接断开,可以做重连");
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
// subscribe后得到的消息会执行到这里面
System.out.println("接收消息主题 : " + topic);
System.out.println("接收消息Qos : " + message.getQos());
System.out.println("接收消息内容 : " + new String(message.getPayload()));
}
}
3.效果测试
@Test
public void test() {
PushPayload pushPayload = PushPayload.getPushPayloadBuider().setContent("test")
.setMobile("119")
.setType("2018")
.bulid();
mqttClientComponent.push("yes",pushPayload);
}
mqtt客户端效果显示