[v0.0.7] 向中心指控的网络请求均改用线程池提交,减少遥测数据处理过程中的阻塞

master
shiyi 4 weeks ago
parent 79b7b27be5
commit 41f5412e57

@ -10,7 +10,7 @@
</parent>
<groupId>com.platform</groupId>
<artifactId>pass-through</artifactId>
<version>0.0.4</version>
<version>0.0.6</version>
<name>pass-through</name>
<description>中心指控直控无人机哈勃服务</description>

@ -1,9 +1,13 @@
package com.platform.config;
import com.platform.service.clientmanage.CacRequestExecutor;
import com.platform.service.clientmanage.CommandManager;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
@Configuration
@Data
public class ServiceConfig {
@ -11,4 +15,10 @@ public class ServiceConfig {
private boolean debug;
@Value("${service.port}")
private Integer port;
@PostConstruct
public void initConfig() {
CommandManager.getInstance();
CacRequestExecutor.getInstance();
}
}

@ -1,33 +0,0 @@
package com.platform.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
@EnableAsync
public class ThreadPoolConfig {
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 设置核心线程数
executor.setCorePoolSize(8);
// 设置最大线程数
executor.setMaxPoolSize(100);
// 设置队列容量
executor.setQueueCapacity(8);
// 设置线程活跃时间(秒)
executor.setKeepAliveSeconds(300);
// 设置默认线程名称
executor.setThreadNamePrefix("thread-");
// 设置拒绝策略rejection-policy当pool已经达到max size的时候如何处理新任务 CALLER_RUNS不在新线程中执行任务而是有调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 等待所有任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
return executor;
}
}

@ -61,7 +61,7 @@ public class ServerService {
.addLast("CacDataRouter", cacDataRouterHandler); // cac数据转发
}
});
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID); // 内存泄露检测,上线记得关掉paranoid
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID); // 内存泄露检测,上线记得关掉
bootstrap.option(ChannelOption.SO_BACKLOG, 2048);
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
bootstrap.childOption(ChannelOption.TCP_NODELAY, true);

