From 41f5412e57df37854d7a7477d763e6422a8f8c19 Mon Sep 17 00:00:00 2001 From: shiyi Date: Fri, 14 Feb 2025 18:56:51 +0800 Subject: [PATCH] =?UTF-8?q?[v0.0.7]=20=E5=90=91=E4=B8=AD=E5=BF=83=E6=8C=87?= =?UTF-8?q?=E6=8E=A7=E7=9A=84=E7=BD=91=E7=BB=9C=E8=AF=B7=E6=B1=82=E5=9D=87?= =?UTF-8?q?=E6=94=B9=E7=94=A8=E7=BA=BF=E7=A8=8B=E6=B1=A0=E6=8F=90=E4=BA=A4?= =?UTF-8?q?=EF=BC=8C=E5=87=8F=E5=B0=91=E9=81=A5=E6=B5=8B=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E5=A4=84=E7=90=86=E8=BF=87=E7=A8=8B=E4=B8=AD=E7=9A=84=E9=98=BB?= =?UTF-8?q?=E5=A1=9E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 2 +- .../com/platform/config/ServiceConfig.java | 10 ++ .../com/platform/config/ThreadPoolConfig.java | 33 ----- .../com/platform/service/ServerService.java | 2 +- .../service/clientmanage/BaseClient.java | 2 +- .../clientmanage/CacRequestExecutor.java | 60 +++++++++ .../service/clientmanage/CommandManager.java | 113 +++++++++++++++++ .../clientmanage/CommandQueueRecord.java | 3 +- .../service/clientmanage/CommandRecord.java | 115 ++++++++++-------- .../service/clientmanage/HaborClient.java | 9 +- .../service/clientmanage/HaborClient981A.java | 37 ++---- .../service/handler/CacDataRouterHandler.java | 2 +- .../service/handler/InMessageHandler1.java | 8 +- src/main/resources/application.yml | 2 +- 14 files changed, 272 insertions(+), 126 deletions(-) delete mode 100644 src/main/java/com/platform/config/ThreadPoolConfig.java create mode 100644 src/main/java/com/platform/service/clientmanage/CacRequestExecutor.java create mode 100644 src/main/java/com/platform/service/clientmanage/CommandManager.java diff --git a/pom.xml b/pom.xml index c6d04b3..7c1855e 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ com.platform pass-through - 0.0.4 + 0.0.6 pass-through 中心指控直控无人机哈勃服务 diff --git a/src/main/java/com/platform/config/ServiceConfig.java b/src/main/java/com/platform/config/ServiceConfig.java index 0d981a7..5e54648 100644 --- a/src/main/java/com/platform/config/ServiceConfig.java +++ b/src/main/java/com/platform/config/ServiceConfig.java @@ -1,9 +1,13 @@ package com.platform.config; +import com.platform.service.clientmanage.CacRequestExecutor; +import com.platform.service.clientmanage.CommandManager; import lombok.Data; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; +import javax.annotation.PostConstruct; + @Configuration @Data public class ServiceConfig { @@ -11,4 +15,10 @@ public class ServiceConfig { private boolean debug; @Value("${service.port}") private Integer port; + + @PostConstruct + public void initConfig() { + CommandManager.getInstance(); + CacRequestExecutor.getInstance(); + } } diff --git a/src/main/java/com/platform/config/ThreadPoolConfig.java b/src/main/java/com/platform/config/ThreadPoolConfig.java deleted file mode 100644 index 10b98b2..0000000 --- a/src/main/java/com/platform/config/ThreadPoolConfig.java +++ /dev/null @@ -1,33 +0,0 @@ -package com.platform.config; - -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.core.task.TaskExecutor; -import org.springframework.scheduling.annotation.EnableAsync; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; - -import java.util.concurrent.ThreadPoolExecutor; - -@Configuration -@EnableAsync -public class ThreadPoolConfig { - @Bean - public TaskExecutor taskExecutor() { - ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); - // 设置核心线程数 - executor.setCorePoolSize(8); - // 设置最大线程数 - executor.setMaxPoolSize(100); - // 设置队列容量 - executor.setQueueCapacity(8); - // 设置线程活跃时间(秒) - executor.setKeepAliveSeconds(300); - // 设置默认线程名称 - executor.setThreadNamePrefix("thread-"); - // 设置拒绝策略rejection-policy:当pool已经达到max size的时候,如何处理新任务 CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行 - executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); - // 等待所有任务结束后再关闭线程池 - executor.setWaitForTasksToCompleteOnShutdown(true); - return executor; - } -} diff --git a/src/main/java/com/platform/service/ServerService.java b/src/main/java/com/platform/service/ServerService.java index 2e7228b..8eac071 100644 --- a/src/main/java/com/platform/service/ServerService.java +++ b/src/main/java/com/platform/service/ServerService.java @@ -61,7 +61,7 @@ public class ServerService { .addLast("CacDataRouter", cacDataRouterHandler); // cac数据转发 } }); - ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID); // 内存泄露检测,上线记得关掉paranoid + ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID); // 内存泄露检测,上线记得关掉 bootstrap.option(ChannelOption.SO_BACKLOG, 2048); bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); bootstrap.childOption(ChannelOption.TCP_NODELAY, true); diff --git a/src/main/java/com/platform/service/clientmanage/BaseClient.java b/src/main/java/com/platform/service/clientmanage/BaseClient.java index 26bd298..5f2b11e 100644 --- a/src/main/java/com/platform/service/clientmanage/BaseClient.java +++ b/src/main/java/com/platform/service/clientmanage/BaseClient.java @@ -24,7 +24,7 @@ public abstract class BaseClient { protected String ip; protected String port; protected String serverPort; - protected boolean online = false; + protected volatile boolean online = false; // protected boolean online = false; public BaseClient(String sn, Channel channel) { this.sn = sn; diff --git a/src/main/java/com/platform/service/clientmanage/CacRequestExecutor.java b/src/main/java/com/platform/service/clientmanage/CacRequestExecutor.java new file mode 100644 index 0000000..211787f --- /dev/null +++ b/src/main/java/com/platform/service/clientmanage/CacRequestExecutor.java @@ -0,0 +1,60 @@ +package com.platform.service.clientmanage; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.*; + +/** + * 在处理哈勃数据流程过程中处理和Cac的交互,减少数据传输的时延 + */ +@Slf4j +public class CacRequestExecutor { + + private static final CacRequestExecutor INSTANCE = new CacRequestExecutor(); + private final ExecutorService executorService; // 根据实际需求调整线程数 + + public CacRequestExecutor() { + // 启动任务通知线程池 + ThreadFactory notifyThreadFactory = new ThreadFactoryBuilder() + .setNameFormat("cacRequest" + "-%d") + .setDaemon(true).build(); + int availableProcessors = Runtime.getRuntime().availableProcessors(); + int corePoolSize = availableProcessors * 2; + int maxPoolSize = availableProcessors * 4; + long keepAliveTime = 60L; // 60秒 + int queueCapacity = 200; + + executorService = new ThreadPoolExecutor( + corePoolSize, + maxPoolSize, + keepAliveTime, + TimeUnit.SECONDS, + new ArrayBlockingQueue<>(queueCapacity), + notifyThreadFactory + // 默认使用AbortPolicy, 会抛异常 + ); + log.info("cacRequest线程池启动"); + // 注册 JVM 关闭挂钩,确保线程池在 JVM 退出时被关闭 + Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown)); + } + public static CacRequestExecutor getInstance() { + return INSTANCE; + } + public Future submit(Runnable task) { + return INSTANCE.executorService.submit(task); + } + public void shutdown() { + log.info("cacRequest线程池关闭..."); + executorService.shutdown(); + try { + if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + } +} + diff --git a/src/main/java/com/platform/service/clientmanage/CommandManager.java b/src/main/java/com/platform/service/clientmanage/CommandManager.java new file mode 100644 index 0000000..527b957 --- /dev/null +++ b/src/main/java/com/platform/service/clientmanage/CommandManager.java @@ -0,0 +1,113 @@ +package com.platform.service.clientmanage; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.netty.util.HashedWheelTimer; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.*; + +/** + * CommandManager 负责集中管理所有指令的超时判断,使用 Netty 的 HashedWheelTimer 实现 + */ +@Slf4j +public class CommandManager { + private static final CommandManager INSTANCE = new CommandManager(); + + // Netty的HashedWheelTimer,用于管理指令超时 + private final HashedWheelTimer timer; + + // 线程池用于处理通知任务,固定大小以限制并发数 + private final CacRequestExecutor cacRequestExecutor; + + + private CommandManager() { + // 初始化HashedWheelTimer,tickDuration和ticksPerWheel可以根据实际需求调整 + // tickDuration:一个刻度的时间间隔即时间精度, 每个刻度500毫秒, + // ticksPerWheel:总的刻度数, 需要为2^N次方 + int tickDuration = 500; + int tickPerWheel = 64; + ThreadFactory wheelTimerThreadFactory = new ThreadFactoryBuilder() + .setNameFormat("wheelTimer" + "-%d") + .setDaemon(true).build(); + this.timer = new HashedWheelTimer(wheelTimerThreadFactory, tickDuration, TimeUnit.MILLISECONDS, tickPerWheel); + + cacRequestExecutor = CacRequestExecutor.getInstance(); + // 注册 JVM 关闭挂钩,确保线程池在 JVM 退出时被关闭 + Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown)); + } + + public static CommandManager getInstance() { + return INSTANCE; + } + + /** + * 添加一个待回报的指令记录,并设置超时任务 + * + * @param record 指令记录 + */ + public void addCommandRecord(CommandRecord record) { + // 创建并提交超时任务 + TimerTask task = timeout -> { + if (record.getStatus() == CommandRecord.WAITING_RESULT) { + record.setStatus(CommandRecord.FAILED); + log.warn("[cac] {} {}指令超时: {}", record.getClient().shortInfo(), record.getCommandType().getInfo(), record.shortInfo()); + // 提交超时通知任务 + submitResultNotification(record, false); + } else { + log.warn("[cac] {} {}指令已完成,状态为{}: {}", record.getClient().shortInfo(), record.getCommandType().getInfo(), record.getStatus(), record.shortInfo()); + } + }; + // 设置超时时间 + Timeout timeout = this.timer.newTimeout(task, record.getExpireMills(), TimeUnit.MILLISECONDS); + record.setTimeout(timeout); + } + + + /** + * 提交通知任务到线程池,处理超时和成功的通知 + * + * @param record 指令记录 + * @param isSuccess 通知类型,true 表示成功,false 表示失败(超时) + */ + public void submitResultNotification(CommandRecord record, boolean isSuccess) { + try { + cacRequestExecutor.submit(() -> notifyCommandResult(record, isSuccess)); + } catch (RejectedExecutionException e) { + log.error("[cac] {} {}指令通知任务提交失败: {}", record.getClient().shortInfo(), record.getCommandType().getInfo(), record.shortInfo(), e); + } + } + + private void notifyCommandResult(CommandRecord record, boolean isSuccess) { + try { + HaborClient client = record.getClient(); + switch (record.getCommandType()) { + case CONTROL: + record.notifyControlCommandResult(client.uavId, isSuccess); + break; + case PARAM_BIND: + case ROUTE_BIND: + record.notifyParamBindCommandResult(client.uavId, client.uavType, isSuccess); + break; + case PARAM_QUERY: + record.notifyParamQueryCommandResult(client.uavId, client.uavType, isSuccess); + break; + default: + log.warn("[cac] 未知的指令类型:{}", record.getCommandType()); + } + } catch (Exception e) { + log.error("[cac] {} 处理{}指令回报通知时发生异常: {}", record.getClient().shortInfo(), record.getCommandType().getInfo(), e.getMessage(), e); + } finally { + record.clearCommand(); + } + } + + /** + * 关闭管理器,释放资源 + */ + public void shutdown() { + log.info("时间轮关闭..."); + this.timer.stop(); + } +} \ No newline at end of file diff --git a/src/main/java/com/platform/service/clientmanage/CommandQueueRecord.java b/src/main/java/com/platform/service/clientmanage/CommandQueueRecord.java index e5f6c10..40fe317 100644 --- a/src/main/java/com/platform/service/clientmanage/CommandQueueRecord.java +++ b/src/main/java/com/platform/service/clientmanage/CommandQueueRecord.java @@ -13,7 +13,7 @@ import java.util.Queue; @Slf4j public class CommandQueueRecord extends CommandRecord { protected Queue commandQueue; - public static final int WAITING_NEXT = 3; // 等待下一个节点回报 + public static final int WAITING_NEXT = 2; // 等待下一个节点回报 public CommandQueueRecord(CommandTypeEnum commandType, HaborClient client, int contentSize){ super(commandType, client, contentSize); @@ -39,6 +39,7 @@ public class CommandQueueRecord extends CommandRecord { return false; } + /** 设置指令状态为等待下一个节点*/ @Override public void recordCommand(byte commandCode, String uniId, ByteBuf commandSlice) { super.recordCommand(commandCode, uniId, commandSlice); diff --git a/src/main/java/com/platform/service/clientmanage/CommandRecord.java b/src/main/java/com/platform/service/clientmanage/CommandRecord.java index 28e60d1..cd641dd 100644 --- a/src/main/java/com/platform/service/clientmanage/CommandRecord.java +++ b/src/main/java/com/platform/service/clientmanage/CommandRecord.java @@ -8,21 +8,17 @@ import com.platform.info.dto.CommandResultParam; import com.platform.info.dto.QueryResultParam; import com.platform.info.enums.CommandTypeEnum; import com.platform.info.enums.UavTypeEnum; +import com.platform.service.clientmanage.HaborClient.UavQueryResultParam; import com.platform.util.ByteUtils; -import com.platform.util.JSONUtils; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; +import io.netty.util.Timeout; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** 记录指令状态,回报指令结果*/ @@ -36,34 +32,49 @@ public class CommandRecord { protected ByteBuf commandBuf = null; // 指令注入内容 protected String commandUniId; // 中心指控指令唯一码 + @Getter@Setter + private UavQueryResultParam uavQueryResultParam; + // 指令状态 @Getter private AtomicInteger receiveFrameCount = new AtomicInteger(0); // 接收到的指令帧数, 150帧还未判断成功则判定为失败 // private long startTime = 0 ; // 接收到的指令帧数, 150帧还未判断成功则判定为失败 private final int MAX_RECEIVE_COUNT = 150; // 接收到的指令帧数, 150帧还未判断成功则判定为失败 - private final long TIMEOUT_THRESHOLD = 30*1000; // 从下发超过5s未回报,则判定为失败 + private static final long DEFAULT_TIMEOUT_THRESHOLD = 10*1000; // 默认超时时间, 从下发超过5s未回报,则判定为失败 @Getter@Setter protected int status; public static final int FAILED = -1; // 指令失败 public static final int NO_COMMAND = 0; // 空闲 - public static final int WAITING_RESULT = 2; // 等待回报 + public static final int WAITING_RESULT = 1; // 等待回报 // 定时任务管理 - private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(50); // 单线程池管理超时任务 - ScheduledFuture timeoutFuture = null; // 超时任务 + @Getter @Setter + private Timeout timeout; + @Getter + long expireMills; // 超时时间 ms + // private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(50); // 单线程池管理超时任务 + // ScheduledFuture timeoutFuture = null; // 超时任务 // todo 过期时间控制 public CommandRecord(CommandTypeEnum commandType, HaborClient client) { this(commandType, client, 16); } - public CommandRecord(CommandTypeEnum commandType, HaborClient client, int contentSize) { + public CommandRecord(CommandTypeEnum commandType, HaborClient client, int commandContentSize) { + this(commandType, client, 16, DEFAULT_TIMEOUT_THRESHOLD); + } + + public CommandRecord(CommandTypeEnum commandType, HaborClient client, int commandContentSize, long expireMills) { this.commandType = commandType; this.client = client; - this.commandBuf = PooledByteBufAllocator.DEFAULT.buffer(contentSize); + this.commandBuf = PooledByteBufAllocator.DEFAULT.buffer(commandContentSize); this.status = NO_COMMAND; + this.expireMills = expireMills; } - + public String shortInfo() { + return String.format("{commandType=%s, uniIsd=%s, code=%s}", commandType.getInfo(), commandUniId, ByteUtils.byteToHex(code)); + } + /** 记录指令信息,启动定时判断任务*/ public void recordCommand(byte commandCode, String uniId, ByteBuf commandSlice) { this.code = commandCode; this.commandBuf.clear(); @@ -72,41 +83,49 @@ public class CommandRecord { this.status = WAITING_RESULT; this.receiveFrameCount.set(0); - //启动一个超时检查的定时任务 - this.timeoutFuture = scheduleTimeoutCheck(); - + // 启动延时任务 + CommandManager commandManager = CommandManager.getInstance(); + commandManager.addCommandRecord(this); + // this.timeoutFuture = scheduleTimeoutCheck(); } - // 定时任务:每5秒检查一次所有命令是否超时 - private ScheduledFuture scheduleTimeoutCheck() { - log.debug("[cac] {} {}指令超时判断定时任务启动", client.shortInfo(), commandType.getInfo()); - // 每5秒检查一次所有命令记录的超时状态 - return scheduler.schedule(() -> { - if (status != NO_COMMAND ) { - status = FAILED; // 标记超时为失败 - log.error("[cac] {} {}指令超时", client.shortInfo(), commandType.getInfo()); - switch (commandType) { - case CONTROL: - this.notifyControlCommandResult(client.uavId, false); - break; - case PARAM_BIND: - this.notifyParamBindCommandResult(client.uavId, client.uavType, false); - break; - case ROUTE_BIND: - this.notifyParamBindCommandResult(client.uavId, client.uavType, false); - break; - case PARAM_QUERY: - this.notifyParamQueryCommandResult(client.uavId, client.uavType, false, null); - } - client.switchToCommonData(); // todo 需要优化,尽量不要在里面用client的控制逻辑 - } - }, TIMEOUT_THRESHOLD, TimeUnit.MILLISECONDS); // 设置延迟时间 - } + // // 定时任务:每5秒检查一次所有命令是否超时 + // private ScheduledFuture scheduleTimeoutCheck() { + // log.debug("[cac] {} {}指令超时判断定时任务启动", client.shortInfo(), commandType.getInfo()); + // // 每5秒检查一次所有命令记录的超时状态 + // return scheduler.schedule(() -> { + // if (status != NO_COMMAND ) { + // status = FAILED; // 标记超时为失败 + // log.error("[cac] {} {}指令超时", client.shortInfo(), commandType.getInfo()); + // switch (commandType) { + // case CONTROL: + // this.notifyControlCommandResult(client.uavId, false); + // break; + // case PARAM_BIND: + // this.notifyParamBindCommandResult(client.uavId, client.uavType, false); + // break; + // case ROUTE_BIND: + // this.notifyParamBindCommandResult(client.uavId, client.uavType, false); + // break; + // case PARAM_QUERY: + // this.notifyParamQueryCommandResult(client.uavId, client.uavType, false); + // } + // client.switchToCommonData(); // todo 需要优化,尽量不要在里面用client的控制逻辑 + // } + // }, DEFAULT_TIMEOUT_THRESHOLD, TimeUnit.MILLISECONDS); // 设置延迟时间 + // } public void cancelScheduleTimeout() { - if (this.timeoutFuture != null && !this.timeoutFuture.isCancelled()) { - if (this.timeoutFuture.cancel(true)) { - log.debug("[cac] {} 取消{}指令超时判断定时任务", client.shortInfo(), commandType.getInfo()); + // if (this.timeoutFuture != null && !this.timeoutFuture.isCancelled()) { + // if (this.timeoutFuture.cancel(true)) { + // log.debug("[cac] {} 取消{}指令超时判断定时任务", client.shortInfo(), commandType.getInfo()); + // } + // } + if (timeout!= null && !timeout.isCancelled()) { + if (timeout.cancel()){ + log.debug("[cac] {} 取消{}指令超时判断任务: {}", client.shortInfo(), commandType.getInfo(), shortInfo()); + } else { + log.warn("[cac] {} 无法取消{}指令的超时判断任务:{},任务可能已在执行中", client.shortInfo(), commandType.getInfo(), shortInfo()); } } } @@ -152,7 +171,7 @@ public class CommandRecord { bindResultParam.setCommandSource(1); bindResultParam.setUavType(uavType.getRemoteCode()); bindResultParam.setUavParamBindCode(ByteUtils.byteToInt(code)); - bindResultParam.setBindResult(isSuccess?0:1); + bindResultParam.setBindResult(isSuccess); boolean notifySuccess = CacHpApi.uavParamBindResultNotify(bindResultParam); log.info("[cac] {} 参数装订{}结果 发送{}", client.shortInfo(), isSuccess?"成功":"失败", notifySuccess?"成功":"失败"); } catch (IOException e) { @@ -160,7 +179,7 @@ public class CommandRecord { } } - public void notifyParamQueryCommandResult(String uavId, UavTypeEnum uavType, boolean isSuccess, HaborClient.UavQueryResultParam uavQueryResultParam) { + public void notifyParamQueryCommandResult(String uavId, UavTypeEnum uavType, boolean isSuccess) { QueryResultParam queryResultParam = new QueryResultParam(); try { queryResultParam.setGcsId(GlobalData.GCS_ID); @@ -169,8 +188,8 @@ public class CommandRecord { queryResultParam.setCommandSource(1); queryResultParam.setUavType(uavType.getRemoteCode()); queryResultParam.setUavParamQueryCode(ByteUtils.byteToInt(code)); - queryResultParam.setQueryResult(isSuccess?0:1); - if (isSuccess) { + queryResultParam.setQueryResult(isSuccess); + if (isSuccess && uavQueryResultParam != null) { String queryResultContent = uavQueryResultParam.toJsonStr(); queryResultParam.setQueryResultContent(queryResultContent); } diff --git a/src/main/java/com/platform/service/clientmanage/HaborClient.java b/src/main/java/com/platform/service/clientmanage/HaborClient.java index bf32cb7..2989a01 100644 --- a/src/main/java/com/platform/service/clientmanage/HaborClient.java +++ b/src/main/java/com/platform/service/clientmanage/HaborClient.java @@ -110,11 +110,12 @@ public abstract class HaborClient extends BaseClient { @Override public boolean online() { // 仅当上线成功时才分配资源并进行和中心指控的通信 - online = CacHpApi.uavMasterControlNotify(uavId); - if (online) { + boolean notifySuccess = CacHpApi.uavMasterControlNotify(uavId); + if (notifySuccess) { resourceAllocate(); startSendCommonData(); } + online = notifySuccess; // 资源分配完毕再确定为上线 return online; } @@ -164,10 +165,6 @@ public abstract class HaborClient extends BaseClient { * 只通过get读数,不改变readerIndex和writerIndex, 也不可更改refCnt */ public void process(ByteBuf msg) { - if (!online) { - return; - } - if (msg.getByte(0) != HEAD[0] && msg.getByte(1) != HEAD[1]) { return; // 帧头错误,抛弃 } diff --git a/src/main/java/com/platform/service/clientmanage/HaborClient981A.java b/src/main/java/com/platform/service/clientmanage/HaborClient981A.java index 3aecfe3..fdb84a3 100644 --- a/src/main/java/com/platform/service/clientmanage/HaborClient981A.java +++ b/src/main/java/com/platform/service/clientmanage/HaborClient981A.java @@ -29,6 +29,7 @@ import java.util.List; @Slf4j @Getter public class HaborClient981A extends HaborClient { + private final CommandManager commandManager = CommandManager.getInstance(); private static final byte[] UPPER_DATA_3_4 = new byte[]{0x02, 0x01}; /**常发数据内容,被填入ByteBuf初始内容,所有飞机共用, 禁止修改*/ @@ -316,8 +317,7 @@ public class HaborClient981A extends HaborClient { previousControl.addReceiveFrameCount(); if (msg.getByte(29) == previousControl.code) { try { - previousControl.notifyControlCommandResult(uavId, true); - previousControl.clearCommand(); + commandManager.submitResultNotification(previousControl, true); return true; } catch (Exception e) { log.warn("[cac] {} 控制指令回报处理异常:{}",shortInfo(), e.getMessage()); @@ -325,12 +325,7 @@ public class HaborClient981A extends HaborClient { switchToCommonData(); // 恢复常发数据帧 } } - // else { - // if (previousControl.isTimeout()) { - // log.info("[cac] {} 控制指令回报失败, 超时", shortInfo()); - // previousControl.clearCommand(); - // } - // } + } return false; } @@ -341,21 +336,12 @@ public class HaborClient981A extends HaborClient { previousParamBind.addReceiveFrameCount(); if (previousParamBind.commandMatch(msg.slice(13,16))) { try { - previousParamBind.notifyParamBindCommandResult(uavId, uavType,true); - previousParamBind.clearCommand(); + commandManager.submitResultNotification(previousParamBind, true); return true; - } catch (Exception e) { - log.warn("[cac] {} 参数装订结果回报处理异常:{}",shortInfo(), e.getMessage()); } finally { switchToCommonData(); // 恢复常发数据帧 } } - // else { - // if (previousParamBind.isTimeout()) { - // log.info("[cac] {} 指令装订失败, 超过最大帧数仍未回报", shortInfo()); - // previousParamBind.clearCommand(); - // } - // } } return false; } @@ -376,8 +362,8 @@ public class HaborClient981A extends HaborClient { default: log.warn("[cac] {} 参数查询结果回报,未知的指令类型:{}", shortInfo(), ByteUtils.byteToHex(msg.getByte(13))); } - previousParamQuery.notifyParamQueryCommandResult(uavId, uavType, true, uavQueryResultParam); - previousParamQuery.clearCommand(); + previousParamQuery.setUavQueryResultParam(uavQueryResultParam); + commandManager.submitResultNotification(previousParamQuery, true); return true; } catch (Exception e) { log.warn("[cac] {} 参数查询结果回报处理异常:{}",shortInfo(), e.getMessage()); @@ -385,12 +371,6 @@ public class HaborClient981A extends HaborClient { switchToCommonData(); // 恢复常发数据帧 } } - // else { - // if (previousControl.isTimeout()) { - // log.info("[cac] {} 指令查询结果回报失败, 超过最大帧数仍未回报", shortInfo()); - // previousControl.clearCommand(); - // } - // } } return false; } @@ -402,11 +382,8 @@ public class HaborClient981A extends HaborClient { if (previousRouteBind.commandMatch(msg.slice(13, 16))) { if (previousRouteBind.completeOrWaitNext()) { // 如果指令列表均已回报,则判定成功,向中心指控回报, 并恢复常发帧 try { - previousRouteBind.notifyParamBindCommandResult(uavId,uavType,true); - previousRouteBind.clearCommand(); + commandManager.submitResultNotification(previousRouteBind, true); return true; - } catch (Exception e) { - log.warn("[cac] {} 航线装订结果回报处理异常:{}", shortInfo(), e.getMessage()); } finally { switchToCommonData(); // 恢复常发数据帧 } diff --git a/src/main/java/com/platform/service/handler/CacDataRouterHandler.java b/src/main/java/com/platform/service/handler/CacDataRouterHandler.java index a200227..d486030 100644 --- a/src/main/java/com/platform/service/handler/CacDataRouterHandler.java +++ b/src/main/java/com/platform/service/handler/CacDataRouterHandler.java @@ -26,7 +26,7 @@ public class CacDataRouterHandler extends SimpleChannelInboundHandler { if (client == null || client.getClientType() != ClientTypeEnum.HABOR) { return; } - log.debug("[cac] {} 收到数据({}bytes): {}", ((HaborClient) client).shortInfo(), msg.readableBytes(), ByteBufUtil.hexDump(msg)); + // log.debug("[cac] {} 收到数据({}bytes): {}", ((HaborClient) client).shortInfo(), msg.readableBytes(), ByteBufUtil.hexDump(msg)); HaborClient haborClient = (HaborClient) client; haborClient.process(msg); } diff --git a/src/main/java/com/platform/service/handler/InMessageHandler1.java b/src/main/java/com/platform/service/handler/InMessageHandler1.java index 5353abb..10f0bdb 100644 --- a/src/main/java/com/platform/service/handler/InMessageHandler1.java +++ b/src/main/java/com/platform/service/handler/InMessageHandler1.java @@ -9,6 +9,7 @@ import com.platform.info.mapping.HaborUavMap; import com.platform.info.mapping.UavIdMap; import com.platform.info.dto.DirectControlUavParam; import com.platform.service.clientmanage.BaseClient; +import com.platform.service.clientmanage.CacRequestExecutor; import com.platform.service.clientmanage.ClientManager; import com.platform.service.clientmanage.HaborClient; import com.platform.util.*; @@ -118,7 +119,7 @@ public class InMessageHandler1 extends ChannelInboundHandlerAdapter { sessionCache.snToAddMap.remove(sn); } } - haborSignOutOfCac(ctx.channel()); + CacRequestExecutor.getInstance().submit(() -> haborSignOutOfCac(ctx.channel())); super.channelInactive(ctx); } @@ -144,7 +145,8 @@ public class InMessageHandler1 extends ChannelInboundHandlerAdapter { sessionCache.snToAddMap.put(deviceCode, address); // mac- ip sessionCache.tcpSession.put(address, ctx.channel()); // ip - channel - haborSignIntoCac(haborSn, channel); + CacRequestExecutor.getInstance().submit(() -> haborSignIntoCac(haborSn, channel)); + } } catch (Exception e) { log.error("哈勃上线失败, haborSn={}", haborSn, e); @@ -174,7 +176,7 @@ public class InMessageHandler1 extends ChannelInboundHandlerAdapter { } BaseClient client = ClientManager.getClient(ctx.channel()); - if (client == null || client.getClientType() != ClientTypeEnum.HABOR) { + if (client == null || client.getClientType() != ClientTypeEnum.HABOR || !client.isOnline()) { if (msgBuf != null) { msgBuf.release(); } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 9003989..bd75e9a 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,6 +1,6 @@ spring: profiles: - active: pre + active: dev app: debug: true \ No newline at end of file