|
|
@ -5,7 +5,6 @@ import com.aliyun.tablestore.grid.model.GridDataSetMeta;
|
|
|
|
import com.aliyun.tablestore.grid.model.StoreOptions;
|
|
|
|
import com.aliyun.tablestore.grid.model.StoreOptions;
|
|
|
|
import com.aliyun.tablestore.grid.model.grid.Grid2D;
|
|
|
|
import com.aliyun.tablestore.grid.model.grid.Grid2D;
|
|
|
|
|
|
|
|
|
|
|
|
import com.htfp.weather.download.FileInfo;
|
|
|
|
|
|
|
|
import com.htfp.weather.download.gfs.GfsVariableHeightEnum;
|
|
|
|
import com.htfp.weather.download.gfs.GfsVariableHeightEnum;
|
|
|
|
import com.htfp.weather.download.gfs.GfsVariableIsobaricEnum;
|
|
|
|
import com.htfp.weather.download.gfs.GfsVariableIsobaricEnum;
|
|
|
|
import com.htfp.weather.griddata.common.TableConfig;
|
|
|
|
import com.htfp.weather.griddata.common.TableConfig;
|
|
|
@ -33,7 +32,6 @@ import java.time.format.DateTimeFormatter;
|
|
|
|
import java.util.*;
|
|
|
|
import java.util.*;
|
|
|
|
import java.util.concurrent.ExecutorService;
|
|
|
|
import java.util.concurrent.ExecutorService;
|
|
|
|
import java.util.concurrent.Executors;
|
|
|
|
import java.util.concurrent.Executors;
|
|
|
|
import java.util.concurrent.Future;
|
|
|
|
|
|
|
|
import java.util.stream.Collectors;
|
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
/**
|
|
|
@ -52,7 +50,7 @@ public class GfsDataImport extends BaseTableOperation {
|
|
|
|
private final ExecutorService executorService = Executors.newFixedThreadPool(5);
|
|
|
|
private final ExecutorService executorService = Executors.newFixedThreadPool(5);
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
/**
|
|
|
|
* init meta data to table store.
|
|
|
|
* 表格元数据初始化
|
|
|
|
*
|
|
|
|
*
|
|
|
|
* @param dataSetID
|
|
|
|
* @param dataSetID
|
|
|
|
* @param dataType
|
|
|
|
* @param dataType
|
|
|
@ -81,7 +79,7 @@ public class GfsDataImport extends BaseTableOperation {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
/**
|
|
|
|
* update meta and set status to DONE when data import finished.
|
|
|
|
* 导入完成后,更新meta状态
|
|
|
|
*
|
|
|
|
*
|
|
|
|
* @param meta
|
|
|
|
* @param meta
|
|
|
|
* @return
|
|
|
|
* @return
|
|
|
@ -92,31 +90,34 @@ public class GfsDataImport extends BaseTableOperation {
|
|
|
|
return meta;
|
|
|
|
return meta;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
|
|
* 导入完成后,设置meta状态
|
|
|
|
|
|
|
|
*/
|
|
|
|
public GridDataSetMeta putMeta(GridDataSetMeta meta) throws Exception {
|
|
|
|
public GridDataSetMeta putMeta(GridDataSetMeta meta) throws Exception {
|
|
|
|
tableStoreGrid.putDataSetMeta(meta);
|
|
|
|
tableStoreGrid.putDataSetMeta(meta);
|
|
|
|
return meta;
|
|
|
|
return meta;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** 导入指定起报时间的所有文件*/
|
|
|
|
/** 导入指定起报时间的所有文件*/
|
|
|
|
public List<ImportResult> importData(OffsetDateTime refTime) throws Exception {
|
|
|
|
public synchronized List<ImportFileInfo> importData(OffsetDateTime refTime) throws Exception {
|
|
|
|
importing = true;
|
|
|
|
importing = true;
|
|
|
|
long start = System.currentTimeMillis();
|
|
|
|
try {
|
|
|
|
List<String> fileList = getFiles(refTime);
|
|
|
|
long start = System.currentTimeMillis();
|
|
|
|
if (CollectionUtils.isEmpty(fileList)) {
|
|
|
|
List<String> fileList = getFiles(refTime);
|
|
|
|
throw new AppException(ErrorCode.NO_NC_OR_GRIB_FILES);
|
|
|
|
if (CollectionUtils.isEmpty(fileList)) {
|
|
|
|
}
|
|
|
|
throw new AppException(ErrorCode.NO_NC_OR_GRIB_FILES);
|
|
|
|
log.info("[tablestore] 数据导入开始, refTime = {}...", refTime);
|
|
|
|
}
|
|
|
|
// datasetId和起报时间绑定
|
|
|
|
log.info("[tablestore] 数据导入开始, refTime = {}...", refTime);
|
|
|
|
String dataSetId = refTime.format(DateTimeFormatter.ofPattern(Constant.DATA_SET_ID_FORMAT_STRING));
|
|
|
|
// datasetId和起报时间绑定
|
|
|
|
List<String> fileVariables = getFileVariables(tableConfig.variableList);
|
|
|
|
String dataSetId = refTime.format(DateTimeFormatter.ofPattern(Constant.DATA_SET_ID_FORMAT_STRING));
|
|
|
|
int[] shape = new int[]{tableConfig.timeSizeMax, tableConfig.levSize, tableConfig.latSize, tableConfig.lonSize};
|
|
|
|
List<String> fileVariables = getFileVariables(tableConfig.variableList);
|
|
|
|
GridDataSetMeta meta = initMeta(dataSetId, tableConfig.dataType, fileVariables, shape);
|
|
|
|
int[] shape = new int[]{tableConfig.timeSizeMax, tableConfig.levSize, tableConfig.latSize, tableConfig.lonSize};
|
|
|
|
List<String> forecastHours = new ArrayList<>(); // 所有有效预报时效坐标记录到数据库属性中
|
|
|
|
GridDataSetMeta meta = initMeta(dataSetId, tableConfig.dataType, fileVariables, shape);
|
|
|
|
|
|
|
|
List<String> forecastHours = new ArrayList<>(); // 所有有效预报时效坐标记录到数据库属性中
|
|
|
|
|
|
|
|
|
|
|
|
// todo 2024/5/13: 待优化,用于数据库的索引,必须连续,因此必须保证前一个时刻成功导入后才能导入下一个时刻,数据量大的时候导入时间较长
|
|
|
|
// todo 2024/5/13: 待优化,用于数据库的索引,必须连续,因此必须保证前一个时刻成功导入后才能导入下一个时刻,数据量大的时候导入时间较长
|
|
|
|
List<ImportResult> finishedList;
|
|
|
|
List<ImportFileInfo> finishedList;
|
|
|
|
// List<Future<ImportResult>> futures = new ArrayList<>();
|
|
|
|
// List<Future<ImportResult>> futures = new ArrayList<>();
|
|
|
|
finishedList = new ArrayList<>();
|
|
|
|
finishedList = new ArrayList<>();
|
|
|
|
for (int i = 0; i < fileList.size(); i++) {
|
|
|
|
for (int i = 0; i < fileList.size(); i++) {
|
|
|
|
String file = fileList.get(i);
|
|
|
|
String file = fileList.get(i);
|
|
|
@ -124,37 +125,39 @@ public class GfsDataImport extends BaseTableOperation {
|
|
|
|
int forecastHour = getForecastHourFromFilename(file);
|
|
|
|
int forecastHour = getForecastHourFromFilename(file);
|
|
|
|
// 2024/5/13:使用多线程,并使用Future来获取结果,但是实际瓶颈可能是网络带宽而非计算
|
|
|
|
// 2024/5/13:使用多线程,并使用Future来获取结果,但是实际瓶颈可能是网络带宽而非计算
|
|
|
|
// futures.add(executorService.submit(() -> importFromNcFile(meta, file, iTime,forecastHour)));
|
|
|
|
// futures.add(executorService.submit(() -> importFromNcFile(meta, file, iTime,forecastHour)));
|
|
|
|
ImportResult importResult = importFromNcFile(meta, file, iTime, forecastHour);
|
|
|
|
ImportFileInfo importFileInfo = importFromNcFile(meta, file, iTime, forecastHour);
|
|
|
|
finishedList.add(importResult);
|
|
|
|
finishedList.add(importFileInfo);
|
|
|
|
if (importResult.isSuccess()) {
|
|
|
|
if (importFileInfo.isSuccess()) {
|
|
|
|
forecastHours.add(String.valueOf(importResult.getForecastHour()));
|
|
|
|
forecastHours.add(String.valueOf(importFileInfo.getForecastHour()));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// for (Future<ImportResult> future : futures) {
|
|
|
|
// for (Future<ImportResult> future : futures) {
|
|
|
|
// ImportResult importResult = future.get();
|
|
|
|
// ImportResult importResult = future.get();
|
|
|
|
// finishedList.add(importResult);
|
|
|
|
// finishedList.add(importResult);
|
|
|
|
// forecastHours.add(String.valueOf(importResult.getForecastHour()));
|
|
|
|
// forecastHours.add(String.valueOf(importResult.getForecastHour()));
|
|
|
|
// if (importResult.isSuccess()) {
|
|
|
|
// if (importResult.isSuccess()) {
|
|
|
|
// forecastHours.add(String.valueOf(importResult.getForecastHour()));
|
|
|
|
// forecastHours.add(String.valueOf(importResult.getForecastHour()));
|
|
|
|
// }
|
|
|
|
// }
|
|
|
|
// }
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
|
|
List<ImportResult> failedList = finishedList.stream().filter(result -> !result.isSuccess()).collect(Collectors.toList());
|
|
|
|
List<ImportFileInfo> failedList = finishedList.stream().filter(result -> !result.isSuccess()).collect(Collectors.toList());
|
|
|
|
if (!failedList.isEmpty()) {
|
|
|
|
if (!failedList.isEmpty()) {
|
|
|
|
log.warn("存在气象数据导入数据库失败,失败文件列表: {} ", failedList);
|
|
|
|
log.warn("存在气象数据导入数据库失败,失败文件列表: {} ", failedList);
|
|
|
|
meta.addAttribute("status", "WRONG");
|
|
|
|
meta.addAttribute("status", "WRONG");
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
meta.addAttribute("status", "DONE");
|
|
|
|
meta.addAttribute("status", "DONE");
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
long end = System.currentTimeMillis();
|
|
|
|
|
|
|
|
log.info("[tablestore] 数据导入完成, 耗时: {} s, forecastHours: {}", (end - start)/1000., forecastHours);
|
|
|
|
|
|
|
|
meta.setForecastHours(forecastHours);
|
|
|
|
|
|
|
|
meta.addAttribute("reference_time", refTime.toInstant().toEpochMilli());
|
|
|
|
|
|
|
|
putMeta(meta);
|
|
|
|
|
|
|
|
gfsDataFetcher.lastGridDataSetMeta = meta;
|
|
|
|
|
|
|
|
log.info("[tablestore]: 更新最新的数据元信息: {}", meta);
|
|
|
|
|
|
|
|
return finishedList;
|
|
|
|
|
|
|
|
} finally {
|
|
|
|
|
|
|
|
importing = false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
long end = System.currentTimeMillis();
|
|
|
|
|
|
|
|
log.info("[tablestore] 数据导入完成, 耗时: {} s, forecastHours: {}", (end - start)/1000., forecastHours);
|
|
|
|
|
|
|
|
meta.setForecastHours(forecastHours);
|
|
|
|
|
|
|
|
meta.addAttribute("reference_time", refTime.toInstant().toEpochMilli());
|
|
|
|
|
|
|
|
putMeta(meta);
|
|
|
|
|
|
|
|
gfsDataFetcher.lastGridDataSetMeta = meta;
|
|
|
|
|
|
|
|
log.info("[tablestore]: 更新最新的数据元信息: {}", meta);
|
|
|
|
|
|
|
|
importing = false;
|
|
|
|
|
|
|
|
return finishedList;
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
/**
|
|
|
@ -164,7 +167,7 @@ public class GfsDataImport extends BaseTableOperation {
|
|
|
|
* @param iTime 文件序号
|
|
|
|
* @param iTime 文件序号
|
|
|
|
* @param forecastHour 预报时效(相对于起报时间的小时数)
|
|
|
|
* @param forecastHour 预报时效(相对于起报时间的小时数)
|
|
|
|
*/
|
|
|
|
*/
|
|
|
|
public ImportResult importFromNcFile(GridDataSetMeta meta, String file, int iTime, int forecastHour) {
|
|
|
|
public ImportFileInfo importFromNcFile(GridDataSetMeta meta, String file, int iTime, int forecastHour) {
|
|
|
|
try (NetcdfFile ncFile = NetcdfFiles.open(file)) {
|
|
|
|
try (NetcdfFile ncFile = NetcdfFiles.open(file)) {
|
|
|
|
GridDataWriter writer = tableStoreGrid.getDataWriter(meta);
|
|
|
|
GridDataWriter writer = tableStoreGrid.getDataWriter(meta);
|
|
|
|
|
|
|
|
|
|
|
@ -176,6 +179,7 @@ public class GfsDataImport extends BaseTableOperation {
|
|
|
|
int ysize = meta.getySize();
|
|
|
|
int ysize = meta.getySize();
|
|
|
|
|
|
|
|
|
|
|
|
for (String variableName : meta.getVariables()) {
|
|
|
|
for (String variableName : meta.getVariables()) {
|
|
|
|
|
|
|
|
// 风速在最后单独处理
|
|
|
|
if ("Wind_speed_gust_surface".equals(variableName) || "Wind_speed_isobaric".equals(variableName) || "Wind_direction_isobaric".equals(variableName)
|
|
|
|
if ("Wind_speed_gust_surface".equals(variableName) || "Wind_speed_isobaric".equals(variableName) || "Wind_direction_isobaric".equals(variableName)
|
|
|
|
|| "Wind_speed_height_above_ground".equals(variableName) || "Wind_direction_height_above_ground".equals(variableName)) {
|
|
|
|
|| "Wind_speed_height_above_ground".equals(variableName) || "Wind_direction_height_above_ground".equals(variableName)) {
|
|
|
|
continue;
|
|
|
|
continue;
|
|
|
@ -183,7 +187,7 @@ public class GfsDataImport extends BaseTableOperation {
|
|
|
|
Variable variable = ncFile.findVariable(variableName);
|
|
|
|
Variable variable = ncFile.findVariable(variableName);
|
|
|
|
if (variable != null) {
|
|
|
|
if (variable != null) {
|
|
|
|
int shapeLength = variable.getShape().length;
|
|
|
|
int shapeLength = variable.getShape().length;
|
|
|
|
if (shapeLength== 4) {
|
|
|
|
if (shapeLength== 4) { // 存在垂直坐标,即变量随高度变化
|
|
|
|
// 气压坐标和高度坐标保持各自的索引
|
|
|
|
// 气压坐标和高度坐标保持各自的索引
|
|
|
|
int zSize = variable.getShape(1);
|
|
|
|
int zSize = variable.getShape(1);
|
|
|
|
for (int z = 0; z < zSize; z++) {
|
|
|
|
for (int z = 0; z < zSize; z++) {
|
|
|
@ -195,7 +199,7 @@ public class GfsDataImport extends BaseTableOperation {
|
|
|
|
writer.writeGrid2D(variableName, iTime, z, grid2D);
|
|
|
|
writer.writeGrid2D(variableName, iTime, z, grid2D);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else if (shapeLength == 3) {
|
|
|
|
} else if (shapeLength == 3) {
|
|
|
|
// DONE 2024/5/24: 导入不含高度坐标的数据
|
|
|
|
// DONE 2024/5/24: 导入不含高度坐标的数据,如降水量,阵风风速
|
|
|
|
Array array = variable.read(new int[]{0, 0, 0}, new int[]{1,xsize, ysize});
|
|
|
|
Array array = variable.read(new int[]{0, 0, 0}, new int[]{1,xsize, ysize});
|
|
|
|
transferUnit(variableName, array);
|
|
|
|
transferUnit(variableName, array);
|
|
|
|
Grid2D grid2D = new Grid2D(array.getDataAsByteBuffer(), variable.getDataType(),
|
|
|
|
Grid2D grid2D = new Grid2D(array.getDataAsByteBuffer(), variable.getDataType(),
|
|
|
@ -210,12 +214,13 @@ public class GfsDataImport extends BaseTableOperation {
|
|
|
|
importWind(writer, meta, ncFile, iTime);
|
|
|
|
importWind(writer, meta, ncFile, iTime);
|
|
|
|
} catch (Exception e) {
|
|
|
|
} catch (Exception e) {
|
|
|
|
log.error("[tablestore] 导入文件数据失败,_t={}: {}", forecastHour, file, e);
|
|
|
|
log.error("[tablestore] 导入文件数据失败,_t={}: {}", forecastHour, file, e);
|
|
|
|
return new ImportResult(false, file, forecastHour, iTime);
|
|
|
|
return new ImportFileInfo(false, file, forecastHour, iTime);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
log.info("[tablestore] 导入文件数据成功,_t={}: {}", forecastHour, file);
|
|
|
|
log.info("[tablestore] 导入文件数据成功,_t={}: {}", forecastHour, file);
|
|
|
|
return new ImportResult(true, file, forecastHour, iTime);
|
|
|
|
return new ImportFileInfo(true, file, forecastHour, iTime);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**单位转换*/
|
|
|
|
private void transferUnit(String variableName, Array array) {
|
|
|
|
private void transferUnit(String variableName, Array array) {
|
|
|
|
if (GfsVariableIsobaricEnum.TEMP.getNameInFile().equals(variableName)) {
|
|
|
|
if (GfsVariableIsobaricEnum.TEMP.getNameInFile().equals(variableName)) {
|
|
|
|
MeteoUtils.kelvin2Celsius(array);
|
|
|
|
MeteoUtils.kelvin2Celsius(array);
|
|
|
@ -322,13 +327,13 @@ public class GfsDataImport extends BaseTableOperation {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@Data
|
|
|
|
@Data
|
|
|
|
public static class ImportResult {
|
|
|
|
public static class ImportFileInfo {
|
|
|
|
private boolean success;
|
|
|
|
private boolean success;
|
|
|
|
private String file;
|
|
|
|
private String file;
|
|
|
|
private int forecastHour;
|
|
|
|
private int forecastHour;
|
|
|
|
private int iTime;
|
|
|
|
private int iTime;
|
|
|
|
|
|
|
|
|
|
|
|
public ImportResult(boolean success, String file, int forecastHour, int iTime) {
|
|
|
|
public ImportFileInfo(boolean success, String file, int forecastHour, int iTime) {
|
|
|
|
this.success = success;
|
|
|
|
this.success = success;
|
|
|
|
this.file = file;
|
|
|
|
this.file = file;
|
|
|
|
this.forecastHour = forecastHour;
|
|
|
|
this.forecastHour = forecastHour;
|
|
|
|