diff --git a/pom.xml b/pom.xml index 5311626..52cd71b 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ </parent> <groupId>com.platform</groupId> <artifactId>pass-through</artifactId> - <version>1.0.3</version> + <version>0.0.4</version> <name>pass-through</name> <description>中心指控直控无人机哈勃服务</description> diff --git a/src/main/java/com/platform/cac/GcsService.java b/src/main/java/com/platform/cac/GcsService.java index 704021f..9371251 100644 --- a/src/main/java/com/platform/cac/GcsService.java +++ b/src/main/java/com/platform/cac/GcsService.java @@ -130,7 +130,6 @@ public class GcsService { */ public boolean gcsAuthRequestToCtrl() { TcpRemoteAuthCacRequest tcpGcsAuthCacRequest = new TcpRemoteAuthCacRequest(); - ChannelFuture sendFuture = null; try { if (StringUtils.isEmpty(GlobalData.AUTHORIZATION)) { log.info("[tcpAuth] authorization 为空,重新发起上线请求"); @@ -143,10 +142,12 @@ public class GcsService { tcpGcsAuthCacRequest.setReadableDataBytes(null); if (CacClient.channelIsActive()) { - sendFuture = CacClient.write(tcpGcsAuthCacRequest).sync(); - if (sendFuture.isSuccess()){ - log.info("[tcpAuth] 认证请求已成功发送发送,认证码:{}", GlobalData.AUTHORIZATION); + ChannelFuture sendFuture = CacClient.write(tcpGcsAuthCacRequest); + if (sendFuture != null && sendFuture.isSuccess()){ + log.info("[tcpAuth] 认证请求已成功发送,认证码:{}", GlobalData.AUTHORIZATION); return true; + } else { + log.error("[tcpAuth] 认证请求发送失败: {}", sendFuture!=null?sendFuture.cause():""); } } else { log.error("[tcpAuth] 和中心指控之间tcp连接异常,tcp认证发送失败!"); diff --git a/src/main/java/com/platform/cac/tcp/CacConnectionHandler.java b/src/main/java/com/platform/cac/tcp/CacConnectionHandler.java index 538a9d8..c84a98c 100644 --- a/src/main/java/com/platform/cac/tcp/CacConnectionHandler.java +++ b/src/main/java/com/platform/cac/tcp/CacConnectionHandler.java @@ -50,6 +50,7 @@ public class CacConnectionHandler extends ChannelInboundHandlerAdapter { log.info("地面站发送上线请求"); if (gcsService.gcsSignInRequest()) { gcsService.gcsAuthRequestToCtrl(); + sendHeartBeat(); } }, 1, TimeUnit.SECONDS); @@ -81,12 +82,16 @@ public class CacConnectionHandler extends ChannelInboundHandlerAdapter { if (IdleState.WRITER_IDLE.equals(state)) { //心跳 TcpHeartBeatRequest tcpHeartBeatRequest = buildHeatBeatRequest(); - ctx.channel().writeAndFlush(tcpHeartBeatRequest); + sendHeartBeat(); + sendHeartBeat(); // 加一条心跳作为分隔符 } } } + private void sendHeartBeat() { + CacClient.write(buildHeatBeatRequest()); + } private TcpHeartBeatRequest buildHeatBeatRequest(){ TcpHeartBeatRequest tcpHeartBeatRequest = new TcpHeartBeatRequest(); tcpHeartBeatRequest.setMagicCode(GlobalData.REMOTE_HEAD); diff --git a/src/main/java/com/platform/cac/tcp/message/dataframe/receive/TcpRemoteAuthorization.java b/src/main/java/com/platform/cac/tcp/message/dataframe/receive/TcpRemoteAuthorization.java index 76d5731..d8dc025 100644 --- a/src/main/java/com/platform/cac/tcp/message/dataframe/receive/TcpRemoteAuthorization.java +++ b/src/main/java/com/platform/cac/tcp/message/dataframe/receive/TcpRemoteAuthorization.java @@ -3,13 +3,13 @@ package com.platform.cac.tcp.message.dataframe.receive; import com.platform.cac.tcp.message.dataframe.RemoteTcpBaseDataFrame; import com.platform.util.ByteUtils; -import lombok.Data; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import java.nio.ByteBuffer; -@Data +@Getter @Slf4j public class TcpRemoteAuthorization extends RemoteTcpBaseDataFrame { diff --git a/src/main/java/com/platform/service/ServerService.java b/src/main/java/com/platform/service/ServerService.java index b5a7d90..ec50398 100644 --- a/src/main/java/com/platform/service/ServerService.java +++ b/src/main/java/com/platform/service/ServerService.java @@ -1,6 +1,9 @@ package com.platform.service; import com.platform.config.ServiceConfig; +import com.platform.service.handler.CacDataRouterHandler; +import com.platform.service.handler.InMessageHandler1; +import com.platform.service.handler.TelemetryDecoder; import com.platform.util.SessionCache; import com.platform.util.StringUtil; import io.netty.bootstrap.ServerBootstrap; @@ -34,12 +37,11 @@ public class ServerService { InMessageHandler1 inMessageHandler; @Autowired CacDataRouterHandler cacDataRouterHandler; - private void startServer() { //服务端需要2个线程组 boss处理客户端连接 work进行客服端连接之后的处理 log.warn("【读取配置端口】 端口 = " + config.getPort()); EventLoopGroup boss = new NioEventLoopGroup(); - EventLoopGroup work = new NioEventLoopGroup(10); + EventLoopGroup work = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); //服务器 配置 @@ -48,7 +50,6 @@ public class ServerService { bootstrap.childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel channel) throws Exception { - // channel.pipeline().addLast(new ByteArrayDecoder()); // revise by shiyi: 解码移动到InMessageHandler中 channel.pipeline().addLast(new ByteArrayEncoder()); if (!config.isDebug()) { channel.pipeline().addLast(new IdleStateHandler(0, 0, diff --git a/src/main/java/com/platform/service/clientmanage/CommandRecord.java b/src/main/java/com/platform/service/clientmanage/CommandRecord.java index 4869c63..c59fbe3 100644 --- a/src/main/java/com/platform/service/clientmanage/CommandRecord.java +++ b/src/main/java/com/platform/service/clientmanage/CommandRecord.java @@ -9,6 +9,8 @@ import com.platform.util.ByteUtils; import com.platform.util.JSONUtils; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; +import lombok.Getter; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; import java.io.IOException; @@ -23,17 +25,22 @@ import java.util.concurrent.atomic.AtomicInteger; /** 记录指令状态,回报指令结果*/ @Slf4j public class CommandRecord { + @Getter private final HaborClient client; + @Getter private final CommandTypeEnum commandType; protected byte code; // 指令码 protected ByteBuf commandBuf = null; // 指令注入内容 protected String commandUniId; // 中心指控指令唯一码 // 指令状态 + @Getter private AtomicInteger receiveFrameCount = new AtomicInteger(0); // 接收到的指令帧数, 150帧还未判断成功则判定为失败 // private long startTime = 0 ; // 接收到的指令帧数, 150帧还未判断成功则判定为失败 private final int MAX_RECEIVE_COUNT = 150; // 接收到的指令帧数, 150帧还未判断成功则判定为失败 private final long TIMEOUT_THRESHOLD = 30*1000; // 从下发超过5s未回报,则判定为失败 + + @Getter@Setter protected int status; public static final int FAILED = -1; // 指令失败 public static final int NO_COMMAND = 0; // 空闲 @@ -44,14 +51,16 @@ public class CommandRecord { ScheduledFuture<?> timeoutFuture = null; // 超时任务 // todo 过期时间控制 public CommandRecord(CommandTypeEnum commandType, HaborClient client) { - this.commandType = commandType; - this.client = client; + this(commandType, client, 16); } + public CommandRecord(CommandTypeEnum commandType, HaborClient client, int contentSize) { this.commandType = commandType; this.client = client; this.commandBuf = PooledByteBufAllocator.DEFAULT.buffer(contentSize); + this.status = NO_COMMAND; } + public void recordCommand(byte commandCode, String uniId, ByteBuf commandSlice) { this.code = commandCode; this.commandBuf.clear(); @@ -105,9 +114,6 @@ public class CommandRecord { this.status = NO_COMMAND; this.receiveFrameCount.set(0); } - public int getState() { - return this.status; - } public boolean commandMatch(ByteBuf commandSlice) { return commandSlice.equals(this.commandBuf); diff --git a/src/main/java/com/platform/service/clientmanage/HaborClient.java b/src/main/java/com/platform/service/clientmanage/HaborClient.java index 456abef..eb0d43c 100644 --- a/src/main/java/com/platform/service/clientmanage/HaborClient.java +++ b/src/main/java/com/platform/service/clientmanage/HaborClient.java @@ -112,6 +112,7 @@ public abstract class HaborClient extends BaseClient { boolean online = CacHpApi.uavMasterControlNotify(uavId); if (online) { resourceAllocate(); + startSendCommonData(); } return online; } @@ -119,6 +120,7 @@ public abstract class HaborClient extends BaseClient { @Override public boolean offline() { resourceRelease(); + stopSendCommonData(); return CacHpApi.uavPowerOff(uavId); } @@ -136,7 +138,6 @@ public abstract class HaborClient extends BaseClient { * 被移除管理时, 释放相关资源 */ protected void resourceRelease() { - // FIXME 2025/2/8: 安全释放资源 if (!resourceAllocated){ return; } @@ -152,6 +153,7 @@ public abstract class HaborClient extends BaseClient { previousParamBind.end(); previousParamQuery.end(); previousRouteBind.end(); + resourceAllocated = false; } /** @@ -199,7 +201,7 @@ public abstract class HaborClient extends BaseClient { public abstract void writeRouteBindCommand(byte commandCode, List<byte[]> routeInfoList, String controlUniId); - // 启动定时任务常发数据帧 + // 常发数据帧 protected void startSendCommonData() { // scheduledFuture = channel.eventLoop().scheduleAtFixedRate( // this::writeCommonData, 0, targetPeriod, TimeUnit.MILLISECONDS); @@ -225,7 +227,6 @@ public abstract class HaborClient extends BaseClient { long elapsed = currentTime - lastSendTime; if (elapsed >= targetPeriod) { - // 批量发送逻辑,如果延迟太大,可以考虑补发 int missedFrames = (int) (elapsed / targetPeriod); if (missedFrames > 2) { // 限制补发帧数 @@ -330,8 +331,8 @@ public abstract class HaborClient extends BaseClient { * * @param srcBuf 原始遥测ByteBuf */ - public void sendToCac(ByteBuf srcBuf) { - telemetryBufHeadToCac.writerIndex(cacHeadLength - 6); + private void sendToCac(ByteBuf srcBuf) { + telemetryBufHeadToCac.writerIndex(cacHeadLength - 6); // 从包序号和包长度开始写入 telemetryBufHeadToCac.writeShort(cacFrameIdx.getAndIncrement()); telemetryBufHeadToCac.writeInt(srcBuf.readableBytes()); ByteBuf telemetryBufToCac = Unpooled.wrappedBuffer(telemetryBufHeadToCac.retain(), srcBuf); // 遥测数据组合 diff --git a/src/main/java/com/platform/service/clientmanage/HaborClient981A.java b/src/main/java/com/platform/service/clientmanage/HaborClient981A.java index a0357ab..8bd7770 100644 --- a/src/main/java/com/platform/service/clientmanage/HaborClient981A.java +++ b/src/main/java/com/platform/service/clientmanage/HaborClient981A.java @@ -163,22 +163,6 @@ public class HaborClient981A extends HaborClient { } - @Override - public boolean online() { - boolean online = super.online(); - if (online) { - startSendCommonData(); - } - return online; - } - @Override - public boolean offline() { - boolean offline = super.offline(); - if (offline) { - stopSendCommonData(); - } - return offline; - } /**发送常发帧*/ @Override @@ -324,7 +308,7 @@ public class HaborClient981A extends HaborClient { /** 检查控制指令回报*/ private boolean checkControlResponse(ByteBuf msg) { - if (CommandRecord.WAITING_RESULT == previousControl.getState()) { + if (CommandRecord.WAITING_RESULT == previousControl.getStatus()) { log.debug("当前指令{}, uniId:{}, A1帧指令回报: {}", ByteUtils.byteToHex(previousControl.code), previousControl.commandUniId, ByteUtils.byteToHex(msg.getByte(29))); previousControl.addReceiveFrameCount(); if (msg.getByte(29) == previousControl.code) { @@ -348,18 +332,19 @@ public class HaborClient981A extends HaborClient { } return false; } + /** 检查参数装订回报*/ private boolean checkParamBindResponse(ByteBuf msg) { - if (CommandRecord.WAITING_RESULT == previousParamBind.getState()) { + if (CommandRecord.WAITING_RESULT == previousParamBind.getStatus()) { previousParamBind.addReceiveFrameCount(); if (previousParamBind.commandMatch(msg.slice(13,16))) { try { boolean notifyResult = previousParamBind.notifyParamBindCommandResult(uavId, uavType,true); - log.info("[cac] {} 指令装订成功结果 发送{}", shortInfo(), notifyResult?"成功":"失败"); + log.info("[cac] {} 参数装订成功结果 发送{}", shortInfo(), notifyResult?"成功":"失败"); previousParamBind.clearCommand(); return true; } catch (Exception e) { - log.warn("[cac] {} 指令装订结果回报处理异常:{}",shortInfo(), e.getMessage()); + log.warn("[cac] {} 参数装订结果回报处理异常:{}",shortInfo(), e.getMessage()); } finally { switchToCommonData(); // 恢复常发数据帧 } @@ -375,7 +360,7 @@ public class HaborClient981A extends HaborClient { } /**检查参数查询回报*/ private boolean checkParamQueryResponse(ByteBuf msg) { - if (CommandRecord.WAITING_RESULT == previousParamQuery.getState()) { + if (CommandRecord.WAITING_RESULT == previousParamQuery.getStatus()) { previousParamQuery.addReceiveFrameCount(); if (msg.getByte(13) == previousParamQuery.code) { // fixme 匹配的不是同样的内容 try { @@ -388,14 +373,14 @@ public class HaborClient981A extends HaborClient { uavQueryResultParam = new UavWeightQueryResultParam981A(msg.slice(13, 16)); break; default: - log.warn("[cac] {} 指令查询结果回报,未知的指令类型:{}", shortInfo(), ByteUtils.byteToHex(msg.getByte(13))); + log.warn("[cac] {} 参数查询结果回报,未知的指令类型:{}", shortInfo(), ByteUtils.byteToHex(msg.getByte(13))); } boolean notifyResult = previousParamQuery.notifyParamQueryCommandResult(uavId, uavType, true, uavQueryResultParam); - log.info("[cac] {} 指令查询成功结果 发送{}", shortInfo(), notifyResult?"成功":"失败"); + log.info("[cac] {} 参数查询成功结果 发送{}", shortInfo(), notifyResult?"成功":"失败"); previousParamQuery.clearCommand(); return true; } catch (Exception e) { - log.warn("[cac] {} 指令查询结果回报处理异常:{}",shortInfo(), e.getMessage()); + log.warn("[cac] {} 参数查询结果回报处理异常:{}",shortInfo(), e.getMessage()); } finally { switchToCommonData(); // 恢复常发数据帧 } @@ -412,7 +397,7 @@ public class HaborClient981A extends HaborClient { /**检查航线装订回报*/ private boolean checkRouteBindResponse(ByteBuf msg) { - if (CommandQueueRecord.WAITING_NEXT == previousRouteBind.getState()) { + if (CommandQueueRecord.WAITING_NEXT == previousRouteBind.getStatus()) { previousRouteBind.addReceiveFrameCount(); if (previousRouteBind.commandMatch(msg.slice(13, 16))) { if (previousRouteBind.completeOrWaitNext()) { // 如果指令列表均已回报,则判定成功,向中心指控回报, 并恢复常发帧 diff --git a/src/main/java/com/platform/service/CacDataRouterHandler.java b/src/main/java/com/platform/service/handler/CacDataRouterHandler.java similarity index 97% rename from src/main/java/com/platform/service/CacDataRouterHandler.java rename to src/main/java/com/platform/service/handler/CacDataRouterHandler.java index 7deee59..eddce2b 100644 --- a/src/main/java/com/platform/service/CacDataRouterHandler.java +++ b/src/main/java/com/platform/service/handler/CacDataRouterHandler.java @@ -1,4 +1,4 @@ -package com.platform.service; +package com.platform.service.handler; import com.platform.info.enums.ClientTypeEnum; import com.platform.service.clientmanage.BaseClient; diff --git a/src/main/java/com/platform/service/InMessageHandler.java b/src/main/java/com/platform/service/handler/InMessageHandler.java similarity index 99% rename from src/main/java/com/platform/service/InMessageHandler.java rename to src/main/java/com/platform/service/handler/InMessageHandler.java index 53cb604..01458b9 100644 --- a/src/main/java/com/platform/service/InMessageHandler.java +++ b/src/main/java/com/platform/service/handler/InMessageHandler.java @@ -1,4 +1,4 @@ -package com.platform.service; +package com.platform.service.handler; import com.platform.cac.RemoteService; diff --git a/src/main/java/com/platform/service/InMessageHandler1.java b/src/main/java/com/platform/service/handler/InMessageHandler1.java similarity index 96% rename from src/main/java/com/platform/service/InMessageHandler1.java rename to src/main/java/com/platform/service/handler/InMessageHandler1.java index 4dbb845..8ba61eb 100644 --- a/src/main/java/com/platform/service/InMessageHandler1.java +++ b/src/main/java/com/platform/service/handler/InMessageHandler1.java @@ -1,4 +1,4 @@ -package com.platform.service; +package com.platform.service.handler; import com.platform.cac.RemoteService; @@ -95,6 +95,33 @@ public class InMessageHandler1 extends ChannelInboundHandlerAdapter { super.userEventTriggered(ctx, obj); } + // @Override + // public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + // log.info("【设备上线】====> " + getAddress(ctx) + "上线"); + // sessionCache.tcpSession.put(getAddress(ctx), ctx.channel()); + // super.handlerAdded(ctx); + // } + + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + String addr = getAddress(ctx); + String sn = sessionCache.addToSnMap.get(addr); + log.info("【设备离线】====> " + addr + "(" + sn + ")离线"); + ControlDevice.clearCurrenCtrDeviceByMac(sn); + sessionCache.tcpSession.remove(addr); + sessionCache.addToSnMap.remove(addr); + if (!StringUtils.isEmpty(sn)) { + // 只有当要被删除的地址跟snToAddMap里面的地址匹配的时候才删除, + // 因为存在一种情况,同一个sn上线了,snToAddMap数据已经被正确的地址覆盖,此时不应该删除 + if (addr.equals(sessionCache.snToAddMap.get(sn))) { + sessionCache.snToAddMap.remove(sn); + } + } + haborSignOutOfCac(ctx.channel()); + super.channelInactive(ctx); + } + @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); @@ -122,6 +149,7 @@ public class InMessageHandler1 extends ChannelInboundHandlerAdapter { } catch (Exception e) { log.error("哈勃上线失败, haborSn={}", haborSn, e); } + super.channelActive(ctx); } @Override @@ -234,7 +262,6 @@ public class InMessageHandler1 extends ChannelInboundHandlerAdapter { } ); } - } if (!sessionCache.mappingListMap.isEmpty() && sessionCache.mappingListMap.get(sn) != null) { @@ -336,36 +363,9 @@ public class InMessageHandler1 extends ChannelInboundHandlerAdapter { ctx.channel().writeAndFlush(buf); } - - @Override - public void handlerAdded(ChannelHandlerContext ctx) throws Exception { - log.warn("【设备上线】====> " + getAddress(ctx) + "上线"); - sessionCache.tcpSession.put(getAddress(ctx), ctx.channel()); - super.handlerAdded(ctx); - } - private String getAddress(ChannelHandlerContext ctx) { String address = ctx.channel().remoteAddress().toString().toUpperCase(); return address.replace("/", ""); } - @Override - public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { - String addr = getAddress(ctx); - String sn = sessionCache.addToSnMap.get(addr); - log.warn("【设备离线】====> " + addr + "(" + sn + ")离线"); - ControlDevice.clearCurrenCtrDeviceByMac(sn); - sessionCache.tcpSession.remove(addr); - sessionCache.addToSnMap.remove(addr); - if (!StringUtils.isEmpty(sn)) { - // 只有当要被删除的地址跟snToAddMap里面的地址匹配的时候才删除, - // 因为存在一种情况,同一个sn上线了,snToAddMap数据已经被正确的地址覆盖,此时不应该删除 - if (addr.equals(sessionCache.snToAddMap.get(sn))) { - sessionCache.snToAddMap.remove(sn); - } - } - haborSignOutOfCac(ctx.channel()); - super.handlerRemoved(ctx); - } - } diff --git a/src/main/java/com/platform/service/TelemetryDecoder.java b/src/main/java/com/platform/service/handler/TelemetryDecoder.java similarity index 97% rename from src/main/java/com/platform/service/TelemetryDecoder.java rename to src/main/java/com/platform/service/handler/TelemetryDecoder.java index 96d9648..13f6ce5 100644 --- a/src/main/java/com/platform/service/TelemetryDecoder.java +++ b/src/main/java/com/platform/service/handler/TelemetryDecoder.java @@ -1,4 +1,4 @@ -package com.platform.service; +package com.platform.service.handler; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled;