From 81b891a37fecc7375e4847e905dc3e492b9fb060 Mon Sep 17 00:00:00 2001 From: Qifeng Date: Fri, 21 Jan 2022 15:18:21 +1100 Subject: [PATCH] #622 impl(Only calculated new added records) --- .../beam/DistributionOutlierPipeline.java | 183 ++++++++++++++++-- .../DistributionOutlierTransform.java | 14 +- .../java/au/org/ala/utils/ALAFsUtils.java | 38 +++- .../ala/outlier/DistributionOutlierTest.java | 8 +- 4 files changed, 217 insertions(+), 26 deletions(-) diff --git a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/DistributionOutlierPipeline.java b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/DistributionOutlierPipeline.java index 00db01ae5f..4e4ee771a4 100644 --- a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/DistributionOutlierPipeline.java +++ b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/DistributionOutlierPipeline.java @@ -1,21 +1,36 @@ package au.org.ala.pipelines.beam; +import au.org.ala.pipelines.options.AllDatasetsPipelinesOptions; import au.org.ala.pipelines.options.DistributionOutlierPipelineOptions; import au.org.ala.pipelines.transforms.DistributionOutlierTransform; import au.org.ala.pipelines.util.VersionInfo; import au.org.ala.utils.ALAFsUtils; import au.org.ala.utils.CombinedYamlConfiguration; +import java.io.*; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; import lombok.AccessLevel; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.io.AvroIO; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.transforms.*; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; import org.gbif.pipelines.common.beam.options.PipelinesOptionsFactory; +import org.gbif.pipelines.core.factory.FileSystemFactory; import org.gbif.pipelines.io.avro.*; import org.slf4j.MDC; @@ -34,6 +49,7 @@ @Slf4j @NoArgsConstructor(access = AccessLevel.PRIVATE) public class DistributionOutlierPipeline { + static List WORKLOGS = new ArrayList(); public static void main(String[] args) throws Exception { VersionInfo.print(); @@ -55,26 +71,114 @@ public static void run(DistributionOutlierPipelineOptions options) throws Except // Create output path // default: {fsPath}/pipelines-outlier // or {fsPath}/pipelines-outlier/{datasetId} - String outputPath = ALAFsUtils.buildPathOutlierUsingTargetPath(options); + String outputPath = ALAFsUtils.buildPathOutlierUsingTargetPath(options, false); log.info("Adding step 1: Collecting index records"); Pipeline p = Pipeline.create(options); - PCollection indexRecords = ALAFsUtils.loadIndexRecords(options, p); - - DistributionOutlierTransform distributionTransform = - new DistributionOutlierTransform(options.getBaseUrl()); - - log.info("Adding step 2: calculating outliers index"); - PCollection kvRecords = - indexRecords + PCollection indexRecords = + ALAFsUtils.loadIndexRecords(options, p) .apply( "Filter out records without id/species/taxon/location", Filter.by( it -> !StringUtils.isEmpty(it.getTaxonID()) && !StringUtils.isEmpty(it.getLatLng()) - && !StringUtils.isEmpty(it.getId()))) + && !StringUtils.isEmpty(it.getId()))); + + indexRecords + .apply(Count.globally()) + .apply( + MapElements.via( + new SimpleFunction() { + @Override + public Long apply(Long input) { + log.info("Number of indexed records loaded: " + input); + WORKLOGS.add("Number of indexed records loaded: " + input); + return input; + } + })); + + DistributionOutlierTransform distributionTransform = + new DistributionOutlierTransform(options.getBaseUrl()); + + log.info("Adding step 2: Loading existing outliers records"); + PCollection exitedOutliers = + loadExistingRecords(options, p, outputPath); + + exitedOutliers + .apply(Count.globally()) + .apply( + MapElements.via( + new SimpleFunction() { + @Override + public Long apply(Long input) { + log.info("Number of existing outlier records: " + input); + WORKLOGS.add("Number of existing outlier records: " + input); + return input; + } + })); + + log.info("Adding step 3: Filtering out the records which already have outliers"); + PCollection> kvIndexRecords = + indexRecords.apply( + WithKeys.of(it -> it.getId()) + .withKeyType(TypeDescriptors.strings())); + + PCollection> kvExistingOutliers = + exitedOutliers + .apply( + MapElements.via( + new SimpleFunction() { + @Override + public String apply(DistributionOutlierRecord input) { + return input.getId(); + } + })) + .apply(Distinct.create()) + .apply( + MapElements.via( + new SimpleFunction>() { + @Override + public KV apply(String input) { + return KV.of(input, true); + } + })); + + PCollection newAddedIndexRecords = + org.apache.beam.sdk.extensions.joinlibrary.Join.leftOuterJoin( + kvIndexRecords, kvExistingOutliers, false) + .apply( + Filter.by( + (SerializableFunction>, Boolean>) + input -> !input.getValue().getValue())) // Choose outlier-not-exist + .apply( + ParDo.of( + new DoFn>, IndexRecord>() { + @ProcessElement + public void processElement(ProcessContext c) { + KV kv = c.element().getValue(); + c.output(kv.getKey()); + } + })); + + newAddedIndexRecords + .apply(Count.globally()) + .apply( + MapElements.via( + new SimpleFunction() { + @Override + public Long apply(Long input) { + log.info("Number of records to be calculated: " + input); + WORKLOGS.add("Number of records to be calculated: " + input); + return input; + } + })); + + log.info("Adding step 4: calculating outliers index"); + + PCollection kvRecords = + newAddedIndexRecords .apply("Key by species", distributionTransform.toKv()) .apply("Grouping by species", GroupByKey.create()) .apply( @@ -82,12 +186,16 @@ public static void run(DistributionOutlierPipelineOptions options) throws Except distributionTransform.calculateOutlier()) .apply("Flatten records", Flatten.iterables()); - log.info("Adding step 3: writing to outliers"); + DateFormat df = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss"); + String ts = df.format(new Date()); + String targetFile = outputPath + "/outlier_" + ts; + WORKLOGS.add("Output: outlier_" + ts + ".avro"); + log.info("Adding step 5: writing to " + targetFile + ".avro"); kvRecords.apply( "Write to file", AvroIO.write(DistributionOutlierRecord.class) - .to(outputPath + "/outlier") + .to(targetFile) .withoutSharding() .withSuffix(".avro")); // Checking purpose. @@ -95,12 +203,59 @@ public static void run(DistributionOutlierPipelineOptions options) throws Except kvRecords .apply("to String", distributionTransform.flatToString()) .apply( - "Write to text", - TextIO.write().to(outputPath + "/outlier").withoutSharding().withSuffix(".txt")); + "Write to text", TextIO.write().to(targetFile).withoutSharding().withSuffix(".txt")); } log.info("Running the pipeline"); PipelineResult result = p.run(); result.waitUntilFinish(); + + writeWorkLogs(options, outputPath); + } + + /** + * TODO: HDSF does not support file appending + * + * @param options + * @param outputPath + */ + private static void writeWorkLogs(AllDatasetsPipelinesOptions options, String outputPath) { + try { + WORKLOGS.add("-------------------------------\r\n"); + FileSystem fs = + FileSystemFactory.getInstance(options.getHdfsSiteConfig(), options.getCoreSiteConfig()) + .getFs(outputPath); + FSDataOutputStream os = null; + if (fs.exists(new Path(outputPath + "/work_logs.txt"))) { + os = fs.append(new Path(outputPath + "/work_logs.txt")); + } else { + os = fs.create(new Path(outputPath + "/work_logs.txt")); + } + InputStream is = new ByteArrayInputStream(String.join("\r\n", WORKLOGS).getBytes()); + IOUtils.copyBytes(is, os, 4096, true); + } catch (Exception e) { + log.warn("Cannot write work history, appending file may not supported? ignored! "); + } + } + + private static PCollection loadExistingRecords( + AllDatasetsPipelinesOptions options, Pipeline p, String outputPath) throws IOException { + + FileSystem fs = + FileSystemFactory.getInstance(options.getHdfsSiteConfig(), options.getCoreSiteConfig()) + .getFs(outputPath); + + String outlierPath = ALAFsUtils.getOutlierTargetPath(options); + boolean hasOutlier = ALAFsUtils.existsAndNonEmpty(fs, outlierPath); + + log.info("Has outlier records from previous runs? {}", hasOutlier); + + if (hasOutlier) { + String samplingPath = String.join("/", outlierPath, "*.avro"); + log.debug("Loading existing outliers from {}", samplingPath); + return p.apply(AvroIO.read(DistributionOutlierRecord.class).from(samplingPath)); + } else { + return p.apply(Create.empty(AvroCoder.of(DistributionOutlierRecord.class))); + } } } diff --git a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/transforms/DistributionOutlierTransform.java b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/transforms/DistributionOutlierTransform.java index 0e63329748..6163e63cc0 100644 --- a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/transforms/DistributionOutlierTransform.java +++ b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/transforms/DistributionOutlierTransform.java @@ -69,7 +69,7 @@ public Iterable apply( Iterable records = input.getValue(); Iterator iter = records.iterator(); List outputs = new ArrayList(); - Map points = new HashMap(); + try { DistributionServiceImpl distributionService = DistributionServiceImpl.init(spatialUrl); @@ -78,9 +78,10 @@ public Iterable apply( boolean hasEDL = edl.size() > 0 ? true : false; double distanceToEDL = hasEDL ? 0 : -1; // 0 -inside, -1: no EDL - // Available EDLD of this species + // Available EDLs of this species if (hasEDL) { + Map points = new HashMap(); while (iter.hasNext()) { IndexRecord record = iter.next(); DistributionOutlierRecord dr = convertToDistribution(record, distanceToEDL); @@ -103,10 +104,17 @@ public Iterable apply( .filter(it -> it.getId().equalsIgnoreCase(entry.getKey())) .forEach(it -> it.setDistanceOutOfEDL(entry.getValue())); } + } else { + while (iter.hasNext()) { + IndexRecord record = iter.next(); + DistributionOutlierRecord dr = convertToDistribution(record, distanceToEDL); + if (dr != null) { + outputs.add(dr); + } + } } } catch (ExpertDistributionException e) { log.error("Error in processing the species: " + lsid + " . Ignored"); - log.error("Points: " + points); log.error(e.getMessage()); } catch (Exception e) { log.error("Runtime error in processing the species: " + lsid + " . Ignored"); diff --git a/livingatlas/pipelines/src/main/java/au/org/ala/utils/ALAFsUtils.java b/livingatlas/pipelines/src/main/java/au/org/ala/utils/ALAFsUtils.java index 6eec743a1c..1e1bc5f161 100644 --- a/livingatlas/pipelines/src/main/java/au/org/ala/utils/ALAFsUtils.java +++ b/livingatlas/pipelines/src/main/java/au/org/ala/utils/ALAFsUtils.java @@ -72,10 +72,40 @@ public static String buildPathSamplingUsingTargetPath(AllDatasetsPipelinesOption } /** - * Build a path to outlier records. {fsPath}/pipelines-outlier/{datasetId} + * NOTE: It will delete the existing folder Build a path to outlier records. + * {fsPath}/pipelines-outlier/{datasetId} {fsPath}/pipelines-outlier/all + */ + public static String buildPathOutlierUsingTargetPath( + AllDatasetsPipelinesOptions options, boolean delete) throws IOException { + // default: {fsPath}/pipelines-outlier + FileSystem fs = + FileSystemFactory.getInstance(options.getHdfsSiteConfig(), options.getCoreSiteConfig()) + .getFs(options.getTargetPath()); + + String outputPath = PathBuilder.buildPath(options.getTargetPath()).toString(); + + // {fsPath}/pipelines-outlier/{datasetId} + if (options.getDatasetId() != null && !"all".equalsIgnoreCase(options.getDatasetId())) { + outputPath = PathBuilder.buildPath(outputPath, options.getDatasetId()).toString(); + } else { + // {fsPath}/pipelines-outlier/all + outputPath = PathBuilder.buildPath(outputPath, "all").toString(); + } + // delete previous runs + if (delete) + FsUtils.deleteIfExist(options.getHdfsSiteConfig(), options.getCoreSiteConfig(), outputPath); + else { + if (!exists(fs, outputPath)) ALAFsUtils.createDirectory(fs, outputPath); + } + + return outputPath; + } + + /** + * Get an output path to outlier records. {fsPath}/pipelines-outlier/{datasetId} * {fsPath}/pipelines-outlier/all */ - public static String buildPathOutlierUsingTargetPath(AllDatasetsPipelinesOptions options) + public static String getOutlierTargetPath(AllDatasetsPipelinesOptions options) throws IOException { // default: {fsPath}/pipelines-outlier FileSystem fs = @@ -91,9 +121,7 @@ public static String buildPathOutlierUsingTargetPath(AllDatasetsPipelinesOptions // {fsPath}/pipelines-outlier/all outputPath = PathBuilder.buildPath(outputPath, "all").toString(); } - // delete previous runs - FsUtils.deleteIfExist(options.getHdfsSiteConfig(), options.getCoreSiteConfig(), outputPath); - ALAFsUtils.createDirectory(fs, outputPath); + return outputPath; } diff --git a/livingatlas/pipelines/src/test/java/au/org/ala/outlier/DistributionOutlierTest.java b/livingatlas/pipelines/src/test/java/au/org/ala/outlier/DistributionOutlierTest.java index f51edcdbd2..ef111258db 100644 --- a/livingatlas/pipelines/src/test/java/au/org/ala/outlier/DistributionOutlierTest.java +++ b/livingatlas/pipelines/src/test/java/au/org/ala/outlier/DistributionOutlierTest.java @@ -87,13 +87,13 @@ public void url_as_id_outliers() { inPoint.put("decimalLongitude", 142.854663); points.put("e512c707-fe92-492c-b869-799c57388c45", inPoint); - //Map results = impl.outliers(URLEncoder.encode("urn:lsid:biodiversity.org.au:afd.taxon:0c3e2403-05c4-4a43-8019-30e6d657a283", StandardCharsets.UTF_8.toString()), points); - Map results = impl.outliers("https://id.biodiversity.org.au/node/apni/2908371", points); + // Map results = + // impl.outliers(URLEncoder.encode("urn:lsid:biodiversity.org.au:afd.taxon:0c3e2403-05c4-4a43-8019-30e6d657a283", StandardCharsets.UTF_8.toString()), points); + Map results = + impl.outliers("https://id.biodiversity.org.au/node/apni/2908371", points); assertSame(0, results.size()); } catch (Exception e) { e.printStackTrace(); } } - - }