如果你不知如何搭建MQTT服务器,可以参考Ubuntu apache-apollo安装,windows操作系统只需下载windows环境下的压缩包即可。
这里需要引入mqtt-client.jar以及gson.jar,调用saveServerLog(message)方法保存日志信息并输出Server.log,相应的jar包文件我会统一写一篇文章整理给大家。
package com.jr.mqtt;import java.util.Date;import java.util.concurrent.ScheduledExecutorService;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;import org.eclipse.paho.client.mqttv3.MqttCallback;import org.eclipse.paho.client.mqttv3.MqttClient;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.eclipse.paho.client.mqttv3.MqttException;import org.eclipse.paho.client.mqttv3.MqttMessage;import org.eclipse.paho.client.mqttv3.MqttTopic;import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;import com.google.gson.Gson;import com.jr.entity.DataLog;import com.jr.io.AppendToFile;import com.jr.io.ReadFromFile;/** * 客户端接收消息 * * @author ljx * */public class ClientMqtt { // 公网地址 public static final String HOST = "tcp://103.41.*.*:61613"; // 服务器内置主题,用来监测当前服务器上连接的客户端数量($SYS/broker/clients/connected) public static final String TOPIC1 = "fourth"; private static final String clientid = "client4"; private MqttClient client; private MqttConnectOptions options; private String userName = "admin"; private String passWord = "password"; @SuppressWarnings("unused") private ScheduledExecutorService scheduler; public void start() throws MqttException { // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存 client = new MqttClient(HOST, clientid, new MemoryPersistence()); // MQTT的连接设置 options = new MqttConnectOptions(); // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接 options.setCleanSession(false); // 设置连接的用户名 options.setUserName(userName); // 设置连接的密码 options.setPassword(passWord.toCharArray()); // 设置超时时间 单位为秒 options.setConnectionTimeout(10); // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 options.setKeepAliveInterval(20); // 设置回调 client.setCallback(new MqttCallback() { public void connectionLost(Throwable cause) { // 连接丢失后,一般在这里面进行重连 System.out.println("连接断开,可以做重连"); } public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("deliveryComplete---------" + token.isComplete()); } public void messageArrived(String topic, MqttMessage message) throws Exception { try { // System.out.println(" 从服务器收到的消息为:" + message.toString()); saveServerLog(message); } catch (Exception e) { e.printStackTrace(); } } }); MqttTopic topic = client.getTopic(TOPIC1); // setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息 options.setWill(topic, "close".getBytes(), 2, true); client.connect(options); // 订阅消息 int[] Qos = { 1 }; String[] topic1 = { TOPIC1 }; client.subscribe(topic1, Qos); } /** * 保存server日志信息 * * @param message */ public void saveServerLog(MqttMessage message) { String fileName = "Server.log"; Gson gson = new Gson(); DataLog datalog = gson.fromJson(message.toString(), DataLog.class); AppendToFile.appendMethodA( fileName, new Date().toString() + " " + datalog.getVin() + " " + datalog.getState() + " " + datalog.getMessage() + " " + datalog.getType() + " " + datalog.getResult() + " " + datalog.getReqcode() + " " + "\n"); // 读取文件输出到控制台,仅供测试使用 ReadFromFile.readFileByLines(fileName); } public static void main(String[] args) { try { new ClientMqtt().start(); } catch (MqttException e) { e.printStackTrace(); } }}
DataLog.java实现set、get方法,构造方法可根据需求修改。
package com.jr.entity;/** * Entity class * * @author ljx * */public class DataLog { private String vin; private String state; private String message; private String type; private String result; private String reqcode; public DataLog() { } public DataLog(String vin, String state, String message, String type, String result, String reqcode) { super(); this.vin = vin; this.state = state; this.message = message; this.type = type; this.result = result; this.reqcode = reqcode; } public String getVin() { return vin; } public void setVin(String vin) { this.vin = vin; } public String getState() { return state; } public void setState(String state) { this.state = state; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } public String getType() { return type; } public void setType(String type) { this.type = type; } public String getResult() { return result; } public void setResult(String result) { this.result = result; } public String getReqcode() { return reqcode; } public void setReqcode(String reqcode) { this.reqcode = reqcode; }}
java文件io操作可以搜索java API,或者百度,google都有相关的资源。
package com.jr.io;import java.io.FileWriter;import java.io.IOException;import java.io.RandomAccessFile;import java.util.ArrayList;import java.util.Date;import com.jr.entity.DataLog;public class AppendToFile { /** * A方法追加文件:使用RandomAccessFile */ public static void appendMethodA(String fileName, String content) { try { // 打开一个随机访问文件流,按读写方式 RandomAccessFile randomFile = new RandomAccessFile(fileName, "rw"); // 文件长度,字节数 long fileLength = randomFile.length(); // 将写文件指针移到文件尾。 randomFile.seek(fileLength); randomFile.writeBytes(content); randomFile.close(); } catch (IOException e) { e.printStackTrace(); } } /** * B方法追加文件:使用FileWriter */ public static void appendMethodB(String fileName, String content) { try { // 打开一个写文件器,构造函数中的第二个参数true表示以追加形式写文件 FileWriter writer = new FileWriter(fileName, true); writer.write(content); writer.close(); } catch (IOException e) { e.printStackTrace(); } } /* * * Data Test数据测试 * * */ public static void main(String[] args) { String fileName = "ServerTest.txt"; String date = new Date().toString(); ArrayList<DataLog> list = new ArrayList<DataLog>(); DataLog data = new DataLog(); data.setVin("vin"); data.setState("state"); data.setMessage("message"); data.setType("type"); data.setReqcode("reqcode"); list.add(data); for (DataLog dataLog : list) { AppendToFile.appendMethodA(fileName, date + " "); AppendToFile.appendMethodA(fileName, dataLog.getVin() + " " + dataLog.getState() + " " + dataLog.getMessage() + " " + dataLog.getType() + " " + dataLog.getReqcode() + " " + "\n"); } ReadFromFile.readFileByLines(fileName); // String content = "new append!"; // 按方法A追加文件 // AppendToFile.appendMethodA(fileName, date + "\t"); // AppendToFile.appendMethodA(fileName, "append end. \n"); // 显示文件内容 // ReadFromFile.readFileByLines(fileName); // 按方法B追加文件 // AppendToFile.appendMethodB(fileName, date); // AppendToFile.appendMethodB(fileName, "append end. \n"); // 显示文件内容 }}
package com.jr.io;import java.io.BufferedReader;import java.io.File;import java.io.FileInputStream;import java.io.FileReader;import java.io.IOException;import java.io.InputStream;import java.io.InputStreamReader;import java.io.RandomAccessFile;import java.io.Reader;public class ReadFromFile { /** * 以字节为单位读取文件,常用于读二进制文件,如图片、声音、影像等文件。 */ public static void readFileByBytes(String fileName) { File file = new File(fileName); InputStream in = null; try { System.out.println("以字节为单位读取文件内容,一次读一个字节:"); // 一次读一个字节 in = new FileInputStream(file); int tempbyte; while ((tempbyte = in.read()) != -1) { System.out.write(tempbyte); } in.close(); } catch (IOException e) { e.printStackTrace(); return; } try { System.out.println("以字节为单位读取文件内容,一次读多个字节:"); // 一次读多个字节 byte[] tempbytes = new byte[100]; int byteread = 0; in = new FileInputStream(fileName); ReadFromFile.showAvailableBytes(in); // 读入多个字节到字节数组中,byteread为一次读入的字节数 while ((byteread = in.read(tempbytes)) != -1) { System.out.write(tempbytes, 0, byteread); } } catch (Exception e1) { e1.printStackTrace(); } finally { if (in != null) { try { in.close(); } catch (IOException e1) { } } } } /** * 以字符为单位读取文件,常用于读文本,数字等类型的文件 */ public static void readFileByChars(String fileName) { File file = new File(fileName); Reader reader = null; try { System.out.println("以字符为单位读取文件内容,一次读一个字节:"); // 一次读一个字符 reader = new InputStreamReader(new FileInputStream(file)); int tempchar; while ((tempchar = reader.read()) != -1) { // 对于windows下,\r\n这两个字符在一起时,表示一个换行。 // 但如果这两个字符分开显示时,会换两次行。 // 因此,屏蔽掉\r,或者屏蔽\n。否则,将会多出很多空行。 if (((char) tempchar) != '\r') { System.out.print((char) tempchar); } } reader.close(); } catch (Exception e) { e.printStackTrace(); } try { System.out.println("以字符为单位读取文件内容,一次读多个字节:"); // 一次读多个字符 char[] tempchars = new char[30]; int charread = 0; reader = new InputStreamReader(new FileInputStream(fileName)); // 读入多个字符到字符数组中,charread为一次读取字符数 while ((charread = reader.read(tempchars)) != -1) { // 同样屏蔽掉\r不显示 if ((charread == tempchars.length) && (tempchars[tempchars.length - 1] != '\r')) { System.out.print(tempchars); } else { for (int i = 0; i < charread; i++) { if (tempchars[i] == '\r') { continue; } else { System.out.print(tempchars[i]); } } } } } catch (Exception e1) { e1.printStackTrace(); } finally { if (reader != null) { try { reader.close(); } catch (IOException e1) { } } } } /** * 以行为单位读取文件,常用于读面向行的格式化文件 */ public static void readFileByLines(String fileName) { File file = new File(fileName); BufferedReader reader = null; try { System.out.println("以行为单位读取文件内容,一次读一整行:"); reader = new BufferedReader(new FileReader(file)); String tempString = null; int line = 1; // 一次读入一行,直到读入null为文件结束 while ((tempString = reader.readLine()) != null) { // 显示行号 System.out.println("line " + line + ": " + tempString); line++; } reader.close(); } catch (IOException e) { e.printStackTrace(); } finally { if (reader != null) { try { reader.close(); } catch (IOException e1) { } } } } /** * 随机读取文件内容 */ public static void readFileByRandomAccess(String fileName) { RandomAccessFile randomFile = null; try { System.out.println("随机读取一段文件内容:"); // 打开一个随机访问文件流,按只读方式 randomFile = new RandomAccessFile(fileName, "r"); // 文件长度,字节数 long fileLength = randomFile.length(); // 读文件的起始位置 int beginIndex = (fileLength > 4) ? 4 : 0; // 将读文件的开始位置移到beginIndex位置。 randomFile.seek(beginIndex); byte[] bytes = new byte[10]; int byteread = 0; // 一次读10个字节,如果文件内容不足10个字节,则读剩下的字节。 // 将一次读取的字节数赋给byteread while ((byteread = randomFile.read(bytes)) != -1) { System.out.write(bytes, 0, byteread); } } catch (IOException e) { e.printStackTrace(); } finally { if (randomFile != null) { try { randomFile.close(); } catch (IOException e1) { } } } } /** * 显示输入流中还剩的字节数 */ private static void showAvailableBytes(InputStream in) { try { System.out.println("当前字节输入流中的字节数为:" + in.available()); } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) { String fileName = "C:/temp/newTemp.txt"; ReadFromFile.readFileByBytes(fileName); ReadFromFile.readFileByChars(fileName); ReadFromFile.readFileByLines(fileName); ReadFromFile.readFileByRandomAccess(fileName); }}
如果服务器想要外网访问可以借助花生壳工具进行内网渗透,读者可自行研究,本文不做介绍。
联系客服