From 6a0444ddd16cc2365e74fca0402caa030d27ed88 Mon Sep 17 00:00:00 2001 From: Qifeng Date: Fri, 26 Nov 2021 11:01:22 +1100 Subject: [PATCH] #622 refactor(Use DistributionOutlier) --- .../pipelines/beam/DistributionPipeline.java | 89 ----------- .../ALADistributionInterpreter.java | 64 -------- .../transforms/ALADistributionTransform.java | 150 ------------------ .../specific/ala-distribution-record.avsc | 15 -- 4 files changed, 318 deletions(-) delete mode 100644 livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/DistributionPipeline.java delete mode 100644 livingatlas/pipelines/src/main/java/au/org/ala/pipelines/interpreters/ALADistributionInterpreter.java delete mode 100644 livingatlas/pipelines/src/main/java/au/org/ala/pipelines/transforms/ALADistributionTransform.java delete mode 100644 sdks/models/src/main/avro/specific/ala-distribution-record.avsc diff --git a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/DistributionPipeline.java b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/DistributionPipeline.java deleted file mode 100644 index 865bcc4a49..0000000000 --- a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/DistributionPipeline.java +++ /dev/null @@ -1,89 +0,0 @@ -package au.org.ala.pipelines.beam; - -import au.org.ala.pipelines.options.DistributionPipelineOptions; -import au.org.ala.pipelines.transforms.ALADistributionTransform; -import au.org.ala.pipelines.util.VersionInfo; -import au.org.ala.utils.ALAFsUtils; -import au.org.ala.utils.CombinedYamlConfiguration; -import lombok.AccessLevel; -import lombok.NoArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.io.AvroIO; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.transforms.*; -import org.apache.beam.sdk.values.PCollection; -import org.apache.hadoop.fs.FileSystem; -import org.gbif.pipelines.common.beam.options.PipelinesOptionsFactory; -import org.gbif.pipelines.common.beam.utils.PathBuilder; -import org.gbif.pipelines.core.factory.FileSystemFactory; -import org.gbif.pipelines.core.utils.FsUtils; -import org.gbif.pipelines.io.avro.*; -import org.slf4j.MDC; - -/** - * A pipeline that calculate distance to the expert distribution layers (EDL) - * - *

distanceOutOfELD: 0 -> inside of EDL, -1: -> No EDLs. >0 -> out of EDL - * - *

* --datasetId=0057a720-17c9-4658-971e-9578f3577cf5 * --attempt=1 * --runner=SparkRunner * - * --targetPath=/some/path/to/output/ * - * --inputPath=/some/path/to/output/0057a720-17c9-4658-971e-9578f3577cf5/1/verbatim.avro - * - *

