[v0.0.2] 1. InMessageHandler1代码方法抽取;2. TelemetryDecoder空buf释放;3. CacUavParamBindHandler和 CacUavParamQueryHandler向飞控传递数据内容错误修复

master
shiyi 1 month ago
parent 1fd2d6196e
commit decf340f97

@ -10,7 +10,7 @@
</parent> </parent>
<groupId>com.platform</groupId> <groupId>com.platform</groupId>
<artifactId>pass-through</artifactId> <artifactId>pass-through</artifactId>
<version>1.0.0</version> <version>1.0.2</version>
<name>pass-through</name> <name>pass-through</name>
<description>Demo project for Spring Boot</description> <description>Demo project for Spring Boot</description>

@ -54,7 +54,7 @@ public class CacUavParamBindHandler implements IRemoteMessageHandler {
} }
String uavControlUniId = uavCommand.getUavControlUniId(); String uavControlUniId = uavCommand.getUavControlUniId();
log.info("[cac] 中心指控下发参数装订指令...{}", uavCommand); log.info("[cac] 中心指控下发参数装订指令...{}", uavCommand);
client.writeParamBindCommand(commandCode, commandContent, uavControlUniId); client.writeParamBindCommand(commandCode, command14_29, uavControlUniId);
} else { } else {
log.warn("[cac] 当前服务未连接中心指控无法下发参数装订指令"); log.warn("[cac] 当前服务未连接中心指控无法下发参数装订指令");
} }

@ -54,7 +54,7 @@ public class CacUavParamQueryHandler implements IRemoteMessageHandler {
String uavControlUniId = uavCommand.getUavControlUniId(); String uavControlUniId = uavCommand.getUavControlUniId();
log.info("[cac] 中心指控下发参数查询指令...{}", uavCommand); log.info("[cac] 中心指控下发参数查询指令...{}", uavCommand);
client.writeParamQueryCommand(commandCode, commandContent, uavControlUniId); client.writeParamQueryCommand(commandCode, command14_29, uavControlUniId);
} else { } else {
log.warn("[cac] 当前服务未连接中心指控无法下发参数查询指令"); log.warn("[cac] 当前服务未连接中心指控无法下发参数查询指令");
} }

@ -25,7 +25,7 @@ public class CacDataRouterHandler extends SimpleChannelInboundHandler<ByteBuf> {
if (client == null || client.getClientType() != ClientTypeEnum.HABOR) { if (client == null || client.getClientType() != ClientTypeEnum.HABOR) {
return; return;
} }
log.debug("[cac] {} 收到数据({}bytes): {}", ((HaborClient) client).shortInfo(), msg.readableBytes(), ByteBufUtil.hexDump(msg)); // log.debug("[cac] {} 收到数据({}bytes): {}", ((HaborClient) client).shortInfo(), msg.readableBytes(), ByteBufUtil.hexDump(msg));
HaborClient haborClient = (HaborClient) client; HaborClient haborClient = (HaborClient) client;
haborClient.process(msg); haborClient.process(msg);
} }

