[v0.0.3] 联调测试问题修改。主要内容:1. 更新常发帧线程调度策略,允许动态修改帧频;2. 向Cac发送遥测数据buf时处理发送失败时的资源释放;3.修改航线装订回报处理的错误

master
shiyi 1 month ago
parent decf340f97
commit 41533d6105

@ -10,9 +10,9 @@
</parent> </parent>
<groupId>com.platform</groupId> <groupId>com.platform</groupId>
<artifactId>pass-through</artifactId> <artifactId>pass-through</artifactId>
<version>1.0.2</version> <version>1.0.3</version>
<name>pass-through</name> <name>pass-through</name>
<description>Demo project for Spring Boot</description> <description>中心指控直控无人机哈勃服务</description>
<properties> <properties>
<java.version>1.8</java.version> <java.version>1.8</java.version>

@ -1,9 +1,6 @@
package com.platform.cac; package com.platform.cac;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.platform.info.GlobalData; import com.platform.info.GlobalData;
import com.platform.model.DirectControlUavParam; import com.platform.model.DirectControlUavParam;
import com.platform.model.Result; import com.platform.model.Result;
@ -13,7 +10,6 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.io.IOException; import java.io.IOException;
import java.net.ConnectException; import java.net.ConnectException;
import java.util.HashMap; import java.util.HashMap;
@ -144,13 +140,6 @@ public class CacHpApi {
return false; return false;
} }
/**
*
*/
public static String queryGcsSnapshot(String body) {
final String gcsSignApi = HTFP_PATH + "/queryGcsSnapshot";
return postRequest(gcsSignApi, body);
}
/** /**
* *
@ -163,26 +152,43 @@ public class CacHpApi {
/** /**
* *
*/ */
public static String commandNotify(String body) { public static boolean commandNotify(String body) {
final String gcsSignApi = HTFP_PATH + "/notifyUavCommand"; final String gcsSignApi = HTFP_PATH + "/notifyUavCommand";
return postRequest(gcsSignApi, body); Result result = postRequestAndGetResult(gcsSignApi, body);
if (result == null || !result.isSuccess()) {
log.error("[cac] 指令执行结果通知失败: {}", result);
return false;
}
return true;
} }
/** /**
* *
*/ */
public static String uavParamQueryResultNotify(String body) { public static boolean uavParamQueryResultNotify(String body) {
final String gcsSignApi = HTFP_PATH + "/uavParamQueryResultNotify"; final String gcsSignApi = HTFP_PATH + "/uavParamQueryResultNotify";
return postRequest(gcsSignApi, body); Result result = postRequestAndGetResult(gcsSignApi, body);
if (result == null || !result.isSuccess()) {
log.error("[cac] 参数查询结果通知失败: {}", result);
return false;
}
return true;
} }
/** /**
* *
*/ */
public static String uavParamBindResultNotify(String body) { public static boolean uavParamBindResultNotify(String body) {
final String gcsSignApi = HTFP_PATH + "/uavParamBindResultNotify"; final String gcsSignApi = HTFP_PATH + "/uavParamBindResultNotify";
return postRequest(gcsSignApi, body); Result result = postRequestAndGetResult(gcsSignApi, body);
if (result == null || !result.isSuccess()) {
log.error("[cac] 装订执行结果通知失败: {}", result);
return false;
}
return true;
} }
public static List<DirectControlUavParam> queryAllCacDirectControlUavList(String body) { public static List<DirectControlUavParam> queryAllCacDirectControlUavList(String body) {
final String gcsSignApi = HTFP_PATH + "/queryAllCacDirectControlUavList"; final String gcsSignApi = HTFP_PATH + "/queryAllCacDirectControlUavList";
try { try {

@ -47,7 +47,7 @@ public class CacUavParamQueryHandler implements IRemoteMessageHandler {
byte[] command14_29 = new byte[16]; byte[] command14_29 = new byte[16];
command14_29[0] = commandCode; command14_29[0] = commandCode;
System.arraycopy(commandContent, 0, command14_29, 1, 15); System.arraycopy(commandContent, 0, command14_29, 1, 15);
if (Arrays.equals(uavCommand.getCrc16(), CRCUtil.getCRCfromGcsTelemetryData(command14_29, 0, 16))) { if (!Arrays.equals(uavCommand.getCrc16(), CRCUtil.getCRCfromGcsTelemetryData(command14_29, 0, 16))) {
log.error("[cac] 中心指控参数查询指令下发失败CRC校验失败"); log.error("[cac] 中心指控参数查询指令下发失败CRC校验失败");
return; return;
} }

@ -48,7 +48,7 @@ public class CacUavRouteBindHandler implements IRemoteMessageHandler {
byte commandCode = uavCommand.getCommandCode(); byte commandCode = uavCommand.getCommandCode();
byte routePointNum = uavCommand.getRoutePointNum(); byte routePointNum = uavCommand.getRoutePointNum();
byte[] routePointInfo = uavCommand.getRoutePointInfo(); // 16 * routePointNum 个字节 byte[] routePointInfo = uavCommand.getRoutePointInfo(); // 16 * routePointNum 个字节
if (Arrays.equals(uavCommand.getCrc16(), CRCUtil.getCRCfromGcsTelemetryData(routePointInfo, 0, routePointInfo.length))) { if (!Arrays.equals(uavCommand.getCrc16(), CRCUtil.getCRCfromGcsTelemetryData(routePointInfo, 0, routePointInfo.length))) {
log.error("[cac] 中心指控航线装订指令下发失败CRC校验失败"); log.error("[cac] 中心指控航线装订指令下发失败CRC校验失败");
return; return;
} }

@ -41,12 +41,12 @@ public class HaborUavMap {
/** 当前地面站是否可控哈勃 */ /** 当前地面站是否可控哈勃 */
public static boolean haborIsControllable(@NotNull String haborSn){ public static boolean haborIsControllable(@NotNull String haborSn){
return habor2uavId.contains(haborSn) && uavId2habor.containsValue(haborSn); return habor2uavId.containsKey(haborSn) && uavId2habor.containsValue(haborSn);
} }
/** 当前地面站是否可控uavId*/ /** 当前地面站是否可控uavId*/
public static boolean uavIsControllable(@NotNull String uavId){ public static boolean uavIsControllable(@NotNull String uavId){
return uavId2habor.contains(uavId) && habor2uavId.containsValue(uavId); return uavId2habor.containsKey(uavId) && habor2uavId.containsValue(uavId);
} }
public static void clear() { public static void clear() {
habor2uavId.clear(); habor2uavId.clear();

@ -5,7 +5,7 @@ import com.platform.service.clientmanage.BaseClient;
import com.platform.service.clientmanage.ClientManager; import com.platform.service.clientmanage.ClientManager;
import com.platform.service.clientmanage.HaborClient; import com.platform.service.clientmanage.HaborClient;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;

@ -3,6 +3,7 @@ package com.platform.service;
import com.platform.cac.RemoteService; import com.platform.cac.RemoteService;
import com.platform.config.ServiceConfig; import com.platform.config.ServiceConfig;
import com.platform.info.GlobalData;
import com.platform.info.enums.ClientTypeEnum; import com.platform.info.enums.ClientTypeEnum;
import com.platform.info.enums.UavTypeEnum; import com.platform.info.enums.UavTypeEnum;
import com.platform.info.mapping.HaborUavMap; import com.platform.info.mapping.HaborUavMap;
@ -25,6 +26,7 @@ import org.springframework.util.StringUtils;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
@ -45,7 +47,13 @@ public class InMessageHandler1 extends ChannelInboundHandlerAdapter {
private static final int head3 = 0x61; private static final int head3 = 0x61;
private static final int lengthLow = 0x03; private static final int lengthLow = 0x03;
private static final int lengthHigh = 0x00; private static final int lengthHigh = 0x00;
static HashSet<String> haborSnSet = new HashSet<>();
static {
// 测试使用添加100个哈勃序列号和连接端口对应
for (int i = 0; i < 100; i++) {
haborSnSet.add("0"+String.valueOf(40000+i));
}
}
@Autowired @Autowired
public InMessageHandler1(SessionCache sessionCache, StringUtil stringUtil, ServiceConfig config) { public InMessageHandler1(SessionCache sessionCache, StringUtil stringUtil, ServiceConfig config) {
this.stringUtil = stringUtil; this.stringUtil = stringUtil;
@ -94,7 +102,7 @@ public class InMessageHandler1 extends ChannelInboundHandlerAdapter {
InetSocketAddress remoteAddress = (InetSocketAddress) channel.remoteAddress(); InetSocketAddress remoteAddress = (InetSocketAddress) channel.remoteAddress();
String haborSn = String.format("%06d", remoteAddress.getPort()); // 远端端口当作哈勃序列号 String haborSn = String.format("%06d", remoteAddress.getPort()); // 远端端口当作哈勃序列号
try { try {
if ("040001".equals(haborSn) || "040002".equals(haborSn) || "040003".equals(haborSn)) { if (haborSnSet.contains(haborSn)) {
String deviceCode = haborSn; String deviceCode = haborSn;
// 删除旧的连接。如果存在异常断开的时候,旧连接并未清除。这里在上线的时候检查并清除 // 删除旧的连接。如果存在异常断开的时候,旧连接并未清除。这里在上线的时候检查并清除
String addr0 = sessionCache.snToAddMap.get(deviceCode); String addr0 = sessionCache.snToAddMap.get(deviceCode);
@ -258,8 +266,11 @@ public class InMessageHandler1 extends ChannelInboundHandlerAdapter {
*/ */
private void haborSignIntoCac(String haborSn, Channel channel) { private void haborSignIntoCac(String haborSn, Channel channel) {
// UavIdMap.addMap(UavTypeEnum.FP981A, 1, "5"); // UavIdMap.addMap(UavTypeEnum.FP981A, 1, "1");
// HaborUavMap.addMap("5", "23293F"); // HaborUavMap.addMap("1", "040001");
// GlobalData.GCS_ID = "1";
// GlobalData.AUTHORIZATION = "1111111111111";
// UavIdMap.addMap(UavTypeEnum.FP981A, 2, "6"); // UavIdMap.addMap(UavTypeEnum.FP981A, 2, "6");
// HaborUavMap.addMap("6", "040000"); // HaborUavMap.addMap("6", "040000");
// UavIdMap.addMap(UavTypeEnum.FP981A, 3, "3"); // UavIdMap.addMap(UavTypeEnum.FP981A, 3, "3");

@ -60,7 +60,7 @@ public class ServerService {
.addLast("CacDataRouter", cacDataRouterHandler); // cac数据转发 .addLast("CacDataRouter", cacDataRouterHandler); // cac数据转发
} }
}); });
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID); // 内存泄露检测上线记得关掉paranoid // ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED); // 内存泄露检测上线记得关掉paranoid
bootstrap.option(ChannelOption.SO_BACKLOG, 2048); bootstrap.option(ChannelOption.SO_BACKLOG, 2048);
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
bootstrap.childOption(ChannelOption.TCP_NODELAY, true); bootstrap.childOption(ChannelOption.TCP_NODELAY, true);

@ -47,11 +47,15 @@ public class CommandQueueRecord extends CommandRecord {
@Override @Override
public void clearCommand() { public void clearCommand() {
cancelScheduleTimeout();
this.commandBuf.clear(); this.commandBuf.clear();
this.commandUniId = null; this.commandUniId = null;
this.commandQueue = null; this.commandQueue = null;
this.status = NO_COMMAND; this.status = NO_COMMAND;
} }
public void waitNext() {
cancelScheduleTimeout();
}
} }

@ -60,7 +60,6 @@ public class CommandRecord {
this.status = WAITING_RESULT; this.status = WAITING_RESULT;
this.receiveFrameCount.set(0); this.receiveFrameCount.set(0);
//启动一个超时检查的定时任务 //启动一个超时检查的定时任务
this.timeoutFuture = scheduleTimeoutCheck(); this.timeoutFuture = scheduleTimeoutCheck();
@ -68,7 +67,7 @@ public class CommandRecord {
// 定时任务每5秒检查一次所有命令是否超时 // 定时任务每5秒检查一次所有命令是否超时
private ScheduledFuture<?> scheduleTimeoutCheck() { private ScheduledFuture<?> scheduleTimeoutCheck() {
log.debug("[cac] {} {}指令判断定时任务启动", client.shortInfo(), commandType.getInfo()); log.debug("[cac] {} {}指令超时判断定时任务启动", client.shortInfo(), commandType.getInfo());
// 每5秒检查一次所有命令记录的超时状态 // 每5秒检查一次所有命令记录的超时状态
return scheduler.schedule(() -> { return scheduler.schedule(() -> {
if (status != NO_COMMAND ) { if (status != NO_COMMAND ) {
@ -95,7 +94,7 @@ public class CommandRecord {
public void cancelScheduleTimeout() { public void cancelScheduleTimeout() {
if (this.timeoutFuture != null && !this.timeoutFuture.isCancelled()) { if (this.timeoutFuture != null && !this.timeoutFuture.isCancelled()) {
if (this.timeoutFuture.cancel(true)) { if (this.timeoutFuture.cancel(true)) {
log.debug("[cac] {} 取消{}指令结果判断定时任务", client.shortInfo(), commandType.getInfo()); log.debug("[cac] {} 取消{}指令超时判断定时任务", client.shortInfo(), commandType.getInfo());
} }
} }
} }
@ -119,7 +118,7 @@ public class CommandRecord {
} }
/**回报开关指令执行结果,目前仅收到回报时通知*/ /**回报开关指令执行结果,目前仅收到回报时通知*/
public void notifyControlCommandResult(String uavId, boolean isSuccess) { public boolean notifyControlCommandResult(String uavId, boolean isSuccess) {
Map<String, Object> body = new HashMap<>(); Map<String, Object> body = new HashMap<>();
body.put("gcsId", GlobalData.GCS_ID); body.put("gcsId", GlobalData.GCS_ID);
body.put("uavId", uavId); body.put("uavId", uavId);
@ -128,13 +127,14 @@ public class CommandRecord {
body.put("commandSource", 1); body.put("commandSource", 1);
body.put("commandUniId", this.commandUniId); body.put("commandUniId", this.commandUniId);
try { try {
CacHpApi.commandNotify(JSONUtils.obj2json(body)); return CacHpApi.commandNotify(JSONUtils.obj2json(body));
} catch (IOException e) { } catch (IOException e) {
log.error("[cac] 开关指令回报通知失败:{}", body, e); log.error("[cac] 开关指令回报通知发送失败:{}", body, e);
return false;
} }
} }
public void notifyParamBindCommandResult(String uavId, UavTypeEnum uavType, boolean isSuccess) { public boolean notifyParamBindCommandResult(String uavId, UavTypeEnum uavType, boolean isSuccess) {
Map<String, Object> body = new HashMap<>(); Map<String, Object> body = new HashMap<>();
body.put("gcsId", GlobalData.GCS_ID); body.put("gcsId", GlobalData.GCS_ID);
body.put("uavId", uavId); body.put("uavId", uavId);
@ -144,13 +144,14 @@ public class CommandRecord {
body.put("uavParamBindCode", ByteUtils.byteToInt(this.code)); body.put("uavParamBindCode", ByteUtils.byteToInt(this.code));
body.put("bindResult", isSuccess); body.put("bindResult", isSuccess);
try { try {
CacHpApi.uavParamBindResultNotify(JSONUtils.obj2json(body)); return CacHpApi.uavParamBindResultNotify(JSONUtils.obj2json(body));
} catch (IOException e) { } catch (IOException e) {
log.error("[cac] 参数装订回报通知失败:{}", body, e); log.error("[cac] 参数装订回报通知失败:{}", body, e);
return false;
} }
} }
public void notifyParamQueryCommandResult(String uavId, UavTypeEnum uavType, boolean isSuccess, HaborClient.UavQueryResultParam uavQueryResultParam) { public boolean notifyParamQueryCommandResult(String uavId, UavTypeEnum uavType, boolean isSuccess, HaborClient.UavQueryResultParam uavQueryResultParam) {
Map<String, Object> body = new HashMap<>(); Map<String, Object> body = new HashMap<>();
try { try {
body.put("gcsId", GlobalData.GCS_ID); body.put("gcsId", GlobalData.GCS_ID);
@ -159,14 +160,15 @@ public class CommandRecord {
body.put("commandSource", 1); body.put("commandSource", 1);
body.put("uavType", uavType.getRemoteCode()); body.put("uavType", uavType.getRemoteCode());
body.put("uavParamQueryCode", ByteUtils.byteToInt(this.code)); body.put("uavParamQueryCode", ByteUtils.byteToInt(this.code));
body.put("bindResult", isSuccess); body.put("queryResult", isSuccess);
if (isSuccess) { if (isSuccess) {
String queryResultContent = uavQueryResultParam.toJsonStr(); String queryResultContent = uavQueryResultParam.toJsonStr();
body.put("queryResultContent", queryResultContent); body.put("queryResultContent", queryResultContent);
} }
CacHpApi.uavParamQueryResultNotify(JSONUtils.obj2json(body)); return CacHpApi.uavParamQueryResultNotify(JSONUtils.obj2json(body));
} catch (IOException e) { } catch (IOException e) {
log.error("[cac] 参数装订回报通知失败:{}", body, e); log.error("[cac] 参数装订回报通知失败:{}", body, e);
return false;
} }
} }

@ -10,7 +10,8 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.util.concurrent.ScheduledFuture; import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.Future;
import lombok.Getter; import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -25,8 +26,9 @@ import java.util.concurrent.atomic.AtomicInteger;
* @Date : 2024/9/18 10:38 * @Date : 2024/9/18 10:38
* @Description : * @Description :
*/ */
@Slf4j @Getter @Slf4j
public class HaborClient extends BaseClient { @Getter
public abstract class HaborClient extends BaseClient {
// static HashSet<String> fkSnSet = new HashSet<>(); // static HashSet<String> fkSnSet = new HashSet<>();
protected UavTypeEnum uavType; protected UavTypeEnum uavType;
protected byte fkId; protected byte fkId;
@ -34,12 +36,15 @@ public class HaborClient extends BaseClient {
// 数据交换控制 // 数据交换控制
protected final AtomicBoolean needToSendCommonData = new AtomicBoolean(false); // 是否发送常发帧 protected final AtomicBoolean needToSendCommonData = new AtomicBoolean(false); // 是否发送常发帧
protected int commonDataFreq = 1; // 默认发送频率, 每秒发送帧数, 以各型号子类为准 protected int commonDataFreq; // 默认发送频率, 每秒发送帧数, 以各型号子类为准
protected ScheduledFuture scheduledFuture; protected volatile int targetPeriod;
protected Future scheduledFuture;
protected static final int MIN_SCHEDULE_DELAY = 1; // 最小调度延迟,避免过于频繁的调度
protected static final int DEFAULT_COMMON_DATA_FREQ = 5; // 默认常发帧发送频率,每秒发送帧数
// 数据帧内容 // 数据帧内容
protected AtomicUByte frameIdx = new AtomicUByte(); // 飞控包序号 protected AtomicUByte frameIdx = new AtomicUByte(); // 飞控包序号
protected boolean resourceAllocated;
protected DataByteBuf dataBufToSend; // 默认发送常发数据帧 ; // 实际数据发送定时任务发送的数据内容发送时将引用以下dataBuf protected DataByteBuf dataBufToSend; // 默认发送常发数据帧 ; // 实际数据发送定时任务发送的数据内容发送时将引用以下dataBuf
protected CommonDataBuf commonDataBuf; // 常发数据帧, 复用 protected CommonDataBuf commonDataBuf; // 常发数据帧, 复用
protected ControlDataBuf controlDataBuf; // 开关指令数据帧, 复用 protected ControlDataBuf controlDataBuf; // 开关指令数据帧, 复用
@ -49,7 +54,6 @@ public class HaborClient extends BaseClient {
protected static final int FRAME_SIZE = 32; // 如果各型号不一样该字段取消final并在子类中初始化 protected static final int FRAME_SIZE = 32; // 如果各型号不一样该字段取消final并在子类中初始化
protected static final byte[] HEAD = new byte[]{(byte) 0xeb, (byte) 0x90}; protected static final byte[] HEAD = new byte[]{(byte) 0xeb, (byte) 0x90};
// 控制指令回报判断, 指令队列目前只允许一个指令,前一个指令回报或过期后才能发送下一个指令 // 控制指令回报判断, 指令队列目前只允许一个指令,前一个指令回报或过期后才能发送下一个指令
protected CommandRecord previousControl; protected CommandRecord previousControl;
// 装订指令回报判断 // 装订指令回报判断
@ -63,41 +67,44 @@ public class HaborClient extends BaseClient {
protected ByteBuf telemetryBufHeadToCac; protected ByteBuf telemetryBufHeadToCac;
protected int cacHeadLength; // 中心指控协议头长度, 见协议序号1-10 protected int cacHeadLength; // 中心指控协议头长度, 见协议序号1-10
protected AtomicUShort cacFrameIdx = new AtomicUShort(); // cac包序号 protected AtomicUShort cacFrameIdx = new AtomicUShort(); // cac包序号
protected long lastSendTime = 0;
public HaborClient(String sn, Channel channel) { public HaborClient(String sn, Channel channel) {
super(sn, channel); super(sn, channel);
// channel绑定客户端 // channel绑定客户端
channel.attr(ClientManager.CLIENT_ATTRIBUTE_KEY).setIfAbsent(this); channel.attr(ClientManager.CLIENT_ATTRIBUTE_KEY).setIfAbsent(this);
} }
public static HaborClient createClient(UavTypeEnum uavType, String haborSn, Channel channel) {
switch (uavType) {
case FP981A:
return new HaborClient981A(haborSn, channel);
default:
break;
}
return null;
}
@Override @Override
public String toString() { public String toString() {
return getClass().getSimpleName()+ "{" + return getClass().getSimpleName() + "{" +
"channel=" + channel.id() + "channel=" + channel.id() +
", sn='" + sn + '\'' + ", sn='" + sn + '\'' +
", ip='" + ip + '\'' + ", ip='" + ip + '\'' +
", port='" + port + '\'' + ", port='" + port + '\'' +
", uavId='" + uavId + '\'' + ", uavId='" + uavId + '\'' +
", fkId=" + fkId + ", fkId=" + fkId +
", uavType=" + uavType.getInfo() + ", uavType=" + uavType.getInfo() +
'}'; '}';
} }
public String shortInfo() { public String shortInfo() {
return "[sn='" + sn + '\'' + return "[sn='" + sn + '\'' +
", uavId='" + uavId + '\'' + ", uavId='" + uavId + '\'' +
", fkId=" + fkId + ", fkId=" + fkId +
", uavType=" + uavType.getInfo() + ", uavType=" + uavType.getInfo() +
']'; ']';
} }
public static HaborClient createClient(UavTypeEnum uavType, String haborSn, Channel channel) {
switch (uavType) {
case FP981A:
return new HaborClient981A(haborSn, channel);
default:
break;
}
return null;
}
@Override @Override
@ -120,14 +127,25 @@ public class HaborClient extends BaseClient {
return ClientTypeEnum.HABOR; return ClientTypeEnum.HABOR;
} }
/**被管理时初始化*/ /**
protected void resourceAllocate() {} *
*/
protected abstract void resourceAllocate();
/**被移除管理时, 释放相关资源*/ /**
* ,
*/
protected void resourceRelease() { protected void resourceRelease() {
commonDataBuf.release(); // FIXME 2025/2/8: 安全释放资源
controlDataBuf.release(); if (!resourceAllocated){
if (telemetryBufHeadToCac.refCnt() > 0) { return;
}
commonDataBuf.end();
controlDataBuf.end();
bindDataBuf.end();
queryDataBuf.end();
routeDataBuf.end();
if (telemetryBufHeadToCac!= null && telemetryBufHeadToCac.refCnt() > 0) {
telemetryBufHeadToCac.release(telemetryBufHeadToCac.refCnt()); telemetryBufHeadToCac.release(telemetryBufHeadToCac.refCnt());
} }
previousControl.end(); previousControl.end();
@ -136,100 +154,203 @@ public class HaborClient extends BaseClient {
previousRouteBind.end(); previousRouteBind.end();
} }
/**只通过get读数不改变readerIndex和writerIndex, 也不可更改refCnt*/ /**
* getreaderIndexwriterIndex, refCnt
*/
public void process(ByteBuf msg) { public void process(ByteBuf msg) {
if (msg.getByte(0) != HEAD[0] && msg.getByte(1) != HEAD[1]) { if (msg.getByte(0) != HEAD[0] && msg.getByte(1) != HEAD[1]) {
return; // 帧头错误,抛弃 return; // 帧头错误,抛弃
} }
if (!processInjectResponse(msg)) { if (!processInjectResponse(msg)) {
// 如果消息未处理,则发送给中心指控
sendToCac(msg.retain()); // 外面SimpleChannelInboundHandler会自动release msg, 所以这里需要retain一次 sendToCac(msg.retain()); // 外面SimpleChannelInboundHandler会自动release msg, 所以这里需要retain一次
} }
}
/**注入回报帧处理*/
protected boolean processInjectResponse(ByteBuf msg) {
return false;
} }
/**发送常发帧*/
protected void writeCommonData() {}
/**
*
*/
protected abstract boolean processInjectResponse(ByteBuf msg);
/**向飞控发送控制指令*/ /**
public void writeControlCommand(byte commandCode, String uavControlUniId) {} *
*/
protected abstract void writeCommonData();
/**向飞控发送参数装订指令*/ /**
public void writeParamBindCommand(byte commandCode, byte[] commandContent, String controlUniId) {} *
*/
public abstract void writeControlCommand(byte commandCode, String uavControlUniId);
/**向飞控发送参数查询指令*/ /**
public void writeParamQueryCommand(byte commandCode, byte[] commandContent, String controlUniId) {} *
*/
public abstract void writeParamBindCommand(byte commandCode, byte[] commandContent, String controlUniId);
/**
*
*/
public abstract void writeParamQueryCommand(byte commandCode, byte[] commandContent, String controlUniId);
/**
* 线
*/
public abstract void writeRouteBindCommand(byte commandCode, List<byte[]> routeInfoList, String controlUniId);
// 启动定时任务常发数据帧
protected void startSendCommonData() {
// scheduledFuture = channel.eventLoop().scheduleAtFixedRate(
// this::writeCommonData, 0, targetPeriod, TimeUnit.MILLISECONDS);
// // 检查任务是否成功启动
// if (scheduledFuture != null && !scheduledFuture.isCancelled()) {
// log.info("[cac] 开始向哈勃终端({})发送常发帧", shortInfo());
// needToSendCommonData.compareAndSet(false, true);
// } else {
// log.info("[cac] 哈勃终端({})发送常发帧启动失败", shortInfo());
// }
needToSendCommonData.set(true);
lastSendTime = System.currentTimeMillis();
scheduledFuture = channel.eventLoop().submit(new Runnable() {
@Override
public void run() {
try {
if (!needToSendCommonData.get() || !channel.isActive()) {
return;
}
long currentTime = System.currentTimeMillis();
long elapsed = currentTime - lastSendTime;
if (elapsed >= targetPeriod) {
// 批量发送逻辑,如果延迟太大,可以考虑补发
int missedFrames = (int) (elapsed / targetPeriod);
if (missedFrames > 2) { // 限制补发帧数
log.warn("常发帧延迟: {}ms", elapsed - targetPeriod);
}
writeCommonData();
lastSendTime = currentTime;
// 使用schedule而不是立即execute给其他任务执行的机会
channel.eventLoop().schedule(this,
Math.max(MIN_SCHEDULE_DELAY, targetPeriod / 2),
TimeUnit.MILLISECONDS);
} else {
// 延迟剩余时间后再次调度
channel.eventLoop().schedule(this,
Math.max(MIN_SCHEDULE_DELAY, targetPeriod - elapsed),
TimeUnit.MILLISECONDS);
}
} catch (Exception e) {
log.error("[cac] {} 常发帧调度异常: {}", shortInfo(), e.getMessage());
// 发生异常时,延迟一段时间后重试
if (needToSendCommonData.get() && channel.isActive()) {
channel.eventLoop().schedule(this, 500, TimeUnit.MILLISECONDS);
}
}
}
});
log.info("[cac] 开始向哈勃终端({})发送常发帧, 频率:{}帧/秒", shortInfo(), commonDataFreq);
}
/**向飞控发送航线装订指令*/
public void writeRouteBindCommand(byte commandCode, List<byte[]> routeInfoList, String controlUniId) {}
/**设置当前发送数据*/
protected void setCurrentSendData(DataByteBuf dataByteBuf) { protected void setCurrentSendData(DataByteBuf dataByteBuf) {
setCurrentSendData(dataByteBuf, DEFAULT_COMMON_DATA_FREQ);
}
/**
*
*/
protected void setCurrentSendData(DataByteBuf dataByteBuf, int freq) {
dataByteBuf.resetSendCount(); dataByteBuf.resetSendCount();
dataBufToSend = dataByteBuf; dataBufToSend = dataByteBuf;
if (freq != commonDataFreq) {
adjustSendFrequency(freq);
}
} }
/**恢复常发帧数据*/
// 调整常发帧发送频率
protected void adjustSendFrequency(int newFreq) {
if (newFreq <= 0) {
throw new IllegalArgumentException("频率必须大于0");
}
commonDataFreq = newFreq;
targetPeriod = 1000 / newFreq;
log.info("[cac] {} 调整发送频率为 {} 帧/秒", shortInfo(), newFreq);
}
/**
*
*/
protected void switchToCommonData() { protected void switchToCommonData() {
setCurrentSendData(commonDataBuf); setCurrentSendData(commonDataBuf, DEFAULT_COMMON_DATA_FREQ);
dataBufToSend.resetSendCount(); dataBufToSend.resetSendCount();
log.info("[cac] {} 设置当前发送数据为常发数据帧", shortInfo()); log.info("[cac] {} 设置当前发送数据为常发数据帧", shortInfo());
} }
// 启动定时任务常发数据帧
protected void startSendCommonData() {
int period = 1000 / commonDataFreq; // 每帧间隔ms
scheduledFuture = channel.eventLoop().scheduleAtFixedRate(
this::writeCommonData, 0, period, TimeUnit.MILLISECONDS);
// 检查任务是否成功启动
if (scheduledFuture != null && !scheduledFuture.isCancelled()) {
log.info("[cac] 开始向哈勃终端({})发送常发帧", shortInfo());
needToSendCommonData.compareAndSet(false, true);
} else {
log.info("[cac] 哈勃终端({})发送常发帧启动失败", shortInfo());
}
}
// 停止定时任务常发数据帧 // 停止定时任务常发数据帧
protected void stopSendCommonData() { protected void stopSendCommonData() {
needToSendCommonData.set(false);
if (scheduledFuture != null && !scheduledFuture.isCancelled()) { if (scheduledFuture != null && !scheduledFuture.isCancelled()) {
scheduledFuture.cancel(true); // 取消任务并尝试中断 scheduledFuture.cancel(true); // 取消任务并尝试中断
needToSendCommonData.compareAndSet(true, false); scheduledFuture = null;
log.info("[cac] 停止向哈勃终端({})发送常发帧", shortInfo());
} }
log.info("[cac] 停止向哈勃终端({})发送常发帧", shortInfo());
} }
protected boolean isNeedToSendCommonData() { protected boolean isNeedToSendCommonData() {
return needToSendCommonData.get(); return needToSendCommonData.get();
} }
/**暂停发送常发帧*/ /**
*
*/
protected void pauseSendCommonData() { protected void pauseSendCommonData() {
// needToSendCommonData.set(false); needToSendCommonData.set(false);
log.info("[cac] 暂停向哈勃终端({})发送常发帧", shortInfo()); log.info("[cac] 暂停向哈勃终端({})发送常发帧", shortInfo());
} }
/**恢复发送常发帧*/ /**
*
*/
protected void resumeSendCommonData() { protected void resumeSendCommonData() {
// needToSendCommonData.set(true); needToSendCommonData.set(true);
log.info("[cac] 恢复向哈勃终端({})发送常发帧", shortInfo()); log.info("[cac] 恢复向哈勃终端({})发送常发帧", shortInfo());
} }
/** /**
* *
*
* @param srcBuf ByteBuf * @param srcBuf ByteBuf
*/ */
public void sendToCac(ByteBuf srcBuf) { public void sendToCac(ByteBuf srcBuf) {
telemetryBufHeadToCac.writerIndex(cacHeadLength-6); telemetryBufHeadToCac.writerIndex(cacHeadLength - 6);
telemetryBufHeadToCac.writeShort(cacFrameIdx.getAndIncrement()); telemetryBufHeadToCac.writeShort(cacFrameIdx.getAndIncrement());
telemetryBufHeadToCac.writeInt(srcBuf.readableBytes()); telemetryBufHeadToCac.writeInt(srcBuf.readableBytes());
ByteBuf telemetryBufToCac = Unpooled.wrappedBuffer(telemetryBufHeadToCac.retain(), srcBuf); // 遥测数据组合 ByteBuf telemetryBufToCac = Unpooled.wrappedBuffer(telemetryBufHeadToCac.retain(), srcBuf); // 遥测数据组合
CacClient.write(telemetryBufToCac); ChannelFuture future = CacClient.write(telemetryBufToCac);
if (future != null) {
// future.addListener((ChannelFuture f) -> {
// if (!f.isSuccess()) {
// telemetryBufToCac.release();
// }
// });
} else {
telemetryBufToCac.release();
}
} }
/**给数据添加CRC16校验*/
/**
* CRC16
*/
protected void addCRC16(ByteBuf dataBuf) { protected void addCRC16(ByteBuf dataBuf) {
ByteBuf slice = dataBuf.slice(2, 28);// 校验第3-30位共28个字节 ByteBuf slice = dataBuf.slice(2, 28);// 校验第3-30位共28个字节
dataBuf.writerIndex(30); dataBuf.writerIndex(30);
@ -237,56 +358,98 @@ public class HaborClient extends BaseClient {
} }
/**
*
*/
public static abstract class UavQueryResultParam { public static abstract class UavQueryResultParam {
public abstract String toJsonStr() throws IOException; public abstract String toJsonStr() throws IOException;
} }
/**Cac起飞参数查询查询回报具体定义见各型号子类*/
public static abstract class UavTakeOffParamQueryResultParam extends UavQueryResultParam {} /**
/**Cac重量查询回报具体定义见各型号子类*/ * Cac
public static abstract class UavWeightQueryResultParam extends UavQueryResultParam {} */
protected static class DataByteBuf{ public static abstract class UavTakeOffParamQueryResultParam extends UavQueryResultParam {
}
/**
* Cac
*/
public static abstract class UavWeightQueryResultParam extends UavQueryResultParam {
}
/**
*
*/
protected static class DataByteBuf {
protected ByteBuf buf; protected ByteBuf buf;
private AtomicInteger sendCount = new AtomicInteger(0); private AtomicInteger sendCount = new AtomicInteger(0);
public DataByteBuf(int frameSize) {buf = PooledByteBufAllocator.DEFAULT.buffer(frameSize);}
public void release() { public DataByteBuf(int frameSize) {
buf = PooledByteBufAllocator.DEFAULT.buffer(frameSize);
}
public void end() {
if (buf.refCnt() > 0) { if (buf.refCnt() > 0) {
buf.release(buf.refCnt()); buf.release(buf.refCnt());
} }
} }
public void resetSendCount() { public void resetSendCount() {
sendCount.set(0); sendCount.set(0);
} }
public int addSendCount() { public int addSendCount() {
return sendCount.incrementAndGet(); return sendCount.incrementAndGet();
} }
public int getSendCount() { public int getSendCount() {
return sendCount.get(); return sendCount.get();
} }
} }
protected static class CommonDataBuf extends DataByteBuf{ /**
*
*/
protected static class CommonDataBuf extends DataByteBuf {
public CommonDataBuf(int frameSize) { public CommonDataBuf(int frameSize) {
super(frameSize); super(frameSize);
} }
} }
protected static class ControlDataBuf extends DataByteBuf{
/**
*
*/
protected static class ControlDataBuf extends DataByteBuf {
public int needCount = 10; // 默认帧数 public int needCount = 10; // 默认帧数
public ControlDataBuf(int frameSize) { public ControlDataBuf(int frameSize) {
super(frameSize); super(frameSize);
} }
} }
protected static class BindDataBuf extends DataByteBuf{
/**
*
*/
protected static class BindDataBuf extends DataByteBuf {
public BindDataBuf(int frameSize) { public BindDataBuf(int frameSize) {
super(frameSize); super(frameSize);
} }
} }
protected static class QueryDataBuf extends DataByteBuf{
/**
*
*/
protected static class QueryDataBuf extends DataByteBuf {
public QueryDataBuf(int frameSize) { public QueryDataBuf(int frameSize) {
super(frameSize); super(frameSize);
} }
} }
protected static class RouteDataBuf extends DataByteBuf{
/**
* 线
*/
protected static class RouteDataBuf extends DataByteBuf {
public RouteDataBuf(int frameSize) { public RouteDataBuf(int frameSize) {
super(frameSize); super(frameSize);
} }

@ -13,6 +13,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil; import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import jdk.nashorn.internal.runtime.regexp.joni.exception.ValueException;
import lombok.Data; import lombok.Data;
import lombok.Getter; import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -85,7 +86,8 @@ public class HaborClient981A extends HaborClient {
uavId = HaborUavMap.getUavIdByHaborSn(haborSn); uavId = HaborUavMap.getUavIdByHaborSn(haborSn);
uavType = UavIdMap.getUavType(uavId); uavType = UavIdMap.getUavType(uavId);
fkId = (byte) UavIdMap.getFkId(uavId); fkId = (byte) UavIdMap.getFkId(uavId);
super.commonDataFreq = 5; // 发送频率, 每秒发送帧数 commonDataFreq = DEFAULT_COMMON_DATA_FREQ; // 发送频率, 每秒发送帧数
targetPeriod = 1000 / commonDataFreq;
} }
@Override @Override
@ -114,6 +116,12 @@ public class HaborClient981A extends HaborClient {
dataBufToSend = commonDataBuf; // note 默认发送常发数据帧 dataBufToSend = commonDataBuf; // note 默认发送常发数据帧
if (GlobalData.GCS_ID == null) {
throw new ValueException("GCS_ID is null");
}
if (GlobalData.AUTHORIZATION == null) {
throw new ValueException("AUTHORIZATION is null");
}
// 中心指控数据帧初始化 // 中心指控数据帧初始化
cacHeadLength = 5 // 魔术2 + 版本号1 + 序列化算法1 + 帧类型1 cacHeadLength = 5 // 魔术2 + 版本号1 + 序列化算法1 + 帧类型1
+ (GlobalData.GCS_ID.length()+1) + (GlobalData.GCS_ID.length()+1)
@ -138,6 +146,8 @@ public class HaborClient981A extends HaborClient {
previousParamBind = new CommandRecord(CommandTypeEnum.PARAM_BIND, this, 16); previousParamBind = new CommandRecord(CommandTypeEnum.PARAM_BIND, this, 16);
previousParamQuery = new CommandRecord(CommandTypeEnum.PARAM_QUERY, this, 16); previousParamQuery = new CommandRecord(CommandTypeEnum.PARAM_QUERY, this, 16);
previousRouteBind = new CommandQueueRecord(CommandTypeEnum.ROUTE_BIND, this, 16); previousRouteBind = new CommandQueueRecord(CommandTypeEnum.ROUTE_BIND, this, 16);
resourceAllocated = true;
} }
/**初始化发送帧内容*/ /**初始化发送帧内容*/
@ -190,7 +200,6 @@ public class HaborClient981A extends HaborClient {
log.debug("[cac] {} 发送控制指令: {}", shortInfo(), ByteUtils.bytes2HexString(ByteBufUtil.getBytes(dataBufToSend.buf))); log.debug("[cac] {} 发送控制指令: {}", shortInfo(), ByteUtils.bytes2HexString(ByteBufUtil.getBytes(dataBufToSend.buf)));
if (dataBufToSend.getSendCount() >= ((ControlDataBuf) dataBufToSend).needCount) { if (dataBufToSend.getSendCount() >= ((ControlDataBuf) dataBufToSend).needCount) {
log.info("[cac] {} 控制指令发送完毕, {}", shortInfo(), ByteUtils.bytes2HexString(ByteBufUtil.getBytes(dataBufToSend.buf))); log.info("[cac] {} 控制指令发送完毕, {}", shortInfo(), ByteUtils.bytes2HexString(ByteBufUtil.getBytes(dataBufToSend.buf)));
//
switchToCommonData(); switchToCommonData();
} }
} }
@ -220,7 +229,7 @@ public class HaborClient981A extends HaborClient {
controlDataBuf.buf.setByte(7, commandCode); // 控制指令 controlDataBuf.buf.setByte(7, commandCode); // 控制指令
controlDataBuf.buf.setByte(8, commandCode); // 控制指令 controlDataBuf.buf.setByte(8, commandCode); // 控制指令
controlDataBuf.needCount = commandNumber(commandCode); // 设置需要发送的次数 controlDataBuf.needCount = commandNumber(commandCode); // 设置需要发送的次数
setCurrentSendData(controlDataBuf); // 设置当前发送的数据类型 setCurrentSendData(controlDataBuf, 20); // 设置当前发送的数据类型
previousControl.recordCommand(commandCode, controlUniId, controlDataBuf.buf.slice(13,16)); // 记录控制指令 previousControl.recordCommand(commandCode, controlUniId, controlDataBuf.buf.slice(13,16)); // 记录控制指令
// for (int i = 0; i < commandNumber(commandCode); i++) { // for (int i = 0; i < commandNumber(commandCode); i++) {
// controlDataBuf.setByte(5, frameIdx.getAndIncrement()); // 帧序号 // controlDataBuf.setByte(5, frameIdx.getAndIncrement()); // 帧序号
@ -292,8 +301,12 @@ public class HaborClient981A extends HaborClient {
writeRouteBindCommand0(commandCode, routeInfoList.get(0), controlUniId); //发送第一个节点 writeRouteBindCommand0(commandCode, routeInfoList.get(0), controlUniId); //发送第一个节点
} }
/**注入回报帧处理, 如果正常处理返回true否则返回false继续透传*/ /**
* , truefalse
* return boolean
* */
@Override @Override
protected boolean processInjectResponse(ByteBuf msg) { protected boolean processInjectResponse(ByteBuf msg) {
// 只处理注入回报帧A0 // 只处理注入回报帧A0
if (msg.getByte(2) == (byte) 0xA0) { if (msg.getByte(2) == (byte) 0xA0) {
@ -316,8 +329,8 @@ public class HaborClient981A extends HaborClient {
previousControl.addReceiveFrameCount(); previousControl.addReceiveFrameCount();
if (msg.getByte(29) == previousControl.code) { if (msg.getByte(29) == previousControl.code) {
try { try {
previousControl.notifyControlCommandResult(uavId, true); boolean notifyResult = previousControl.notifyControlCommandResult(uavId, true);
log.info("[cac] {} 控制指令成功结果回报...", shortInfo()); log.info("[cac] {} 控制指令成功结果 发送{}", shortInfo(), notifyResult?"成功":"失败");
previousControl.clearCommand(); previousControl.clearCommand();
return true; return true;
} catch (Exception e) { } catch (Exception e) {
@ -341,8 +354,8 @@ public class HaborClient981A extends HaborClient {
previousParamBind.addReceiveFrameCount(); previousParamBind.addReceiveFrameCount();
if (previousParamBind.commandMatch(msg.slice(13,16))) { if (previousParamBind.commandMatch(msg.slice(13,16))) {
try { try {
previousParamBind.notifyParamBindCommandResult(uavId, uavType,true); boolean notifyResult = previousParamBind.notifyParamBindCommandResult(uavId, uavType,true);
log.info("[cac] {} 指令装订成功结果回报...", shortInfo()); log.info("[cac] {} 指令装订成功结果 发送{}", shortInfo(), notifyResult?"成功":"失败");
previousParamBind.clearCommand(); previousParamBind.clearCommand();
return true; return true;
} catch (Exception e) { } catch (Exception e) {
@ -377,8 +390,8 @@ public class HaborClient981A extends HaborClient {
default: default:
log.warn("[cac] {} 指令查询结果回报,未知的指令类型:{}", shortInfo(), ByteUtils.byteToHex(msg.getByte(13))); log.warn("[cac] {} 指令查询结果回报,未知的指令类型:{}", shortInfo(), ByteUtils.byteToHex(msg.getByte(13)));
} }
previousParamQuery.notifyParamQueryCommandResult(uavId, uavType, true, uavQueryResultParam); boolean notifyResult = previousParamQuery.notifyParamQueryCommandResult(uavId, uavType, true, uavQueryResultParam);
log.info("[cac] {} 指令查询成功结果回报...", shortInfo()); log.info("[cac] {} 指令查询成功结果 发送{}", shortInfo(), notifyResult?"成功":"失败");
previousParamQuery.clearCommand(); previousParamQuery.clearCommand();
return true; return true;
} catch (Exception e) { } catch (Exception e) {
@ -404,8 +417,8 @@ public class HaborClient981A extends HaborClient {
if (previousRouteBind.commandMatch(msg.slice(13, 16))) { if (previousRouteBind.commandMatch(msg.slice(13, 16))) {
if (previousRouteBind.completeOrWaitNext()) { // 如果指令列表均已回报,则判定成功,向中心指控回报, 并恢复常发帧 if (previousRouteBind.completeOrWaitNext()) { // 如果指令列表均已回报,则判定成功,向中心指控回报, 并恢复常发帧
try { try {
previousRouteBind.notifyParamBindCommandResult(uavId,uavType,true); boolean notifyResult = previousRouteBind.notifyParamBindCommandResult(uavId,uavType,true);
log.info("[cac] {} 航线装订成功结果成功", shortInfo()); log.info("[cac] {} 航线装订成功结果 发送{}", shortInfo(), notifyResult?"成功":"失败");
previousRouteBind.clearCommand(); previousRouteBind.clearCommand();
return true; return true;
} catch (Exception e) { } catch (Exception e) {
@ -415,7 +428,8 @@ public class HaborClient981A extends HaborClient {
} }
} else { } else {
// 仍有航点未装订 // 仍有航点未装订
writeRouteBindCommand0(previousRouteBind.code, previousRouteBind.commandQueue.peek(), previousControl.commandUniId); previousRouteBind.waitNext();
writeRouteBindCommand0(previousRouteBind.code, previousRouteBind.commandQueue.peek(), previousRouteBind.commandUniId);
log.debug("[cac] {} 当前航点装订成功,剩余航点:{}", shortInfo(), previousRouteBind.commandQueue.size()); log.debug("[cac] {} 当前航点装订成功,剩余航点:{}", shortInfo(), previousRouteBind.commandQueue.size());
} }
} }
@ -540,6 +554,7 @@ public class HaborClient981A extends HaborClient {
} }
@Data
public static class UavWeightQueryResultParam981A extends UavWeightQueryResultParam{ public static class UavWeightQueryResultParam981A extends UavWeightQueryResultParam{
/** /**
* kg; 1 * kg; 1

Loading…
Cancel
Save