[v0.0.1] InMessageHandler转发过程buf引用问题修改

master
shiyi 1 month ago
parent 6f4711ff8d
commit 1fd2d6196e

@ -25,7 +25,7 @@ public class CacDataRouterHandler extends SimpleChannelInboundHandler<ByteBuf> {
if (client == null || client.getClientType() != ClientTypeEnum.HABOR) { if (client == null || client.getClientType() != ClientTypeEnum.HABOR) {
return; return;
} }
// log.debug("[cac] {} 收到数据({}bytes): {}", ((HaborClient) client).shortInfo(), msg.readableBytes(), ByteBufUtil.hexDump(msg)); log.debug("[cac] {} 收到数据({}bytes): {}", ((HaborClient) client).shortInfo(), msg.readableBytes(), ByteBufUtil.hexDump(msg));
HaborClient haborClient = (HaborClient) client; HaborClient haborClient = (HaborClient) client;
haborClient.process(msg); haborClient.process(msg);
} }

@ -67,16 +67,20 @@ public class InMessageHandler extends ChannelInboundHandlerAdapter {
if (obj instanceof IdleStateEvent) { if (obj instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) obj; IdleStateEvent event = (IdleStateEvent) obj;
IdleState state = event.state(); IdleState state = event.state();
if (state == IdleState.ALL_IDLE) {
BaseClient client = ClientManager.getClient(ctx.channel()); BaseClient client = ClientManager.getClient(ctx.channel());
if (state == IdleState.ALL_IDLE) {
if (client != null) { if (client != null) {
//设备离线,更改设备状态 //设备离线,更改设备状态
log.warn("【客户端" + ctx.channel().remoteAddress() + "进入idle状态】"); log.warn("【客户端" + ctx.channel().remoteAddress() + "读超时】");
ChannelFuture future = ctx.channel().close(); ChannelFuture future = ctx.channel().close();
if (!future.isSuccess()) { if (!future.isSuccess()) {
log.warn("【客户端异常关闭】", future.cause()); log.warn("【客户端异常关闭】", future.cause());
} }
} }
} else if (state == IdleState.WRITER_IDLE) {
if (client != null) {
log.warn("【客户端" + ctx.channel().remoteAddress() + "写超时】");
}
} }
} }
super.userEventTriggered(ctx, obj); super.userEventTriggered(ctx, obj);
@ -106,6 +110,8 @@ public class InMessageHandler extends ChannelInboundHandlerAdapter {
// /* // /*
//上线握手阶段 //上线握手阶段
if (!sessionCache.addToSnMap.containsKey(address)) { // 判断连接是否已经建立,如未建立则先握手 if (!sessionCache.addToSnMap.containsKey(address)) { // 判断连接是否已经建立,如未建立则先握手
try {
// if (msgBuf.getByte(0) != (byte) 0xAA || msgBuf.getByte(1) != (byte) 0x44) { // if (msgBuf.getByte(0) != (byte) 0xAA || msgBuf.getByte(1) != (byte) 0x44) {
// return; // return;
// } // }
@ -126,22 +132,22 @@ public class InMessageHandler extends ChannelInboundHandlerAdapter {
// deviceCode 终端设备取后六位 // deviceCode 终端设备取后六位
String deviceCode; String deviceCode;
String typeName; String typeName;
if(type.equals("1")) { if (type.equals("1")) {
// 首先读取deviceCode 哈勃deviceCode是后6个byte // 首先读取deviceCode 哈勃deviceCode是后6个byte
typeName = "地面站"; typeName = "地面站";
deviceCode = new String(msg, 6, 12, Charset.defaultCharset()).toUpperCase(); // 地面站 deviceCode = new String(msg, 6, 12, Charset.defaultCharset()).toUpperCase(); // 地面站
protocolAck(ctx, (byte)1); protocolAck(ctx, (byte) 1);
} else if (type.equals("2")){ } else if (type.equals("2")) {
typeName = "哈勃终端"; typeName = "哈勃终端";
deviceCode = new String(msg, 12, 6 , Charset.defaultCharset()).toUpperCase(); // 哈勃上线 deviceCode = new String(msg, 12, 6, Charset.defaultCharset()).toUpperCase(); // 哈勃上线
protocolAck(ctx, (byte)1); protocolAck(ctx, (byte) 1);
haborSignIntoCac(deviceCode, ctx.channel()); haborSignIntoCac(deviceCode, ctx.channel());
} else { } else {
protocolAck(ctx,(byte) 0); // 握手失败 protocolAck(ctx, (byte) 0); // 握手失败
return; return;
} }
log.info("【读取握手数据】======> deviceCode = " + deviceCode +" type = " + typeName); log.info("【读取握手数据】======> deviceCode = " + deviceCode + " type = " + typeName);
//删除旧的连接。如果存在异常断开的时候,旧连接并未清除。这里在上线的时候检查并清除 //删除旧的连接。如果存在异常断开的时候,旧连接并未清除。这里在上线的时候检查并清除
String addr0 = sessionCache.snToAddMap.get(deviceCode); String addr0 = sessionCache.snToAddMap.get(deviceCode);
@ -157,6 +163,9 @@ public class InMessageHandler extends ChannelInboundHandlerAdapter {
sessionCache.tcpSession.put(address, ctx.channel()); // ip - channel sessionCache.tcpSession.put(address, ctx.channel()); // ip - channel
return; //握手结束立即返回 return; //握手结束立即返回
} finally {
msgBuf.release();
}
} }
// */ // */
/* /*
@ -195,9 +204,6 @@ public class InMessageHandler extends ChannelInboundHandlerAdapter {
// /* // /*
// 已经建立连接且非地面站控制信息,则进行数据转发 // 已经建立连接且非地面站控制信息,则进行数据转发
else { else {
if (debug) {
log.info("【转发数据】======> " + ByteBufUtil.hexDump(msgBuf));
}
String sn = sessionCache.addToSnMap.get(address); //sn =mac String sn = sessionCache.addToSnMap.get(address); //sn =mac
//监控相关 //监控相关
if (!sessionCache.snToMonitorMap.isEmpty()) { if (!sessionCache.snToMonitorMap.isEmpty()) {
@ -206,13 +212,16 @@ public class InMessageHandler extends ChannelInboundHandlerAdapter {
Channel sendChannel = sendChannelOpt.get(); Channel sendChannel = sendChannelOpt.get();
log.info("(监控)out channel: " + sendChannel.remoteAddress() + "(" + sessionCache.addToSnMap.get(sendChannel.remoteAddress().toString().replace("/", "")) log.info("(监控)out channel: " + sendChannel.remoteAddress() + "(" + sessionCache.addToSnMap.get(sendChannel.remoteAddress().toString().replace("/", ""))
+ ") 即将发送 --- " + "in channel: " + ctx.channel().remoteAddress() + "(" + sn + ")发来的TCP消息"); + ") 即将发送 --- " + "in channel: " + ctx.channel().remoteAddress() + "(" + sn + ")发来的TCP消息");
sendChannel.writeAndFlush(msgBuf.retain()); sendChannel.writeAndFlush(ByteBufUtil.getBytes(msgBuf));
} }
} }
if (!sessionCache.mappingListMap.isEmpty() && sessionCache.mappingListMap.get(sn) != null) { if (!sessionCache.mappingListMap.isEmpty() && sessionCache.mappingListMap.get(sn) != null) {
if (debug) {
log.info("【转发数据】======> " + ByteBufUtil.hexDump(msgBuf));
}
List<Channel> sendChannelOpt = sessionCache.findMappingTargetChannel(sn); // 设备编号对应的通道 List<Channel> sendChannelOpt = sessionCache.findMappingTargetChannel(sn); // 设备编号对应的通道
log.info("send channel count of {} = {}", sn, sendChannelOpt.size()); log.info("send channel count of {} = {}", sn, sendChannelOpt.size());
sendChannelOpt.forEach(sendChannel -> { sendChannelOpt.forEach(sendChannel -> {
@ -221,7 +230,7 @@ public class InMessageHandler extends ChannelInboundHandlerAdapter {
// TODO 2025/1/17: 大量打印日志消耗性能,需要优化,用更合理的方式显示通信对象状态 // TODO 2025/1/17: 大量打印日志消耗性能,需要优化,用更合理的方式显示通信对象状态
log.debug("out channel: {} ({})即将发送 --- from channel: {} ({})", sendChannel.remoteAddress(), sessionCache.addToSnMap.get(sendChannel.remoteAddress().toString().replace("/", "")), ctx.channel().remoteAddress(), sn); log.debug("out channel: {} ({})即将发送 --- from channel: {} ({})", sendChannel.remoteAddress(), sessionCache.addToSnMap.get(sendChannel.remoteAddress().toString().replace("/", "")), ctx.channel().remoteAddress(), sn);
// 如果需要发送则增加一次计数引用便于中心指控继续使用buffer // 如果需要发送则增加一次计数引用便于中心指控继续使用buffer
sendChannel.writeAndFlush(msgBuf.retain()); sendChannel.writeAndFlush(ByteBufUtil.getBytes(msgBuf));
// NOTE 2025/1/17: 线上版本的逻辑没有在控无人机更新, 这里也不再判断飞控的控制关系,直接转发 // NOTE 2025/1/17: 线上版本的逻辑没有在控无人机更新, 这里也不再判断飞控的控制关系,直接转发
// String sendIp = sessionCache.findSnByChannel(sendChannel); // 通道找到IP // String sendIp = sessionCache.findSnByChannel(sendChannel); // 通道找到IP
@ -276,10 +285,10 @@ public class InMessageHandler extends ChannelInboundHandlerAdapter {
BaseClient client = ClientManager.getClient(ctx.channel()); BaseClient client = ClientManager.getClient(ctx.channel());
if (client == null || client.getClientType() != ClientTypeEnum.HABOR) { if (client == null || client.getClientType() != ClientTypeEnum.HABOR) {
if (msgBuf.refCnt() !=0 ) msgBuf.release(msgBuf.refCnt()); msgBuf.release();
return; return;
} else { } else {
if (msgBuf.refCnt() == 1) msgBuf.retain(); // if (msgBuf.refCnt() == 1) msgBuf.retain();
// 以上是哈勃原来的处理逻辑,完成之后向下游传递,进入中心指控的处理逻辑 // 以上是哈勃原来的处理逻辑,完成之后向下游传递,进入中心指控的处理逻辑
// ctx.channel().writeAndFlush(msgBuf.retain()); // 模拟转发过程 // ctx.channel().writeAndFlush(msgBuf.retain()); // 模拟转发过程
ctx.fireChannelRead(msg0); ctx.fireChannelRead(msg0);

@ -0,0 +1,350 @@
package com.platform.service;
import com.platform.cac.RemoteService;
import com.platform.config.ServiceConfig;
import com.platform.info.enums.ClientTypeEnum;
import com.platform.info.enums.UavTypeEnum;
import com.platform.info.mapping.HaborUavMap;
import com.platform.info.mapping.UavIdMap;
import com.platform.model.DirectControlUavParam;
import com.platform.service.clientmanage.BaseClient;
import com.platform.service.clientmanage.ClientManager;
import com.platform.service.clientmanage.HaborClient;
import com.platform.util.*;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
@Slf4j
@Component
@ChannelHandler.Sharable
public class InMessageHandler1 extends ChannelInboundHandlerAdapter {
private boolean debug;
private StringUtil stringUtil;
private SessionCache sessionCache;
@Autowired
RemoteService remoteService;
private static final int head1 = 0xAA;
private static final int head2 = 0x44;
private static final int head3 = 0x61;
private static final int lengthLow = 0x03;
private static final int lengthHigh = 0x00;
@Autowired
public InMessageHandler1(SessionCache sessionCache, StringUtil stringUtil, ServiceConfig config) {
this.stringUtil = stringUtil;
this.sessionCache = sessionCache;
this.debug = config.isDebug();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("【系统异常】======>", cause);
ctx.close();
}
/**
*
* 5;
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception {
if (obj instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) obj;
IdleState state = event.state();
BaseClient client = ClientManager.getClient(ctx.channel());
if (state == IdleState.ALL_IDLE) {
if (client != null) {
// 设备离线,更改设备状态
log.warn("【客户端" + ctx.channel().remoteAddress() + "读超时】");
ChannelFuture future = ctx.channel().close();
if (!future.isSuccess()) {
log.warn("【客户端异常关闭】", future.cause());
}
}
} else if (state == IdleState.WRITER_IDLE) {
if (client != null) {
log.warn("【客户端" + ctx.channel().remoteAddress() + "写超时】");
}
}
}
super.userEventTriggered(ctx, obj);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
// 获取对方ip和端口
InetSocketAddress remoteAddress = (InetSocketAddress) channel.remoteAddress();
String haborSn = String.format("%06d", remoteAddress.getPort()); // 远端端口当作哈勃序列号
try {
if ("040001".equals(haborSn) || "040002".equals(haborSn) || "040003".equals(haborSn)) {
String deviceCode = haborSn;
// 删除旧的连接。如果存在异常断开的时候,旧连接并未清除。这里在上线的时候检查并清除
String addr0 = sessionCache.snToAddMap.get(deviceCode);
if (Objects.nonNull(addr0)) {
sessionCache.tcpSession.remove(addr0);
sessionCache.addToSnMap.remove(addr0);
log.info("【握手删除旧连接】====> " + addr0 + "(" + deviceCode + ")删除");
}
// 添加新连接
String address = getAddress(ctx);
sessionCache.addToSnMap.put(address, deviceCode); // ip-mac
sessionCache.snToAddMap.put(deviceCode, address); // mac- ip
sessionCache.tcpSession.put(address, ctx.channel()); // ip - channel
haborSignIntoCac(haborSn, channel);
}
} catch (Exception e) {
log.error("哈勃上线失败, haborSn={}", haborSn, e);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg0) throws Exception {
String address = getAddress(ctx);
final ByteBuf msgBuf = (ByteBuf) msg0;
// 上线握手阶段
if (!sessionCache.addToSnMap.containsKey(address)) { // 判断连接是否已经建立,如未建立则先握手
try {
// if (msgBuf.getByte(0) != (byte) 0xAA || msgBuf.getByte(1) != (byte) 0x44) {
// return;
// }
byte[] msg = new byte[msgBuf.readableBytes()];
msgBuf.readBytes(msg);
log.info("【接收握手原数据】 " + ByteUtils.bytes2HexString(msg));
// 握手数据不满足协议长度,立即返回
// String d = stringUtil.bytesToHexString(msg);
// byte[] crc = CRC.crc16(msg, 2, 16);
// if (crc[0] != msg[18] || crc[1] != msg[19]){
// log.warn("握手数据crc校验错误期望crc " + stringTool.bytesToHexString(crc) + ", 实际收到 " + d);
// protocolAck(ctx,(byte) 0);
// return;
// }
// 数据类型 1为地面站 2为终端设备
String type = new String(msg, 5, 1, Charset.defaultCharset());
// deviceCode 终端设备取后六位
String deviceCode;
String typeName;
if (type.equals("1")) {
// 首先读取deviceCode 哈勃deviceCode是后6个byte
typeName = "地面站";
deviceCode = new String(msg, 6, 12, Charset.defaultCharset()).toUpperCase(); // 地面站
protocolAck(ctx, (byte) 1);
} else if (type.equals("2")) {
typeName = "哈勃终端";
deviceCode = new String(msg, 12, 6, Charset.defaultCharset()).toUpperCase(); // 哈勃上线
protocolAck(ctx, (byte) 1);
haborSignIntoCac(deviceCode, ctx.channel());
} else {
protocolAck(ctx, (byte) 0); // 握手失败
return;
}
log.info("【读取握手数据】======> deviceCode = " + deviceCode + " type = " + typeName);
// 删除旧的连接。如果存在异常断开的时候,旧连接并未清除。这里在上线的时候检查并清除
String addr0 = sessionCache.snToAddMap.get(deviceCode);
if (Objects.nonNull(addr0)) {
sessionCache.tcpSession.remove(addr0);
sessionCache.addToSnMap.remove(addr0);
log.info("【握手删除旧连接】====> " + addr0 + "(" + deviceCode + ")删除");
}
// 添加新连接
sessionCache.addToSnMap.put(address, deviceCode); // ip-mac
sessionCache.snToAddMap.put(deviceCode, address); // mac- ip
sessionCache.tcpSession.put(address, ctx.channel()); // ip - channel
return; // 握手结束立即返回
} finally {
msgBuf.release();
}
}
// 已经建立连接且非地面站控制信息,则进行数据转发
else {
final ByteBuf bufToTransfer = msgBuf.duplicate(); // 转发,保留原始索引
String sn = sessionCache.addToSnMap.get(address); // sn =mac
// 监控相关
if (!sessionCache.snToMonitorMap.isEmpty()) {
Optional<Channel> sendChannelOpt = sessionCache.findMonitorChannel(address);
if (sendChannelOpt.isPresent()) {
Channel sendChannel = sendChannelOpt.get();
log.info("(监控)out channel: " + sendChannel.remoteAddress() + "(" + sessionCache.addToSnMap.get(sendChannel.remoteAddress().toString().replace("/", ""))
+ ") 即将发送 --- " + "in channel: " + ctx.channel().remoteAddress() + "(" + sn + ")发来的TCP消息");
sendChannel.writeAndFlush(bufToTransfer.retain()).addListener(
future -> {
// TODO 2025/1/17: 需优化
if (!future.isSuccess()) {
bufToTransfer.release();
}
}
);
}
}
if (!sessionCache.mappingListMap.isEmpty() && sessionCache.mappingListMap.get(sn) != null) {
if (debug) {
// log.info("【转发数据】======> " + ByteBufUtil.hexDump(msgBuf));
}
List<Channel> sendChannelOpt = sessionCache.findMappingTargetChannel(sn); // 设备编号对应的通道
// log.info("send channel count of {} = {}", sn, sendChannelOpt.size());
sendChannelOpt.forEach(sendChannel -> {
// log.info("out channel: " + sendChannel.remoteAddress() + "(" + sessionCache.addToSnMap.get(sendChannel.remoteAddress().toString().replace("/", "")) + ")即将发送 --- " + "in channel: " + ctx.channel().remoteAddress() + "(" + sn + ")发来的TCP消息");
// TODO 2025/1/17: 大量打印日志消耗性能,需要优化,用更合理的方式显示通信对象状态
// log.debug("out channel: {} ({})即将发送 --- from channel: {} ({})", sendChannel.remoteAddress(), sessionCache.addToSnMap.get(sendChannel.remoteAddress().toString().replace("/", "")), ctx.channel().remoteAddress(), sn);
// 如果需要发送则增加一次计数引用便于中心指控继续使用buffer
sendChannel.writeAndFlush(bufToTransfer.retain()).addListener(
future -> {
// TODO 2025/1/17: 需优化
if (!future.isSuccess()) {
bufToTransfer.release();
} else {
// log.info("发送成功: {}, {} ", sendChannel.remoteAddress(), ByteBufUtil.hexDump(bufToTransfer));
}
}
);
});
}
}
BaseClient client = ClientManager.getClient(ctx.channel());
if (client == null || client.getClientType() != ClientTypeEnum.HABOR) {
if (msgBuf != null) {
msgBuf.release();
}
return;
} else {
// 直接传递当前引用,不需要增加引用计数
ctx.fireChannelRead(msgBuf);
}
}
/**
* , 线
*/
private void haborSignIntoCac(String haborSn, Channel channel) {
// UavIdMap.addMap(UavTypeEnum.FP981A, 1, "5");
// HaborUavMap.addMap("5", "23293F");
// UavIdMap.addMap(UavTypeEnum.FP981A, 2, "6");
// HaborUavMap.addMap("6", "040000");
// UavIdMap.addMap(UavTypeEnum.FP981A, 3, "3");
// HaborUavMap.addMap("7", "000777");
if (!HaborUavMap.haborIsControllable(haborSn)) {
DirectControlUavParam info = remoteService.queryCacDirectControlUavInfo(haborSn);// 缓存没查到就向中心指控查一遍,查不到就直接退出
if (info == null) {
log.info("[cac] 未查询到哈勃终端-{}的信息,跳过上线", haborSn);
return;
}
}
if (ClientManager.getClient(channel) != null) {
// TODO 2025/1/17: 重复上线的如何处理
return;
}
String uavId = HaborUavMap.getUavIdByHaborSn(haborSn);
UavTypeEnum uavType = UavIdMap.getUavType(uavId);
// 新建哈勃客户端并加入管理
HaborClient client = HaborClient.createClient(uavType, haborSn, channel);
if (client != null) {
ClientManager.addAndOnline(client);
// channel.pipeline().addAfter(); // 上线后添加相关处理器
}
}
/**
* 线
*/
private void haborSignOutOfCac(Channel channel) {
BaseClient client = ClientManager.getClient(channel);
if (client != null) {
ClientManager.removeAndOffline(client);
}
}
private void protocolAck(ChannelHandlerContext ctx, byte success) {
byte[] ack = new byte[8];
ack[0] = (byte) head1;
ack[1] = (byte) head2;
ack[2] = (byte) head3;
ack[3] = (byte) lengthLow;
ack[4] = (byte) lengthHigh;
ack[5] = success;
byte[] crc = CRCUtil.crc16(ack, 2, 4);
ack[6] = crc[0];
ack[7] = crc[1];
ctx.channel().writeAndFlush(ack);
}
private void response(ChannelHandlerContext ctx, byte[] content) {
ByteBuf buf = Unpooled.buffer(content.length + 4);
buf.writeByte((byte) 0xcc);
buf.writeByte((byte) 0x06);
buf.writeByte((byte) 0x04);
buf.writeByte((byte) 0x01);
buf.writeBytes(content);
// ack[4] = (byte) 0x01;
ctx.channel().writeAndFlush(buf);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
log.warn("【设备上线】====> " + getAddress(ctx) + "上线");
sessionCache.tcpSession.put(getAddress(ctx), ctx.channel());
super.handlerAdded(ctx);
}
private String getAddress(ChannelHandlerContext ctx) {
String address = ctx.channel().remoteAddress().toString().toUpperCase();
return address.replace("/", "");
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
String addr = getAddress(ctx);
String sn = sessionCache.addToSnMap.get(addr);
log.warn("【设备离线】====> " + addr + "(" + sn + ")离线");
ControlDevice.clearCurrenCtrDeviceByMac(sn);
sessionCache.tcpSession.remove(addr);
sessionCache.addToSnMap.remove(addr);
if (!StringUtils.isEmpty(sn)) {
// 只有当要被删除的地址跟snToAddMap里面的地址匹配的时候才删除
// 因为存在一种情况同一个sn上线了snToAddMap数据已经被正确的地址覆盖此时不应该删除
if (addr.equals(sessionCache.snToAddMap.get(sn))) {
sessionCache.snToAddMap.remove(sn);
}
}
haborSignOutOfCac(ctx.channel());
super.handlerRemoved(ctx);
}
}

@ -9,6 +9,7 @@ import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.bytes.ByteArrayEncoder; import io.netty.handler.codec.bytes.ByteArrayEncoder;
import io.netty.handler.timeout.IdleStateHandler; import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.ResourceLeakDetector;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -30,7 +31,7 @@ public class ServerService {
private SessionCache sessionCache; private SessionCache sessionCache;
@Autowired @Autowired
InMessageHandler inMessageHandler; InMessageHandler1 inMessageHandler;
@Autowired @Autowired
CacDataRouterHandler cacDataRouterHandler; CacDataRouterHandler cacDataRouterHandler;
@ -59,7 +60,7 @@ public class ServerService {
.addLast("CacDataRouter", cacDataRouterHandler); // cac数据转发 .addLast("CacDataRouter", cacDataRouterHandler); // cac数据转发
} }
}); });
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID); // 内存泄露检测上线记得关掉paranoid
bootstrap.option(ChannelOption.SO_BACKLOG, 2048); bootstrap.option(ChannelOption.SO_BACKLOG, 2048);
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
bootstrap.childOption(ChannelOption.TCP_NODELAY, true); bootstrap.childOption(ChannelOption.TCP_NODELAY, true);

