Skip to content

Commit

Permalink
#622 fix(loading avro from an empty folder)
Browse files Browse the repository at this point in the history
  • Loading branch information
qifeng-bai committed Jan 23, 2022
1 parent 81b891a commit 3bb724b
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -246,15 +246,14 @@ private static PCollection<DistributionOutlierRecord> loadExistingRecords(
.getFs(outputPath);

String outlierPath = ALAFsUtils.getOutlierTargetPath(options);
boolean hasOutlier = ALAFsUtils.existsAndNonEmpty(fs, outlierPath);

log.info("Has outlier records from previous runs? {}", hasOutlier);
boolean hasOutlier = ALAFsUtils.hasAvro(fs, outlierPath, false);
log.debug("Try to Load existing outliers from {}", outlierPath);

if (hasOutlier) {
String samplingPath = String.join("/", outlierPath, "*.avro");
log.debug("Loading existing outliers from {}", samplingPath);
return p.apply(AvroIO.read(DistributionOutlierRecord.class).from(samplingPath));
} else {
log.info("No existing outlier AVRO files under " + outlierPath);
return p.apply(Create.empty(AvroCoder.of(DistributionOutlierRecord.class)));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ public Iterable<DistributionOutlierRecord> apply(
Map.Entry<String, Double> entry = iterator.next();
StreamSupport.stream(outputs.spliterator(), false)
.filter(it -> it.getId().equalsIgnoreCase(entry.getKey()))
.forEach(it -> it.setDistanceOutOfEDL(entry.getValue()));
.findFirst()
.ifPresent(it -> it.setDistanceOutOfEDL(entry.getValue()));
}
} else {
while (iter.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public static String getOutlierTargetPath(AllDatasetsPipelinesOptions options)
}

/**
* Removes a directory with content if the folder exists
* Check if a directory exists and is not empty
*
* @param directoryPath path to some directory
*/
Expand All @@ -151,6 +151,32 @@ public static boolean existsAndNonEmpty(FileSystem fs, String directoryPath) {
}
}

/**
* Check if avro files exist in the target folder
*
* @param directoryPath path to some directory
*/
public static boolean hasAvro(FileSystem fs, String directoryPath, boolean recurse) {
Path path = PathBuilder.buildPath(directoryPath);
Path avroFilePath = PathBuilder.buildPath(directoryPath, "*.avro");
try {
if (fs.exists(path)) {
RemoteIterator<LocatedFileStatus> it = fs.listFiles(path, recurse);
while (it.hasNext()) {
LocatedFileStatus lfs = it.next();
if (lfs.getPath().getName().toLowerCase().endsWith(".avro")) {
return true;
}
}
return false;
} else {
return false;
}
} catch (IOException ex) {
return false;
}
}

/**
* Removes a directory with content if the folder exists
*
Expand Down

0 comments on commit 3bb724b

Please sign in to comment.