java -jar /data/pipelines-data/avro-tools-1.11.0.jar tojson distribution-00000-of-00005.avro - * >5.json java -jar /data/pipelines-data/avro-tools-1.11.0.jar fromjson --schema-file schema.avsc - * ./verbatim.json > verbatim.avro - */ -@Slf4j -@NoArgsConstructor(access = AccessLevel.PRIVATE) -public class DistributionPipeline { - - public static void main(String[] args) throws Exception { - VersionInfo.print(); - CombinedYamlConfiguration conf = new CombinedYamlConfiguration(args); - String[] combinedArgs = conf.toArgs("general", "distribution"); - - DistributionPipelineOptions options = - PipelinesOptionsFactory.create(DistributionPipelineOptions.class, combinedArgs); - MDC.put("datasetId", options.getDatasetId()); - MDC.put("attempt", options.getAttempt().toString()); - MDC.put("step", "DISTRIBUTION"); - PipelinesOptionsFactory.registerHdfs(options); - run(options); - System.exit(0); - } - - public static void run(DistributionPipelineOptions options) throws Exception { - - //Create output path - // default: {fsPath}/pipelines-outlier - // or {fsPath}/pipelines-outlier/{datasetId} - String outputPath = ALAFsUtils.buildPathOutlierUsingTargetPath(options); - - - log.info("Adding step 1: Collecting index records"); - Pipeline p = Pipeline.create(options); - - PCollection indexRecords = ALAFsUtils.loadIndexRecords(options, p); - - ALADistributionTransform distributionTransform = - new ALADistributionTransform(options.getBaseUrl()); - - log.info("Adding step 2: calculating outliers index"); - PCollection kvRecords = - indexRecords - .apply("Key by species", distributionTransform.toKv()) - .apply("Grouping by species", GroupByKey.create()) - .apply( - "Calculating outliers based on the species", - distributionTransform.calculateOutlier()) - .apply("Flatten records", Flatten.iterables()); - - kvRecords.apply( "Write to file",AvroIO.write(ALADistributionRecord.class).to(outputPath+"/outliers").withoutSharding().withSuffix(".avro")); - - log.info("Running the pipeline"); - PipelineResult result = p.run(); - result.waitUntilFinish(); - } -} diff --git a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/interpreters/ALADistributionInterpreter.java b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/interpreters/ALADistributionInterpreter.java deleted file mode 100644 index 8c98ef1ad5..0000000000 --- a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/interpreters/ALADistributionInterpreter.java +++ /dev/null @@ -1,64 +0,0 @@ -package au.org.ala.pipelines.interpreters; - -import static org.gbif.pipelines.core.utils.ModelUtils.*; - -import au.org.ala.pipelines.parser.CoordinatesParser; -import com.google.common.base.Strings; -import lombok.AccessLevel; -import lombok.NoArgsConstructor; -import org.gbif.dwc.terms.DwcTerm; -import org.gbif.kvs.geocode.LatLng; -import org.gbif.pipelines.core.parsers.common.ParsedField; -import org.gbif.pipelines.io.avro.ALADistributionRecord; -import org.gbif.pipelines.io.avro.ExtendedRecord; -import org.gbif.pipelines.io.avro.IndexRecord; - -/* - * living atlases. - */ -@NoArgsConstructor(access = AccessLevel.PRIVATE) -public class ALADistributionInterpreter { - - public static void interpretOccurrenceID(IndexRecord ir, ALADistributionRecord dr) { - dr.setOccurrenceID(ir.getId()); - } - - public static void interpretLocation(IndexRecord ir, ALADistributionRecord dr) { - String latlng = ir.getLatLng(); - String[] coordinates = latlng.split(","); - dr.setDecimalLatitude(Double.parseDouble(coordinates[0])); - dr.setDecimalLongitude(Double.parseDouble(coordinates[1])); - } - - public static void interpretSpeciesId(IndexRecord ir, ALADistributionRecord dr) { - dr.setSpeciesID(ir.getTaxonID()); - } - - /* - * Interprete from verbatim - */ - public static void interpretOccurrenceID(ExtendedRecord er, ALADistributionRecord dr) { - String value = extractNullAwareValue(er, DwcTerm.occurrenceID); - if (!Strings.isNullOrEmpty(value)) { - dr.setOccurrenceID(value); - } - } - - public static void interpretLocation(ExtendedRecord er, ALADistributionRecord dr) { - ParsedField parsedLatLon = CoordinatesParser.parseCoords(er); - addIssue(dr, parsedLatLon.getIssues()); - - if (parsedLatLon.isSuccessful()) { - LatLng latlng = parsedLatLon.getResult(); - dr.setDecimalLatitude(latlng.getLatitude()); - dr.setDecimalLongitude(latlng.getLongitude()); - } - } - - public static void interpretSpeciesId(ExtendedRecord er, ALADistributionRecord dr) { - String value = extractNullAwareValue(er, DwcTerm.taxonConceptID); - if (!Strings.isNullOrEmpty(value)) { - dr.setSpeciesID(value); - } - } -} diff --git a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/transforms/ALADistributionTransform.java b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/transforms/ALADistributionTransform.java deleted file mode 100644 index b8e009303b..0000000000 --- a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/transforms/ALADistributionTransform.java +++ /dev/null @@ -1,150 +0,0 @@ -package au.org.ala.pipelines.transforms; - -import au.org.ala.distribution.DistributionLayer; -import au.org.ala.distribution.DistributionServiceImpl; -import au.org.ala.distribution.ExpertDistributionException; -import au.org.ala.pipelines.common.ALARecordTypes; -import au.org.ala.pipelines.interpreters.ALADistributionInterpreter; -import java.util.*; -import java.util.stream.StreamSupport; -import org.apache.beam.sdk.transforms.*; -import org.apache.beam.sdk.values.*; -import org.gbif.pipelines.core.functions.SerializableConsumer; -import org.gbif.pipelines.core.interpreters.Interpretation; -import org.gbif.pipelines.io.avro.*; -import org.gbif.pipelines.transforms.Transform; - -public class ALADistributionTransform extends Transform { - - private String spatialUrl; - - public ALADistributionTransform(String spatialUrl) { - super( - ALADistributionRecord.class, - ALARecordTypes.ALA_DISTRIBUTION, - ALADistributionTransform.class.getName(), - "alaDistributionCount"); - this.spatialUrl = spatialUrl; - } - - /** Beam @Setup initializes resources */ - @Setup - public void setup() {} - - public ALADistributionTransform counterFn(SerializableConsumer counterFn) { - setCounterFn(counterFn); - return this; - } - - @Override - public Optional convert(IndexRecord source) { - ALADistributionRecord dr = - ALADistributionRecord.newBuilder().setId(source.getId()).setSpeciesID("").build(); - return Interpretation.from(source) - .to(dr) - .via(ALADistributionInterpreter::interpretOccurrenceID) - .via(ALADistributionInterpreter::interpretLocation) - .via(ALADistributionInterpreter::interpretSpeciesId) - .getOfNullable(); - } - - public MapElements> toKv() { - return MapElements.into(new TypeDescriptor>() {}) - .via((IndexRecord ir) -> KV.of(ir.getTaxonID(), ir)); - } - - public MapElements>, Iterable> - calculateOutlier() { - return MapElements.via( - (new SimpleFunction>, Iterable>() { - @Override - public Iterable apply(KV> input) { - String lsid = input.getKey(); - Iterable records = input.getValue(); - Iterator iter = records.iterator(); - List outputs = new ArrayList(); - - try { - DistributionServiceImpl distributionService = - DistributionServiceImpl.init(spatialUrl); - List edl = distributionService.findLayersByLsid(lsid); - // Available EDLD of this species - Map points = new HashMap(); - while (iter.hasNext()) { - IndexRecord record = iter.next(); - ALADistributionRecord dr = convertToDistribution(record); - outputs.add(dr); - - Map point = new HashMap(); - point.put("decimalLatitude", dr.getDecimalLatitude()); - point.put("decimalLongitude", dr.getDecimalLongitude()); - points.put(dr.getOccurrenceID(), point); - } - - if (edl.size() > 0) { - Map results = distributionService.outliers(lsid, points); - Iterator> iterator = results.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - StreamSupport.stream(outputs.spliterator(), false) - .filter(it -> it.getOccurrenceID().equalsIgnoreCase(entry.getKey())) - .forEach(it -> it.setDistanceOutOfEDL(entry.getValue())); - } - } - - } catch (ExpertDistributionException e) { - throw new RuntimeException( - "Expert distribution service returns error: " + e.getMessage()); - } catch (Exception e) { - throw new RuntimeException("Runtime error: " + e.getMessage()); - } - - return outputs; - } - })); - } - - /** - * Stringify {@Link ALADistributionRecord} - * - * @return - */ - public MapElements flatToString() { - return MapElements.into(new TypeDescriptor() {}) - .via((ALADistributionRecord dr) -> this.convertRecordToString(dr)); - } - - /** - * Only can be used when EDL exists - which means the record can only in/out EDL Force to reset - * distanceOutOfEDL 0 because Spatial EDL service only return distance of outlier records - * - * @param record - * @return - */ - private ALADistributionRecord convertToDistribution(IndexRecord record) { - ALADistributionRecord newRecord = - ALADistributionRecord.newBuilder() - .setId(record.getId()) - .setOccurrenceID(record.getId()) - .setDistanceOutOfEDL(0.0d) - .setSpeciesID(record.getTaxonID()) - .build(); - - String latlng = record.getLatLng(); - String[] coordinates = latlng.split(","); - newRecord.setDecimalLatitude(Double.parseDouble(coordinates[0])); - newRecord.setDecimalLongitude(Double.parseDouble(coordinates[1])); - - return newRecord; - } - - private String convertRecordToString(ALADistributionRecord record) { - return String.format( - "occurrenceId:%s, lat:%f, lng:%f, speciesId:%s, distanceToEDL:%f", - record.getOccurrenceID(), - record.getDecimalLatitude(), - record.getDecimalLongitude(), - record.getSpeciesID(), - record.getDistanceOutOfEDL()); - } -} diff --git a/sdks/models/src/main/avro/specific/ala-distribution-record.avsc b/sdks/models/src/main/avro/specific/ala-distribution-record.avsc deleted file mode 100644 index 7c52a0c521..0000000000 --- a/sdks/models/src/main/avro/specific/ala-distribution-record.avsc +++ /dev/null @@ -1,15 +0,0 @@ -{ - "name":"ALADistributionRecord", - "namespace":"org.gbif.pipelines.io.avro", - "type":"record", - "doc":"ALA Expert Distribution Layer data information", - "fields":[ - {"name": "id", "type": "string", "doc": "Pipelines identifier"}, - {"name" : "occurrenceID", "type" : ["null", "string"], "default" : null, "doc" : "Occurrence identifier"}, - {"name" : "distanceOutOfEDL", "type" : "double", "default" : -1, ",doc" : "Distance to the expert distribution layers. -1: no EDL, 0: in EDL, >0 =out of EDL" }, - {"name": "decimalLatitude","type":["null","double"],"default":null, "doc":"http://rs.tdwg.org/dwc/terms/decimalLatitude"}, - {"name": "decimalLongitude","type":["null","double"],"default":null, "doc":"http://rs.tdwg.org/dwc/terms/decimalLongitude"}, - {"name": "speciesID","type":"string", "default": "","doc":"http://rs.tdwg.org/dwc/terms/taxonConceptID"}, - {"name": "issues", "type": "IssueRecord", "default":{}} - ] -}