@ -60,7 +60,7 @@ public class HaborClient extends BaseClient {
protected CommandQueueRecord previousRouteBind; protected CommandQueueRecord previousRouteBind;
// 遥测频率较高转发中心指控的时候不在CacClient中进行协议类实例化这里直接创建ByteBuf模板 // 遥测频率较高转发中心指控的时候不在CacClient中进行协议类实例化这里直接创建ByteBuf模板
protected static ByteBuf telemetryBufHeadToCac; protected ByteBuf telemetryBufHeadToCac;
protected int cacHeadLength; // 中心指控协议头长度, 见协议序号1-10 protected int cacHeadLength; // 中心指控协议头长度, 见协议序号1-10
protected AtomicUShort cacFrameIdx = new AtomicUShort(); // cac包序号 protected AtomicUShort cacFrameIdx = new AtomicUShort(); // cac包序号
public HaborClient(String sn, Channel channel) { public HaborClient(String sn, Channel channel) {
@ -249,7 +249,11 @@ public class HaborClient extends BaseClient {
protected ByteBuf buf; protected ByteBuf buf;
private AtomicInteger sendCount = new AtomicInteger(0); private AtomicInteger sendCount = new AtomicInteger(0);
public DataByteBuf(int frameSize) {buf = PooledByteBufAllocator.DEFAULT.buffer(frameSize);} public DataByteBuf(int frameSize) {buf = PooledByteBufAllocator.DEFAULT.buffer(frameSize);}
public void release() { if (buf.refCnt() > 0) buf.release(buf.refCnt());} public void release() {
if (buf.refCnt() > 0) {
buf.release(buf.refCnt());
}
}
public void resetSendCount() { public void resetSendCount() {
sendCount.set(0); sendCount.set(0);
} }

@ -1,7 +1,7 @@
server: server:
port: 11727 port: 11112
service: service:
port: 11728 port: 11111
# 中心指控系统 tcp udp 配置 # 中心指控系统 tcp udp 配置

@ -3,9 +3,6 @@ server:
service: service:
port: 11111 port: 11111
app:
debug: false
# 中心指控系统 tcp udp 配置 # 中心指控系统 tcp udp 配置
remote-cac: remote-cac:
ip: 123.57.54.1 ip: 123.57.54.1

Loading…
Cancel
Save