diff --git a/pom.xml b/pom.xml
index c6d04b3..7c1855e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -10,7 +10,7 @@
com.platform
pass-through
- 0.0.4
+ 0.0.6
pass-through
中心指控直控无人机哈勃服务
diff --git a/src/main/java/com/platform/config/ServiceConfig.java b/src/main/java/com/platform/config/ServiceConfig.java
index 0d981a7..5e54648 100644
--- a/src/main/java/com/platform/config/ServiceConfig.java
+++ b/src/main/java/com/platform/config/ServiceConfig.java
@@ -1,9 +1,13 @@
package com.platform.config;
+import com.platform.service.clientmanage.CacRequestExecutor;
+import com.platform.service.clientmanage.CommandManager;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
+import javax.annotation.PostConstruct;
+
@Configuration
@Data
public class ServiceConfig {
@@ -11,4 +15,10 @@ public class ServiceConfig {
private boolean debug;
@Value("${service.port}")
private Integer port;
+
+ @PostConstruct
+ public void initConfig() {
+ CommandManager.getInstance();
+ CacRequestExecutor.getInstance();
+ }
}
diff --git a/src/main/java/com/platform/config/ThreadPoolConfig.java b/src/main/java/com/platform/config/ThreadPoolConfig.java
deleted file mode 100644
index 10b98b2..0000000
--- a/src/main/java/com/platform/config/ThreadPoolConfig.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package com.platform.config;
-
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.core.task.TaskExecutor;
-import org.springframework.scheduling.annotation.EnableAsync;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-
-import java.util.concurrent.ThreadPoolExecutor;
-
-@Configuration
-@EnableAsync
-public class ThreadPoolConfig {
- @Bean
- public TaskExecutor taskExecutor() {
- ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
- // 设置核心线程数
- executor.setCorePoolSize(8);
- // 设置最大线程数
- executor.setMaxPoolSize(100);
- // 设置队列容量
- executor.setQueueCapacity(8);
- // 设置线程活跃时间(秒)
- executor.setKeepAliveSeconds(300);
- // 设置默认线程名称
- executor.setThreadNamePrefix("thread-");
- // 设置拒绝策略rejection-policy:当pool已经达到max size的时候,如何处理新任务 CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
- executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
- // 等待所有任务结束后再关闭线程池
- executor.setWaitForTasksToCompleteOnShutdown(true);
- return executor;
- }
-}
diff --git a/src/main/java/com/platform/service/ServerService.java b/src/main/java/com/platform/service/ServerService.java
index 2e7228b..8eac071 100644
--- a/src/main/java/com/platform/service/ServerService.java
+++ b/src/main/java/com/platform/service/ServerService.java
@@ -61,7 +61,7 @@ public class ServerService {
.addLast("CacDataRouter", cacDataRouterHandler); // cac数据转发
}
});
- ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID); // 内存泄露检测,上线记得关掉paranoid
+ ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID); // 内存泄露检测,上线记得关掉
bootstrap.option(ChannelOption.SO_BACKLOG, 2048);
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
diff --git a/src/main/java/com/platform/service/clientmanage/BaseClient.java b/src/main/java/com/platform/service/clientmanage/BaseClient.java
index 26bd298..5f2b11e 100644
--- a/src/main/java/com/platform/service/clientmanage/BaseClient.java
+++ b/src/main/java/com/platform/service/clientmanage/BaseClient.java
@@ -24,7 +24,7 @@ public abstract class BaseClient {
protected String ip;
protected String port;
protected String serverPort;
- protected boolean online = false;
+ protected volatile boolean online = false;
// protected boolean online = false;
public BaseClient(String sn, Channel channel) {
this.sn = sn;
diff --git a/src/main/java/com/platform/service/clientmanage/CacRequestExecutor.java b/src/main/java/com/platform/service/clientmanage/CacRequestExecutor.java
new file mode 100644
index 0000000..211787f
--- /dev/null
+++ b/src/main/java/com/platform/service/clientmanage/CacRequestExecutor.java
@@ -0,0 +1,60 @@
+package com.platform.service.clientmanage;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.concurrent.*;
+
+/**
+ * 在处理哈勃数据流程过程中处理和Cac的交互,减少数据传输的时延
+ */
+@Slf4j
+public class CacRequestExecutor {
+
+ private static final CacRequestExecutor INSTANCE = new CacRequestExecutor();
+ private final ExecutorService executorService; // 根据实际需求调整线程数
+
+ public CacRequestExecutor() {
+ // 启动任务通知线程池
+ ThreadFactory notifyThreadFactory = new ThreadFactoryBuilder()
+ .setNameFormat("cacRequest" + "-%d")
+ .setDaemon(true).build();
+ int availableProcessors = Runtime.getRuntime().availableProcessors();
+ int corePoolSize = availableProcessors * 2;
+ int maxPoolSize = availableProcessors * 4;
+ long keepAliveTime = 60L; // 60秒
+ int queueCapacity = 200;
+
+ executorService = new ThreadPoolExecutor(
+ corePoolSize,
+ maxPoolSize,
+ keepAliveTime,
+ TimeUnit.SECONDS,
+ new ArrayBlockingQueue<>(queueCapacity),
+ notifyThreadFactory
+ // 默认使用AbortPolicy, 会抛异常
+ );
+ log.info("cacRequest线程池启动");
+ // 注册 JVM 关闭挂钩,确保线程池在 JVM 退出时被关闭
+ Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
+ }
+ public static CacRequestExecutor getInstance() {
+ return INSTANCE;
+ }
+ public Future> submit(Runnable task) {
+ return INSTANCE.executorService.submit(task);
+ }
+ public void shutdown() {
+ log.info("cacRequest线程池关闭...");
+ executorService.shutdown();
+ try {
+ if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+ executorService.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ executorService.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+ }
+}
+
diff --git a/src/main/java/com/platform/service/clientmanage/CommandManager.java b/src/main/java/com/platform/service/clientmanage/CommandManager.java
new file mode 100644
index 0000000..527b957
--- /dev/null
+++ b/src/main/java/com/platform/service/clientmanage/CommandManager.java
@@ -0,0 +1,113 @@
+package com.platform.service.clientmanage;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.concurrent.*;
+
+/**
+ * CommandManager 负责集中管理所有指令的超时判断,使用 Netty 的 HashedWheelTimer 实现
+ */
+@Slf4j
+public class CommandManager {
+ private static final CommandManager INSTANCE = new CommandManager();
+
+ // Netty的HashedWheelTimer,用于管理指令超时
+ private final HashedWheelTimer timer;
+
+ // 线程池用于处理通知任务,固定大小以限制并发数
+ private final CacRequestExecutor cacRequestExecutor;
+
+
+ private CommandManager() {
+ // 初始化HashedWheelTimer,tickDuration和ticksPerWheel可以根据实际需求调整
+ // tickDuration:一个刻度的时间间隔即时间精度, 每个刻度500毫秒,
+ // ticksPerWheel:总的刻度数, 需要为2^N次方
+ int tickDuration = 500;
+ int tickPerWheel = 64;
+ ThreadFactory wheelTimerThreadFactory = new ThreadFactoryBuilder()
+ .setNameFormat("wheelTimer" + "-%d")
+ .setDaemon(true).build();
+ this.timer = new HashedWheelTimer(wheelTimerThreadFactory, tickDuration, TimeUnit.MILLISECONDS, tickPerWheel);
+
+ cacRequestExecutor = CacRequestExecutor.getInstance();
+ // 注册 JVM 关闭挂钩,确保线程池在 JVM 退出时被关闭
+ Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
+ }
+
+ public static CommandManager getInstance() {
+ return INSTANCE;
+ }
+
+ /**
+ * 添加一个待回报的指令记录,并设置超时任务
+ *
+ * @param record 指令记录
+ */
+ public void addCommandRecord(CommandRecord record) {
+ // 创建并提交超时任务
+ TimerTask task = timeout -> {
+ if (record.getStatus() == CommandRecord.WAITING_RESULT) {
+ record.setStatus(CommandRecord.FAILED);
+ log.warn("[cac] {} {}指令超时: {}", record.getClient().shortInfo(), record.getCommandType().getInfo(), record.shortInfo());
+ // 提交超时通知任务
+ submitResultNotification(record, false);
+ } else {
+ log.warn("[cac] {} {}指令已完成,状态为{}: {}", record.getClient().shortInfo(), record.getCommandType().getInfo(), record.getStatus(), record.shortInfo());
+ }
+ };
+ // 设置超时时间
+ Timeout timeout = this.timer.newTimeout(task, record.getExpireMills(), TimeUnit.MILLISECONDS);
+ record.setTimeout(timeout);
+ }
+
+
+ /**
+ * 提交通知任务到线程池,处理超时和成功的通知
+ *
+ * @param record 指令记录
+ * @param isSuccess 通知类型,true 表示成功,false 表示失败(超时)
+ */
+ public void submitResultNotification(CommandRecord record, boolean isSuccess) {
+ try {
+ cacRequestExecutor.submit(() -> notifyCommandResult(record, isSuccess));
+ } catch (RejectedExecutionException e) {
+ log.error("[cac] {} {}指令通知任务提交失败: {}", record.getClient().shortInfo(), record.getCommandType().getInfo(), record.shortInfo(), e);
+ }
+ }
+
+ private void notifyCommandResult(CommandRecord record, boolean isSuccess) {
+ try {
+ HaborClient client = record.getClient();
+ switch (record.getCommandType()) {
+ case CONTROL:
+ record.notifyControlCommandResult(client.uavId, isSuccess);
+ break;
+ case PARAM_BIND:
+ case ROUTE_BIND:
+ record.notifyParamBindCommandResult(client.uavId, client.uavType, isSuccess);
+ break;
+ case PARAM_QUERY:
+ record.notifyParamQueryCommandResult(client.uavId, client.uavType, isSuccess);
+ break;
+ default:
+ log.warn("[cac] 未知的指令类型:{}", record.getCommandType());
+ }
+ } catch (Exception e) {
+ log.error("[cac] {} 处理{}指令回报通知时发生异常: {}", record.getClient().shortInfo(), record.getCommandType().getInfo(), e.getMessage(), e);
+ } finally {
+ record.clearCommand();
+ }
+ }
+
+ /**
+ * 关闭管理器,释放资源
+ */
+ public void shutdown() {
+ log.info("时间轮关闭...");
+ this.timer.stop();
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/platform/service/clientmanage/CommandQueueRecord.java b/src/main/java/com/platform/service/clientmanage/CommandQueueRecord.java
index e5f6c10..40fe317 100644
--- a/src/main/java/com/platform/service/clientmanage/CommandQueueRecord.java
+++ b/src/main/java/com/platform/service/clientmanage/CommandQueueRecord.java
@@ -13,7 +13,7 @@ import java.util.Queue;
@Slf4j
public class CommandQueueRecord extends CommandRecord {
protected Queue commandQueue;
- public static final int WAITING_NEXT = 3; // 等待下一个节点回报
+ public static final int WAITING_NEXT = 2; // 等待下一个节点回报
public CommandQueueRecord(CommandTypeEnum commandType, HaborClient client, int contentSize){
super(commandType, client, contentSize);
@@ -39,6 +39,7 @@ public class CommandQueueRecord extends CommandRecord {
return false;
}
+ /** 设置指令状态为等待下一个节点*/
@Override
public void recordCommand(byte commandCode, String uniId, ByteBuf commandSlice) {
super.recordCommand(commandCode, uniId, commandSlice);
diff --git a/src/main/java/com/platform/service/clientmanage/CommandRecord.java b/src/main/java/com/platform/service/clientmanage/CommandRecord.java
index 28e60d1..cd641dd 100644
--- a/src/main/java/com/platform/service/clientmanage/CommandRecord.java
+++ b/src/main/java/com/platform/service/clientmanage/CommandRecord.java
@@ -8,21 +8,17 @@ import com.platform.info.dto.CommandResultParam;
import com.platform.info.dto.QueryResultParam;
import com.platform.info.enums.CommandTypeEnum;
import com.platform.info.enums.UavTypeEnum;
+import com.platform.service.clientmanage.HaborClient.UavQueryResultParam;
import com.platform.util.ByteUtils;
-import com.platform.util.JSONUtils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.util.Timeout;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/** 记录指令状态,回报指令结果*/
@@ -36,34 +32,49 @@ public class CommandRecord {
protected ByteBuf commandBuf = null; // 指令注入内容
protected String commandUniId; // 中心指控指令唯一码
+ @Getter@Setter
+ private UavQueryResultParam uavQueryResultParam;
+
// 指令状态
@Getter
private AtomicInteger receiveFrameCount = new AtomicInteger(0); // 接收到的指令帧数, 150帧还未判断成功则判定为失败
// private long startTime = 0 ; // 接收到的指令帧数, 150帧还未判断成功则判定为失败
private final int MAX_RECEIVE_COUNT = 150; // 接收到的指令帧数, 150帧还未判断成功则判定为失败
- private final long TIMEOUT_THRESHOLD = 30*1000; // 从下发超过5s未回报,则判定为失败
+ private static final long DEFAULT_TIMEOUT_THRESHOLD = 10*1000; // 默认超时时间, 从下发超过5s未回报,则判定为失败
@Getter@Setter
protected int status;
public static final int FAILED = -1; // 指令失败
public static final int NO_COMMAND = 0; // 空闲
- public static final int WAITING_RESULT = 2; // 等待回报
+ public static final int WAITING_RESULT = 1; // 等待回报
// 定时任务管理
- private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(50); // 单线程池管理超时任务
- ScheduledFuture> timeoutFuture = null; // 超时任务
+ @Getter @Setter
+ private Timeout timeout;
+ @Getter
+ long expireMills; // 超时时间 ms
+ // private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(50); // 单线程池管理超时任务
+ // ScheduledFuture> timeoutFuture = null; // 超时任务
// todo 过期时间控制
public CommandRecord(CommandTypeEnum commandType, HaborClient client) {
this(commandType, client, 16);
}
- public CommandRecord(CommandTypeEnum commandType, HaborClient client, int contentSize) {
+ public CommandRecord(CommandTypeEnum commandType, HaborClient client, int commandContentSize) {
+ this(commandType, client, 16, DEFAULT_TIMEOUT_THRESHOLD);
+ }
+
+ public CommandRecord(CommandTypeEnum commandType, HaborClient client, int commandContentSize, long expireMills) {
this.commandType = commandType;
this.client = client;
- this.commandBuf = PooledByteBufAllocator.DEFAULT.buffer(contentSize);
+ this.commandBuf = PooledByteBufAllocator.DEFAULT.buffer(commandContentSize);
this.status = NO_COMMAND;
+ this.expireMills = expireMills;
}
-
+ public String shortInfo() {
+ return String.format("{commandType=%s, uniIsd=%s, code=%s}", commandType.getInfo(), commandUniId, ByteUtils.byteToHex(code));
+ }
+ /** 记录指令信息,启动定时判断任务*/
public void recordCommand(byte commandCode, String uniId, ByteBuf commandSlice) {
this.code = commandCode;
this.commandBuf.clear();
@@ -72,41 +83,49 @@ public class CommandRecord {
this.status = WAITING_RESULT;
this.receiveFrameCount.set(0);
- //启动一个超时检查的定时任务
- this.timeoutFuture = scheduleTimeoutCheck();
-
+ // 启动延时任务
+ CommandManager commandManager = CommandManager.getInstance();
+ commandManager.addCommandRecord(this);
+ // this.timeoutFuture = scheduleTimeoutCheck();
}
- // 定时任务:每5秒检查一次所有命令是否超时
- private ScheduledFuture> scheduleTimeoutCheck() {
- log.debug("[cac] {} {}指令超时判断定时任务启动", client.shortInfo(), commandType.getInfo());
- // 每5秒检查一次所有命令记录的超时状态
- return scheduler.schedule(() -> {
- if (status != NO_COMMAND ) {
- status = FAILED; // 标记超时为失败
- log.error("[cac] {} {}指令超时", client.shortInfo(), commandType.getInfo());
- switch (commandType) {
- case CONTROL:
- this.notifyControlCommandResult(client.uavId, false);
- break;
- case PARAM_BIND:
- this.notifyParamBindCommandResult(client.uavId, client.uavType, false);
- break;
- case ROUTE_BIND:
- this.notifyParamBindCommandResult(client.uavId, client.uavType, false);
- break;
- case PARAM_QUERY:
- this.notifyParamQueryCommandResult(client.uavId, client.uavType, false, null);
- }
- client.switchToCommonData(); // todo 需要优化,尽量不要在里面用client的控制逻辑
- }
- }, TIMEOUT_THRESHOLD, TimeUnit.MILLISECONDS); // 设置延迟时间
- }
+ // // 定时任务:每5秒检查一次所有命令是否超时
+ // private ScheduledFuture> scheduleTimeoutCheck() {
+ // log.debug("[cac] {} {}指令超时判断定时任务启动", client.shortInfo(), commandType.getInfo());
+ // // 每5秒检查一次所有命令记录的超时状态
+ // return scheduler.schedule(() -> {
+ // if (status != NO_COMMAND ) {
+ // status = FAILED; // 标记超时为失败
+ // log.error("[cac] {} {}指令超时", client.shortInfo(), commandType.getInfo());
+ // switch (commandType) {
+ // case CONTROL:
+ // this.notifyControlCommandResult(client.uavId, false);
+ // break;
+ // case PARAM_BIND:
+ // this.notifyParamBindCommandResult(client.uavId, client.uavType, false);
+ // break;
+ // case ROUTE_BIND:
+ // this.notifyParamBindCommandResult(client.uavId, client.uavType, false);
+ // break;
+ // case PARAM_QUERY:
+ // this.notifyParamQueryCommandResult(client.uavId, client.uavType, false);
+ // }
+ // client.switchToCommonData(); // todo 需要优化,尽量不要在里面用client的控制逻辑
+ // }
+ // }, DEFAULT_TIMEOUT_THRESHOLD, TimeUnit.MILLISECONDS); // 设置延迟时间
+ // }
public void cancelScheduleTimeout() {
- if (this.timeoutFuture != null && !this.timeoutFuture.isCancelled()) {
- if (this.timeoutFuture.cancel(true)) {
- log.debug("[cac] {} 取消{}指令超时判断定时任务", client.shortInfo(), commandType.getInfo());
+ // if (this.timeoutFuture != null && !this.timeoutFuture.isCancelled()) {
+ // if (this.timeoutFuture.cancel(true)) {
+ // log.debug("[cac] {} 取消{}指令超时判断定时任务", client.shortInfo(), commandType.getInfo());
+ // }
+ // }
+ if (timeout!= null && !timeout.isCancelled()) {
+ if (timeout.cancel()){
+ log.debug("[cac] {} 取消{}指令超时判断任务: {}", client.shortInfo(), commandType.getInfo(), shortInfo());
+ } else {
+ log.warn("[cac] {} 无法取消{}指令的超时判断任务:{},任务可能已在执行中", client.shortInfo(), commandType.getInfo(), shortInfo());
}
}
}
@@ -152,7 +171,7 @@ public class CommandRecord {
bindResultParam.setCommandSource(1);
bindResultParam.setUavType(uavType.getRemoteCode());
bindResultParam.setUavParamBindCode(ByteUtils.byteToInt(code));
- bindResultParam.setBindResult(isSuccess?0:1);
+ bindResultParam.setBindResult(isSuccess);
boolean notifySuccess = CacHpApi.uavParamBindResultNotify(bindResultParam);
log.info("[cac] {} 参数装订{}结果 发送{}", client.shortInfo(), isSuccess?"成功":"失败", notifySuccess?"成功":"失败");
} catch (IOException e) {
@@ -160,7 +179,7 @@ public class CommandRecord {
}
}
- public void notifyParamQueryCommandResult(String uavId, UavTypeEnum uavType, boolean isSuccess, HaborClient.UavQueryResultParam uavQueryResultParam) {
+ public void notifyParamQueryCommandResult(String uavId, UavTypeEnum uavType, boolean isSuccess) {
QueryResultParam queryResultParam = new QueryResultParam();
try {
queryResultParam.setGcsId(GlobalData.GCS_ID);
@@ -169,8 +188,8 @@ public class CommandRecord {
queryResultParam.setCommandSource(1);
queryResultParam.setUavType(uavType.getRemoteCode());
queryResultParam.setUavParamQueryCode(ByteUtils.byteToInt(code));
- queryResultParam.setQueryResult(isSuccess?0:1);
- if (isSuccess) {
+ queryResultParam.setQueryResult(isSuccess);
+ if (isSuccess && uavQueryResultParam != null) {
String queryResultContent = uavQueryResultParam.toJsonStr();
queryResultParam.setQueryResultContent(queryResultContent);
}
diff --git a/src/main/java/com/platform/service/clientmanage/HaborClient.java b/src/main/java/com/platform/service/clientmanage/HaborClient.java
index bf32cb7..2989a01 100644
--- a/src/main/java/com/platform/service/clientmanage/HaborClient.java
+++ b/src/main/java/com/platform/service/clientmanage/HaborClient.java
@@ -110,11 +110,12 @@ public abstract class HaborClient extends BaseClient {
@Override
public boolean online() {
// 仅当上线成功时才分配资源并进行和中心指控的通信
- online = CacHpApi.uavMasterControlNotify(uavId);
- if (online) {
+ boolean notifySuccess = CacHpApi.uavMasterControlNotify(uavId);
+ if (notifySuccess) {
resourceAllocate();
startSendCommonData();
}
+ online = notifySuccess; // 资源分配完毕再确定为上线
return online;
}
@@ -164,10 +165,6 @@ public abstract class HaborClient extends BaseClient {
* 只通过get读数,不改变readerIndex和writerIndex, 也不可更改refCnt
*/
public void process(ByteBuf msg) {
- if (!online) {
- return;
- }
-
if (msg.getByte(0) != HEAD[0] && msg.getByte(1) != HEAD[1]) {
return; // 帧头错误,抛弃
}
diff --git a/src/main/java/com/platform/service/clientmanage/HaborClient981A.java b/src/main/java/com/platform/service/clientmanage/HaborClient981A.java
index 3aecfe3..fdb84a3 100644
--- a/src/main/java/com/platform/service/clientmanage/HaborClient981A.java
+++ b/src/main/java/com/platform/service/clientmanage/HaborClient981A.java
@@ -29,6 +29,7 @@ import java.util.List;
@Slf4j @Getter
public class HaborClient981A extends HaborClient {
+ private final CommandManager commandManager = CommandManager.getInstance();
private static final byte[] UPPER_DATA_3_4 = new byte[]{0x02, 0x01};
/**常发数据内容,被填入ByteBuf初始内容,所有飞机共用, 禁止修改*/
@@ -316,8 +317,7 @@ public class HaborClient981A extends HaborClient {
previousControl.addReceiveFrameCount();
if (msg.getByte(29) == previousControl.code) {
try {
- previousControl.notifyControlCommandResult(uavId, true);
- previousControl.clearCommand();
+ commandManager.submitResultNotification(previousControl, true);
return true;
} catch (Exception e) {
log.warn("[cac] {} 控制指令回报处理异常:{}",shortInfo(), e.getMessage());
@@ -325,12 +325,7 @@ public class HaborClient981A extends HaborClient {
switchToCommonData(); // 恢复常发数据帧
}
}
- // else {
- // if (previousControl.isTimeout()) {
- // log.info("[cac] {} 控制指令回报失败, 超时", shortInfo());
- // previousControl.clearCommand();
- // }
- // }
+
}
return false;
}
@@ -341,21 +336,12 @@ public class HaborClient981A extends HaborClient {
previousParamBind.addReceiveFrameCount();
if (previousParamBind.commandMatch(msg.slice(13,16))) {
try {
- previousParamBind.notifyParamBindCommandResult(uavId, uavType,true);
- previousParamBind.clearCommand();
+ commandManager.submitResultNotification(previousParamBind, true);
return true;
- } catch (Exception e) {
- log.warn("[cac] {} 参数装订结果回报处理异常:{}",shortInfo(), e.getMessage());
} finally {
switchToCommonData(); // 恢复常发数据帧
}
}
- // else {
- // if (previousParamBind.isTimeout()) {
- // log.info("[cac] {} 指令装订失败, 超过最大帧数仍未回报", shortInfo());
- // previousParamBind.clearCommand();
- // }
- // }
}
return false;
}
@@ -376,8 +362,8 @@ public class HaborClient981A extends HaborClient {
default:
log.warn("[cac] {} 参数查询结果回报,未知的指令类型:{}", shortInfo(), ByteUtils.byteToHex(msg.getByte(13)));
}
- previousParamQuery.notifyParamQueryCommandResult(uavId, uavType, true, uavQueryResultParam);
- previousParamQuery.clearCommand();
+ previousParamQuery.setUavQueryResultParam(uavQueryResultParam);
+ commandManager.submitResultNotification(previousParamQuery, true);
return true;
} catch (Exception e) {
log.warn("[cac] {} 参数查询结果回报处理异常:{}",shortInfo(), e.getMessage());
@@ -385,12 +371,6 @@ public class HaborClient981A extends HaborClient {
switchToCommonData(); // 恢复常发数据帧
}
}
- // else {
- // if (previousControl.isTimeout()) {
- // log.info("[cac] {} 指令查询结果回报失败, 超过最大帧数仍未回报", shortInfo());
- // previousControl.clearCommand();
- // }
- // }
}
return false;
}
@@ -402,11 +382,8 @@ public class HaborClient981A extends HaborClient {
if (previousRouteBind.commandMatch(msg.slice(13, 16))) {
if (previousRouteBind.completeOrWaitNext()) { // 如果指令列表均已回报,则判定成功,向中心指控回报, 并恢复常发帧
try {
- previousRouteBind.notifyParamBindCommandResult(uavId,uavType,true);
- previousRouteBind.clearCommand();
+ commandManager.submitResultNotification(previousRouteBind, true);
return true;
- } catch (Exception e) {
- log.warn("[cac] {} 航线装订结果回报处理异常:{}", shortInfo(), e.getMessage());
} finally {
switchToCommonData(); // 恢复常发数据帧
}
diff --git a/src/main/java/com/platform/service/handler/CacDataRouterHandler.java b/src/main/java/com/platform/service/handler/CacDataRouterHandler.java
index a200227..d486030 100644
--- a/src/main/java/com/platform/service/handler/CacDataRouterHandler.java
+++ b/src/main/java/com/platform/service/handler/CacDataRouterHandler.java
@@ -26,7 +26,7 @@ public class CacDataRouterHandler extends SimpleChannelInboundHandler {
if (client == null || client.getClientType() != ClientTypeEnum.HABOR) {
return;
}
- log.debug("[cac] {} 收到数据({}bytes): {}", ((HaborClient) client).shortInfo(), msg.readableBytes(), ByteBufUtil.hexDump(msg));
+ // log.debug("[cac] {} 收到数据({}bytes): {}", ((HaborClient) client).shortInfo(), msg.readableBytes(), ByteBufUtil.hexDump(msg));
HaborClient haborClient = (HaborClient) client;
haborClient.process(msg);
}
diff --git a/src/main/java/com/platform/service/handler/InMessageHandler1.java b/src/main/java/com/platform/service/handler/InMessageHandler1.java
index 5353abb..10f0bdb 100644
--- a/src/main/java/com/platform/service/handler/InMessageHandler1.java
+++ b/src/main/java/com/platform/service/handler/InMessageHandler1.java
@@ -9,6 +9,7 @@ import com.platform.info.mapping.HaborUavMap;
import com.platform.info.mapping.UavIdMap;
import com.platform.info.dto.DirectControlUavParam;
import com.platform.service.clientmanage.BaseClient;
+import com.platform.service.clientmanage.CacRequestExecutor;
import com.platform.service.clientmanage.ClientManager;
import com.platform.service.clientmanage.HaborClient;
import com.platform.util.*;
@@ -118,7 +119,7 @@ public class InMessageHandler1 extends ChannelInboundHandlerAdapter {
sessionCache.snToAddMap.remove(sn);
}
}
- haborSignOutOfCac(ctx.channel());
+ CacRequestExecutor.getInstance().submit(() -> haborSignOutOfCac(ctx.channel()));
super.channelInactive(ctx);
}
@@ -144,7 +145,8 @@ public class InMessageHandler1 extends ChannelInboundHandlerAdapter {
sessionCache.snToAddMap.put(deviceCode, address); // mac- ip
sessionCache.tcpSession.put(address, ctx.channel()); // ip - channel
- haborSignIntoCac(haborSn, channel);
+ CacRequestExecutor.getInstance().submit(() -> haborSignIntoCac(haborSn, channel));
+
}
} catch (Exception e) {
log.error("哈勃上线失败, haborSn={}", haborSn, e);
@@ -174,7 +176,7 @@ public class InMessageHandler1 extends ChannelInboundHandlerAdapter {
}
BaseClient client = ClientManager.getClient(ctx.channel());
- if (client == null || client.getClientType() != ClientTypeEnum.HABOR) {
+ if (client == null || client.getClientType() != ClientTypeEnum.HABOR || !client.isOnline()) {
if (msgBuf != null) {
msgBuf.release();
}
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
index 9003989..bd75e9a 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -1,6 +1,6 @@
spring:
profiles:
- active: pre
+ active: dev
app:
debug: true
\ No newline at end of file