[v0.0.5] 联调优化

master
shiyi 1 month ago
parent 196bcd23c4
commit dc5f756573

@ -10,7 +10,7 @@
</parent>
<groupId>com.platform</groupId>
<artifactId>pass-through</artifactId>
<version>1.0.3</version>
<version>0.0.4</version>
<name>pass-through</name>
<description>中心指控直控无人机哈勃服务</description>

@ -130,7 +130,6 @@ public class GcsService {
*/
public boolean gcsAuthRequestToCtrl() {
TcpRemoteAuthCacRequest tcpGcsAuthCacRequest = new TcpRemoteAuthCacRequest();
ChannelFuture sendFuture = null;
try {
if (StringUtils.isEmpty(GlobalData.AUTHORIZATION)) {
log.info("[tcpAuth] authorization 为空,重新发起上线请求");
@ -143,10 +142,12 @@ public class GcsService {
tcpGcsAuthCacRequest.setReadableDataBytes(null);
if (CacClient.channelIsActive()) {
sendFuture = CacClient.write(tcpGcsAuthCacRequest).sync();
if (sendFuture.isSuccess()){
log.info("[tcpAuth] 认证请求已成功发送发送,认证码:{}", GlobalData.AUTHORIZATION);
ChannelFuture sendFuture = CacClient.write(tcpGcsAuthCacRequest);
if (sendFuture != null && sendFuture.isSuccess()){
log.info("[tcpAuth] 认证请求已成功发送,认证码:{}", GlobalData.AUTHORIZATION);
return true;
} else {
log.error("[tcpAuth] 认证请求发送失败: {}", sendFuture!=null?sendFuture.cause():"");
}
} else {
log.error("[tcpAuth] 和中心指控之间tcp连接异常tcp认证发送失败");

@ -50,6 +50,7 @@ public class CacConnectionHandler extends ChannelInboundHandlerAdapter {
log.info("地面站发送上线请求");
if (gcsService.gcsSignInRequest()) {
gcsService.gcsAuthRequestToCtrl();
sendHeartBeat();
}
},
1, TimeUnit.SECONDS);
@ -81,12 +82,16 @@ public class CacConnectionHandler extends ChannelInboundHandlerAdapter {
if (IdleState.WRITER_IDLE.equals(state)) {
//心跳
TcpHeartBeatRequest tcpHeartBeatRequest = buildHeatBeatRequest();
ctx.channel().writeAndFlush(tcpHeartBeatRequest);
sendHeartBeat();
sendHeartBeat(); // 加一条心跳作为分隔符
}
}
}
private void sendHeartBeat() {
CacClient.write(buildHeatBeatRequest());
}
private TcpHeartBeatRequest buildHeatBeatRequest(){
TcpHeartBeatRequest tcpHeartBeatRequest = new TcpHeartBeatRequest();
tcpHeartBeatRequest.setMagicCode(GlobalData.REMOTE_HEAD);

@ -3,13 +3,13 @@ package com.platform.cac.tcp.message.dataframe.receive;
import com.platform.cac.tcp.message.dataframe.RemoteTcpBaseDataFrame;
import com.platform.util.ByteUtils;
import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.nio.ByteBuffer;
@Data
@Getter
@Slf4j
public class TcpRemoteAuthorization extends RemoteTcpBaseDataFrame {

@ -1,6 +1,9 @@
package com.platform.service;
import com.platform.config.ServiceConfig;
import com.platform.service.handler.CacDataRouterHandler;
import com.platform.service.handler.InMessageHandler1;
import com.platform.service.handler.TelemetryDecoder;
import com.platform.util.SessionCache;
import com.platform.util.StringUtil;
import io.netty.bootstrap.ServerBootstrap;
@ -34,12 +37,11 @@ public class ServerService {
InMessageHandler1 inMessageHandler;
@Autowired
CacDataRouterHandler cacDataRouterHandler;
private void startServer() {
//服务端需要2个线程组 boss处理客户端连接 work进行客服端连接之后的处理
log.warn("【读取配置端口】 端口 = " + config.getPort());
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup work = new NioEventLoopGroup(10);
EventLoopGroup work = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
//服务器 配置
@ -48,7 +50,6 @@ public class ServerService {
bootstrap.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
// channel.pipeline().addLast(new ByteArrayDecoder()); // revise by shiyi: 解码移动到InMessageHandler中
channel.pipeline().addLast(new ByteArrayEncoder());
if (!config.isDebug()) {
channel.pipeline().addLast(new IdleStateHandler(0, 0,

@ -9,6 +9,8 @@ import com.platform.util.ByteUtils;
import com.platform.util.JSONUtils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
@ -23,17 +25,22 @@ import java.util.concurrent.atomic.AtomicInteger;
/** 记录指令状态,回报指令结果*/
@Slf4j
public class CommandRecord {
@Getter
private final HaborClient client;
@Getter
private final CommandTypeEnum commandType;
protected byte code; // 指令码
protected ByteBuf commandBuf = null; // 指令注入内容
protected String commandUniId; // 中心指控指令唯一码
// 指令状态
@Getter
private AtomicInteger receiveFrameCount = new AtomicInteger(0); // 接收到的指令帧数, 150帧还未判断成功则判定为失败
// private long startTime = 0 ; // 接收到的指令帧数, 150帧还未判断成功则判定为失败
private final int MAX_RECEIVE_COUNT = 150; // 接收到的指令帧数, 150帧还未判断成功则判定为失败
private final long TIMEOUT_THRESHOLD = 30*1000; // 从下发超过5s未回报则判定为失败
@Getter@Setter
protected int status;
public static final int FAILED = -1; // 指令失败
public static final int NO_COMMAND = 0; // 空闲
@ -44,14 +51,16 @@ public class CommandRecord {
ScheduledFuture<?> timeoutFuture = null; // 超时任务
// todo 过期时间控制
public CommandRecord(CommandTypeEnum commandType, HaborClient client) {
this.commandType = commandType;
this.client = client;
this(commandType, client, 16);
}
public CommandRecord(CommandTypeEnum commandType, HaborClient client, int contentSize) {
this.commandType = commandType;
this.client = client;
this.commandBuf = PooledByteBufAllocator.DEFAULT.buffer(contentSize);
this.status = NO_COMMAND;
}
public void recordCommand(byte commandCode, String uniId, ByteBuf commandSlice) {
this.code = commandCode;
this.commandBuf.clear();
@ -105,9 +114,6 @@ public class CommandRecord {
this.status = NO_COMMAND;
this.receiveFrameCount.set(0);
}
public int getState() {
return this.status;
}
public boolean commandMatch(ByteBuf commandSlice) {
return commandSlice.equals(this.commandBuf);

@ -112,6 +112,7 @@ public abstract class HaborClient extends BaseClient {
boolean online = CacHpApi.uavMasterControlNotify(uavId);
if (online) {
resourceAllocate();
startSendCommonData();
}
return online;
}
@ -119,6 +120,7 @@ public abstract class HaborClient extends BaseClient {
@Override
public boolean offline() {
resourceRelease();
stopSendCommonData();
return CacHpApi.uavPowerOff(uavId);
}
@ -136,7 +138,6 @@ public abstract class HaborClient extends BaseClient {
* ,
*/
protected void resourceRelease() {
// FIXME 2025/2/8: 安全释放资源
if (!resourceAllocated){
return;
}
@ -152,6 +153,7 @@ public abstract class HaborClient extends BaseClient {
previousParamBind.end();
previousParamQuery.end();
previousRouteBind.end();
resourceAllocated = false;
}
/**
@ -199,7 +201,7 @@ public abstract class HaborClient extends BaseClient {
public abstract void writeRouteBindCommand(byte commandCode, List<byte[]> routeInfoList, String controlUniId);
// 启动定时任务常发数据帧
// 常发数据帧
protected void startSendCommonData() {
// scheduledFuture = channel.eventLoop().scheduleAtFixedRate(
// this::writeCommonData, 0, targetPeriod, TimeUnit.MILLISECONDS);
@ -225,7 +227,6 @@ public abstract class HaborClient extends BaseClient {
long elapsed = currentTime - lastSendTime;
if (elapsed >= targetPeriod) {
// 批量发送逻辑,如果延迟太大,可以考虑补发
int missedFrames = (int) (elapsed / targetPeriod);
if (missedFrames > 2) { // 限制补发帧数
@ -330,8 +331,8 @@ public abstract class HaborClient extends BaseClient {
*
* @param srcBuf ByteBuf
*/
public void sendToCac(ByteBuf srcBuf) {
telemetryBufHeadToCac.writerIndex(cacHeadLength - 6);
private void sendToCac(ByteBuf srcBuf) {
telemetryBufHeadToCac.writerIndex(cacHeadLength - 6); // 从包序号和包长度开始写入
telemetryBufHeadToCac.writeShort(cacFrameIdx.getAndIncrement());
telemetryBufHeadToCac.writeInt(srcBuf.readableBytes());
ByteBuf telemetryBufToCac = Unpooled.wrappedBuffer(telemetryBufHeadToCac.retain(), srcBuf); // 遥测数据组合

@ -163,22 +163,6 @@ public class HaborClient981A extends HaborClient {
}
@Override
public boolean online() {
boolean online = super.online();
if (online) {
startSendCommonData();
}
return online;
}
@Override
public boolean offline() {
boolean offline = super.offline();
if (offline) {
stopSendCommonData();
}
return offline;
}
/**发送常发帧*/
@Override
@ -324,7 +308,7 @@ public class HaborClient981A extends HaborClient {
/** 检查控制指令回报*/
private boolean checkControlResponse(ByteBuf msg) {
if (CommandRecord.WAITING_RESULT == previousControl.getState()) {
if (CommandRecord.WAITING_RESULT == previousControl.getStatus()) {
log.debug("当前指令{}, uniId:{}, A1帧指令回报: {}", ByteUtils.byteToHex(previousControl.code), previousControl.commandUniId, ByteUtils.byteToHex(msg.getByte(29)));
previousControl.addReceiveFrameCount();
if (msg.getByte(29) == previousControl.code) {
@ -348,18 +332,19 @@ public class HaborClient981A extends HaborClient {
}
return false;
}
/** 检查参数装订回报*/
private boolean checkParamBindResponse(ByteBuf msg) {
if (CommandRecord.WAITING_RESULT == previousParamBind.getState()) {
if (CommandRecord.WAITING_RESULT == previousParamBind.getStatus()) {
previousParamBind.addReceiveFrameCount();
if (previousParamBind.commandMatch(msg.slice(13,16))) {
try {
boolean notifyResult = previousParamBind.notifyParamBindCommandResult(uavId, uavType,true);
log.info("[cac] {} 指令装订成功结果 发送{}", shortInfo(), notifyResult?"成功":"失败");
log.info("[cac] {} 参数装订成功结果 发送{}", shortInfo(), notifyResult?"成功":"失败");
previousParamBind.clearCommand();
return true;
} catch (Exception e) {
log.warn("[cac] {} 指令装订结果回报处理异常:{}",shortInfo(), e.getMessage());
log.warn("[cac] {} 参数装订结果回报处理异常:{}",shortInfo(), e.getMessage());
} finally {
switchToCommonData(); // 恢复常发数据帧
}
@ -375,7 +360,7 @@ public class HaborClient981A extends HaborClient {
}
/**检查参数查询回报*/
private boolean checkParamQueryResponse(ByteBuf msg) {
if (CommandRecord.WAITING_RESULT == previousParamQuery.getState()) {
if (CommandRecord.WAITING_RESULT == previousParamQuery.getStatus()) {
previousParamQuery.addReceiveFrameCount();
if (msg.getByte(13) == previousParamQuery.code) { // fixme 匹配的不是同样的内容
try {
@ -388,14 +373,14 @@ public class HaborClient981A extends HaborClient {
uavQueryResultParam = new UavWeightQueryResultParam981A(msg.slice(13, 16));
break;
default:
log.warn("[cac] {} 指令查询结果回报,未知的指令类型:{}", shortInfo(), ByteUtils.byteToHex(msg.getByte(13)));
log.warn("[cac] {} 参数查询结果回报,未知的指令类型:{}", shortInfo(), ByteUtils.byteToHex(msg.getByte(13)));
}
boolean notifyResult = previousParamQuery.notifyParamQueryCommandResult(uavId, uavType, true, uavQueryResultParam);
log.info("[cac] {} 指令查询成功结果 发送{}", shortInfo(), notifyResult?"成功":"失败");
log.info("[cac] {} 参数查询成功结果 发送{}", shortInfo(), notifyResult?"成功":"失败");
previousParamQuery.clearCommand();
return true;
} catch (Exception e) {
log.warn("[cac] {} 指令查询结果回报处理异常:{}",shortInfo(), e.getMessage());
log.warn("[cac] {} 参数查询结果回报处理异常:{}",shortInfo(), e.getMessage());
} finally {
switchToCommonData(); // 恢复常发数据帧
}
@ -412,7 +397,7 @@ public class HaborClient981A extends HaborClient {
/**检查航线装订回报*/
private boolean checkRouteBindResponse(ByteBuf msg) {
if (CommandQueueRecord.WAITING_NEXT == previousRouteBind.getState()) {
if (CommandQueueRecord.WAITING_NEXT == previousRouteBind.getStatus()) {
previousRouteBind.addReceiveFrameCount();
if (previousRouteBind.commandMatch(msg.slice(13, 16))) {
if (previousRouteBind.completeOrWaitNext()) { // 如果指令列表均已回报,则判定成功,向中心指控回报, 并恢复常发帧

@ -1,4 +1,4 @@
package com.platform.service;
package com.platform.service.handler;
import com.platform.info.enums.ClientTypeEnum;
import com.platform.service.clientmanage.BaseClient;

@ -1,4 +1,4 @@
package com.platform.service;
package com.platform.service.handler;
import com.platform.cac.RemoteService;

@ -1,4 +1,4 @@
package com.platform.service;
package com.platform.service.handler;
import com.platform.cac.RemoteService;
@ -95,6 +95,33 @@ public class InMessageHandler1 extends ChannelInboundHandlerAdapter {
super.userEventTriggered(ctx, obj);
}
// @Override
// public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// log.info("【设备上线】====> " + getAddress(ctx) + "上线");
// sessionCache.tcpSession.put(getAddress(ctx), ctx.channel());
// super.handlerAdded(ctx);
// }
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
String addr = getAddress(ctx);
String sn = sessionCache.addToSnMap.get(addr);
log.info("【设备离线】====> " + addr + "(" + sn + ")离线");
ControlDevice.clearCurrenCtrDeviceByMac(sn);
sessionCache.tcpSession.remove(addr);
sessionCache.addToSnMap.remove(addr);
if (!StringUtils.isEmpty(sn)) {
// 只有当要被删除的地址跟snToAddMap里面的地址匹配的时候才删除
// 因为存在一种情况同一个sn上线了snToAddMap数据已经被正确的地址覆盖此时不应该删除
if (addr.equals(sessionCache.snToAddMap.get(sn))) {
sessionCache.snToAddMap.remove(sn);
}
}
haborSignOutOfCac(ctx.channel());
super.channelInactive(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
@ -122,6 +149,7 @@ public class InMessageHandler1 extends ChannelInboundHandlerAdapter {
} catch (Exception e) {
log.error("哈勃上线失败, haborSn={}", haborSn, e);
}
super.channelActive(ctx);
}
@Override
@ -234,7 +262,6 @@ public class InMessageHandler1 extends ChannelInboundHandlerAdapter {
}
);
}
}
if (!sessionCache.mappingListMap.isEmpty() && sessionCache.mappingListMap.get(sn) != null) {
@ -336,36 +363,9 @@ public class InMessageHandler1 extends ChannelInboundHandlerAdapter {
ctx.channel().writeAndFlush(buf);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
log.warn("【设备上线】====> " + getAddress(ctx) + "上线");
sessionCache.tcpSession.put(getAddress(ctx), ctx.channel());
super.handlerAdded(ctx);
}
private String getAddress(ChannelHandlerContext ctx) {
String address = ctx.channel().remoteAddress().toString().toUpperCase();
return address.replace("/", "");
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
String addr = getAddress(ctx);
String sn = sessionCache.addToSnMap.get(addr);
log.warn("【设备离线】====> " + addr + "(" + sn + ")离线");
ControlDevice.clearCurrenCtrDeviceByMac(sn);
sessionCache.tcpSession.remove(addr);
sessionCache.addToSnMap.remove(addr);
if (!StringUtils.isEmpty(sn)) {
// 只有当要被删除的地址跟snToAddMap里面的地址匹配的时候才删除
// 因为存在一种情况同一个sn上线了snToAddMap数据已经被正确的地址覆盖此时不应该删除
if (addr.equals(sessionCache.snToAddMap.get(sn))) {
sessionCache.snToAddMap.remove(sn);
}
}
haborSignOutOfCac(ctx.channel());
super.handlerRemoved(ctx);
}
}

@ -1,4 +1,4 @@
package com.platform.service;
package com.platform.service.handler;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
Loading…
Cancel
Save