diff --git a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/DistributionOutlierPipeline.java b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/DistributionOutlierPipeline.java index c17c62456a..79971ab91e 100644 --- a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/DistributionOutlierPipeline.java +++ b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/DistributionOutlierPipeline.java @@ -1,6 +1,6 @@ package au.org.ala.pipelines.beam; -import au.org.ala.pipelines.options.DistributionPipelineOptions; +import au.org.ala.pipelines.options.DistributionOutlierPipelineOptions; import au.org.ala.pipelines.transforms.DistributionOutlierTransform; import au.org.ala.pipelines.util.VersionInfo; import au.org.ala.utils.ALAFsUtils; @@ -37,10 +37,10 @@ public class DistributionOutlierPipeline { public static void main(String[] args) throws Exception { VersionInfo.print(); CombinedYamlConfiguration conf = new CombinedYamlConfiguration(args); - String[] combinedArgs = conf.toArgs("general", "distribution"); + String[] combinedArgs = conf.toArgs("general", "outlier"); - DistributionPipelineOptions options = - PipelinesOptionsFactory.create(DistributionPipelineOptions.class, combinedArgs); + DistributionOutlierPipelineOptions options = + PipelinesOptionsFactory.create(DistributionOutlierPipelineOptions.class, combinedArgs); MDC.put("datasetId", options.getDatasetId()); MDC.put("attempt", options.getAttempt().toString()); MDC.put("step", "DISTRIBUTION"); @@ -49,14 +49,13 @@ public static void main(String[] args) throws Exception { System.exit(0); } - public static void run(DistributionPipelineOptions options) throws Exception { + public static void run(DistributionOutlierPipelineOptions options) throws Exception { - //Create output path + // Create output path // default: {fsPath}/pipelines-outlier // or {fsPath}/pipelines-outlier/{datasetId} String outputPath = ALAFsUtils.buildPathOutlierUsingTargetPath(options); - log.info("Adding step 1: Collecting index records"); Pipeline p = Pipeline.create(options); @@ -75,7 +74,12 @@ public static void run(DistributionPipelineOptions options) throws Exception { distributionTransform.calculateOutlier()) .apply("Flatten records", Flatten.iterables()); - kvRecords.apply( "Write to file",AvroIO.write(DistributionOutlierRecord.class).to(outputPath+"/outliers").withoutSharding().withSuffix(".avro")); + kvRecords.apply( + "Write to file", + AvroIO.write(DistributionOutlierRecord.class) + .to(outputPath + "/outliers") + .withoutSharding() + .withSuffix(".avro")); log.info("Running the pipeline"); PipelineResult result = p.run(); diff --git a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/IndexRecordToSolrPipeline.java b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/IndexRecordToSolrPipeline.java index a9b3338f87..8225691b87 100644 --- a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/IndexRecordToSolrPipeline.java +++ b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/IndexRecordToSolrPipeline.java @@ -50,7 +50,7 @@ public class IndexRecordToSolrPipeline { static final Relationships nullClustering = Relationships.newBuilder().setId(EMPTY).setRelationships(new ArrayList<>()).build(); static final DistributionOutlierRecord nullOutlier = - DistributionOutlierRecord.newBuilder().setId(EMPTY).build(); + DistributionOutlierRecord.newBuilder().setId(EMPTY).build(); public static void main(String[] args) throws Exception { VersionInfo.print(); @@ -192,8 +192,6 @@ public KV apply(KV input) { writeToSolr(options, indexRecordsCollection, conn, schemaFields, dynamicFieldPrefixes); } - - log.info("Starting pipeline"); pipeline.run(options).waitUntilFinish(); @@ -284,26 +282,21 @@ private static PCollection> addClusteringInfo( return indexRecordJoinClustering.apply(ParDo.of(addClusteringInfo())); } - private static PCollection> addOutlierInfo( - SolrPipelineOptions options, - Pipeline pipeline, - PCollection> indexRecords) { + SolrPipelineOptions options, + Pipeline pipeline, + PCollection> indexRecords) { // Load outlier records, keyed on ID PCollection> outlierRecords = - loadOutlierRecords(options, pipeline); - PCollection>> - indexRecordJoinOurlier = - Join.leftOuterJoin(indexRecords, outlierRecords, nullOutlier); - + loadOutlierRecords(options, pipeline); + PCollection>> indexRecordJoinOurlier = + Join.leftOuterJoin(indexRecords, outlierRecords, nullOutlier); // Add Jackknife information return indexRecordJoinOurlier.apply(ParDo.of(addOutlierInfo())); } - - private static void writeToSolr( SolrPipelineOptions options, PCollection> kvIndexRecords, @@ -557,10 +550,12 @@ public void processElement(ProcessContext c) { }; } - private static DoFn>, KV> - addOutlierInfo() { + private static DoFn< + KV>, KV> + addOutlierInfo() { - return new DoFn>, KV>() { + return new DoFn< + KV>, KV>() { @ProcessElement public void processElement(ProcessContext c) { @@ -578,7 +573,6 @@ public void processElement(ProcessContext c) { }; } - /** Load index records from AVRO. */ private static PCollection> loadIndexRecords( SolrPipelineOptions options, Pipeline p) { @@ -647,21 +641,20 @@ public KV apply(Relationships input) { } private static PCollection> loadOutlierRecords( - SolrPipelineOptions options, Pipeline p) { - String path = - PathBuilder.buildPath( - options.getAllDatasetsInputPath() + "/distribution/", "distribution*" + AVRO_EXTENSION) - .toString(); + SolrPipelineOptions options, Pipeline p) { + String path = PathBuilder.buildPath(options.getOutlierPath(), "*" + AVRO_EXTENSION).toString(); log.info("Loading outlier from {}", path); return p.apply(AvroIO.read(DistributionOutlierRecord.class).from(path)) - .apply( - MapElements.via( - new SimpleFunction>() { - @Override - public KV apply(DistributionOutlierRecord input) { - return KV.of(input.getId(), input); - } - })); + .apply( + MapElements.via( + new SimpleFunction< + DistributionOutlierRecord, KV>() { + @Override + public KV apply( + DistributionOutlierRecord input) { + return KV.of(input.getId(), input); + } + })); } } diff --git a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/options/DistributionPipelineOptions.java b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/options/DistributionOutlierPipelineOptions.java similarity index 51% rename from livingatlas/pipelines/src/main/java/au/org/ala/pipelines/options/DistributionPipelineOptions.java rename to livingatlas/pipelines/src/main/java/au/org/ala/pipelines/options/DistributionOutlierPipelineOptions.java index ae04f8715b..381e7abdbe 100644 --- a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/options/DistributionPipelineOptions.java +++ b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/options/DistributionOutlierPipelineOptions.java @@ -6,23 +6,11 @@ /** * Options for pipelines that run against spatial service for Expert distribution layer calculation. */ -public interface DistributionPipelineOptions extends AllDatasetsPipelinesOptions { +public interface DistributionOutlierPipelineOptions extends AllDatasetsPipelinesOptions { @Description("Base URL for Spatial service") @Default.String("https://spatial.ala.org.au/ws/") String getBaseUrl(); void setBaseUrl(String baseUrl); - - @Description("Default batch size") - @Default.Integer(25000) - Integer getBatchSize(); - - void setBatchSize(Integer batchSize); - - @Description("Keep download sampling CSVs") - @Default.Integer(1000) - Integer getBatchStatusSleepTime(); - - void setBatchStatusSleepTime(Integer batchStatusSleepTime); } diff --git a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/options/SolrPipelineOptions.java b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/options/SolrPipelineOptions.java index 6772ec6941..b9f689d553 100644 --- a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/options/SolrPipelineOptions.java +++ b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/options/SolrPipelineOptions.java @@ -68,8 +68,15 @@ public interface SolrPipelineOptions extends IndexingPipelineOptions { @Description("Path to clustering avro files") @Default.String("/data/pipelines-clustering") String getClusteringPath(); + void setClusteringPath(String clusteringPath); + @Description("Path to outlier avro files") + @Default.String("/data/pipelines-outlier") + String getOutlierPath(); + + void setOutlierPath(String outlierPath); + @Description("Number of partitions to use") @Default.Integer(1) Integer getNumOfPartitions(); diff --git a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/transforms/DistributionOutlierTransform.java b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/transforms/DistributionOutlierTransform.java index 6b1ccbc476..44ca5771ef 100644 --- a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/transforms/DistributionOutlierTransform.java +++ b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/transforms/DistributionOutlierTransform.java @@ -14,7 +14,8 @@ import org.gbif.pipelines.io.avro.*; import org.gbif.pipelines.transforms.Transform; -public class DistributionOutlierTransform extends Transform { +public class DistributionOutlierTransform + extends Transform { private String spatialUrl; @@ -56,9 +57,11 @@ public MapElements> toKv() { public MapElements>, Iterable> calculateOutlier() { return MapElements.via( - (new SimpleFunction>, Iterable>() { + (new SimpleFunction< + KV>, Iterable>() { @Override - public Iterable apply(KV> input) { + public Iterable apply( + KV> input) { String lsid = input.getKey(); Iterable records = input.getValue(); Iterator iter = records.iterator(); diff --git a/livingatlas/pipelines/src/main/java/au/org/ala/utils/ALAFsUtils.java b/livingatlas/pipelines/src/main/java/au/org/ala/utils/ALAFsUtils.java index f7e2cf04d9..16deb1b6a9 100644 --- a/livingatlas/pipelines/src/main/java/au/org/ala/utils/ALAFsUtils.java +++ b/livingatlas/pipelines/src/main/java/au/org/ala/utils/ALAFsUtils.java @@ -72,18 +72,18 @@ public static String buildPathSamplingUsingTargetPath(AllDatasetsPipelinesOption } /** Build a path to outlier records. */ - public static String buildPathOutlierUsingTargetPath(AllDatasetsPipelinesOptions options) throws IOException { + public static String buildPathOutlierUsingTargetPath(AllDatasetsPipelinesOptions options) + throws IOException { // default: {fsPath}/pipelines-outlier FileSystem fs = - FileSystemFactory.getInstance(options.getHdfsSiteConfig(), options.getCoreSiteConfig()) - .getFs(options.getTargetPath()); + FileSystemFactory.getInstance(options.getHdfsSiteConfig(), options.getCoreSiteConfig()) + .getFs(options.getTargetPath()); - String outputPath = PathBuilder.buildPath(options.getTargetPath()).toString(); + String outputPath = PathBuilder.buildPath(options.getTargetPath()).toString(); // {fsPath}/pipelines-outlier/{datasetId} if (options.getDatasetId() != null && !"all".equalsIgnoreCase(options.getDatasetId())) { - outputPath = - PathBuilder.buildPath(outputPath, options.getDatasetId()).toString(); + outputPath = PathBuilder.buildPath(outputPath, options.getDatasetId()).toString(); } // delete previous runs FsUtils.deleteIfExist(options.getHdfsSiteConfig(), options.getCoreSiteConfig(), outputPath); @@ -92,7 +92,6 @@ public static String buildPathOutlierUsingTargetPath(AllDatasetsPipelinesOptions return outputPath; } - /** * Removes a directory with content if the folder exists * diff --git a/livingatlas/pipelines/src/test/java/au/org/ala/distribution/DistributionTest.java b/livingatlas/pipelines/src/test/java/au/org/ala/outlier/DistributionOutlierTest.java similarity index 94% rename from livingatlas/pipelines/src/test/java/au/org/ala/distribution/DistributionTest.java rename to livingatlas/pipelines/src/test/java/au/org/ala/outlier/DistributionOutlierTest.java index c344a89390..e586e8832e 100644 --- a/livingatlas/pipelines/src/test/java/au/org/ala/distribution/DistributionTest.java +++ b/livingatlas/pipelines/src/test/java/au/org/ala/outlier/DistributionOutlierTest.java @@ -1,7 +1,9 @@ -package au.org.ala.distribution; +package au.org.ala.outlier; import static org.junit.Assert.*; +import au.org.ala.distribution.DistributionLayer; +import au.org.ala.distribution.DistributionServiceImpl; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -11,7 +13,7 @@ * Tests ported from * https://github.com/AtlasOfLivingAustralia/biocache-store/blob/master/src/test/scala/au/org/ala/biocache/DistanceRangeParserTest.scala */ -public class DistributionTest { +public class DistributionOutlierTest { String spatial_url = "http://devt.ala.org.au:8080/ws/"; // String spatial_url = "https://spatial.ala.org.au/ws/";