@ -13,7 +13,6 @@ import com.platform.service.clientmanage.ClientManager;
import com.platform.service.clientmanage.HaborClient; import com.platform.service.clientmanage.HaborClient;
import com.platform.util.*; import com.platform.util.*;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.*; import io.netty.channel.*;
import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleState;
@ -120,15 +119,41 @@ public class InMessageHandler1 extends ChannelInboundHandlerAdapter {
@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg0) throws Exception { public void channelRead(ChannelHandlerContext ctx, Object msg0) throws Exception {
String address = getAddress(ctx); String address = getAddress(ctx);
final ByteBuf msgBuf = (ByteBuf) msg0; final ByteBuf msgBuf = (ByteBuf) msg0;
try {
// 上线握手阶段 // 上线握手阶段
if (!sessionCache.addToSnMap.containsKey(address)) { // 判断连接是否已经建立,如未建立则先握手 if (!sessionCache.addToSnMap.containsKey(address)) { // 判断连接是否已经建立,如未建立则先握手
try { handshake(ctx, msgBuf, address);
msgBuf.release();
return; // 握手结束立即返回
}
else {
// 已经建立连接且非地面站控制信息,则进行数据转发
transfer(ctx, msgBuf, address);
}
} catch (RuntimeException e) {
ReferenceCountUtil.safeRelease(msgBuf);
return;
}
// if (msgBuf.getByte(0) != (byte) 0xAA || msgBuf.getByte(1) != (byte) 0x44) { BaseClient client = ClientManager.getClient(ctx.channel());
// return; if (client == null || client.getClientType() != ClientTypeEnum.HABOR) {
// } if (msgBuf != null) {
msgBuf.release();
}
return;
} else {
// 直接传递当前引用,不需要增加引用计数
ctx.fireChannelRead(msgBuf);
}
}
/**处理哈勃、地面站的握手信息*/
private boolean handshake(ChannelHandlerContext ctx, ByteBuf msgBuf, String address) {
if (msgBuf.getByte(0) != (byte) 0xAA || msgBuf.getByte(1) != (byte) 0x44) {
return false;
}
byte[] msg = new byte[msgBuf.readableBytes()]; byte[] msg = new byte[msgBuf.readableBytes()];
msgBuf.readBytes(msg); msgBuf.readBytes(msg);
log.info("【接收握手原数据】 " + ByteUtils.bytes2HexString(msg)); log.info("【接收握手原数据】 " + ByteUtils.bytes2HexString(msg));
@ -151,17 +176,19 @@ public class InMessageHandler1 extends ChannelInboundHandlerAdapter {
typeName = "地面站"; typeName = "地面站";
deviceCode = new String(msg, 6, 12, Charset.defaultCharset()).toUpperCase(); // 地面站 deviceCode = new String(msg, 6, 12, Charset.defaultCharset()).toUpperCase(); // 地面站
protocolAck(ctx, (byte) 1); protocolAck(ctx, (byte) 1);
log.info("{} {} 握手成功: {}", typeName, deviceCode, ByteUtils.bytes2HexString(msg));
} else if (type.equals("2")) { } else if (type.equals("2")) {
typeName = "哈勃终端"; typeName = "哈勃终端";
deviceCode = new String(msg, 12, 6, Charset.defaultCharset()).toUpperCase(); // 哈勃上线 deviceCode = new String(msg, 12, 6, Charset.defaultCharset()).toUpperCase(); // 哈勃上线
protocolAck(ctx, (byte) 1); protocolAck(ctx, (byte) 1);
log.info("{} {} 握手成功: {}", typeName, deviceCode, ByteUtils.bytes2HexString(msg));
haborSignIntoCac(deviceCode, ctx.channel()); haborSignIntoCac(deviceCode, ctx.channel());
} else { } else {
protocolAck(ctx, (byte) 0); // 握手失败 protocolAck(ctx, (byte) 0); // 握手失败
return; log.info("握手失败: {}", ByteUtils.bytes2HexString(msg));
return false;
} }
log.info("【读取握手数据】======> deviceCode = " + deviceCode + " type = " + typeName);
// 删除旧的连接。如果存在异常断开的时候,旧连接并未清除。这里在上线的时候检查并清除 // 删除旧的连接。如果存在异常断开的时候,旧连接并未清除。这里在上线的时候检查并清除
String addr0 = sessionCache.snToAddMap.get(deviceCode); String addr0 = sessionCache.snToAddMap.get(deviceCode);
@ -175,16 +202,14 @@ public class InMessageHandler1 extends ChannelInboundHandlerAdapter {
sessionCache.addToSnMap.put(address, deviceCode); // ip-mac sessionCache.addToSnMap.put(address, deviceCode); // ip-mac
sessionCache.snToAddMap.put(deviceCode, address); // mac- ip sessionCache.snToAddMap.put(deviceCode, address); // mac- ip
sessionCache.tcpSession.put(address, ctx.channel()); // ip - channel sessionCache.tcpSession.put(address, ctx.channel()); // ip - channel
return true;
return; // 握手结束立即返回
} finally {
msgBuf.release();
}
} }
// 已经建立连接且非地面站控制信息,则进行数据转发
else { /**地面站和哈勃之间的数据转发(原哈勃服务功能)*/
private void transfer(ChannelHandlerContext ctx, ByteBuf msgBuf, String address) throws RuntimeException {
final ByteBuf bufToTransfer = msgBuf.duplicate(); // 转发,保留原始索引 final ByteBuf bufToTransfer = msgBuf.duplicate(); // 转发,保留原始索引
String sn = sessionCache.addToSnMap.get(address); // sn =mac String sn = sessionCache.addToSnMap.get(address); // sn =mac
// 监控相关 // 监控相关
if (!sessionCache.snToMonitorMap.isEmpty()) { if (!sessionCache.snToMonitorMap.isEmpty()) {
Optional<Channel> sendChannelOpt = sessionCache.findMonitorChannel(address); Optional<Channel> sendChannelOpt = sessionCache.findMonitorChannel(address);
@ -204,7 +229,6 @@ public class InMessageHandler1 extends ChannelInboundHandlerAdapter {
} }
if (!sessionCache.mappingListMap.isEmpty() && sessionCache.mappingListMap.get(sn) != null) { if (!sessionCache.mappingListMap.isEmpty() && sessionCache.mappingListMap.get(sn) != null) {
if (debug) { if (debug) {
// log.info("【转发数据】======> " + ByteBufUtil.hexDump(msgBuf)); // log.info("【转发数据】======> " + ByteBufUtil.hexDump(msgBuf));
@ -222,8 +246,6 @@ public class InMessageHandler1 extends ChannelInboundHandlerAdapter {
// TODO 2025/1/17: 需优化 // TODO 2025/1/17: 需优化
if (!future.isSuccess()) { if (!future.isSuccess()) {
bufToTransfer.release(); bufToTransfer.release();
} else {
// log.info("发送成功: {}, {} ", sendChannel.remoteAddress(), ByteBufUtil.hexDump(bufToTransfer));
} }
} }
); );
@ -231,18 +253,6 @@ public class InMessageHandler1 extends ChannelInboundHandlerAdapter {
} }
} }
BaseClient client = ClientManager.getClient(ctx.channel());
if (client == null || client.getClientType() != ClientTypeEnum.HABOR) {
if (msgBuf != null) {
msgBuf.release();
}
return;
} else {
// 直接传递当前引用,不需要增加引用计数
ctx.fireChannelRead(msgBuf);
}
}
/** /**
* , 线 * , 线
*/ */

