diff --git a/livingatlas/configs/la-pipelines.yaml b/livingatlas/configs/la-pipelines.yaml index d7b96579de..eea06647e1 100644 --- a/livingatlas/configs/la-pipelines.yaml +++ b/livingatlas/configs/la-pipelines.yaml @@ -172,7 +172,7 @@ sampling: outlier: appName: Expert distribution outliers for {datasetId} baseUrl: https://spatial.ala.org.au/ws/ - targetPath: &outlierTargetPath '{fsPath}/pipelines-outlier' + targetPath: '{fsPath}/pipelines-outlier' allDatasetsInputPath: '{fsPath}/pipelines-all-datasets' runner: SparkRunner @@ -201,7 +201,7 @@ solr: allDatasetsInputPath: '{fsPath}/pipelines-all-datasets' jackKnifePath: "{fsPath}/pipelines-jackknife" clusteringPath: "{fsPath}/pipelines-clustering" - outlierPath: *outlierTargetPath + outlierPath: '{fsPath}/pipelines-outlier' solrCollection: biocache includeSampling: false includeJackKnife: false diff --git a/livingatlas/pipelines/src/main/java/au/org/ala/distribution/DistributionService.java b/livingatlas/pipelines/src/main/java/au/org/ala/distribution/DistributionService.java index cd4c641a79..7f46738044 100644 --- a/livingatlas/pipelines/src/main/java/au/org/ala/distribution/DistributionService.java +++ b/livingatlas/pipelines/src/main/java/au/org/ala/distribution/DistributionService.java @@ -24,10 +24,4 @@ Call> getLayersByLsid( @POST("distribution/outliers/{id}") Call> outliers( @Path("id") String lsid, @Body Map> points); - - // @FormUrlEncoded - // @POST("distribution/outliers/{id}") - // Call outliers(@Path(value="id",encoded = true) String lsid, - // @Field("pointsJson") Map> points); - } diff --git a/livingatlas/pipelines/src/main/java/au/org/ala/distribution/DistributionServiceImpl.java b/livingatlas/pipelines/src/main/java/au/org/ala/distribution/DistributionServiceImpl.java index b0b3271e0c..e6d24d0f9e 100644 --- a/livingatlas/pipelines/src/main/java/au/org/ala/distribution/DistributionServiceImpl.java +++ b/livingatlas/pipelines/src/main/java/au/org/ala/distribution/DistributionServiceImpl.java @@ -25,14 +25,17 @@ private DistributionServiceImpl(String baseUrl) { service = retrofit.create(DistributionService.class); } + /** + * Init Distribution outlier service + * + * @param baseUrl Url of Spatial service + * @return + */ public static DistributionServiceImpl init(String baseUrl) { - // set up sampling service return new DistributionServiceImpl(baseUrl); } public List getLayers() throws IOException, ExpertDistributionException { - // Response> response = - // distributionService.getLayersByLsid("urn:lsid:biodiversity.org.au:afd.taxon:4f3a5260-4f39-4393-a644-4d05b1c45f92", "false").execute(); Response> response = service.getLayers().execute(); int code = response.code(); diff --git a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/DistributionOutlierPipeline.java b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/DistributionOutlierPipeline.java index b59141f55b..5f56d5fd2a 100644 --- a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/DistributionOutlierPipeline.java +++ b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/DistributionOutlierPipeline.java @@ -11,8 +11,10 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.AvroIO; +import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.transforms.*; import org.apache.beam.sdk.values.PCollection; +import org.apache.commons.lang3.StringUtils; import org.gbif.pipelines.common.beam.options.PipelinesOptionsFactory; import org.gbif.pipelines.io.avro.*; import org.slf4j.MDC; @@ -69,6 +71,13 @@ public static void run(DistributionOutlierPipelineOptions options) throws Except log.info("Adding step 2: calculating outliers index"); PCollection kvRecords = indexRecords + .apply( + "Filter out records without id/species/taxon/location", + Filter.by( + it -> + !StringUtils.isEmpty(it.getTaxonID()) + && !StringUtils.isEmpty(it.getLatLng()) + && !StringUtils.isEmpty(it.getId()))) .apply("Key by species", distributionTransform.toKv()) .apply("Grouping by species", GroupByKey.create()) .apply( @@ -84,12 +93,14 @@ public static void run(DistributionOutlierPipelineOptions options) throws Except .to(outputPath + "/outlier") .withoutSharding() .withSuffix(".avro")); - - // kvRecords - // .apply("to String", distributionTransform.flatToString()) - // .apply( - // "Write to text", - // TextIO.write().to(outputPath+"/outlier").withoutSharding().withSuffix(".txt")); + // Checking purpose. + if (System.getenv("test") != null) { + kvRecords + .apply("to String", distributionTransform.flatToString()) + .apply( + "Write to text", + TextIO.write().to(outputPath + "/outlier").withoutSharding().withSuffix(".txt")); + } log.info("Running the pipeline"); PipelineResult result = p.run(); diff --git a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/interpreters/DistributionOutlierInterpreter.java b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/interpreters/DistributionOutlierInterpreter.java index ed7b524b52..f4376c2464 100644 --- a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/interpreters/DistributionOutlierInterpreter.java +++ b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/interpreters/DistributionOutlierInterpreter.java @@ -20,7 +20,7 @@ public class DistributionOutlierInterpreter { public static void interpretOccurrenceID(IndexRecord ir, DistributionOutlierRecord dr) { - dr.setOccurrenceID(ir.getId()); + dr.setId(ir.getId()); } public static void interpretLocation(IndexRecord ir, DistributionOutlierRecord dr) { @@ -40,7 +40,7 @@ public static void interpretSpeciesId(IndexRecord ir, DistributionOutlierRecord public static void interpretOccurrenceID(ExtendedRecord er, DistributionOutlierRecord dr) { String value = extractNullAwareValue(er, DwcTerm.occurrenceID); if (!Strings.isNullOrEmpty(value)) { - dr.setOccurrenceID(value); + dr.setId(value); } } diff --git a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/transforms/DistributionOutlierTransform.java b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/transforms/DistributionOutlierTransform.java index 5a8b9a5e00..be5f2aedca 100644 --- a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/transforms/DistributionOutlierTransform.java +++ b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/transforms/DistributionOutlierTransform.java @@ -10,6 +10,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.beam.sdk.transforms.*; import org.apache.beam.sdk.values.*; +import org.apache.commons.lang3.StringUtils; import org.gbif.pipelines.core.functions.SerializableConsumer; import org.gbif.pipelines.core.interpreters.Interpretation; import org.gbif.pipelines.io.avro.*; @@ -80,12 +81,13 @@ public Iterable apply( while (iter.hasNext()) { IndexRecord record = iter.next(); DistributionOutlierRecord dr = convertToDistribution(record, distanceToEDL); - outputs.add(dr); - - Map point = new HashMap(); - point.put("decimalLatitude", dr.getDecimalLatitude()); - point.put("decimalLongitude", dr.getDecimalLongitude()); - points.put(dr.getOccurrenceID(), point); + if (dr != null) { + outputs.add(dr); + Map point = new HashMap(); + point.put("decimalLatitude", dr.getDecimalLatitude()); + point.put("decimalLongitude", dr.getDecimalLongitude()); + points.put(dr.getId(), point); + } } if (hasEDL) { @@ -94,7 +96,7 @@ public Iterable apply( while (iterator.hasNext()) { Map.Entry entry = iterator.next(); StreamSupport.stream(outputs.spliterator(), false) - .filter(it -> it.getOccurrenceID().equalsIgnoreCase(entry.getKey())) + .filter(it -> it.getId().equalsIgnoreCase(entry.getKey())) .forEach(it -> it.setDistanceOutOfEDL(entry.getValue())); } } @@ -131,26 +133,34 @@ public MapElements flatToString() { */ private DistributionOutlierRecord convertToDistribution( IndexRecord record, double distanceToEDL) { - DistributionOutlierRecord newRecord = - DistributionOutlierRecord.newBuilder() - .setId(record.getId()) - .setOccurrenceID(record.getId()) - .setSpeciesID(record.getTaxonID()) - .setDistanceOutOfEDL(distanceToEDL) - .build(); - - String latlng = record.getLatLng(); - String[] coordinates = latlng.split(","); - newRecord.setDecimalLatitude(Double.parseDouble(coordinates[0])); - newRecord.setDecimalLongitude(Double.parseDouble(coordinates[1])); - - return newRecord; + try { + if (!StringUtils.isEmpty(record.getId()) + && !StringUtils.isEmpty(record.getTaxonID()) + && !StringUtils.isEmpty(record.getLatLng())) { + DistributionOutlierRecord newRecord = + DistributionOutlierRecord.newBuilder() + .setId(record.getId()) + .setSpeciesID(record.getTaxonID()) + .setDistanceOutOfEDL(distanceToEDL) + .build(); + + String latlng = record.getLatLng(); + String[] coordinates = latlng.split(","); + newRecord.setDecimalLatitude(Double.parseDouble(coordinates[0])); + newRecord.setDecimalLongitude(Double.parseDouble(coordinates[1])); + + return newRecord; + } + } catch (Exception ex) { + log.debug(record.getId() + " does not have lat/lng or taxon. ignored.."); + } + return null; } private String convertRecordToString(DistributionOutlierRecord record) { return String.format( "occurrenceId:%s, lat:%f, lng:%f, speciesId:%s, distanceToEDL:%f", - record.getOccurrenceID(), + record.getId(), record.getDecimalLatitude(), record.getDecimalLongitude(), record.getSpeciesID(), diff --git a/livingatlas/pipelines/src/main/java/au/org/ala/utils/ALAFsUtils.java b/livingatlas/pipelines/src/main/java/au/org/ala/utils/ALAFsUtils.java index aaa5630f7f..6eec743a1c 100644 --- a/livingatlas/pipelines/src/main/java/au/org/ala/utils/ALAFsUtils.java +++ b/livingatlas/pipelines/src/main/java/au/org/ala/utils/ALAFsUtils.java @@ -309,14 +309,10 @@ public static PCollection loadIndexRecords( if (dataResourceFolder == null || "all".equalsIgnoreCase(dataResourceFolder)) { dataResourceFolder = "*"; } - return p.apply( - AvroIO.read(IndexRecord.class) - .from( - String.join( - "/", - options.getAllDatasetsInputPath(), - "index-record", - dataResourceFolder, - "*.avro"))); + String dataSource = + String.join( + "/", options.getAllDatasetsInputPath(), "index-record", dataResourceFolder, "*.avro"); + log.info("Loading index records from: " + dataSource); + return p.apply(AvroIO.read(IndexRecord.class).from(dataSource)); } } diff --git a/sdks/models/src/main/avro/specific/distribution-outlier-record.avsc b/sdks/models/src/main/avro/specific/distribution-outlier-record.avsc index 9e868f13a0..322efef95d 100644 --- a/sdks/models/src/main/avro/specific/distribution-outlier-record.avsc +++ b/sdks/models/src/main/avro/specific/distribution-outlier-record.avsc @@ -4,12 +4,11 @@ "type":"record", "doc":"ALA Expert Distribution Layer data information", "fields":[ - {"name": "id", "type": "string", "doc": "Pipelines identifier"}, - {"name" : "occurrenceID", "type" : ["null", "string"], "default" : null, "doc" : "Occurrence identifier"}, + {"name": "id", "type": "string", "doc": "Occurrence identifier"}, {"name" : "distanceOutOfEDL", "type" : "double", "default" : -1, ",doc" : "Distance to the expert distribution layers. -1: no EDL, 0: in EDL, >0 =out of EDL" }, {"name": "decimalLatitude","type":["null","double"],"default":null, "doc":"http://rs.tdwg.org/dwc/terms/decimalLatitude"}, {"name": "decimalLongitude","type":["null","double"],"default":null, "doc":"http://rs.tdwg.org/dwc/terms/decimalLongitude"}, - {"name": "speciesID","type":"string", "default": "","doc":"http://rs.tdwg.org/dwc/terms/taxonConceptID"}, + {"name": "speciesID","type": ["null", "string"], "default": null,"doc":"http://rs.tdwg.org/dwc/terms/taxonConceptID"}, {"name": "issues", "type": "IssueRecord", "default":{}} ] }