From 1fd2d6196e0abaff50f8c0ffda32327f7ffdf84b Mon Sep 17 00:00:00 2001 From: shiyi Date: Wed, 5 Feb 2025 19:16:03 +0800 Subject: [PATCH] =?UTF-8?q?[v0.0.1]=20InMessageHandler=E8=BD=AC=E5=8F=91?= =?UTF-8?q?=E8=BF=87=E7=A8=8Bbuf=E5=BC=95=E7=94=A8=E9=97=AE=E9=A2=98?= =?UTF-8?q?=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/CacDataRouterHandler.java | 2 +- .../platform/service/InMessageHandler.java | 107 +++--- .../platform/service/InMessageHandler1.java | 350 ++++++++++++++++++ .../com/platform/service/ServerService.java | 5 +- .../service/clientmanage/HaborClient.java | 8 +- src/main/resources/application-dev.yml | 4 +- src/main/resources/application-pre.yml | 3 - 7 files changed, 420 insertions(+), 59 deletions(-) create mode 100644 src/main/java/com/platform/service/InMessageHandler1.java diff --git a/src/main/java/com/platform/service/CacDataRouterHandler.java b/src/main/java/com/platform/service/CacDataRouterHandler.java index 1440c53..8d2d499 100644 --- a/src/main/java/com/platform/service/CacDataRouterHandler.java +++ b/src/main/java/com/platform/service/CacDataRouterHandler.java @@ -25,7 +25,7 @@ public class CacDataRouterHandler extends SimpleChannelInboundHandler { if (client == null || client.getClientType() != ClientTypeEnum.HABOR) { return; } - // log.debug("[cac] {} 收到数据({}bytes): {}", ((HaborClient) client).shortInfo(), msg.readableBytes(), ByteBufUtil.hexDump(msg)); + log.debug("[cac] {} 收到数据({}bytes): {}", ((HaborClient) client).shortInfo(), msg.readableBytes(), ByteBufUtil.hexDump(msg)); HaborClient haborClient = (HaborClient) client; haborClient.process(msg); } diff --git a/src/main/java/com/platform/service/InMessageHandler.java b/src/main/java/com/platform/service/InMessageHandler.java index e6d83a3..53cb604 100644 --- a/src/main/java/com/platform/service/InMessageHandler.java +++ b/src/main/java/com/platform/service/InMessageHandler.java @@ -67,16 +67,20 @@ public class InMessageHandler extends ChannelInboundHandlerAdapter { if (obj instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) obj; IdleState state = event.state(); + BaseClient client = ClientManager.getClient(ctx.channel()); if (state == IdleState.ALL_IDLE) { - BaseClient client = ClientManager.getClient(ctx.channel()); if (client != null) { //设备离线,更改设备状态 - log.warn("【客户端" + ctx.channel().remoteAddress() + "进入idle状态】"); + log.warn("【客户端" + ctx.channel().remoteAddress() + "读超时】"); ChannelFuture future = ctx.channel().close(); if (!future.isSuccess()) { log.warn("【客户端异常关闭】", future.cause()); } } + } else if (state == IdleState.WRITER_IDLE) { + if (client != null) { + log.warn("【客户端" + ctx.channel().remoteAddress() + "写超时】"); + } } } super.userEventTriggered(ctx, obj); @@ -106,14 +110,16 @@ public class InMessageHandler extends ChannelInboundHandlerAdapter { // /* //上线握手阶段 if (!sessionCache.addToSnMap.containsKey(address)) { // 判断连接是否已经建立,如未建立则先握手 - // if (msgBuf.getByte(0) != (byte) 0xAA || msgBuf.getByte(1) != (byte) 0x44) { - // return; - // } - byte[] msg = new byte[msgBuf.readableBytes()]; - msgBuf.readBytes(msg); - log.info("【接收握手原数据】 " + ByteUtils.bytes2HexString(msg)); - // 握手数据不满足协议长度,立即返回 - // String d = stringUtil.bytesToHexString(msg); + try { + + // if (msgBuf.getByte(0) != (byte) 0xAA || msgBuf.getByte(1) != (byte) 0x44) { + // return; + // } + byte[] msg = new byte[msgBuf.readableBytes()]; + msgBuf.readBytes(msg); + log.info("【接收握手原数据】 " + ByteUtils.bytes2HexString(msg)); + // 握手数据不满足协议长度,立即返回 + // String d = stringUtil.bytesToHexString(msg); // byte[] crc = CRC.crc16(msg, 2, 16); // if (crc[0] != msg[18] || crc[1] != msg[19]){ // log.warn("握手数据crc校验错误,期望crc " + stringTool.bytesToHexString(crc) + ", 实际收到 " + d); @@ -121,42 +127,45 @@ public class InMessageHandler extends ChannelInboundHandlerAdapter { // return; // } - // 数据类型 1为地面站 2为终端设备 - String type = new String(msg, 5, 1, Charset.defaultCharset()); - // deviceCode 终端设备取后六位 - String deviceCode; - String typeName; - if(type.equals("1")) { - // 首先读取deviceCode, 哈勃deviceCode是后6个byte - typeName = "地面站"; - deviceCode = new String(msg, 6, 12, Charset.defaultCharset()).toUpperCase(); // 地面站 - protocolAck(ctx, (byte)1); - } else if (type.equals("2")){ - typeName = "哈勃终端"; - deviceCode = new String(msg, 12, 6 , Charset.defaultCharset()).toUpperCase(); // 哈勃上线 - protocolAck(ctx, (byte)1); - haborSignIntoCac(deviceCode, ctx.channel()); - } else { - protocolAck(ctx,(byte) 0); // 握手失败 - return; - } + // 数据类型 1为地面站 2为终端设备 + String type = new String(msg, 5, 1, Charset.defaultCharset()); + // deviceCode 终端设备取后六位 + String deviceCode; + String typeName; + if (type.equals("1")) { + // 首先读取deviceCode, 哈勃deviceCode是后6个byte + typeName = "地面站"; + deviceCode = new String(msg, 6, 12, Charset.defaultCharset()).toUpperCase(); // 地面站 + protocolAck(ctx, (byte) 1); + } else if (type.equals("2")) { + typeName = "哈勃终端"; + deviceCode = new String(msg, 12, 6, Charset.defaultCharset()).toUpperCase(); // 哈勃上线 + protocolAck(ctx, (byte) 1); + haborSignIntoCac(deviceCode, ctx.channel()); + } else { + protocolAck(ctx, (byte) 0); // 握手失败 + return; + } - log.info("【读取握手数据】======> deviceCode = " + deviceCode +" type = " + typeName); + log.info("【读取握手数据】======> deviceCode = " + deviceCode + " type = " + typeName); - //删除旧的连接。如果存在异常断开的时候,旧连接并未清除。这里在上线的时候检查并清除 - String addr0 = sessionCache.snToAddMap.get(deviceCode); - if (Objects.nonNull(addr0)) { - sessionCache.tcpSession.remove(addr0); - sessionCache.addToSnMap.remove(addr0); - log.info("【握手删除旧连接】====> " + addr0 + "(" + deviceCode + ")删除"); - } + //删除旧的连接。如果存在异常断开的时候,旧连接并未清除。这里在上线的时候检查并清除 + String addr0 = sessionCache.snToAddMap.get(deviceCode); + if (Objects.nonNull(addr0)) { + sessionCache.tcpSession.remove(addr0); + sessionCache.addToSnMap.remove(addr0); + log.info("【握手删除旧连接】====> " + addr0 + "(" + deviceCode + ")删除"); + } - //添加新连接 - sessionCache.addToSnMap.put(address, deviceCode); // ip-mac - sessionCache.snToAddMap.put(deviceCode, address); // mac- ip - sessionCache.tcpSession.put(address, ctx.channel()); // ip - channel + //添加新连接 + sessionCache.addToSnMap.put(address, deviceCode); // ip-mac + sessionCache.snToAddMap.put(deviceCode, address); // mac- ip + sessionCache.tcpSession.put(address, ctx.channel()); // ip - channel - return; //握手结束立即返回 + return; //握手结束立即返回 + } finally { + msgBuf.release(); + } } // */ /* @@ -195,9 +204,6 @@ public class InMessageHandler extends ChannelInboundHandlerAdapter { // /* // 已经建立连接且非地面站控制信息,则进行数据转发 else { - if (debug) { - log.info("【转发数据】======> " + ByteBufUtil.hexDump(msgBuf)); - } String sn = sessionCache.addToSnMap.get(address); //sn =mac //监控相关 if (!sessionCache.snToMonitorMap.isEmpty()) { @@ -206,13 +212,16 @@ public class InMessageHandler extends ChannelInboundHandlerAdapter { Channel sendChannel = sendChannelOpt.get(); log.info("(监控)out channel: " + sendChannel.remoteAddress() + "(" + sessionCache.addToSnMap.get(sendChannel.remoteAddress().toString().replace("/", "")) + ") 即将发送 --- " + "in channel: " + ctx.channel().remoteAddress() + "(" + sn + ")发来的TCP消息"); - sendChannel.writeAndFlush(msgBuf.retain()); + sendChannel.writeAndFlush(ByteBufUtil.getBytes(msgBuf)); } } if (!sessionCache.mappingListMap.isEmpty() && sessionCache.mappingListMap.get(sn) != null) { + if (debug) { + log.info("【转发数据】======> " + ByteBufUtil.hexDump(msgBuf)); + } List sendChannelOpt = sessionCache.findMappingTargetChannel(sn); // 设备编号对应的通道 log.info("send channel count of {} = {}", sn, sendChannelOpt.size()); sendChannelOpt.forEach(sendChannel -> { @@ -221,7 +230,7 @@ public class InMessageHandler extends ChannelInboundHandlerAdapter { // TODO 2025/1/17: 大量打印日志消耗性能,需要优化,用更合理的方式显示通信对象状态 log.debug("out channel: {} ({})即将发送 --- from channel: {} ({})", sendChannel.remoteAddress(), sessionCache.addToSnMap.get(sendChannel.remoteAddress().toString().replace("/", "")), ctx.channel().remoteAddress(), sn); // 如果需要发送,则增加一次计数引用,便于中心指控继续使用buffer - sendChannel.writeAndFlush(msgBuf.retain()); + sendChannel.writeAndFlush(ByteBufUtil.getBytes(msgBuf)); // NOTE 2025/1/17: 线上版本的逻辑没有在控无人机更新, 这里也不再判断飞控的控制关系,直接转发 // String sendIp = sessionCache.findSnByChannel(sendChannel); // 通道找到IP @@ -276,10 +285,10 @@ public class InMessageHandler extends ChannelInboundHandlerAdapter { BaseClient client = ClientManager.getClient(ctx.channel()); if (client == null || client.getClientType() != ClientTypeEnum.HABOR) { - if (msgBuf.refCnt() !=0 ) msgBuf.release(msgBuf.refCnt()); + msgBuf.release(); return; } else { - if (msgBuf.refCnt() == 1) msgBuf.retain(); + // if (msgBuf.refCnt() == 1) msgBuf.retain(); // 以上是哈勃原来的处理逻辑,完成之后向下游传递,进入中心指控的处理逻辑 // ctx.channel().writeAndFlush(msgBuf.retain()); // 模拟转发过程 ctx.fireChannelRead(msg0); diff --git a/src/main/java/com/platform/service/InMessageHandler1.java b/src/main/java/com/platform/service/InMessageHandler1.java new file mode 100644 index 0000000..eba24a7 --- /dev/null +++ b/src/main/java/com/platform/service/InMessageHandler1.java @@ -0,0 +1,350 @@ +package com.platform.service; + + +import com.platform.cac.RemoteService; +import com.platform.config.ServiceConfig; +import com.platform.info.enums.ClientTypeEnum; +import com.platform.info.enums.UavTypeEnum; +import com.platform.info.mapping.HaborUavMap; +import com.platform.info.mapping.UavIdMap; +import com.platform.model.DirectControlUavParam; +import com.platform.service.clientmanage.BaseClient; +import com.platform.service.clientmanage.ClientManager; +import com.platform.service.clientmanage.HaborClient; +import com.platform.util.*; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.Unpooled; +import io.netty.channel.*; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.util.ReferenceCountUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.springframework.util.StringUtils; + +import java.net.InetSocketAddress; +import java.nio.charset.Charset; +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +@Slf4j +@Component +@ChannelHandler.Sharable +public class InMessageHandler1 extends ChannelInboundHandlerAdapter { + + private boolean debug; + private StringUtil stringUtil; + private SessionCache sessionCache; + + @Autowired + RemoteService remoteService; + private static final int head1 = 0xAA; + private static final int head2 = 0x44; + private static final int head3 = 0x61; + private static final int lengthLow = 0x03; + private static final int lengthHigh = 0x00; + + @Autowired + public InMessageHandler1(SessionCache sessionCache, StringUtil stringUtil, ServiceConfig config) { + this.stringUtil = stringUtil; + this.sessionCache = sessionCache; + this.debug = config.isDebug(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + log.error("【系统异常】======>", cause); + ctx.close(); + } + + /** + * 超时处理 + * 如果5秒没有接受客户端的心跳,就触发; + */ + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception { + if (obj instanceof IdleStateEvent) { + IdleStateEvent event = (IdleStateEvent) obj; + IdleState state = event.state(); + BaseClient client = ClientManager.getClient(ctx.channel()); + if (state == IdleState.ALL_IDLE) { + if (client != null) { + // 设备离线,更改设备状态 + log.warn("【客户端" + ctx.channel().remoteAddress() + "读超时】"); + ChannelFuture future = ctx.channel().close(); + if (!future.isSuccess()) { + log.warn("【客户端异常关闭】", future.cause()); + } + } + } else if (state == IdleState.WRITER_IDLE) { + if (client != null) { + log.warn("【客户端" + ctx.channel().remoteAddress() + "写超时】"); + } + } + } + super.userEventTriggered(ctx, obj); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + Channel channel = ctx.channel(); + // 获取对方ip和端口 + InetSocketAddress remoteAddress = (InetSocketAddress) channel.remoteAddress(); + String haborSn = String.format("%06d", remoteAddress.getPort()); // 远端端口当作哈勃序列号 + try { + if ("040001".equals(haborSn) || "040002".equals(haborSn) || "040003".equals(haborSn)) { + String deviceCode = haborSn; + // 删除旧的连接。如果存在异常断开的时候,旧连接并未清除。这里在上线的时候检查并清除 + String addr0 = sessionCache.snToAddMap.get(deviceCode); + if (Objects.nonNull(addr0)) { + sessionCache.tcpSession.remove(addr0); + sessionCache.addToSnMap.remove(addr0); + log.info("【握手删除旧连接】====> " + addr0 + "(" + deviceCode + ")删除"); + } + // 添加新连接 + String address = getAddress(ctx); + sessionCache.addToSnMap.put(address, deviceCode); // ip-mac + sessionCache.snToAddMap.put(deviceCode, address); // mac- ip + sessionCache.tcpSession.put(address, ctx.channel()); // ip - channel + + haborSignIntoCac(haborSn, channel); + } + } catch (Exception e) { + log.error("哈勃上线失败, haborSn={}", haborSn, e); + } + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg0) throws Exception { + String address = getAddress(ctx); + + final ByteBuf msgBuf = (ByteBuf) msg0; + // 上线握手阶段 + if (!sessionCache.addToSnMap.containsKey(address)) { // 判断连接是否已经建立,如未建立则先握手 + try { + + // if (msgBuf.getByte(0) != (byte) 0xAA || msgBuf.getByte(1) != (byte) 0x44) { + // return; + // } + byte[] msg = new byte[msgBuf.readableBytes()]; + msgBuf.readBytes(msg); + log.info("【接收握手原数据】 " + ByteUtils.bytes2HexString(msg)); + // 握手数据不满足协议长度,立即返回 + // String d = stringUtil.bytesToHexString(msg); + // byte[] crc = CRC.crc16(msg, 2, 16); + // if (crc[0] != msg[18] || crc[1] != msg[19]){ + // log.warn("握手数据crc校验错误,期望crc " + stringTool.bytesToHexString(crc) + ", 实际收到 " + d); + // protocolAck(ctx,(byte) 0); + // return; + // } + + // 数据类型 1为地面站 2为终端设备 + String type = new String(msg, 5, 1, Charset.defaultCharset()); + // deviceCode 终端设备取后六位 + String deviceCode; + String typeName; + if (type.equals("1")) { + // 首先读取deviceCode, 哈勃deviceCode是后6个byte + typeName = "地面站"; + deviceCode = new String(msg, 6, 12, Charset.defaultCharset()).toUpperCase(); // 地面站 + protocolAck(ctx, (byte) 1); + } else if (type.equals("2")) { + typeName = "哈勃终端"; + deviceCode = new String(msg, 12, 6, Charset.defaultCharset()).toUpperCase(); // 哈勃上线 + protocolAck(ctx, (byte) 1); + haborSignIntoCac(deviceCode, ctx.channel()); + } else { + protocolAck(ctx, (byte) 0); // 握手失败 + return; + } + + log.info("【读取握手数据】======> deviceCode = " + deviceCode + " type = " + typeName); + + // 删除旧的连接。如果存在异常断开的时候,旧连接并未清除。这里在上线的时候检查并清除 + String addr0 = sessionCache.snToAddMap.get(deviceCode); + if (Objects.nonNull(addr0)) { + sessionCache.tcpSession.remove(addr0); + sessionCache.addToSnMap.remove(addr0); + log.info("【握手删除旧连接】====> " + addr0 + "(" + deviceCode + ")删除"); + } + + // 添加新连接 + sessionCache.addToSnMap.put(address, deviceCode); // ip-mac + sessionCache.snToAddMap.put(deviceCode, address); // mac- ip + sessionCache.tcpSession.put(address, ctx.channel()); // ip - channel + + return; // 握手结束立即返回 + } finally { + msgBuf.release(); + } + } + // 已经建立连接且非地面站控制信息,则进行数据转发 + else { + final ByteBuf bufToTransfer = msgBuf.duplicate(); // 转发,保留原始索引 + String sn = sessionCache.addToSnMap.get(address); // sn =mac + // 监控相关 + if (!sessionCache.snToMonitorMap.isEmpty()) { + Optional sendChannelOpt = sessionCache.findMonitorChannel(address); + if (sendChannelOpt.isPresent()) { + Channel sendChannel = sendChannelOpt.get(); + log.info("(监控)out channel: " + sendChannel.remoteAddress() + "(" + sessionCache.addToSnMap.get(sendChannel.remoteAddress().toString().replace("/", "")) + + ") 即将发送 --- " + "in channel: " + ctx.channel().remoteAddress() + "(" + sn + ")发来的TCP消息"); + sendChannel.writeAndFlush(bufToTransfer.retain()).addListener( + future -> { + // TODO 2025/1/17: 需优化 + if (!future.isSuccess()) { + bufToTransfer.release(); + } + } + ); + } + + } + + + if (!sessionCache.mappingListMap.isEmpty() && sessionCache.mappingListMap.get(sn) != null) { + if (debug) { + // log.info("【转发数据】======> " + ByteBufUtil.hexDump(msgBuf)); + } + List sendChannelOpt = sessionCache.findMappingTargetChannel(sn); // 设备编号对应的通道 + // log.info("send channel count of {} = {}", sn, sendChannelOpt.size()); + sendChannelOpt.forEach(sendChannel -> { + // log.info("out channel: " + sendChannel.remoteAddress() + "(" + sessionCache.addToSnMap.get(sendChannel.remoteAddress().toString().replace("/", "")) + ")即将发送 --- " + "in channel: " + ctx.channel().remoteAddress() + "(" + sn + ")发来的TCP消息"); + + // TODO 2025/1/17: 大量打印日志消耗性能,需要优化,用更合理的方式显示通信对象状态 + // log.debug("out channel: {} ({})即将发送 --- from channel: {} ({})", sendChannel.remoteAddress(), sessionCache.addToSnMap.get(sendChannel.remoteAddress().toString().replace("/", "")), ctx.channel().remoteAddress(), sn); + // 如果需要发送,则增加一次计数引用,便于中心指控继续使用buffer + sendChannel.writeAndFlush(bufToTransfer.retain()).addListener( + future -> { + // TODO 2025/1/17: 需优化 + if (!future.isSuccess()) { + bufToTransfer.release(); + } else { + // log.info("发送成功: {}, {} ", sendChannel.remoteAddress(), ByteBufUtil.hexDump(bufToTransfer)); + } + } + ); + }); + } + } + + BaseClient client = ClientManager.getClient(ctx.channel()); + if (client == null || client.getClientType() != ClientTypeEnum.HABOR) { + if (msgBuf != null) { + msgBuf.release(); + } + return; + } else { + // 直接传递当前引用,不需要增加引用计数 + ctx.fireChannelRead(msgBuf); + } + } + + /** + * 接入中心指控系统, 上线 + */ + private void haborSignIntoCac(String haborSn, Channel channel) { + + // UavIdMap.addMap(UavTypeEnum.FP981A, 1, "5"); + // HaborUavMap.addMap("5", "23293F"); + // UavIdMap.addMap(UavTypeEnum.FP981A, 2, "6"); + // HaborUavMap.addMap("6", "040000"); + // UavIdMap.addMap(UavTypeEnum.FP981A, 3, "3"); + // HaborUavMap.addMap("7", "000777"); + if (!HaborUavMap.haborIsControllable(haborSn)) { + DirectControlUavParam info = remoteService.queryCacDirectControlUavInfo(haborSn);// 缓存没查到就向中心指控查一遍,查不到就直接退出 + if (info == null) { + log.info("[cac] 未查询到哈勃终端-{}的信息,跳过上线", haborSn); + return; + } + } + + if (ClientManager.getClient(channel) != null) { + // TODO 2025/1/17: 重复上线的如何处理 + return; + } + String uavId = HaborUavMap.getUavIdByHaborSn(haborSn); + UavTypeEnum uavType = UavIdMap.getUavType(uavId); + // 新建哈勃客户端并加入管理 + HaborClient client = HaborClient.createClient(uavType, haborSn, channel); + if (client != null) { + ClientManager.addAndOnline(client); + // channel.pipeline().addAfter(); // 上线后添加相关处理器 + } + } + + /** + * 哈勃下线 + */ + private void haborSignOutOfCac(Channel channel) { + BaseClient client = ClientManager.getClient(channel); + if (client != null) { + ClientManager.removeAndOffline(client); + } + } + + private void protocolAck(ChannelHandlerContext ctx, byte success) { + byte[] ack = new byte[8]; + ack[0] = (byte) head1; + ack[1] = (byte) head2; + ack[2] = (byte) head3; + ack[3] = (byte) lengthLow; + ack[4] = (byte) lengthHigh; + ack[5] = success; + byte[] crc = CRCUtil.crc16(ack, 2, 4); + ack[6] = crc[0]; + ack[7] = crc[1]; + ctx.channel().writeAndFlush(ack); + } + + + private void response(ChannelHandlerContext ctx, byte[] content) { + ByteBuf buf = Unpooled.buffer(content.length + 4); + buf.writeByte((byte) 0xcc); + buf.writeByte((byte) 0x06); + buf.writeByte((byte) 0x04); + buf.writeByte((byte) 0x01); + + buf.writeBytes(content); + + + // ack[4] = (byte) 0x01; + ctx.channel().writeAndFlush(buf); + } + + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + log.warn("【设备上线】====> " + getAddress(ctx) + "上线"); + sessionCache.tcpSession.put(getAddress(ctx), ctx.channel()); + super.handlerAdded(ctx); + } + + private String getAddress(ChannelHandlerContext ctx) { + String address = ctx.channel().remoteAddress().toString().toUpperCase(); + return address.replace("/", ""); + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + String addr = getAddress(ctx); + String sn = sessionCache.addToSnMap.get(addr); + log.warn("【设备离线】====> " + addr + "(" + sn + ")离线"); + ControlDevice.clearCurrenCtrDeviceByMac(sn); + sessionCache.tcpSession.remove(addr); + sessionCache.addToSnMap.remove(addr); + if (!StringUtils.isEmpty(sn)) { + // 只有当要被删除的地址跟snToAddMap里面的地址匹配的时候才删除, + // 因为存在一种情况,同一个sn上线了,snToAddMap数据已经被正确的地址覆盖,此时不应该删除 + if (addr.equals(sessionCache.snToAddMap.get(sn))) { + sessionCache.snToAddMap.remove(sn); + } + } + haborSignOutOfCac(ctx.channel()); + super.handlerRemoved(ctx); + } + +} diff --git a/src/main/java/com/platform/service/ServerService.java b/src/main/java/com/platform/service/ServerService.java index d83dd0c..cffcbe9 100644 --- a/src/main/java/com/platform/service/ServerService.java +++ b/src/main/java/com/platform/service/ServerService.java @@ -9,6 +9,7 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.bytes.ByteArrayEncoder; import io.netty.handler.timeout.IdleStateHandler; +import io.netty.util.ResourceLeakDetector; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -30,7 +31,7 @@ public class ServerService { private SessionCache sessionCache; @Autowired - InMessageHandler inMessageHandler; + InMessageHandler1 inMessageHandler; @Autowired CacDataRouterHandler cacDataRouterHandler; @@ -59,7 +60,7 @@ public class ServerService { .addLast("CacDataRouter", cacDataRouterHandler); // cac数据转发 } }); - + ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID); // 内存泄露检测,上线记得关掉paranoid bootstrap.option(ChannelOption.SO_BACKLOG, 2048); bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); bootstrap.childOption(ChannelOption.TCP_NODELAY, true); diff --git a/src/main/java/com/platform/service/clientmanage/HaborClient.java b/src/main/java/com/platform/service/clientmanage/HaborClient.java index bbc7ec1..210d98b 100644 --- a/src/main/java/com/platform/service/clientmanage/HaborClient.java +++ b/src/main/java/com/platform/service/clientmanage/HaborClient.java @@ -60,7 +60,7 @@ public class HaborClient extends BaseClient { protected CommandQueueRecord previousRouteBind; // 遥测频率较高,转发中心指控的时候不在CacClient中进行协议类实例化,这里直接创建ByteBuf模板 - protected static ByteBuf telemetryBufHeadToCac; + protected ByteBuf telemetryBufHeadToCac; protected int cacHeadLength; // 中心指控协议头长度, 见协议序号1-10 protected AtomicUShort cacFrameIdx = new AtomicUShort(); // cac包序号 public HaborClient(String sn, Channel channel) { @@ -249,7 +249,11 @@ public class HaborClient extends BaseClient { protected ByteBuf buf; private AtomicInteger sendCount = new AtomicInteger(0); public DataByteBuf(int frameSize) {buf = PooledByteBufAllocator.DEFAULT.buffer(frameSize);} - public void release() { if (buf.refCnt() > 0) buf.release(buf.refCnt());} + public void release() { + if (buf.refCnt() > 0) { + buf.release(buf.refCnt()); + } + } public void resetSendCount() { sendCount.set(0); } diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index 8da5a2d..5d0aa2c 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -1,7 +1,7 @@ server: - port: 11727 + port: 11112 service: - port: 11728 + port: 11111 # 中心指控系统 tcp udp 配置 diff --git a/src/main/resources/application-pre.yml b/src/main/resources/application-pre.yml index 324e416..8eb3162 100644 --- a/src/main/resources/application-pre.yml +++ b/src/main/resources/application-pre.yml @@ -3,9 +3,6 @@ server: service: port: 11111 -app: - debug: false - # 中心指控系统 tcp udp 配置 remote-cac: ip: 123.57.54.1