diff --git a/weather-service/src/main/java/com/htfp/weather/download/BaseDataDownloader.java b/weather-service/src/main/java/com/htfp/weather/download/BaseDataDownloader.java index 633eef9..a70af62 100644 --- a/weather-service/src/main/java/com/htfp/weather/download/BaseDataDownloader.java +++ b/weather-service/src/main/java/com/htfp/weather/download/BaseDataDownloader.java @@ -25,22 +25,22 @@ public abstract class BaseDataDownloader { /** * 获取所有下载文件的相关信息 */ - public abstract List getFilesInfo(); + public abstract List getFilesInfo(); /** * 单个文件下载 - * @param fileInfo: 目标文件的下载信息 + * @param downLoadFileInfo: 目标文件的下载信息 * @return 下载成功/失败 * @throws IOException */ - public abstract FileInfo download(FileInfo fileInfo) throws IOException; + public abstract DownLoadFileInfo download(DownLoadFileInfo downLoadFileInfo) throws IOException; /** * 下载所有目标文件 - * @param fileInfoList + * @param downLoadFileInfoList * @return */ - public abstract List downloadAll(List fileInfoList); + public abstract List downloadAll(List downLoadFileInfoList); } diff --git a/weather-service/src/main/java/com/htfp/weather/download/FileInfo.java b/weather-service/src/main/java/com/htfp/weather/download/DownLoadFileInfo.java similarity index 90% rename from weather-service/src/main/java/com/htfp/weather/download/FileInfo.java rename to weather-service/src/main/java/com/htfp/weather/download/DownLoadFileInfo.java index 98e831c..a5e5a31 100644 --- a/weather-service/src/main/java/com/htfp/weather/download/FileInfo.java +++ b/weather-service/src/main/java/com/htfp/weather/download/DownLoadFileInfo.java @@ -9,7 +9,7 @@ import lombok.ToString; @Data @ToString -public class FileInfo { +public class DownLoadFileInfo { /**保存文件名*/ private String fileName; /**数据起报时间*/ @@ -26,7 +26,7 @@ public class FileInfo { private String savePath; private boolean isDownloadSuccess; - public FileInfo() { + public DownLoadFileInfo() { } } diff --git a/weather-service/src/main/java/com/htfp/weather/download/gfs/GfsDownloader.java b/weather-service/src/main/java/com/htfp/weather/download/gfs/GfsDownloader.java index 7eb02a0..077616b 100644 --- a/weather-service/src/main/java/com/htfp/weather/download/gfs/GfsDownloader.java +++ b/weather-service/src/main/java/com/htfp/weather/download/gfs/GfsDownloader.java @@ -1,7 +1,7 @@ package com.htfp.weather.download.gfs; import com.htfp.weather.download.BaseDataDownloader; -import com.htfp.weather.download.FileInfo; +import com.htfp.weather.download.DownLoadFileInfo; import com.htfp.weather.info.Constant; import com.htfp.weather.utils.DateTimeUtils; import com.htfp.weather.utils.HttpClientUtils; @@ -26,7 +26,7 @@ import java.util.concurrent.Future; /** * @author shiyi - * @ + * @Description GFS数据下载类, 目前初始时间默认为调用初始化方法的时间,不允许设置 */ @Slf4j @Component("gfsDownloader") @@ -62,33 +62,33 @@ public class GfsDownloader extends BaseDataDownloader { } @Override - public FileInfo download(FileInfo fileInfo) { - return download0(fileInfo, 2); + public DownLoadFileInfo download(DownLoadFileInfo downLoadFileInfo) { + return download0(downLoadFileInfo, 2); } - private FileInfo download0(FileInfo fileInfo, int retryNum) { - String url = fileInfo.getUrl(); - File destDir = new File(fileInfo.getSavePath()); - File fileOut = new File(destDir, fileInfo.getFileName()); + private DownLoadFileInfo download0(DownLoadFileInfo downLoadFileInfo, int retryNum) { + String url = downLoadFileInfo.getUrl(); + File destDir = new File(downLoadFileInfo.getSavePath()); + File fileOut = new File(destDir, downLoadFileInfo.getFileName()); log.info("[GFS Download] 文件下载中,保存至 {}", fileOut); try { // DONE: 改用okhttp FIXME 2024/6/8: 如果连接是https,jar包启动下载会报错 javax.net.ssl.SSLException: Received fatal alert: internal_error HttpClientUtils.downloadFileByUrl(url, fileOut.getPath()); log.info("[GFS Download] 文件下载成功: {}", fileOut); - fileInfo.setDownloadSuccess(fileValid(fileOut.getAbsolutePath())); + downLoadFileInfo.setDownloadSuccess(fileValid(fileOut.getAbsolutePath())); } catch (Exception e) { // DONE 2024/5/24: 文件服务器在外网不稳定,增加重试机制 - fileInfo.setDownloadSuccess(false); + downLoadFileInfo.setDownloadSuccess(false); if (retryNum > 0) { log.error("[GFS Download] 文件下载失败,重试中: {}", fileOut); - return download0(fileInfo, retryNum - 1); + return download0(downLoadFileInfo, retryNum - 1); } else { e.printStackTrace(); log.error("[GFS Download] 文件下载失败: {}", fileOut); } } - return fileInfo; + return downLoadFileInfo; } /** 简单校验验证文件完整性,并生成索引文件*/ @@ -101,14 +101,14 @@ public class GfsDownloader extends BaseDataDownloader { } } @Override - public List downloadAll(List fileInfoList) { - log.info("[GFS Download] 下载任务启动,共 {} 个文件", fileInfoList.size()); - List> futures = new ArrayList<>(); - List finishList = new ArrayList<>(); - for (FileInfo fileInfo : fileInfoList) { - futures.add(executorService.submit(() -> download(fileInfo))); + public List downloadAll(List downLoadFileInfoList) { + log.info("[GFS Download] 下载任务启动,共 {} 个文件", downLoadFileInfoList.size()); + List> futures = new ArrayList<>(); + List finishList = new ArrayList<>(); + for (DownLoadFileInfo downLoadFileInfo : downLoadFileInfoList) { + futures.add(executorService.submit(() -> download(downLoadFileInfo))); } - for (Future future : futures) { + for (Future future : futures) { try { finishList.add(future.get()); } catch (InterruptedException | ExecutionException e) { @@ -120,7 +120,7 @@ public class GfsDownloader extends BaseDataDownloader { } @Override - public List getFilesInfo() { + public List getFilesInfo() { // 时间选项填充 String lonlatBoxStr = null; String levelsStr = null; @@ -145,28 +145,28 @@ public class GfsDownloader extends BaseDataDownloader { // 分辨率为3h,整数截断;包括目标时刻本身 final int nFiles = 1 + gfsDataConfig.getDuration() / forecastStep; // 存储文件信息 - List fileInfoList = new ArrayList<>(nFiles); + List downLoadFileInfoList = new ArrayList<>(nFiles); for (int i = 0; i < nFiles; i++) { - FileInfo fileInfo = new FileInfo(); + DownLoadFileInfo downLoadFileInfo = new DownLoadFileInfo(); int forecastHour = i * forecastStep + hourDiff; - fileInfo.setForecastHour(forecastHour); + downLoadFileInfo.setForecastHour(forecastHour); String baseURL = getBaseURL(forecastHour, levelsStr, variablesStr, lonlatBoxStr); - fileInfo.setUrl(baseURL); - fileInfo.setRefTimeStr(refTimeStr); - fileInfo.setForecastUTCTimeStr( - this.getRefTime().plusHours(fileInfo.getForecastHour()) + downLoadFileInfo.setUrl(baseURL); + downLoadFileInfo.setRefTimeStr(refTimeStr); + downLoadFileInfo.setForecastUTCTimeStr( + this.getRefTime().plusHours(downLoadFileInfo.getForecastHour()) .format(DateTimeFormatter.ofPattern(Constant.UTC_TIME_STRING)) ); - fileInfo.setForecastBJTimeStr( - DateTimeUtils.getLocalZoneDateTime(this.getRefTime().plusHours(fileInfo.getForecastHour())) + downLoadFileInfo.setForecastBJTimeStr( + DateTimeUtils.getLocalZoneDateTime(this.getRefTime().plusHours(downLoadFileInfo.getForecastHour())) .format(DateTimeFormatter.ofPattern(Constant.BJT_TIME_STRING)) ); - fileInfo.setFileName(String.format("%s-from-%s+%d.grib2", fileInfo.getForecastBJTimeStr(), fileInfo.getRefTimeStr(), forecastHour)); - fileInfo.setSavePath(savePath); - fileInfoList.add(fileInfo); + downLoadFileInfo.setFileName(String.format("%s-from-%s+%d.grib2", downLoadFileInfo.getForecastBJTimeStr(), downLoadFileInfo.getRefTimeStr(), forecastHour)); + downLoadFileInfo.setSavePath(savePath); + downLoadFileInfoList.add(downLoadFileInfo); } - return fileInfoList; + return downLoadFileInfoList; } diff --git a/weather-service/src/main/java/com/htfp/weather/download/gfs/GfsVariableIsobaricEnum.java b/weather-service/src/main/java/com/htfp/weather/download/gfs/GfsVariableIsobaricEnum.java index 6051dec..9c881da 100644 --- a/weather-service/src/main/java/com/htfp/weather/download/gfs/GfsVariableIsobaricEnum.java +++ b/weather-service/src/main/java/com/htfp/weather/download/gfs/GfsVariableIsobaricEnum.java @@ -43,8 +43,8 @@ public enum GfsVariableIsobaricEnum { public static String getGfsVariableName(GfsVariableIsobaricEnum gfsVariableIsobaricEnum) { return variableName.get(gfsVariableIsobaricEnum.nameInApi); } - public static String getGfsVariableName(String tableVariableName) { - return variableName.get(tableVariableName); + public static String getGfsVariableName(String nameInApi) { + return variableName.get(nameInApi); } public static String getVariableNameInApi(String nameInFile) { diff --git a/weather-service/src/main/java/com/htfp/weather/griddata/operation/GfsDataImport.java b/weather-service/src/main/java/com/htfp/weather/griddata/operation/GfsDataImport.java index a640cdc..9a694d0 100644 --- a/weather-service/src/main/java/com/htfp/weather/griddata/operation/GfsDataImport.java +++ b/weather-service/src/main/java/com/htfp/weather/griddata/operation/GfsDataImport.java @@ -5,7 +5,6 @@ import com.aliyun.tablestore.grid.model.GridDataSetMeta; import com.aliyun.tablestore.grid.model.StoreOptions; import com.aliyun.tablestore.grid.model.grid.Grid2D; -import com.htfp.weather.download.FileInfo; import com.htfp.weather.download.gfs.GfsVariableHeightEnum; import com.htfp.weather.download.gfs.GfsVariableIsobaricEnum; import com.htfp.weather.griddata.common.TableConfig; @@ -33,7 +32,6 @@ import java.time.format.DateTimeFormatter; import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.stream.Collectors; /** @@ -52,7 +50,7 @@ public class GfsDataImport extends BaseTableOperation { private final ExecutorService executorService = Executors.newFixedThreadPool(5); /** - * init meta data to table store. + * 表格元数据初始化 * * @param dataSetID * @param dataType @@ -81,7 +79,7 @@ public class GfsDataImport extends BaseTableOperation { } /** - * update meta and set status to DONE when data import finished. + * 导入完成后,更新meta状态 * * @param meta * @return @@ -92,31 +90,34 @@ public class GfsDataImport extends BaseTableOperation { return meta; } + /** + * 导入完成后,设置meta状态 + */ public GridDataSetMeta putMeta(GridDataSetMeta meta) throws Exception { tableStoreGrid.putDataSetMeta(meta); return meta; } - /** 导入指定起报时间的所有文件*/ - public List importData(OffsetDateTime refTime) throws Exception { + public synchronized List importData(OffsetDateTime refTime) throws Exception { importing = true; - long start = System.currentTimeMillis(); - List fileList = getFiles(refTime); - if (CollectionUtils.isEmpty(fileList)) { - throw new AppException(ErrorCode.NO_NC_OR_GRIB_FILES); - } - log.info("[tablestore] 数据导入开始, refTime = {}...", refTime); - // datasetId和起报时间绑定 - String dataSetId = refTime.format(DateTimeFormatter.ofPattern(Constant.DATA_SET_ID_FORMAT_STRING)); - List fileVariables = getFileVariables(tableConfig.variableList); - int[] shape = new int[]{tableConfig.timeSizeMax, tableConfig.levSize, tableConfig.latSize, tableConfig.lonSize}; - GridDataSetMeta meta = initMeta(dataSetId, tableConfig.dataType, fileVariables, shape); - List forecastHours = new ArrayList<>(); // 所有有效预报时效坐标记录到数据库属性中 + try { + long start = System.currentTimeMillis(); + List fileList = getFiles(refTime); + if (CollectionUtils.isEmpty(fileList)) { + throw new AppException(ErrorCode.NO_NC_OR_GRIB_FILES); + } + log.info("[tablestore] 数据导入开始, refTime = {}...", refTime); + // datasetId和起报时间绑定 + String dataSetId = refTime.format(DateTimeFormatter.ofPattern(Constant.DATA_SET_ID_FORMAT_STRING)); + List fileVariables = getFileVariables(tableConfig.variableList); + int[] shape = new int[]{tableConfig.timeSizeMax, tableConfig.levSize, tableConfig.latSize, tableConfig.lonSize}; + GridDataSetMeta meta = initMeta(dataSetId, tableConfig.dataType, fileVariables, shape); + List forecastHours = new ArrayList<>(); // 所有有效预报时效坐标记录到数据库属性中 - // todo 2024/5/13: 待优化,用于数据库的索引,必须连续,因此必须保证前一个时刻成功导入后才能导入下一个时刻,数据量大的时候导入时间较长 - List finishedList; - // List> futures = new ArrayList<>(); + // todo 2024/5/13: 待优化,用于数据库的索引,必须连续,因此必须保证前一个时刻成功导入后才能导入下一个时刻,数据量大的时候导入时间较长 + List finishedList; + // List> futures = new ArrayList<>(); finishedList = new ArrayList<>(); for (int i = 0; i < fileList.size(); i++) { String file = fileList.get(i); @@ -124,37 +125,39 @@ public class GfsDataImport extends BaseTableOperation { int forecastHour = getForecastHourFromFilename(file); // 2024/5/13:使用多线程,并使用Future来获取结果,但是实际瓶颈可能是网络带宽而非计算 // futures.add(executorService.submit(() -> importFromNcFile(meta, file, iTime,forecastHour))); - ImportResult importResult = importFromNcFile(meta, file, iTime, forecastHour); - finishedList.add(importResult); - if (importResult.isSuccess()) { - forecastHours.add(String.valueOf(importResult.getForecastHour())); + ImportFileInfo importFileInfo = importFromNcFile(meta, file, iTime, forecastHour); + finishedList.add(importFileInfo); + if (importFileInfo.isSuccess()) { + forecastHours.add(String.valueOf(importFileInfo.getForecastHour())); } } - // for (Future future : futures) { - // ImportResult importResult = future.get(); - // finishedList.add(importResult); - // forecastHours.add(String.valueOf(importResult.getForecastHour())); - // if (importResult.isSuccess()) { - // forecastHours.add(String.valueOf(importResult.getForecastHour())); - // } - // } + // for (Future future : futures) { + // ImportResult importResult = future.get(); + // finishedList.add(importResult); + // forecastHours.add(String.valueOf(importResult.getForecastHour())); + // if (importResult.isSuccess()) { + // forecastHours.add(String.valueOf(importResult.getForecastHour())); + // } + // } - List failedList = finishedList.stream().filter(result -> !result.isSuccess()).collect(Collectors.toList()); - if (!failedList.isEmpty()) { - log.warn("存在气象数据导入数据库失败,失败文件列表: {} ", failedList); - meta.addAttribute("status", "WRONG"); - } else { - meta.addAttribute("status", "DONE"); + List failedList = finishedList.stream().filter(result -> !result.isSuccess()).collect(Collectors.toList()); + if (!failedList.isEmpty()) { + log.warn("存在气象数据导入数据库失败,失败文件列表: {} ", failedList); + meta.addAttribute("status", "WRONG"); + } else { + meta.addAttribute("status", "DONE"); + } + long end = System.currentTimeMillis(); + log.info("[tablestore] 数据导入完成, 耗时: {} s, forecastHours: {}", (end - start)/1000., forecastHours); + meta.setForecastHours(forecastHours); + meta.addAttribute("reference_time", refTime.toInstant().toEpochMilli()); + putMeta(meta); + gfsDataFetcher.lastGridDataSetMeta = meta; + log.info("[tablestore]: 更新最新的数据元信息: {}", meta); + return finishedList; + } finally { + importing = false; } - long end = System.currentTimeMillis(); - log.info("[tablestore] 数据导入完成, 耗时: {} s, forecastHours: {}", (end - start)/1000., forecastHours); - meta.setForecastHours(forecastHours); - meta.addAttribute("reference_time", refTime.toInstant().toEpochMilli()); - putMeta(meta); - gfsDataFetcher.lastGridDataSetMeta = meta; - log.info("[tablestore]: 更新最新的数据元信息: {}", meta); - importing = false; - return finishedList; } /** @@ -164,7 +167,7 @@ public class GfsDataImport extends BaseTableOperation { * @param iTime 文件序号 * @param forecastHour 预报时效(相对于起报时间的小时数) */ - public ImportResult importFromNcFile(GridDataSetMeta meta, String file, int iTime, int forecastHour) { + public ImportFileInfo importFromNcFile(GridDataSetMeta meta, String file, int iTime, int forecastHour) { try (NetcdfFile ncFile = NetcdfFiles.open(file)) { GridDataWriter writer = tableStoreGrid.getDataWriter(meta); @@ -176,6 +179,7 @@ public class GfsDataImport extends BaseTableOperation { int ysize = meta.getySize(); for (String variableName : meta.getVariables()) { + // 风速在最后单独处理 if ("Wind_speed_gust_surface".equals(variableName) || "Wind_speed_isobaric".equals(variableName) || "Wind_direction_isobaric".equals(variableName) || "Wind_speed_height_above_ground".equals(variableName) || "Wind_direction_height_above_ground".equals(variableName)) { continue; @@ -183,7 +187,7 @@ public class GfsDataImport extends BaseTableOperation { Variable variable = ncFile.findVariable(variableName); if (variable != null) { int shapeLength = variable.getShape().length; - if (shapeLength== 4) { + if (shapeLength== 4) { // 存在垂直坐标,即变量随高度变化 // 气压坐标和高度坐标保持各自的索引 int zSize = variable.getShape(1); for (int z = 0; z < zSize; z++) { @@ -195,7 +199,7 @@ public class GfsDataImport extends BaseTableOperation { writer.writeGrid2D(variableName, iTime, z, grid2D); } } else if (shapeLength == 3) { - // DONE 2024/5/24: 导入不含高度坐标的数据 + // DONE 2024/5/24: 导入不含高度坐标的数据,如降水量,阵风风速 Array array = variable.read(new int[]{0, 0, 0}, new int[]{1,xsize, ysize}); transferUnit(variableName, array); Grid2D grid2D = new Grid2D(array.getDataAsByteBuffer(), variable.getDataType(), @@ -210,12 +214,13 @@ public class GfsDataImport extends BaseTableOperation { importWind(writer, meta, ncFile, iTime); } catch (Exception e) { log.error("[tablestore] 导入文件数据失败,_t={}: {}", forecastHour, file, e); - return new ImportResult(false, file, forecastHour, iTime); + return new ImportFileInfo(false, file, forecastHour, iTime); } log.info("[tablestore] 导入文件数据成功,_t={}: {}", forecastHour, file); - return new ImportResult(true, file, forecastHour, iTime); + return new ImportFileInfo(true, file, forecastHour, iTime); } + /**单位转换*/ private void transferUnit(String variableName, Array array) { if (GfsVariableIsobaricEnum.TEMP.getNameInFile().equals(variableName)) { MeteoUtils.kelvin2Celsius(array); @@ -322,13 +327,13 @@ public class GfsDataImport extends BaseTableOperation { } @Data - public static class ImportResult { + public static class ImportFileInfo { private boolean success; private String file; private int forecastHour; private int iTime; - public ImportResult(boolean success, String file, int forecastHour, int iTime) { + public ImportFileInfo(boolean success, String file, int forecastHour, int iTime) { this.success = success; this.file = file; this.forecastHour = forecastHour; diff --git a/weather-service/src/main/java/com/htfp/weather/schedule/GridDataProcessor.java b/weather-service/src/main/java/com/htfp/weather/schedule/GridDataProcessor.java index fd9994f..3dfea7e 100644 --- a/weather-service/src/main/java/com/htfp/weather/schedule/GridDataProcessor.java +++ b/weather-service/src/main/java/com/htfp/weather/schedule/GridDataProcessor.java @@ -1,11 +1,10 @@ package com.htfp.weather.schedule; -import com.htfp.weather.download.FileInfo; +import com.htfp.weather.download.DownLoadFileInfo; import com.htfp.weather.download.gfs.GfsDataConfig; import com.htfp.weather.download.gfs.GfsDownloader; -import com.htfp.weather.griddata.operation.GfsDataFetcher; import com.htfp.weather.griddata.operation.GfsDataImport; -import com.htfp.weather.griddata.operation.GfsDataImport.ImportResult; +import com.htfp.weather.griddata.operation.GfsDataImport.ImportFileInfo; import com.htfp.weather.info.Constant; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; @@ -60,16 +59,13 @@ public class GridDataProcessor { clearExpiredData(); } - public boolean download() { + private boolean download() { gfsDownloader.iniTimeSetting(); try { log.info("[schedule-start] 开始下载气象数据"); - List fileInfoList = gfsDownloader.getFilesInfo(); - // if (test){ - // throw new RuntimeException("测试异常"); - // } - List finishedList = gfsDownloader.downloadAll(fileInfoList); - List failedList = finishedList.stream().filter(fileInfo -> !fileInfo.isDownloadSuccess()).collect(Collectors.toList()); + List downLoadFileInfoList = gfsDownloader.getFilesInfo(); + List finishedList = gfsDownloader.downloadAll(downLoadFileInfoList); + List failedList = finishedList.stream().filter(fileInfo -> !fileInfo.isDownloadSuccess()).collect(Collectors.toList()); if (!failedList.isEmpty()) { log.error("[schedule] 下载失败文件列表: {}", failedList); downloadFailedReport(gfsDownloader.getRefTimeStr(), finishedList, failedList); @@ -84,16 +80,13 @@ public class GridDataProcessor { } } - public boolean importToTableStore(){ + private boolean importToTableStore(){ try { OffsetDateTime refTime = gfsDownloader.getRefTime(); log.info("[schedule-start] 开始将数据导入数据库, 起报时间: {}", refTime); - // if (test){ - // throw new RuntimeException("测试异常"); - // } - List importResults = gfsDataImport.importData(refTime); - List finishedList = importResults.stream().filter(result -> result.isSuccess()).collect(Collectors.toList()); - List failedList = importResults.stream().filter(result -> !result.isSuccess()).collect(Collectors.toList()); + List importFileInfos = gfsDataImport.importData(refTime); + List finishedList = importFileInfos.stream().filter(result -> result.isSuccess()).collect(Collectors.toList()); + List failedList = importFileInfos.stream().filter(result -> !result.isSuccess()).collect(Collectors.toList()); if (!failedList.isEmpty()) { log.error("[schedule-end] 气象数据导入数据库失败, 文件列表: {} ", failedList); importFailedReport(gfsDownloader.getRefTimeStr(), finishedList, failedList); @@ -132,7 +125,7 @@ public class GridDataProcessor { log.info("[schedule-end] 过期数据清理结束"); } - private void downloadFailedReport(String refTime, List finishedList, List failedList) { + private void downloadFailedReport(String refTime, List finishedList, List failedList) { // 邮件 Mail mail = new Mail(); mail.setSubject("【航天飞鹏-气象服务】数据下载失败通知"); @@ -185,7 +178,7 @@ public class GridDataProcessor { } } - private void importFailedReport(String refTime, List finishedList, List failedList) { + private void importFailedReport(String refTime, List finishedList, List failedList) { Mail mail = new Mail(); mail.setSubject("【航天飞鹏-气象服务】数据入库失败通知"); mail.setTos(mailReceiver); diff --git a/weather-service/src/main/java/com/htfp/weather/web/controller/ConfigController.java b/weather-service/src/main/java/com/htfp/weather/web/controller/ConfigController.java index 8b46988..212cc12 100644 --- a/weather-service/src/main/java/com/htfp/weather/web/controller/ConfigController.java +++ b/weather-service/src/main/java/com/htfp/weather/web/controller/ConfigController.java @@ -1,20 +1,23 @@ package com.htfp.weather.web.controller; -import com.htfp.weather.download.FileInfo; +import com.htfp.weather.download.DownLoadFileInfo; import com.htfp.weather.download.gfs.GfsDataConfig; -import com.htfp.weather.download.gfs.GfsDownloader; import com.htfp.weather.griddata.common.TableConfig; import com.htfp.weather.griddata.operation.GfsDataImport; -import com.htfp.weather.griddata.operation.GfsDataImport.ImportResult; +import com.htfp.weather.griddata.operation.GfsDataImport.ImportFileInfo; import com.htfp.weather.info.Constant; import com.htfp.weather.utils.DateTimeUtils; import com.htfp.weather.web.exception.AppException; import com.htfp.weather.web.exception.ErrorCode; import com.htfp.weather.web.pojo.request.GfsConfigUpdate; import com.htfp.weather.web.pojo.request.TableConfigUpdate; +import com.htfp.weather.web.pojo.response.DownloadResult; +import com.htfp.weather.web.pojo.response.ImportResult; import com.htfp.weather.web.pojo.response.Result; import com.htfp.weather.web.service.FileService; +import com.htfp.weather.web.service.GfsDataServiceImpl; import org.apache.commons.lang3.StringUtils; +import org.jetbrains.annotations.NotNull; import org.springframework.util.CollectionUtils; import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.*; @@ -23,11 +26,11 @@ import javax.annotation.Resource; import java.io.File; import java.io.IOException; import java.time.OffsetDateTime; +import java.time.LocalDateTime; +import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; /** * @Author : shiyi @@ -43,11 +46,11 @@ public class ConfigController { @Resource GfsDataConfig gfsDataConfig; @Resource - GfsDownloader gfsDownloader; - @Resource GfsDataImport gfsDataImport; @Resource FileService fileService; + @Resource + GfsDataServiceImpl gfsDataService; private final String SECRET = "htfpweather"; private void validSecret(String secret) { @@ -121,14 +124,8 @@ public class ConfigController { validSecret(secret); // List allCompleted = false; try { - gfsDownloader.iniTimeSetting(); - List fileInfoList = gfsDownloader.getFilesInfo(); - List finishedList = gfsDownloader.downloadAll(fileInfoList); - List successList = finishedList.stream().filter(FileInfo::isDownloadSuccess).collect(Collectors.toList()); - List failedList = finishedList.stream().filter(fileInfo -> !fileInfo.isDownloadSuccess()).collect(Collectors.toList()); - Map> data = new HashMap<>(); - data.put("successList", successList); - data.put("failedList", failedList); + DownloadResult data = gfsDataService.downloadNowAllFile(); + List failedList = data.getFailedList(); if (CollectionUtils.isEmpty(failedList)) { return Result.success(data); } else { @@ -147,24 +144,8 @@ public class ConfigController { String secret = params.get("secret"); validSecret(secret); OffsetDateTime offsetDateTime = OffsetDateTime.parse(params.get("time"), Constant.API_TIME_FORMATTER); - String targetUtcStr = DateTimeUtils.getUTCDateTime(offsetDateTime).format(DateTimeFormatter.ofPattern(Constant.UTC_TIME_STRING)); - try { - gfsDownloader.iniTimeSetting(); - List fileInfoList = gfsDownloader.getFilesInfo(); // TODO 2024/6/12: 可以优化,不用生成24小时所有的文件信息 - for (FileInfo fileInfo : fileInfoList) { - if (fileInfo.getForecastUTCTimeStr().equals(targetUtcStr)) { - FileInfo downloadResult = gfsDownloader.download(fileInfo); - if (downloadResult.isDownloadSuccess()) { - return Result.success(downloadResult); - } else { - return Result.error(ErrorCode.DOWNLOAD_ERROR); - } - } - } - return Result.error(ErrorCode.QUERY_TIME_ERROR, ": 请选择当前时间之后24小时范围内的时刻"); - } catch (AppException e) { - return Result.error(ErrorCode.DOWNLOAD_START_ERROR, e.getMessage()); - } + DownLoadFileInfo downLoadFileInfo = gfsDataService.downloadSingleFile(offsetDateTime); + return Result.success(downLoadFileInfo); } /** @@ -175,17 +156,34 @@ public class ConfigController { String secret = params.get("secret"); validSecret(secret); OffsetDateTime time = OffsetDateTime.parse(params.get("time"), Constant.API_TIME_FORMATTER); + return getImportResponse(time); + } + + /** + * 下载并导入 + */ + @PostMapping("/downloadAndImport") + public Result downloadAndImport(@RequestBody Map params) { + Result downloadResponse = downloadAllFiles(params); + if (downloadResponse.isSuccess()) { + DownloadResult data = (DownloadResult)downloadResponse.getData(); + String refTimeStr = data.getSuccessList().get(0).getRefTimeStr(); + OffsetDateTime time = OffsetDateTime.of(LocalDateTime.parse(refTimeStr, DateTimeFormatter.ofPattern(Constant.UTC_TIME_STRING)), ZoneOffset.UTC); + return getImportResponse(time); + } else { + return downloadResponse; + } + } + + @NotNull + private Result getImportResponse(OffsetDateTime time) { try { - List finishedList = gfsDataImport.importData(time); - List successList = finishedList.stream().filter(result -> result.isSuccess()).collect(Collectors.toList()); - List failedList = finishedList.stream().filter(result -> !result.isSuccess()).collect(Collectors.toList()); - Map> data = new HashMap<>(); - data.put("successList", successList); - data.put("failedList", failedList); + ImportResult importResult = gfsDataService.importData(time); + List failedList = importResult.getFailedList(); if (CollectionUtils.isEmpty(failedList)) { - return Result.success(data); + return Result.success(importResult); } else { - return new Result(false, ErrorCode.TABLESTORE_IMPORT_ERROR.getCode(), ErrorCode.TABLESTORE_IMPORT_ERROR.getMsg() + ": 数据导入未全部完成", data); + return new Result(false, ErrorCode.TABLESTORE_IMPORT_ERROR.getCode(), ErrorCode.TABLESTORE_IMPORT_ERROR.getMsg() + ": 数据导入未全部完成", importResult); } } catch (AppException e) { return Result.error(e.getErrorCode()); diff --git a/weather-service/src/main/java/com/htfp/weather/web/pojo/response/DownloadResult.java b/weather-service/src/main/java/com/htfp/weather/web/pojo/response/DownloadResult.java new file mode 100644 index 0000000..34989da --- /dev/null +++ b/weather-service/src/main/java/com/htfp/weather/web/pojo/response/DownloadResult.java @@ -0,0 +1,17 @@ +package com.htfp.weather.web.pojo.response; + +import com.htfp.weather.download.DownLoadFileInfo; +import lombok.Data; + +import java.util.List; + +/** + * @Author : shiyi + * @Date : 2024/7/3 11:37 + * @Description : + */ +@Data +public class DownloadResult { + List successList; + List failedList; +} diff --git a/weather-service/src/main/java/com/htfp/weather/web/pojo/response/ImportResult.java b/weather-service/src/main/java/com/htfp/weather/web/pojo/response/ImportResult.java new file mode 100644 index 0000000..6a8533e --- /dev/null +++ b/weather-service/src/main/java/com/htfp/weather/web/pojo/response/ImportResult.java @@ -0,0 +1,18 @@ +package com.htfp.weather.web.pojo.response; + +import com.htfp.weather.griddata.operation.GfsDataImport; +import com.htfp.weather.griddata.operation.GfsDataImport.ImportFileInfo; +import lombok.Data; + +import java.util.List; + +/** + * @Author : shiyi + * @Date : 2024/7/3 11:38 + * @Description : + */ +@Data +public class ImportResult { + List successList; + List failedList; +} diff --git a/weather-service/src/main/java/com/htfp/weather/web/service/FileService.java b/weather-service/src/main/java/com/htfp/weather/web/service/FileService.java index cd09530..fbf0ca8 100644 --- a/weather-service/src/main/java/com/htfp/weather/web/service/FileService.java +++ b/weather-service/src/main/java/com/htfp/weather/web/service/FileService.java @@ -14,7 +14,7 @@ import java.util.List; /** * @Author : shiyi * @Date : 2024/6/12 11:25 - * @Description : 文件系统管理 + * @Description : 本地文件管理 */ @Component public class FileService { @@ -57,13 +57,7 @@ public class FileService { } return subDirList; } - public void uploadFile(String filePath){ - // 文件上传逻辑 - } - public void downloadFile(String filePath){ - // 文件下载逻辑 - } public void deleteFile(File file) throws IOException { if (file == null || !file.exists()) { diff --git a/weather-service/src/main/java/com/htfp/weather/web/service/GfsDataServiceImpl.java b/weather-service/src/main/java/com/htfp/weather/web/service/GfsDataServiceImpl.java index 92edd6e..73b2dd6 100644 --- a/weather-service/src/main/java/com/htfp/weather/web/service/GfsDataServiceImpl.java +++ b/weather-service/src/main/java/com/htfp/weather/web/service/GfsDataServiceImpl.java @@ -4,12 +4,15 @@ import com.aliyun.tablestore.grid.consts.AttributionEnum; import com.aliyun.tablestore.grid.model.GridDataSet; import com.aliyun.tablestore.grid.model.GridDataSetMeta; import com.aliyun.tablestore.grid.model.grid.Grid4D; +import com.htfp.weather.download.DownLoadFileInfo; +import com.htfp.weather.download.gfs.GfsDownloader; import com.htfp.weather.download.gfs.GfsLevelsEnum; import com.htfp.weather.download.gfs.GfsVariableHeightEnum; import com.htfp.weather.download.gfs.GfsVariableIsobaricEnum; import com.htfp.weather.griddata.common.TableConfig; import com.htfp.weather.griddata.common.ValueRange; import com.htfp.weather.griddata.operation.GfsDataFetcher; +import com.htfp.weather.griddata.operation.GfsDataImport; import com.htfp.weather.info.Constant; import com.htfp.weather.utils.DateTimeUtils; import com.htfp.weather.utils.MeteoUtils; @@ -27,7 +30,9 @@ import java.lang.reflect.Method; import java.time.Instant; import java.time.OffsetDateTime; import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; import java.util.*; +import java.util.stream.Collectors; /** * @Author : shiyi @@ -41,9 +46,60 @@ public class GfsDataServiceImpl implements IDataService { GfsDataFetcher gfsDataFetcher; @Resource TableConfig tableConfig; + @Resource + GfsDownloader gfsDownloader; + @Resource + GfsDataImport gfsDataImport; + // 下载和导入数据 + public DownloadResult downloadNowAllFile() { + gfsDownloader.iniTimeSetting(); + List downLoadFileInfoList = gfsDownloader.getFilesInfo(); + List finishedList = gfsDownloader.downloadAll(downLoadFileInfoList); + List successList = finishedList.stream().filter(DownLoadFileInfo::isDownloadSuccess).collect(Collectors.toList()); + List failedList = finishedList.stream().filter(fileInfo -> !fileInfo.isDownloadSuccess()).collect(Collectors.toList()); + DownloadResult downloadResult = new DownloadResult(); + downloadResult.setSuccessList(successList); + downloadResult.setFailedList(failedList); + return downloadResult; + } + public DownLoadFileInfo downloadSingleFile(OffsetDateTime offsetDateTime) { + String targetUtcStr = DateTimeUtils.getUTCDateTime(offsetDateTime).format(DateTimeFormatter.ofPattern(Constant.UTC_TIME_STRING)); + try { + gfsDownloader.iniTimeSetting(); + List downLoadFileInfoList = gfsDownloader.getFilesInfo(); // TODO 2024/6/12: 可以优化,不用生成24小时所有的文件信息 + for (DownLoadFileInfo downLoadFileInfo : downLoadFileInfoList) { + if (downLoadFileInfo.getForecastUTCTimeStr().equals(targetUtcStr)) { + DownLoadFileInfo downloadResult = gfsDownloader.download(downLoadFileInfo); + if (downloadResult.isDownloadSuccess()) { + return downloadResult; + } else { + throw new AppException(ErrorCode.DOWNLOAD_ERROR); + } + } + } + throw new AppException(ErrorCode.QUERY_TIME_ERROR, ": 请选择当前时间之后24小时范围内的时刻"); + } catch (Exception e) { + throw new AppException(ErrorCode.DOWNLOAD_START_ERROR, e.getMessage()); + } + } + + public ImportResult importData(OffsetDateTime time) throws Exception { + List finishedList = gfsDataImport.importData(time); + List successList = finishedList.stream().filter(result -> result.isSuccess()).collect(Collectors.toList()); + List failedList = finishedList.stream().filter(result -> !result.isSuccess()).collect(Collectors.toList()); + ImportResult importResult = new ImportResult(); + importResult.setFailedList(failedList); + importResult.setSuccessList(successList); + return importResult; + + } + public void downloadAndImportData() { + + } + // 从tablestore获取数据 /** * 获取当前时刻的天气状况 * @@ -447,8 +503,7 @@ public class GfsDataServiceImpl implements IDataService { return result; } - @NotNull - private static String getVariableNameInApi(GfsLevelsEnum levelFlag, Map.Entry variable) { + private String getVariableNameInApi(GfsLevelsEnum levelFlag, Map.Entry variable) { String variableNameInFile = variable.getKey(); String variableNameInApi; if (GfsLevelsEnum.SURFACE.equals(levelFlag)) { diff --git a/weather-service/src/main/java/com/htfp/weather/web/service/surfaceapi/CmaServiceImpl.java b/weather-service/src/main/java/com/htfp/weather/web/service/surfaceapi/CmaServiceImpl.java index acc44dd..87d09c4 100644 --- a/weather-service/src/main/java/com/htfp/weather/web/service/surfaceapi/CmaServiceImpl.java +++ b/weather-service/src/main/java/com/htfp/weather/web/service/surfaceapi/CmaServiceImpl.java @@ -56,8 +56,12 @@ public class CmaServiceImpl implements ISurfaceDataService{ return getSurfaceWarningByCounty(countyId); } - public List getSurfaceWarningByCounty(String countyCode) throws Exception { - return warningCache.get(countyCode); + public List getSurfaceWarningByCounty(String countyCode) { + if (warningCache.containsKey(countyCode)) { + return warningCache.get(countyCode); + } else { + return new ArrayList<>(); + } // else { // // 如果缓存中没有,则先查一遍 // updateCmaWarning(countyCode); diff --git a/weather-service/src/test/java/com/htfp/weather/download/GfsDownloaderTest.java b/weather-service/src/test/java/com/htfp/weather/download/GfsDownloaderTest.java index 8eba0cf..4aeecf4 100644 --- a/weather-service/src/test/java/com/htfp/weather/download/GfsDownloaderTest.java +++ b/weather-service/src/test/java/com/htfp/weather/download/GfsDownloaderTest.java @@ -28,24 +28,24 @@ class GfsDownloaderTest { @Test void download() { gfsDownloader.iniTimeSetting(); - List fileInfoList = gfsDownloader.getFilesInfo(); - gfsDownloader.download(fileInfoList.get(0)); + List downLoadFileInfoList = gfsDownloader.getFilesInfo(); + gfsDownloader.download(downLoadFileInfoList.get(0)); } @Test void downloadAll() { gfsDownloader.iniTimeSetting(); - List fileInfoList = gfsDownloader.getFilesInfo(); - gfsDownloader.downloadAll(fileInfoList); + List downLoadFileInfoList = gfsDownloader.getFilesInfo(); + gfsDownloader.downloadAll(downLoadFileInfoList); } @Test void getFilesInfo() { gfsDownloader.iniTimeSetting(); - List fileInfoList = gfsDownloader.getFilesInfo(); - for (FileInfo fileInfo :fileInfoList) { - System.out.println(fileInfo); + List downLoadFileInfoList = gfsDownloader.getFilesInfo(); + for (DownLoadFileInfo downLoadFileInfo : downLoadFileInfoList) { + System.out.println(downLoadFileInfo); } }