@ -27,10 +27,19 @@ public class TelemetryDecoder extends DelimiterBasedFrameDecoder {
@Override @Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
ByteBuf decodeBuf = (ByteBuf) super.decode(ctx, buffer); ByteBuf decodeBuf = (ByteBuf) super.decode(ctx, buffer);
if (decodeBuf == null || decodeBuf.readableBytes()==0) { if (decodeBuf == null) {
return null; // 如果 decodeBuf 为 null直接返回 null return null;
} else if (decodeBuf.readableBytes()==0){
decodeBuf.release(); // 释放空buffer
return null;
}
try {
// 创建组合buffer注意HEAD需要复制因为它是静态共享的
return Unpooled.wrappedBuffer(HEAD.retainedDuplicate(), decodeBuf);
} catch (Exception e) {
decodeBuf.release();
return null; // Add return statement in catch block
} }
return Unpooled.wrappedBuffer(HEAD.retain(), decodeBuf); // 只有在 decodeBuf 不为 null 时,才补上帧头
// return decodeBuf;
} }
} }

@ -241,6 +241,11 @@ public class HaborClient981A extends HaborClient {
@Override @Override
public void writeParamBindCommand(byte commandCode, byte[] commandContent, String controlUniId) { public void writeParamBindCommand(byte commandCode, byte[] commandContent, String controlUniId) {
// try { // try {
// byte[] test = {
// (byte) 0x39, (byte) 0x32, (byte) 0x00, (byte) 0x36, (byte) 0x32, (byte) 0x14, (byte) 0xF6, (byte) 0x00, (byte) 0x00, (byte) 0x28, (byte) 0x05, (byte) 0x05, (byte) 0x00, (byte) 0x00, (byte) 0x04, (byte) 0x00
// };
// 39 32 00 36 32 14 F6 00 00 28 05 05 00 00 04 00
// 39 32 00 36 32 14 F6 00 00 28 05 05 00 05 00 03
bindDataBuf.buf.setBytes(13, commandContent); bindDataBuf.buf.setBytes(13, commandContent);
setCurrentSendData(bindDataBuf); // 设置当前发送的数据类型 setCurrentSendData(bindDataBuf); // 设置当前发送的数据类型
previousParamBind.recordCommand(commandCode, controlUniId, bindDataBuf.buf.slice(13,16)); // 记录参数装订指令 previousParamBind.recordCommand(commandCode, controlUniId, bindDataBuf.buf.slice(13,16)); // 记录参数装订指令

@ -1,6 +1,6 @@
spring: spring:
profiles: profiles:
active: pre active: dev
app: app:
debug: true debug: true
Loading…
Cancel
Save