From 2b2539f85e294644bc13c21199c345013d3cebf1 Mon Sep 17 00:00:00 2001 From: Qifeng Date: Thu, 25 Nov 2021 09:24:43 +1100 Subject: [PATCH] #622 init(distribution from indexrecord) --- .../ALADistributionInterpreter.java | 65 ++++++++ .../options/DistributionPipelineOptions.java | 28 ++++ .../transforms/ALADistributionTransform.java | 150 ++++++++++++++++++ .../specific/ala-distribution-record.avsc | 15 ++ 4 files changed, 258 insertions(+) create mode 100644 livingatlas/pipelines/src/main/java/au/org/ala/pipelines/interpreters/ALADistributionInterpreter.java create mode 100644 livingatlas/pipelines/src/main/java/au/org/ala/pipelines/options/DistributionPipelineOptions.java create mode 100644 livingatlas/pipelines/src/main/java/au/org/ala/pipelines/transforms/ALADistributionTransform.java create mode 100644 sdks/models/src/main/avro/specific/ala-distribution-record.avsc diff --git a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/interpreters/ALADistributionInterpreter.java b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/interpreters/ALADistributionInterpreter.java new file mode 100644 index 0000000000..eddd2abe32 --- /dev/null +++ b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/interpreters/ALADistributionInterpreter.java @@ -0,0 +1,65 @@ +package au.org.ala.pipelines.interpreters; + +import static org.gbif.pipelines.core.utils.ModelUtils.*; + +import au.org.ala.pipelines.parser.CoordinatesParser; +import com.google.common.base.Strings; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import org.gbif.dwc.terms.DwcTerm; +import org.gbif.kvs.geocode.LatLng; +import org.gbif.pipelines.core.parsers.common.ParsedField; +import org.gbif.pipelines.io.avro.ALADistributionRecord; +import org.gbif.pipelines.io.avro.ExtendedRecord; +import org.gbif.pipelines.io.avro.IndexRecord; +import org.gbif.pipelines.io.avro.Record; + +/* + * living atlases. + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class ALADistributionInterpreter { + + public static void interpretOccurrenceID(IndexRecord ir, ALADistributionRecord dr) { + dr.setOccurrenceID(ir.getId()); + } + + public static void interpretLocation(IndexRecord ir, ALADistributionRecord dr) { + String latlng = ir.getLatLng(); + String[] coordinates = latlng.split(","); + dr.setDecimalLatitude(Double.parseDouble(coordinates[0])); + dr.setDecimalLongitude(Double.parseDouble(coordinates[1])); + } + + public static void interpretSpeciesId(IndexRecord ir, ALADistributionRecord dr) { + dr.setSpeciesID(ir.getTaxonID()); + } + + /* + * Interprete from verbatim + */ + public static void interpretOccurrenceID(ExtendedRecord er, ALADistributionRecord dr) { + String value = extractNullAwareValue(er, DwcTerm.occurrenceID); + if (!Strings.isNullOrEmpty(value)) { + dr.setOccurrenceID(value); + } + } + + public static void interpretLocation(ExtendedRecord er, ALADistributionRecord dr) { + ParsedField parsedLatLon = CoordinatesParser.parseCoords(er); + addIssue(dr, parsedLatLon.getIssues()); + + if (parsedLatLon.isSuccessful()) { + LatLng latlng = parsedLatLon.getResult(); + dr.setDecimalLatitude(latlng.getLatitude()); + dr.setDecimalLongitude(latlng.getLongitude()); + } + } + + public static void interpretSpeciesId(ExtendedRecord er, ALADistributionRecord dr) { + String value = extractNullAwareValue(er, DwcTerm.taxonConceptID); + if (!Strings.isNullOrEmpty(value)) { + dr.setSpeciesID(value); + } + } +} diff --git a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/options/DistributionPipelineOptions.java b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/options/DistributionPipelineOptions.java new file mode 100644 index 0000000000..ae04f8715b --- /dev/null +++ b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/options/DistributionPipelineOptions.java @@ -0,0 +1,28 @@ +package au.org.ala.pipelines.options; + +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; + +/** + * Options for pipelines that run against spatial service for Expert distribution layer calculation. + */ +public interface DistributionPipelineOptions extends AllDatasetsPipelinesOptions { + + @Description("Base URL for Spatial service") + @Default.String("https://spatial.ala.org.au/ws/") + String getBaseUrl(); + + void setBaseUrl(String baseUrl); + + @Description("Default batch size") + @Default.Integer(25000) + Integer getBatchSize(); + + void setBatchSize(Integer batchSize); + + @Description("Keep download sampling CSVs") + @Default.Integer(1000) + Integer getBatchStatusSleepTime(); + + void setBatchStatusSleepTime(Integer batchStatusSleepTime); +} diff --git a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/transforms/ALADistributionTransform.java b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/transforms/ALADistributionTransform.java new file mode 100644 index 0000000000..7410619ee0 --- /dev/null +++ b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/transforms/ALADistributionTransform.java @@ -0,0 +1,150 @@ +package au.org.ala.pipelines.transforms; + +import au.org.ala.distribution.DistributionServiceImpl; +import au.org.ala.distribution.DistributionLayer; +import au.org.ala.distribution.ExpertDistributionException; +import au.org.ala.pipelines.common.ALARecordTypes; +import au.org.ala.pipelines.interpreters.ALADistributionInterpreter; +import java.util.*; +import java.util.stream.StreamSupport; +import org.apache.beam.sdk.transforms.*; +import org.apache.beam.sdk.values.*; +import org.gbif.pipelines.core.functions.SerializableConsumer; +import org.gbif.pipelines.core.interpreters.Interpretation; +import org.gbif.pipelines.io.avro.*; +import org.gbif.pipelines.transforms.Transform; + +public class ALADistributionTransform extends Transform { + + private String spatialUrl; + + public ALADistributionTransform(String spatialUrl) { + super( + ALADistributionRecord.class, + ALARecordTypes.ALA_DISTRIBUTION, + ALADistributionTransform.class.getName(), + "alaDistributionCount"); + this.spatialUrl = spatialUrl; + } + + /** Beam @Setup initializes resources */ + @Setup + public void setup() {} + + public ALADistributionTransform counterFn(SerializableConsumer counterFn) { + setCounterFn(counterFn); + return this; + } + + @Override + public Optional convert(IndexRecord source) { + ALADistributionRecord dr = + ALADistributionRecord.newBuilder().setId(source.getId()).setSpeciesID("").build(); + return Interpretation.from(source) + .to(dr) + .via(ALADistributionInterpreter::interpretOccurrenceID) + .via(ALADistributionInterpreter::interpretLocation) + .via(ALADistributionInterpreter::interpretSpeciesId) + .getOfNullable(); + } + + + /** + * Maps {@link ALADistributionRecord} to key value, where key is {@link + * ALADistributionRecord#getSpeciesID()} + */ + public MapElements> toKv() { + return MapElements.into(new TypeDescriptor>() {}) + .via((ALADistributionRecord dr) -> KV.of(dr.getSpeciesID(), dr)); + } + + public MapElements>, Iterable> + calculateOutlier() { + return MapElements.via( + (new SimpleFunction< + KV>, Iterable>() { + @Override + public Iterable apply( + KV> input) { + String lsid = input.getKey(); + Iterable records = input.getValue(); + Iterator iter = records.iterator(); + + try { + DistributionServiceImpl distributionService = DistributionServiceImpl.init(spatialUrl); + List edl = distributionService.findLayersByLsid(lsid); + // Available EDLD of this species + if (edl.size() > 0) { + // Duplicate records because Transform does not allow change input + List outputs = new ArrayList(); + Map points = new HashMap(); + while (iter.hasNext()) { + ALADistributionRecord record = iter.next(); + outputs.add(copy(record)); + + Map point = new HashMap(); + point.put("decimalLatitude", record.getDecimalLatitude()); + point.put("decimalLongitude", record.getDecimalLongitude()); + points.put(record.getOccurrenceID(), point); + } + + Map results = distributionService.outliers(lsid, points); + Iterator> iterator = results.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + StreamSupport.stream(outputs.spliterator(), false) + .filter(it -> it.getOccurrenceID().equalsIgnoreCase(entry.getKey())) + .forEach(it -> it.setDistanceOutOfEDL(entry.getValue())); + } + return outputs; + } + + } catch (ExpertDistributionException e) { + throw new RuntimeException( + "Expert distribution service returns error: " + e.getMessage()); + } catch (Exception e) { + throw new RuntimeException("Runtime error: " + e.getMessage()); + } + + return records; + } + })); + } + + /** + * Stringify {@Link ALADistributionRecord} + * + * @return + */ + public MapElements flatToString() { + return MapElements.into(new TypeDescriptor() {}) + .via((ALADistributionRecord dr) -> this.convertRecordToString(dr)); + } + + /** + * Only can be used when EDL exists - which means the record can only in/out EDL Force to reset + * distanceOutOfEDL 0 because Spatial EDL service only return distance of outlier records + * + * @param record + * @return + */ + private ALADistributionRecord copy(ALADistributionRecord record) { + ALADistributionRecord newRecord = new ALADistributionRecord(); + newRecord.setId(record.getId()); + newRecord.setOccurrenceID(record.getOccurrenceID()); + newRecord.setDistanceOutOfEDL(0.0d); + newRecord.setDecimalLongitude(record.getDecimalLongitude()); + newRecord.setDecimalLatitude(record.getDecimalLatitude()); + newRecord.setSpeciesID(record.getSpeciesID()); + IssueRecord ir = + IssueRecord.newBuilder().setIssueList(record.getIssues().getIssueList()).build(); + newRecord.setIssues(ir); + return newRecord; + } + + private String convertRecordToString(ALADistributionRecord record) { + return String.format( + "occurrenceId:%s, lat:%f, lng:%f, speciesId:%s, distanceToEDL:%f", + record.getOccurrenceID(), record.getDecimalLatitude(), record.getDecimalLongitude(), record.getSpeciesID(), record.getDistanceOutOfEDL()); + } +} diff --git a/sdks/models/src/main/avro/specific/ala-distribution-record.avsc b/sdks/models/src/main/avro/specific/ala-distribution-record.avsc new file mode 100644 index 0000000000..3fa613ed8f --- /dev/null +++ b/sdks/models/src/main/avro/specific/ala-distribution-record.avsc @@ -0,0 +1,15 @@ +{ + "name":"ALADistributionRecord", + "namespace":"org.gbif.pipelines.io.avro", + "type":"record", + "doc":"ALA Expert Distribution Layer data information", + "fields":[ + {"name": "id", "type": "string", "doc": "Pipelines identifier"}, + {"name" : "occurrenceID", "type" : ["null", "string"], "default" : null, "doc" : "Occurrence identifier"}, + {"name" : "distanceOutOfEDL", "type" : [ "double"], "default" : -1, ",doc" : "Distance to the expert distribution layers. -1: no EDL, 0: in EDL, >0 =out of EDL" }, + {"name": "decimalLatitude","type":["null","double"],"default":null, "doc":"http://rs.tdwg.org/dwc/terms/decimalLatitude"}, + {"name": "decimalLongitude","type":["null","double"],"default":null, "doc":"http://rs.tdwg.org/dwc/terms/decimalLongitude"}, + {"name": "speciesID","type":"string", "doc":"http://rs.tdwg.org/dwc/terms/taxonConceptID"}, + {"name": "issues", "type": "IssueRecord", "default":{}} + ] +}