diff --git a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/DistributionPipeline.java b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/DistributionPipeline.java index 9820df88a4..865bcc4a49 100644 --- a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/DistributionPipeline.java +++ b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/DistributionPipeline.java @@ -56,20 +56,11 @@ public static void main(String[] args) throws Exception { public static void run(DistributionPipelineOptions options) throws Exception { - FileSystem fs = - FileSystemFactory.getInstance(options.getHdfsSiteConfig(), options.getCoreSiteConfig()) - .getFs(options.getTargetPath()); - String outputPath = - PathBuilder.buildDatasetAttemptPath(options, "interpreted/distribution", false); - if (options.getDatasetId() == null || "all".equalsIgnoreCase(options.getDatasetId())) { - outputPath = - PathBuilder.buildPath(options.getAllDatasetsInputPath(), "distribution").toString(); - } + //Create output path + // default: {fsPath}/pipelines-outlier + // or {fsPath}/pipelines-outlier/{datasetId} + String outputPath = ALAFsUtils.buildPathOutlierUsingTargetPath(options); - // delete previous runs - FsUtils.deleteIfExist(options.getHdfsSiteConfig(), options.getCoreSiteConfig(), outputPath); - - ALAFsUtils.createDirectory(fs, outputPath); log.info("Adding step 1: Collecting index records"); Pipeline p = Pipeline.create(options); @@ -89,13 +80,7 @@ public static void run(DistributionPipelineOptions options) throws Exception { distributionTransform.calculateOutlier()) .apply("Flatten records", Flatten.iterables()); - // kvRecords.apply( "Write to - // file",AvroIO.write(ALADistributionRecord.class).to(outputPath+"/interpreted").withoutSharding().withSuffix(".avro")); - kvRecords - .apply("to String", distributionTransform.flatToString()) - .apply( - "Write to text", - TextIO.write().to(outputPath + "/interpreted").withoutSharding().withSuffix(".text")); + kvRecords.apply( "Write to file",AvroIO.write(ALADistributionRecord.class).to(outputPath+"/outliers").withoutSharding().withSuffix(".avro")); log.info("Running the pipeline"); PipelineResult result = p.run(); diff --git a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/IndexRecordToSolrPipeline.java b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/IndexRecordToSolrPipeline.java index d455544a37..5f35fde749 100644 --- a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/IndexRecordToSolrPipeline.java +++ b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/IndexRecordToSolrPipeline.java @@ -49,6 +49,8 @@ public class IndexRecordToSolrPipeline { JackKnifeOutlierRecord.newBuilder().setId(EMPTY).setItems(new ArrayList<>()).build(); static final Relationships nullClustering = Relationships.newBuilder().setId(EMPTY).setRelationships(new ArrayList<>()).build(); + static final ALADistributionRecord nullOutlier = + ALADistributionRecord.newBuilder().setId(EMPTY).build(); public static void main(String[] args) throws Exception { VersionInfo.print(); @@ -96,6 +98,13 @@ public static void run(SolrPipelineOptions options) { log.info("Skipping adding clustering to the index"); } + if (options.getIncludeOutlier()) { + log.info("Adding outlier to the index"); + indexRecordsCollection = addOutlierInfo(options, pipeline, indexRecordsCollection); + } else { + log.info("Skipping adding outlier to the index"); + } + PCollection> recordsWithoutCoordinates = null; if (options.getIncludeSampling()) { @@ -183,6 +192,8 @@ public KV apply(KV input) { writeToSolr(options, indexRecordsCollection, conn, schemaFields, dynamicFieldPrefixes); } + + log.info("Starting pipeline"); pipeline.run(options).waitUntilFinish(); @@ -273,6 +284,26 @@ private static PCollection> addClusteringInfo( return indexRecordJoinClustering.apply(ParDo.of(addClusteringInfo())); } + + private static PCollection> addOutlierInfo( + SolrPipelineOptions options, + Pipeline pipeline, + PCollection> indexRecords) { + + // Load outlier records, keyed on ID + PCollection> outlierRecords = + loadOutlierRecords(options, pipeline); + PCollection>> + indexRecordJoinOurlier = + Join.leftOuterJoin(indexRecords, outlierRecords, nullOutlier); + + + // Add Jackknife information + return indexRecordJoinOurlier.apply(ParDo.of(addOutlierInfo())); + } + + + private static void writeToSolr( SolrPipelineOptions options, PCollection> kvIndexRecords, @@ -526,6 +557,28 @@ public void processElement(ProcessContext c) { }; } + private static DoFn>, KV> + addOutlierInfo() { + + return new DoFn>, KV>() { + @ProcessElement + public void processElement(ProcessContext c) { + + KV> e = c.element(); + + IndexRecord indexRecord = e.getValue().getKey(); + String id = indexRecord.getId(); + + ALADistributionRecord outlierRecord = e.getValue().getValue(); + + indexRecord.getDoubles().put(DISTANCE_TO_EDL, outlierRecord.getDistanceOutOfEDL()); + + c.output(KV.of(id, indexRecord)); + } + }; + } + + /** Load index records from AVRO. */ private static PCollection> loadIndexRecords( SolrPipelineOptions options, Pipeline p) { @@ -592,4 +645,23 @@ public KV apply(Relationships input) { } })); } + + private static PCollection> loadOutlierRecords( + SolrPipelineOptions options, Pipeline p) { + String path = + PathBuilder.buildPath( + options.getAllDatasetsInputPath() + "/distribution/", "distribution*" + AVRO_EXTENSION) + .toString(); + log.info("Loading outlier from {}", path); + + return p.apply(AvroIO.read(ALADistributionRecord.class).from(path)) + .apply( + MapElements.via( + new SimpleFunction>() { + @Override + public KV apply(ALADistributionRecord input) { + return KV.of(input.getId(), input); + } + })); + } } diff --git a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/options/SolrPipelineOptions.java b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/options/SolrPipelineOptions.java index 346ebb5f7e..6772ec6941 100644 --- a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/options/SolrPipelineOptions.java +++ b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/options/SolrPipelineOptions.java @@ -59,10 +59,15 @@ public interface SolrPipelineOptions extends IndexingPipelineOptions { void setIncludeClustering(Boolean includeClustering); + @Description("Include distance to expert distribution layers") + @Default.Boolean(false) + Boolean getIncludeOutlier(); + + void setIncludeOutlier(Boolean includeOutlier); + @Description("Path to clustering avro files") @Default.String("/data/pipelines-clustering") String getClusteringPath(); - void setClusteringPath(String clusteringPath); @Description("Number of partitions to use") diff --git a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/transforms/IndexFields.java b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/transforms/IndexFields.java index 4f6f337dc3..32079da7bd 100644 --- a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/transforms/IndexFields.java +++ b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/transforms/IndexFields.java @@ -18,6 +18,7 @@ public interface IndexFields { String DATE_PRECISION = "datePrecision"; String DECADE = "decade"; String DEFAULT_VALUES_USED = "defaultValuesUsed"; + String DISTANCE_TO_EDL = "distanceToEDL"; String DUPLICATE_JUSTIFICATION = "duplicateJustification"; String DUPLICATE_STATUS = "duplicateStatus"; String DUPLICATE_TYPE = "duplicateType"; diff --git a/livingatlas/pipelines/src/main/java/au/org/ala/utils/ALAFsUtils.java b/livingatlas/pipelines/src/main/java/au/org/ala/utils/ALAFsUtils.java index dd63a8f24c..f7e2cf04d9 100644 --- a/livingatlas/pipelines/src/main/java/au/org/ala/utils/ALAFsUtils.java +++ b/livingatlas/pipelines/src/main/java/au/org/ala/utils/ALAFsUtils.java @@ -71,6 +71,28 @@ public static String buildPathSamplingUsingTargetPath(AllDatasetsPipelinesOption return PathBuilder.buildDatasetAttemptPath(options, "sampling", false); } + /** Build a path to outlier records. */ + public static String buildPathOutlierUsingTargetPath(AllDatasetsPipelinesOptions options) throws IOException { + // default: {fsPath}/pipelines-outlier + FileSystem fs = + FileSystemFactory.getInstance(options.getHdfsSiteConfig(), options.getCoreSiteConfig()) + .getFs(options.getTargetPath()); + + String outputPath = PathBuilder.buildPath(options.getTargetPath()).toString(); + + // {fsPath}/pipelines-outlier/{datasetId} + if (options.getDatasetId() != null && !"all".equalsIgnoreCase(options.getDatasetId())) { + outputPath = + PathBuilder.buildPath(outputPath, options.getDatasetId()).toString(); + } + // delete previous runs + FsUtils.deleteIfExist(options.getHdfsSiteConfig(), options.getCoreSiteConfig(), outputPath); + ALAFsUtils.createDirectory(fs, outputPath); + + return outputPath; + } + + /** * Removes a directory with content if the folder exists * diff --git a/sdks/models/src/main/avro/specific/ala-distribution-record.avsc b/sdks/models/src/main/avro/specific/ala-distribution-record.avsc index 3fa613ed8f..7c52a0c521 100644 --- a/sdks/models/src/main/avro/specific/ala-distribution-record.avsc +++ b/sdks/models/src/main/avro/specific/ala-distribution-record.avsc @@ -6,10 +6,10 @@ "fields":[ {"name": "id", "type": "string", "doc": "Pipelines identifier"}, {"name" : "occurrenceID", "type" : ["null", "string"], "default" : null, "doc" : "Occurrence identifier"}, - {"name" : "distanceOutOfEDL", "type" : [ "double"], "default" : -1, ",doc" : "Distance to the expert distribution layers. -1: no EDL, 0: in EDL, >0 =out of EDL" }, + {"name" : "distanceOutOfEDL", "type" : "double", "default" : -1, ",doc" : "Distance to the expert distribution layers. -1: no EDL, 0: in EDL, >0 =out of EDL" }, {"name": "decimalLatitude","type":["null","double"],"default":null, "doc":"http://rs.tdwg.org/dwc/terms/decimalLatitude"}, {"name": "decimalLongitude","type":["null","double"],"default":null, "doc":"http://rs.tdwg.org/dwc/terms/decimalLongitude"}, - {"name": "speciesID","type":"string", "doc":"http://rs.tdwg.org/dwc/terms/taxonConceptID"}, + {"name": "speciesID","type":"string", "default": "","doc":"http://rs.tdwg.org/dwc/terms/taxonConceptID"}, {"name": "issues", "type": "IssueRecord", "default":{}} ] }