应很多朋友的要求,今天分享一下如何使用SpringBoot和Netty构建高并发稳健的JT808网关,并且是兼容JT808-2011和JT808-2019的网关,此网关已经有多个客户在商用。
JT808网关作为部标终端连接的服务端,承载了终端登录、心跳、位置、拍照等基础业务以及信令交互,是整个系统最核心的模块,一旦崩溃,则所有部标终端都会离线,所有信令交互包括1078和主动安全的信令交互也会大受影响。所以,JT808网关的并发性稳定性健壮性成为整个系统最重要的考量之一。
很多朋友用Mina或者Netty编写网关程序时遇到过很多问题:
本文使用JDK8 的环境开发,使用SpringBoot2.x以及Netty4.x,如有不懂JDK8的新语法,请查阅资料。
此网关的特性:
1.支持JT808-2011、JT808-2019、JT1078报警、主动安全报警
2.使用MQ和Redis解耦,多模块数据共享订阅,不与任何数据库关联
3.多环境开发
4.跨平台,部署简单
5.支持ProtoBuf和JSON序列化
6.本公司首创的利用策略模式的底层封装库,模板可用于任何协议的开发,简化了网络编程的复杂度,只专注于业务开发,无任何网络编程经验的人员都可接手,节省开发成本。
public class TcpServer { public TcpServer(int threadPoolSize, int port) { NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.TCP_NODELAY, true) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { } }); serverBootstrap.bind(port).addListener(future -> { if (future.isSuccess()) { //启动成功 } else { //启动失败 } }); }}
先贴上我们pipeline的处理器都有哪些:
ch.pipeline().addLast(new IdleStateHandler(Jt808Constant.READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS));ch.pipeline().addLast(new Jt808FrameDecoder());ch.pipeline().addLast(Jt808ProtocolDecoder.INSTANCE, new Jt808ProtocolEncoder());ch.pipeline().addLast(Jt808LoginHandler.INSTANCE);ch.pipeline().addLast(executorGroup, Jt808BusinessHandler.INSTANCE);
以下是Jt808Message的代码,我们要把每条消息所有字段都看成一个整体,没必要把消息头消息体分离出去新建其他类,最后还派生出一堆子类,只会把自己和别人绕晕。
public class Jt808Message extends BaseMessage { /** * 消息ID */ private int msgId; /** * 终端手机号 */ private String phoneNumber; /** * 终端手机号数组 */ private byte[] phoneNumberArr; /** * 协议版本号 */ private int protocolVersion; /** * 消息流水号 */ private int msgFlowId; /** * 是否分包 */ private boolean multiPacket; /** * 版本标识 */ private int versionFlag; /** * 加密方式,0:不加密,1:RSA加密 */ private int encryptType; /** * 消息总包数 */ private int packetTotalCount; /** * 包序号 */ private int packetOrder; /** * 协议类型(JT808_2011、JT808_2013、JT905、JT808_2019) */ private ProtocolEnum protocolType;}
协议解码器代码:
@Slf4j@Sharablepublic class Jt808ProtocolDecoder extends MessageToMessageDecoder<ByteBuf> { public static final Jt808ProtocolDecoder INSTANCE = new Jt808ProtocolDecoder(); private Jt808ProtocolDecoder() { } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception { //消息长度 int msgLen = msg.readableBytes(); //包头 msg.readByte(); //消息ID int msgId = msg.readUnsignedShort(); //消息体属性 short msgBodyAttr = msg.readShort(); //消息体长度 int msgBodyLen = msgBodyAttr & 0b00000011_11111111; //是否分包 boolean multiPacket = (msgBodyAttr & 0b00100000_00000000) > 0; //版本标识(版本标识0为2011年的版本,1为2019年的版本) int versionFlag = (msgBodyAttr & 0b01000000_00000000) >> 14; //去除消息体的基础长度 int baseLen = Jt808Constant.MSG_BASE_LENGTH; ProtocolEnum protocolType = ProtocolEnum.JT808_2011; if (versionFlag == 1) { baseLen = Jt808Constant.JT2019_MSG_BASE_LENGTH; protocolType = ProtocolEnum.JT808_2019; } //根据消息体长度和是否分包得出后面的包长 int ensureLen = multiPacket ? baseLen msgBodyLen 4 : baseLen msgBodyLen; if (msgLen < ensureLen) { log.info('包长不对,数据长度:{},正确长度:{},数据:{}', msgLen, ensureLen, ByteBufUtil.hexDump(msg)); return; } //数据加密方式 int encryptType = (msgBodyAttr & 0b00011100_00000000) >> 10; //协议版本号 int protocolVersion = 0; //终端手机号数组,JT808-2019为10个字节 byte[] phoneNumberArr; if (protocolType == ProtocolEnum.JT808_2019) { protocolVersion = msg.readByte(); phoneNumberArr = new byte[10]; } else { phoneNumberArr = new byte[6]; } msg.readBytes(phoneNumberArr); //终端手机号(去除前面的0) String phoneNumber = StringUtils.stripStart(ByteBufUtil.hexDump(phoneNumberArr), '0'); //消息流水号 int msgFlowId = msg.readUnsignedShort(); //消息总包数 int packetTotalCount = 0; //包序号 int packetOrder = 0; //分包 if (multiPacket) { packetTotalCount = msg.readShort(); packetOrder = msg.readShort(); } //消息体 byte[] msgBodyArr = new byte[msgBodyLen]; msg.readBytes(msgBodyArr); //校验码 int checkCode = msg.readUnsignedByte(); //包尾 msg.readByte(); //计算和验证校验码 ByteBuf checksumBuf = msg.slice(1, msgLen - 3); int checksumResult = CommonUtil.xor(checksumBuf); if (checksumResult != checkCode) { log.error('校验码验证失败,计算结果:{},校验码:{},消息ID:{},手机号:{},数据:{}', checksumResult, checkCode, NumberUtil.formatMessageId(msgId), phoneNumber, ByteBufUtil.hexDump(msg)); return; } //构造Jt808消息,传递到下一个handler处理 Jt808Message jt808Msg = new Jt808Message(); jt808Msg.setMsgId(msgId); jt808Msg.setEncryptType(encryptType); jt808Msg.setVersionFlag(versionFlag); jt808Msg.setProtocolType(protocolType); jt808Msg.setMultiPacket(multiPacket); jt808Msg.setProtocolVersion(protocolVersion); jt808Msg.setPhoneNumber(phoneNumber); jt808Msg.setPhoneNumberArr(phoneNumberArr); jt808Msg.setMsgFlowId(msgFlowId); jt808Msg.setPacketTotalCount(packetTotalCount); jt808Msg.setPacketOrder(packetOrder); jt808Msg.setMsgBodyArr(msgBodyArr); out.add(jt808Msg); }}
@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Jt808Message msg) throws Exception { //未接收完的分包不进入业务处理 Jt808Message wholeMsg = handleMultiPacket(ctx, msg); if (wholeMsg == null) { return; } //获取对应的消息处理器 int msgId = wholeMsg.getMsgId(); BaseMessageService messageService = messageServiceProvider.getMessageService(msgId); ByteBuf msgBodyBuf = Unpooled.wrappedBuffer(wholeMsg.getMsgBodyArr()); try { Object result = messageService.process(ctx, wholeMsg, msgBodyBuf); log.info('收到{}({}),终端手机号:{},消息流水号:{},内容:{}', messageService.getDesc(), NumberUtil.formatMessageId(msgId), wholeMsg.getPhoneNumber(), wholeMsg.getMsgFlowId(), wholeMsg.getMsgBodyItems()); //发送指令应答给调用方 if (result != null) { downCommandReceiver.sendUpCommand(ctx, NumberUtil.hexStr(msgId), result); } } catch (Exception e) { printExceptionLog(wholeMsg, messageService, e); } finally { //处理完业务逻辑统一释放资源 ReferenceCountUtil.release(msgBodyBuf); }}
首先我们先处理分包,不分包的消息直接返回给业务处理器处理。如果是分包的,收到第一包时会创建一个分包接收器,里面会自动判断有无接收完,接收完后会自动把所有分包数据整合在一起,然后返回给业务处理器处理。分包接收器代码篇幅有限暂时不贴出:
private Jt808Message handleMultiPacket(ChannelHandlerContext ctx, Jt808Message msg) { //不分包 if (!msg.isMultiPacket()) { return msg; } //总包数 int packetTotalCount = msg.getPacketTotalCount(); //当前包序号 int packetOrder = msg.getPacketOrder(); //第一包,创建分包接收器 if (packetTotalCount > 1 && packetOrder == 1) { multiPacketService.createMultiPacketReceiver(ctx, msg); Jt808PacketUtil.reply8001(ctx, msg); log.info('收到{},终端手机号:{},消息流水号:{},分包总包数:{},第{}包,内容:{}', NumberUtil.formatMessageId(msg.getMsgId()), msg.getPhoneNumber(), msg.getMsgFlowId(), packetTotalCount, packetOrder, ByteBufUtil.hexDump(msg.getMsgBodyArr())); return null; } //后续包 if (packetTotalCount > 1 && packetOrder > 1) { Jt808Message wholeMsg = multiPacketService.addSubPacket(msg); Jt808PacketUtil.reply8001(ctx, msg); log.info('收到{},终端手机号:{},消息流水号:{},分包总包数:{},第{}包,内容:{}', NumberUtil.formatMessageId(msg.getMsgId()), msg.getPhoneNumber(), msg.getMsgFlowId(), packetTotalCount, packetOrder, ByteBufUtil.hexDump(msg.getMsgBodyArr())); return wholeMsg; } //单个数据包 return msg;}
再往下看业务处理,我们设计了一个通用的泛型消息服务类BaseMessageService<T>,T表示各种协议的消息实体类,可以处理任何私有协议(JT809和主动安全程序也使用了这种处理方式),有些私有协议的消息ID是字符串类型的,这个服务类也做了兼容,只需要实现里面的process方法即可。这个方法传递了socket上下文可以获取该socket绑定的终端信息,消息实体类T以及消息体内容的ByteBuf。每种消息类型的处理都集中在这个方法中,按照协议从ByteBuf解析消息体内容即可。
以下是BaseService的代码:
public abstract class BaseMessageService<T extends BaseMessage> { /** * 消息ID */ private int messageId; /** * 字符串消息ID */ private String strMessageId; /** * 消息处理器描述 */ private String desc; /** * 获取终端信息 * * @param ctx socket上下文 * @return 终端信息 */ public TerminalProto getTerminalInfo(ChannelHandlerContext ctx) { return SessionUtil.getTerminalInfo(ctx); } /** * 检查消息体长度 * * @param msg 消息 * @param msgBodyLen 消息体长度 * @throws ApplicationException 应用异常 */ public void checkMessageBodyLen(T msg, int msgBodyLen) throws ApplicationException { byte[] msgBody = msg.getMsgBodyArr(); if (msgBody.length < msgBodyLen) { throw new ApplicationException('消息体长度不对,不能小于' msgBodyLen); } } /** * 处理消息 * * @param ctx socket上下文 * @param msg 消息 * @param msgBodyBuf 消息体 * @return 返回结果 * @throws Exception 异常 */ public abstract Object process(ChannelHandlerContext ctx, T msg, ByteBuf msgBodyBuf) throws Exception;}
这里贴出0x0200位置汇报的服务类:
public class Message0200Service extends BaseMessageService<Jt808Message> { @Autowired private RabbitMessageSender messageSender; @Override public Object process(ChannelHandlerContext ctx, Jt808Message msg, ByteBuf msgBodyBuf) throws Exception { //检查消息体长度 checkMessageBodyLen(msg, 28); //通用应答 Jt808PacketUtil.reply8001(ctx, msg); //解析位置信息和附加信息 LocationProto location = LocationParser.parse(msg, msgBodyBuf); //发送到MQ messageSender.sendLocation(getTerminalInfo(ctx), location); msg.putMessageBodyItem('位置', location); return null; }}
几行代码完成了消息应答、位置解析、位置发送到MQ。
以下是拍照的0x0801多媒体数据上传处理:
public class Message0801Service extends BaseMessageService<Jt808Message> { @Autowired private RabbitMessageSender messageSender; @Override public Object process(ChannelHandlerContext ctx, Jt808Message msg, ByteBuf msgBodyBuf) throws Exception { //多媒体ID long mediaId = msgBodyBuf.readUnsignedInt(); //多媒体类型 int mediaType = msgBodyBuf.readByte(); //多媒体格式编码 int mediaFormatCode = msgBodyBuf.readByte(); //事件项编码 int eventItemCode = msgBodyBuf.readByte(); //通道ID int channelId = msgBodyBuf.readByte(); //老协议不带位置数据(28 bytes),图片数据以0xFFD8开头 LocationProto location = null; if (mediaFormatCode != 0 || msgBodyBuf.getUnsignedShort(0) != 0xFFD8) { location = LocationParser.parseLocation(msgBodyBuf); } //多媒体数据 byte[] mediaData = new byte[msgBodyBuf.readableBytes()]; msgBodyBuf.readBytes(mediaData); MediaFileProto mediaFile = new MediaFileProto(); mediaFile.setMediaId(mediaId); mediaFile.setMediaType(mediaType); mediaFile.setMediaFormatCode(mediaFormatCode); mediaFile.setEventItemCode(eventItemCode); mediaFile.setChannelId(channelId); mediaFile.setLocation(location); mediaFile.setMediaData(mediaData); mediaFile.setTerminalInfo(getTerminalInfo(ctx)); //发送到MQ messageSender.sendMediaFile(mediaFile); return mediaFile; }}
其他协议的处理也是采用这个方法,比如JT809的从链路连接保持请求消息处理:
public class DownLinkTestReqProcessor extends BaseMessageService<Jt809Message> { @Autowired private MessageSendService messageSendService; @Autowired private DownLinkTestRspSender downLinkTestRspSender; @Override public Object process(ChannelHandlerContext ctx, Jt809Message jt809Msg, ByteBuf msgBodyBuf) throws Exception { //发送JT809日志到MQ Jt809Status jt809Status = Jt809Manager.getStatusAttr(ctx); Jt809ConfigDTO jt809Config = jt809Status.getJt809Config(); messageSendService.publishJt809Log(jt809Config, jt809Msg); //发送从链路连接保持应答消息 downLinkTestRspSender.send(ctx, jt809Status); return null; }}
至此,整个网关的工作量就全部集中在每种消息服务的开发了。内存溢出、资源未释放、异常等问题全部都得到了统一的处理,可以放心大胆的开发业务逻辑。有了通用处理模板,开发效率大幅提升,其他私有协议网关的开发也变得异常简单。如果MQ消息传输格式定义好的话,整个网关程序2-3天就能全部开发完。而且无任何网络编程经验的人员都能很快接手。
整个工程除了业务service,其他类只有十来个:
TcpServer需要另外开启线程启动的,不要占用阻塞SpringBoot的主线程。
配置多环境开发:
生产环境的配置:application-prod.yml。
gnss: jt808: tcpPort: 6608 middleware-ip: 127.0.0.1 threadPool: size: 10 message: converter: protospring: redis: host: ${gnss.middleware-ip} port: 6379 password: gps-pro@cn rabbitmq: host: ${gnss.middleware-ip} port: 5672 username: guest password: guest
支持2种MQ序列化方式:ProtoBuf和JSON,可以在配置文件切换。
ProtoBuf性能高,安全性高,传输量少,Web后台是JAVA开发的话可以选用这种方式,虽然也跨语言但要复杂一些。
JSON性能差,安全性差,传输量大,优点是跨语言兼容性好。如果后台是非JAVA的可以选择这种方式。
如图,发送一条相同的位置到MQ,ProtoBuf需要152字节,JSON需要675字节,传输量差了5倍。
启动后会自动加载消息处理器:
联系客服