@ -24,7 +24,7 @@ public abstract class BaseClient {
protected String ip;
protected String port;
protected String serverPort;
protected boolean online = false;
protected volatile boolean online = false;
// protected boolean online = false;
public BaseClient(String sn, Channel channel) {
this.sn = sn;

@ -0,0 +1,60 @@
package com.platform.service.clientmanage;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
/**
* Cac
*/
@Slf4j
public class CacRequestExecutor {
private static final CacRequestExecutor INSTANCE = new CacRequestExecutor();
private final ExecutorService executorService; // 根据实际需求调整线程数
public CacRequestExecutor() {
// 启动任务通知线程池
ThreadFactory notifyThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("cacRequest" + "-%d")
.setDaemon(true).build();
int availableProcessors = Runtime.getRuntime().availableProcessors();
int corePoolSize = availableProcessors * 2;
int maxPoolSize = availableProcessors * 4;
long keepAliveTime = 60L; // 60秒
int queueCapacity = 200;
executorService = new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(queueCapacity),
notifyThreadFactory
// 默认使用AbortPolicy, 会抛异常
);
log.info("cacRequest线程池启动");
// 注册 JVM 关闭挂钩,确保线程池在 JVM 退出时被关闭
Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
}
public static CacRequestExecutor getInstance() {
return INSTANCE;
}
public Future<?> submit(Runnable task) {
return INSTANCE.executorService.submit(task);
}
public void shutdown() {
log.info("cacRequest线程池关闭...");
executorService.shutdown();
try {
if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
}

@ -0,0 +1,113 @@
package com.platform.service.clientmanage;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
/**
* CommandManager 使 Netty HashedWheelTimer
*/
@Slf4j
public class CommandManager {
private static final CommandManager INSTANCE = new CommandManager();
// Netty的HashedWheelTimer用于管理指令超时
private final HashedWheelTimer timer;
// 线程池用于处理通知任务,固定大小以限制并发数
private final CacRequestExecutor cacRequestExecutor;
private CommandManager() {
// 初始化HashedWheelTimertickDuration和ticksPerWheel可以根据实际需求调整
// tickDuration一个刻度的时间间隔即时间精度, 每个刻度500毫秒
// ticksPerWheel总的刻度数, 需要为2^N次方
int tickDuration = 500;
int tickPerWheel = 64;
ThreadFactory wheelTimerThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("wheelTimer" + "-%d")
.setDaemon(true).build();
this.timer = new HashedWheelTimer(wheelTimerThreadFactory, tickDuration, TimeUnit.MILLISECONDS, tickPerWheel);
cacRequestExecutor = CacRequestExecutor.getInstance();
// 注册 JVM 关闭挂钩,确保线程池在 JVM 退出时被关闭
Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
}
public static CommandManager getInstance() {
return INSTANCE;
}
/**
*
*
* @param record
*/
public void addCommandRecord(CommandRecord record) {
// 创建并提交超时任务
TimerTask task = timeout -> {
if (record.getStatus() == CommandRecord.WAITING_RESULT) {
record.setStatus(CommandRecord.FAILED);
log.warn("[cac] {} {}指令超时: {}", record.getClient().shortInfo(), record.getCommandType().getInfo(), record.shortInfo());
// 提交超时通知任务
submitResultNotification(record, false);
} else {
log.warn("[cac] {} {}指令已完成,状态为{}: {}", record.getClient().shortInfo(), record.getCommandType().getInfo(), record.getStatus(), record.shortInfo());
}
};
// 设置超时时间
Timeout timeout = this.timer.newTimeout(task, record.getExpireMills(), TimeUnit.MILLISECONDS);
record.setTimeout(timeout);
}
/**
* 线
*
* @param record
* @param isSuccess true false
*/
public void submitResultNotification(CommandRecord record, boolean isSuccess) {
try {
cacRequestExecutor.submit(() -> notifyCommandResult(record, isSuccess));
} catch (RejectedExecutionException e) {
log.error("[cac] {} {}指令通知任务提交失败: {}", record.getClient().shortInfo(), record.getCommandType().getInfo(), record.shortInfo(), e);
}
}
private void notifyCommandResult(CommandRecord record, boolean isSuccess) {
try {
HaborClient client = record.getClient();
switch (record.getCommandType()) {
case CONTROL:
record.notifyControlCommandResult(client.uavId, isSuccess);
break;
case PARAM_BIND:
case ROUTE_BIND:
record.notifyParamBindCommandResult(client.uavId, client.uavType, isSuccess);
break;
case PARAM_QUERY:
record.notifyParamQueryCommandResult(client.uavId, client.uavType, isSuccess);
break;
default:
log.warn("[cac] 未知的指令类型:{}", record.getCommandType());
}
} catch (Exception e) {
log.error("[cac] {} 处理{}指令回报通知时发生异常: {}", record.getClient().shortInfo(), record.getCommandType().getInfo(), e.getMessage(), e);
} finally {
record.clearCommand();
}
}
/**
*
*/
public void shutdown() {
log.info("时间轮关闭...");
this.timer.stop();
}
}

@ -13,7 +13,7 @@ import java.util.Queue;
@Slf4j
public class CommandQueueRecord extends CommandRecord {
protected Queue<byte[]> commandQueue;
public static final int WAITING_NEXT = 3; // 等待下一个节点回报
public static final int WAITING_NEXT = 2; // 等待下一个节点回报
public CommandQueueRecord(CommandTypeEnum commandType, HaborClient client, int contentSize){
super(commandType, client, contentSize);
@ -39,6 +39,7 @@ public class CommandQueueRecord extends CommandRecord {
return false;
}
/** 设置指令状态为等待下一个节点*/
@Override
public void recordCommand(byte commandCode, String uniId, ByteBuf commandSlice) {
super.recordCommand(commandCode, uniId, commandSlice);

@ -8,21 +8,17 @@ import com.platform.info.dto.CommandResultParam;
import com.platform.info.dto.QueryResultParam;
import com.platform.info.enums.CommandTypeEnum;
import com.platform.info.enums.UavTypeEnum;
import com.platform.service.clientmanage.HaborClient.UavQueryResultParam;
import com.platform.util.ByteUtils;
import com.platform.util.JSONUtils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.util.Timeout;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/** 记录指令状态,回报指令结果*/
@ -36,34 +32,49 @@ public class CommandRecord {
protected ByteBuf commandBuf = null; // 指令注入内容
protected String commandUniId; // 中心指控指令唯一码
@Getter@Setter
private UavQueryResultParam uavQueryResultParam;
// 指令状态
@Getter
private AtomicInteger receiveFrameCount = new AtomicInteger(0); // 接收到的指令帧数, 150帧还未判断成功则判定为失败
// private long startTime = 0 ; // 接收到的指令帧数, 150帧还未判断成功则判定为失败
private final int MAX_RECEIVE_COUNT = 150; // 接收到的指令帧数, 150帧还未判断成功则判定为失败
private final long TIMEOUT_THRESHOLD = 30*1000; // 从下发超过5s未回报则判定为失败
private static final long DEFAULT_TIMEOUT_THRESHOLD = 10*1000; // 默认超时时间, 从下发超过5s未回报则判定为失败
@Getter@Setter
protected int status;
public static final int FAILED = -1; // 指令失败
public static final int NO_COMMAND = 0; // 空闲
public static final int WAITING_RESULT = 2; // 等待回报
public static final int WAITING_RESULT = 1; // 等待回报
// 定时任务管理
private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(50); // 单线程池管理超时任务
ScheduledFuture<?> timeoutFuture = null; // 超时任务
@Getter @Setter
private Timeout timeout;
@Getter
long expireMills; // 超时时间 ms
// private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(50); // 单线程池管理超时任务
// ScheduledFuture<?> timeoutFuture = null; // 超时任务
// todo 过期时间控制
public CommandRecord(CommandTypeEnum commandType, HaborClient client) {
this(commandType, client, 16);
}
public CommandRecord(CommandTypeEnum commandType, HaborClient client, int contentSize) {
public CommandRecord(CommandTypeEnum commandType, HaborClient client, int commandContentSize) {
this(commandType, client, 16, DEFAULT_TIMEOUT_THRESHOLD);
}
public CommandRecord(CommandTypeEnum commandType, HaborClient client, int commandContentSize, long expireMills) {
this.commandType = commandType;
this.client = client;
this.commandBuf = PooledByteBufAllocator.DEFAULT.buffer(contentSize);
this.commandBuf = PooledByteBufAllocator.DEFAULT.buffer(commandContentSize);
this.status = NO_COMMAND;
this.expireMills = expireMills;
}
public String shortInfo() {
return String.format("{commandType=%s, uniIsd=%s, code=%s}", commandType.getInfo(), commandUniId, ByteUtils.byteToHex(code));
}
/** 记录指令信息,启动定时判断任务*/
public void recordCommand(byte commandCode, String uniId, ByteBuf commandSlice) {
this.code = commandCode;
this.commandBuf.clear();
@ -72,41 +83,49 @@ public class CommandRecord {
this.status = WAITING_RESULT;
this.receiveFrameCount.set(0);
//启动一个超时检查的定时任务
this.timeoutFuture = scheduleTimeoutCheck();
// 启动延时任务
CommandManager commandManager = CommandManager.getInstance();
commandManager.addCommandRecord(this);
// this.timeoutFuture = scheduleTimeoutCheck();
}
// 定时任务每5秒检查一次所有命令是否超时
private ScheduledFuture<?> scheduleTimeoutCheck() {
log.debug("[cac] {} {}指令超时判断定时任务启动", client.shortInfo(), commandType.getInfo());
// 每5秒检查一次所有命令记录的超时状态
return scheduler.schedule(() -> {
if (status != NO_COMMAND ) {
status = FAILED; // 标记超时为失败
log.error("[cac] {} {}指令超时", client.shortInfo(), commandType.getInfo());
switch (commandType) {
case CONTROL:
this.notifyControlCommandResult(client.uavId, false);
break;
case PARAM_BIND:
this.notifyParamBindCommandResult(client.uavId, client.uavType, false);
break;
case ROUTE_BIND:
this.notifyParamBindCommandResult(client.uavId, client.uavType, false);
break;
case PARAM_QUERY:
this.notifyParamQueryCommandResult(client.uavId, client.uavType, false, null);
}
client.switchToCommonData(); // todo 需要优化尽量不要在里面用client的控制逻辑
}
}, TIMEOUT_THRESHOLD, TimeUnit.MILLISECONDS); // 设置延迟时间
}
// // 定时任务每5秒检查一次所有命令是否超时
// private ScheduledFuture<?> scheduleTimeoutCheck() {
// log.debug("[cac] {} {}指令超时判断定时任务启动", client.shortInfo(), commandType.getInfo());
// // 每5秒检查一次所有命令记录的超时状态
// return scheduler.schedule(() -> {
// if (status != NO_COMMAND ) {
// status = FAILED; // 标记超时为失败
// log.error("[cac] {} {}指令超时", client.shortInfo(), commandType.getInfo());
// switch (commandType) {
// case CONTROL:
// this.notifyControlCommandResult(client.uavId, false);
// break;
// case PARAM_BIND:
// this.notifyParamBindCommandResult(client.uavId, client.uavType, false);
// break;
// case ROUTE_BIND:
// this.notifyParamBindCommandResult(client.uavId, client.uavType, false);
// break;
// case PARAM_QUERY:
// this.notifyParamQueryCommandResult(client.uavId, client.uavType, false);
// }
// client.switchToCommonData(); // todo 需要优化尽量不要在里面用client的控制逻辑
// }
// }, DEFAULT_TIMEOUT_THRESHOLD, TimeUnit.MILLISECONDS); // 设置延迟时间
// }
public void cancelScheduleTimeout() {
if (this.timeoutFuture != null && !this.timeoutFuture.isCancelled()) {
if (this.timeoutFuture.cancel(true)) {
log.debug("[cac] {} 取消{}指令超时判断定时任务", client.shortInfo(), commandType.getInfo());
// if (this.timeoutFuture != null && !this.timeoutFuture.isCancelled()) {
// if (this.timeoutFuture.cancel(true)) {
// log.debug("[cac] {} 取消{}指令超时判断定时任务", client.shortInfo(), commandType.getInfo());
// }
// }
if (timeout!= null && !timeout.isCancelled()) {
if (timeout.cancel()){
log.debug("[cac] {} 取消{}指令超时判断任务: {}", client.shortInfo(), commandType.getInfo(), shortInfo());
} else {
log.warn("[cac] {} 无法取消{}指令的超时判断任务:{},任务可能已在执行中", client.shortInfo(), commandType.getInfo(), shortInfo());
}
}
}
@ -152,7 +171,7 @@ public class CommandRecord {
bindResultParam.setCommandSource(1);
bindResultParam.setUavType(uavType.getRemoteCode());
bindResultParam.setUavParamBindCode(ByteUtils.byteToInt(code));
bindResultParam.setBindResult(isSuccess?0:1);
bindResultParam.setBindResult(isSuccess);
boolean notifySuccess = CacHpApi.uavParamBindResultNotify(bindResultParam);
log.info("[cac] {} 参数装订{}结果 发送{}", client.shortInfo(), isSuccess?"成功":"失败", notifySuccess?"成功":"失败");
} catch (IOException e) {
@ -160,7 +179,7 @@ public class CommandRecord {
}
}
public void notifyParamQueryCommandResult(String uavId, UavTypeEnum uavType, boolean isSuccess, HaborClient.UavQueryResultParam uavQueryResultParam) {
public void notifyParamQueryCommandResult(String uavId, UavTypeEnum uavType, boolean isSuccess) {
QueryResultParam queryResultParam = new QueryResultParam();
try {
queryResultParam.setGcsId(GlobalData.GCS_ID);
@ -169,8 +188,8 @@ public class CommandRecord {
queryResultParam.setCommandSource(1);
queryResultParam.setUavType(uavType.getRemoteCode());
queryResultParam.setUavParamQueryCode(ByteUtils.byteToInt(code));
queryResultParam.setQueryResult(isSuccess?0:1);
if (isSuccess) {
queryResultParam.setQueryResult(isSuccess);
if (isSuccess && uavQueryResultParam != null) {
String queryResultContent = uavQueryResultParam.toJsonStr();
queryResultParam.setQueryResultContent(queryResultContent);
}

@ -110,11 +110,12 @@ public abstract class HaborClient extends BaseClient {
@Override
public boolean online() {
// 仅当上线成功时才分配资源并进行和中心指控的通信
online = CacHpApi.uavMasterControlNotify(uavId);
if (online) {
boolean notifySuccess = CacHpApi.uavMasterControlNotify(uavId);
if (notifySuccess) {
resourceAllocate();
startSendCommonData();
}
online = notifySuccess; // 资源分配完毕再确定为上线
return online;
}
@ -164,10 +165,6 @@ public abstract class HaborClient extends BaseClient {
* getreaderIndexwriterIndex, refCnt
*/
public void process(ByteBuf msg) {
if (!online) {
return;
}
if (msg.getByte(0) != HEAD[0] && msg.getByte(1) != HEAD[1]) {
return; // 帧头错误,抛弃
}

@ -29,6 +29,7 @@ import java.util.List;
@Slf4j @Getter
public class HaborClient981A extends HaborClient {
private final CommandManager commandManager = CommandManager.getInstance();
private static final byte[] UPPER_DATA_3_4 = new byte[]{0x02, 0x01};
/**常发数据内容被填入ByteBuf初始内容所有飞机共用, 禁止修改*/
@ -316,8 +317,7 @@ public class HaborClient981A extends HaborClient {
previousControl.addReceiveFrameCount();
if (msg.getByte(29) == previousControl.code) {
try {
previousControl.notifyControlCommandResult(uavId, true);
previousControl.clearCommand();
commandManager.submitResultNotification(previousControl, true);
return true;
} catch (Exception e) {
log.warn("[cac] {} 控制指令回报处理异常:{}",shortInfo(), e.getMessage());
@ -325,12 +325,7 @@ public class HaborClient981A extends HaborClient {
switchToCommonData(); // 恢复常发数据帧
}
}
// else {
// if (previousControl.isTimeout()) {
// log.info("[cac] {} 控制指令回报失败, 超时", shortInfo());
// previousControl.clearCommand();
// }
// }
}
return false;
}
@ -341,21 +336,12 @@ public class HaborClient981A extends HaborClient {
previousParamBind.addReceiveFrameCount();
if (previousParamBind.commandMatch(msg.slice(13,16))) {
try {
previousParamBind.notifyParamBindCommandResult(uavId, uavType,true);
previousParamBind.clearCommand();
commandManager.submitResultNotification(previousParamBind, true);
return true;
} catch (Exception e) {
log.warn("[cac] {} 参数装订结果回报处理异常:{}",shortInfo(), e.getMessage());
} finally {
switchToCommonData(); // 恢复常发数据帧
}
}
// else {
// if (previousParamBind.isTimeout()) {
// log.info("[cac] {} 指令装订失败, 超过最大帧数仍未回报", shortInfo());
// previousParamBind.clearCommand();
// }
// }
}
return false;
}
@ -376,8 +362,8 @@ public class HaborClient981A extends HaborClient {
default:
log.warn("[cac] {} 参数查询结果回报,未知的指令类型:{}", shortInfo(), ByteUtils.byteToHex(msg.getByte(13)));
}
previousParamQuery.notifyParamQueryCommandResult(uavId, uavType, true, uavQueryResultParam);
previousParamQuery.clearCommand();
previousParamQuery.setUavQueryResultParam(uavQueryResultParam);
commandManager.submitResultNotification(previousParamQuery, true);
return true;
} catch (Exception e) {
log.warn("[cac] {} 参数查询结果回报处理异常:{}",shortInfo(), e.getMessage());
@ -385,12 +371,6 @@ public class HaborClient981A extends HaborClient {
switchToCommonData(); // 恢复常发数据帧
}
}
// else {
// if (previousControl.isTimeout()) {
// log.info("[cac] {} 指令查询结果回报失败, 超过最大帧数仍未回报", shortInfo());
// previousControl.clearCommand();
// }
// }
}
return false;
}
@ -402,11 +382,8 @@ public class HaborClient981A extends HaborClient {
if (previousRouteBind.commandMatch(msg.slice(13, 16))) {
if (previousRouteBind.completeOrWaitNext()) { // 如果指令列表均已回报,则判定成功,向中心指控回报, 并恢复常发帧
try {
previousRouteBind.notifyParamBindCommandResult(uavId,uavType,true);
previousRouteBind.clearCommand();
commandManager.submitResultNotification(previousRouteBind, true);
return true;
} catch (Exception e) {
log.warn("[cac] {} 航线装订结果回报处理异常:{}", shortInfo(), e.getMessage());
} finally {
switchToCommonData(); // 恢复常发数据帧
}

@ -26,7 +26,7 @@ public class CacDataRouterHandler extends SimpleChannelInboundHandler<ByteBuf> {
if (client == null || client.getClientType() != ClientTypeEnum.HABOR) {
return;
}
log.debug("[cac] {} 收到数据({}bytes): {}", ((HaborClient) client).shortInfo(), msg.readableBytes(), ByteBufUtil.hexDump(msg));
// log.debug("[cac] {} 收到数据({}bytes): {}", ((HaborClient) client).shortInfo(), msg.readableBytes(), ByteBufUtil.hexDump(msg));
HaborClient haborClient = (HaborClient) client;
haborClient.process(msg);
}

@ -9,6 +9,7 @@ import com.platform.info.mapping.HaborUavMap;
import com.platform.info.mapping.UavIdMap;
import com.platform.info.dto.DirectControlUavParam;
import com.platform.service.clientmanage.BaseClient;
import com.platform.service.clientmanage.CacRequestExecutor;
import com.platform.service.clientmanage.ClientManager;
import com.platform.service.clientmanage.HaborClient;
import com.platform.util.*;
@ -118,7 +119,7 @@ public class InMessageHandler1 extends ChannelInboundHandlerAdapter {
sessionCache.snToAddMap.remove(sn);
}
}
haborSignOutOfCac(ctx.channel());
CacRequestExecutor.getInstance().submit(() -> haborSignOutOfCac(ctx.channel()));
super.channelInactive(ctx);
}
@ -144,7 +145,8 @@ public class InMessageHandler1 extends ChannelInboundHandlerAdapter {
sessionCache.snToAddMap.put(deviceCode, address); // mac- ip
sessionCache.tcpSession.put(address, ctx.channel()); // ip - channel
haborSignIntoCac(haborSn, channel);
CacRequestExecutor.getInstance().submit(() -> haborSignIntoCac(haborSn, channel));
}
} catch (Exception e) {
log.error("哈勃上线失败, haborSn={}", haborSn, e);
@ -174,7 +176,7 @@ public class InMessageHandler1 extends ChannelInboundHandlerAdapter {
}
BaseClient client = ClientManager.getClient(ctx.channel());
if (client == null || client.getClientType() != ClientTypeEnum.HABOR) {
if (client == null || client.getClientType() != ClientTypeEnum.HABOR || !client.isOnline()) {
if (msgBuf != null) {
msgBuf.release();
}

@ -1,6 +1,6 @@
spring:
profiles:
active: pre
active: dev
app:
debug: true
Loading…
Cancel
Save