From 3bb724b7fd83062dc2c0e1013bed482d2784bae8 Mon Sep 17 00:00:00 2001 From: Qifeng Date: Mon, 24 Jan 2022 10:22:45 +1100 Subject: [PATCH] #622 fix(loading avro from an empty folder) --- .../beam/DistributionOutlierPipeline.java | 7 ++--- .../DistributionOutlierTransform.java | 3 +- .../java/au/org/ala/utils/ALAFsUtils.java | 28 ++++++++++++++++++- 3 files changed, 32 insertions(+), 6 deletions(-) diff --git a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/DistributionOutlierPipeline.java b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/DistributionOutlierPipeline.java index 4e4ee771a4..ef5ff37163 100644 --- a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/DistributionOutlierPipeline.java +++ b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/DistributionOutlierPipeline.java @@ -246,15 +246,14 @@ private static PCollection loadExistingRecords( .getFs(outputPath); String outlierPath = ALAFsUtils.getOutlierTargetPath(options); - boolean hasOutlier = ALAFsUtils.existsAndNonEmpty(fs, outlierPath); - - log.info("Has outlier records from previous runs? {}", hasOutlier); + boolean hasOutlier = ALAFsUtils.hasAvro(fs, outlierPath, false); + log.debug("Try to Load existing outliers from {}", outlierPath); if (hasOutlier) { String samplingPath = String.join("/", outlierPath, "*.avro"); - log.debug("Loading existing outliers from {}", samplingPath); return p.apply(AvroIO.read(DistributionOutlierRecord.class).from(samplingPath)); } else { + log.info("No existing outlier AVRO files under " + outlierPath); return p.apply(Create.empty(AvroCoder.of(DistributionOutlierRecord.class))); } } diff --git a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/transforms/DistributionOutlierTransform.java b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/transforms/DistributionOutlierTransform.java index 6163e63cc0..3761e984db 100644 --- a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/transforms/DistributionOutlierTransform.java +++ b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/transforms/DistributionOutlierTransform.java @@ -102,7 +102,8 @@ public Iterable apply( Map.Entry entry = iterator.next(); StreamSupport.stream(outputs.spliterator(), false) .filter(it -> it.getId().equalsIgnoreCase(entry.getKey())) - .forEach(it -> it.setDistanceOutOfEDL(entry.getValue())); + .findFirst() + .ifPresent(it -> it.setDistanceOutOfEDL(entry.getValue())); } } else { while (iter.hasNext()) { diff --git a/livingatlas/pipelines/src/main/java/au/org/ala/utils/ALAFsUtils.java b/livingatlas/pipelines/src/main/java/au/org/ala/utils/ALAFsUtils.java index 1e1bc5f161..990b6a8560 100644 --- a/livingatlas/pipelines/src/main/java/au/org/ala/utils/ALAFsUtils.java +++ b/livingatlas/pipelines/src/main/java/au/org/ala/utils/ALAFsUtils.java @@ -126,7 +126,7 @@ public static String getOutlierTargetPath(AllDatasetsPipelinesOptions options) } /** - * Removes a directory with content if the folder exists + * Check if a directory exists and is not empty * * @param directoryPath path to some directory */ @@ -151,6 +151,32 @@ public static boolean existsAndNonEmpty(FileSystem fs, String directoryPath) { } } + /** + * Check if avro files exist in the target folder + * + * @param directoryPath path to some directory + */ + public static boolean hasAvro(FileSystem fs, String directoryPath, boolean recurse) { + Path path = PathBuilder.buildPath(directoryPath); + Path avroFilePath = PathBuilder.buildPath(directoryPath, "*.avro"); + try { + if (fs.exists(path)) { + RemoteIterator it = fs.listFiles(path, recurse); + while (it.hasNext()) { + LocatedFileStatus lfs = it.next(); + if (lfs.getPath().getName().toLowerCase().endsWith(".avro")) { + return true; + } + } + return false; + } else { + return false; + } + } catch (IOException ex) { + return false; + } + } + /** * Removes a directory with content if the folder exists *