|
|
@ -5,8 +5,9 @@ import com.aliyun.tablestore.grid.model.GridDataSetMeta;
|
|
|
|
import com.aliyun.tablestore.grid.model.StoreOptions;
|
|
|
|
import com.aliyun.tablestore.grid.model.StoreOptions;
|
|
|
|
import com.aliyun.tablestore.grid.model.grid.Grid2D;
|
|
|
|
import com.aliyun.tablestore.grid.model.grid.Grid2D;
|
|
|
|
|
|
|
|
|
|
|
|
import com.htfp.weather.griddata.common.GfsVariableHeightEnum;
|
|
|
|
import com.htfp.weather.download.FileInfo;
|
|
|
|
import com.htfp.weather.griddata.common.GfsVariableIsobaricEnum;
|
|
|
|
import com.htfp.weather.download.gfs.GfsVariableHeightEnum;
|
|
|
|
|
|
|
|
import com.htfp.weather.download.gfs.GfsVariableIsobaricEnum;
|
|
|
|
import com.htfp.weather.griddata.common.TableConfig;
|
|
|
|
import com.htfp.weather.griddata.common.TableConfig;
|
|
|
|
import com.htfp.weather.info.Constant;
|
|
|
|
import com.htfp.weather.info.Constant;
|
|
|
|
import com.htfp.weather.utils.MeteoUtils;
|
|
|
|
import com.htfp.weather.utils.MeteoUtils;
|
|
|
@ -32,6 +33,8 @@ import java.time.format.DateTimeFormatter;
|
|
|
|
import java.util.*;
|
|
|
|
import java.util.*;
|
|
|
|
import java.util.concurrent.ExecutorService;
|
|
|
|
import java.util.concurrent.ExecutorService;
|
|
|
|
import java.util.concurrent.Executors;
|
|
|
|
import java.util.concurrent.Executors;
|
|
|
|
|
|
|
|
import java.util.concurrent.Future;
|
|
|
|
|
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
/**
|
|
|
|
* @author shi_y
|
|
|
|
* @author shi_y
|
|
|
@ -85,57 +88,64 @@ public class GfsDataImport extends BaseTableOperation {
|
|
|
|
* @throws Exception
|
|
|
|
* @throws Exception
|
|
|
|
*/
|
|
|
|
*/
|
|
|
|
public GridDataSetMeta updateMeta(GridDataSetMeta meta) throws Exception {
|
|
|
|
public GridDataSetMeta updateMeta(GridDataSetMeta meta) throws Exception {
|
|
|
|
meta.addAttribute("status", "DONE");
|
|
|
|
|
|
|
|
tableStoreGrid.updateDataSetMeta(meta);
|
|
|
|
tableStoreGrid.updateDataSetMeta(meta);
|
|
|
|
return meta;
|
|
|
|
return meta;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
public GridDataSetMeta putMeta(GridDataSetMeta meta) throws Exception {
|
|
|
|
public GridDataSetMeta putMeta(GridDataSetMeta meta) throws Exception {
|
|
|
|
meta.addAttribute("status", "DONE");
|
|
|
|
|
|
|
|
tableStoreGrid.putDataSetMeta(meta);
|
|
|
|
tableStoreGrid.putDataSetMeta(meta);
|
|
|
|
return meta;
|
|
|
|
return meta;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** 导入指定起报时间的所有文件*/
|
|
|
|
public List<ImportResult> importData(OffsetDateTime refTime) throws Exception {
|
|
|
|
public List<ImportResult> importData(OffsetDateTime refTime) throws Exception {
|
|
|
|
|
|
|
|
importing = true;
|
|
|
|
long start = System.currentTimeMillis();
|
|
|
|
long start = System.currentTimeMillis();
|
|
|
|
List<String> fileList = getFiles(refTime);
|
|
|
|
List<String> fileList = getFiles(refTime);
|
|
|
|
if (CollectionUtils.isEmpty(fileList)) {
|
|
|
|
if (CollectionUtils.isEmpty(fileList)) {
|
|
|
|
throw new AppException(ErrorCode.NO_NC_OR_GRIB_FILES);
|
|
|
|
throw new AppException(ErrorCode.NO_NC_OR_GRIB_FILES);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
log.info("[tablestore] 数据导入开始, refTime = {}...", refTime);
|
|
|
|
log.info("[tablestore] 数据导入开始, refTime = {}...", refTime);
|
|
|
|
|
|
|
|
// datasetId和起报时间绑定
|
|
|
|
String dataSetId = refTime.format(DateTimeFormatter.ofPattern(Constant.DATA_SET_ID_FORMAT_STRING));
|
|
|
|
String dataSetId = refTime.format(DateTimeFormatter.ofPattern(Constant.DATA_SET_ID_FORMAT_STRING));
|
|
|
|
List<String> fileVariables = getFileVariables(tableConfig.variableList);
|
|
|
|
List<String> fileVariables = getFileVariables(tableConfig.variableList);
|
|
|
|
int[] shape = new int[]{tableConfig.timeSizeMax, tableConfig.levSize, tableConfig.latSize, tableConfig.lonSize};
|
|
|
|
int[] shape = new int[]{tableConfig.timeSizeMax, tableConfig.levSize, tableConfig.latSize, tableConfig.lonSize};
|
|
|
|
GridDataSetMeta meta = initMeta(dataSetId, tableConfig.dataType, fileVariables, shape);
|
|
|
|
GridDataSetMeta meta = initMeta(dataSetId, tableConfig.dataType, fileVariables, shape);
|
|
|
|
List<String> forecastHours = new ArrayList<>(); // 记录到数据库属性中
|
|
|
|
List<String> forecastHours = new ArrayList<>(); // 所有有效预报时效坐标记录到数据库属性中
|
|
|
|
|
|
|
|
|
|
|
|
// done 2024/5/13: 待优化,用于数据库的索引,必须连续,因此必须保证前一个时刻成功导入后才能导入下一个时刻,数据量大的时候导入时间较长
|
|
|
|
|
|
|
|
// done 2024/5/13: 优化方案,使用多线程,并使用Future来获取结果,但是必须保证每个时刻都成功导入
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// todo 2024/5/13: 待优化,用于数据库的索引,必须连续,因此必须保证前一个时刻成功导入后才能导入下一个时刻,数据量大的时候导入时间较长
|
|
|
|
List<ImportResult> finishedList;
|
|
|
|
List<ImportResult> finishedList;
|
|
|
|
try {
|
|
|
|
// List<Future<ImportResult>> futures = new ArrayList<>();
|
|
|
|
finishedList = new ArrayList<>();
|
|
|
|
finishedList = new ArrayList<>();
|
|
|
|
for (int i = 0; i < fileList.size(); i++) {
|
|
|
|
for (int i = 0; i < fileList.size(); i++) {
|
|
|
|
// for(int i = 0; i < 200; i++) {
|
|
|
|
|
|
|
|
String file = fileList.get(i);
|
|
|
|
String file = fileList.get(i);
|
|
|
|
int iTime = i;
|
|
|
|
int iTime = i;
|
|
|
|
int forecastHour = getForecastHourFromFilename(file);
|
|
|
|
int forecastHour = getForecastHourFromFilename(file);
|
|
|
|
// futures.add(executorService.submit(() -> importFromNcFile(meta, file, iTime)));
|
|
|
|
// 2024/5/13:使用多线程,并使用Future来获取结果,但是实际瓶颈可能是网络带宽而非计算
|
|
|
|
|
|
|
|
// futures.add(executorService.submit(() -> importFromNcFile(meta, file, iTime,forecastHour)));
|
|
|
|
ImportResult importResult = importFromNcFile(meta, file, iTime, forecastHour);
|
|
|
|
ImportResult importResult = importFromNcFile(meta, file, iTime, forecastHour);
|
|
|
|
finishedList.add(importResult);
|
|
|
|
finishedList.add(importResult);
|
|
|
|
// ImportResult importResult = importFromNcFile(meta, file, 0, forecastHour);
|
|
|
|
|
|
|
|
if (importResult.isSuccess()) {
|
|
|
|
if (importResult.isSuccess()) {
|
|
|
|
forecastHours.add(String.valueOf(importResult.getForecastHour()));
|
|
|
|
forecastHours.add(String.valueOf(importResult.getForecastHour()));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} finally {
|
|
|
|
|
|
|
|
importing = false;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
// for (Future<ImportResult> future : futures) {
|
|
|
|
// for (Future<ImportResult> future : futures) {
|
|
|
|
// ImportResult importResult = future.get();
|
|
|
|
// ImportResult importResult = future.get();
|
|
|
|
// forecastHours.add(String.valueOf(importResult.getForcastHour()));
|
|
|
|
// finishedList.add(importResult);
|
|
|
|
|
|
|
|
// forecastHours.add(String.valueOf(importResult.getForecastHour()));
|
|
|
|
|
|
|
|
// if (importResult.isSuccess()) {
|
|
|
|
|
|
|
|
// forecastHours.add(String.valueOf(importResult.getForecastHour()));
|
|
|
|
|
|
|
|
// }
|
|
|
|
// }
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
List<ImportResult> failedList = finishedList.stream().filter(result -> !result.isSuccess()).collect(Collectors.toList());
|
|
|
|
|
|
|
|
if (!failedList.isEmpty()) {
|
|
|
|
|
|
|
|
log.warn("存在气象数据导入数据库失败,失败文件列表: {} ", failedList);
|
|
|
|
|
|
|
|
meta.addAttribute("status", "WRONG");
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
|
|
|
meta.addAttribute("status", "DONE");
|
|
|
|
|
|
|
|
}
|
|
|
|
long end = System.currentTimeMillis();
|
|
|
|
long end = System.currentTimeMillis();
|
|
|
|
log.info("[tablestore] 数据导入完成, 耗时: {} s, forecastHours: {}", (end - start)/1000., forecastHours);
|
|
|
|
log.info("[tablestore] 数据导入完成, 耗时: {} s, forecastHours: {}", (end - start)/1000., forecastHours);
|
|
|
|
meta.setForecastHours(forecastHours);
|
|
|
|
meta.setForecastHours(forecastHours);
|
|
|
@ -143,15 +153,16 @@ public class GfsDataImport extends BaseTableOperation {
|
|
|
|
putMeta(meta);
|
|
|
|
putMeta(meta);
|
|
|
|
gfsDataFetcher.lastGridDataSetMeta = meta;
|
|
|
|
gfsDataFetcher.lastGridDataSetMeta = meta;
|
|
|
|
log.info("[tablestore]: 更新最新的数据元信息: {}", meta);
|
|
|
|
log.info("[tablestore]: 更新最新的数据元信息: {}", meta);
|
|
|
|
|
|
|
|
importing = false;
|
|
|
|
return finishedList;
|
|
|
|
return finishedList;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
/**
|
|
|
|
* read data from netcdf file and write data to table store.
|
|
|
|
* 读取grib/nc文件,并进行相关计算,最后导入tablestore数据库
|
|
|
|
*
|
|
|
|
|
|
|
|
* @param meta 数据元信息
|
|
|
|
* @param meta 数据元信息
|
|
|
|
* @param file netcdf文件路径
|
|
|
|
* @param file netcdf文件路径
|
|
|
|
* @param iTime 文件序号
|
|
|
|
* @param iTime 文件序号
|
|
|
|
|
|
|
|
* @param forecastHour 预报时效(相对于起报时间的小时数)
|
|
|
|
*/
|
|
|
|
*/
|
|
|
|
public ImportResult importFromNcFile(GridDataSetMeta meta, String file, int iTime, int forecastHour) {
|
|
|
|
public ImportResult importFromNcFile(GridDataSetMeta meta, String file, int iTime, int forecastHour) {
|
|
|
|
try (NetcdfFile ncFile = NetcdfFiles.open(file)) {
|
|
|
|
try (NetcdfFile ncFile = NetcdfFiles.open(file)) {
|
|
|
@ -191,7 +202,6 @@ public class GfsDataImport extends BaseTableOperation {
|
|
|
|
new int[]{0, 0}, new int[]{xsize, ysize});
|
|
|
|
new int[]{0, 0}, new int[]{xsize, ysize});
|
|
|
|
writer.writeGrid2D(variableName, iTime, 0, grid2D);
|
|
|
|
writer.writeGrid2D(variableName, iTime, 0, grid2D);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
log.warn("[tablestore] 数据文件 {} 中没有变量 {}", ncFile.getLocation(), variableName);
|
|
|
|
log.warn("[tablestore] 数据文件 {} 中没有变量 {}", ncFile.getLocation(), variableName);
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -218,8 +228,9 @@ public class GfsDataImport extends BaseTableOperation {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** 计算风速风向,包括地面坐标和气压坐标,并导入数据库*/
|
|
|
|
private void importWind(GridDataWriter writer, GridDataSetMeta meta, NetcdfFile ncFile, int iTime) throws Exception {
|
|
|
|
private void importWind(GridDataWriter writer, GridDataSetMeta meta, NetcdfFile ncFile, int iTime) throws Exception {
|
|
|
|
// TODO 2024/5/8: 风速风向需要保存到文件中
|
|
|
|
// TODO 2024/5/8: 风速风向最好单独保存为一个文件,如果后续需要二维可视化,可以直接本地读取出图,减少网络请求
|
|
|
|
for (String suffix : new String[]{"_isobaric", "_height_above_ground"}) {
|
|
|
|
for (String suffix : new String[]{"_isobaric", "_height_above_ground"}) {
|
|
|
|
Variable uwnd = ncFile.findVariable("u-component_of_wind" + suffix);
|
|
|
|
Variable uwnd = ncFile.findVariable("u-component_of_wind" + suffix);
|
|
|
|
Variable vwnd = ncFile.findVariable("v-component_of_wind" + suffix);
|
|
|
|
Variable vwnd = ncFile.findVariable("v-component_of_wind" + suffix);
|
|
|
|