diff --git a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/DistributionPipeline.java b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/DistributionOutlierPipeline.java similarity index 81% rename from livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/DistributionPipeline.java rename to livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/DistributionOutlierPipeline.java index 865bcc4a49..c17c62456a 100644 --- a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/DistributionPipeline.java +++ b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/DistributionOutlierPipeline.java @@ -1,7 +1,7 @@ package au.org.ala.pipelines.beam; import au.org.ala.pipelines.options.DistributionPipelineOptions; -import au.org.ala.pipelines.transforms.ALADistributionTransform; +import au.org.ala.pipelines.transforms.DistributionOutlierTransform; import au.org.ala.pipelines.util.VersionInfo; import au.org.ala.utils.ALAFsUtils; import au.org.ala.utils.CombinedYamlConfiguration; @@ -11,14 +11,9 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.AvroIO; -import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.transforms.*; import org.apache.beam.sdk.values.PCollection; -import org.apache.hadoop.fs.FileSystem; import org.gbif.pipelines.common.beam.options.PipelinesOptionsFactory; -import org.gbif.pipelines.common.beam.utils.PathBuilder; -import org.gbif.pipelines.core.factory.FileSystemFactory; -import org.gbif.pipelines.core.utils.FsUtils; import org.gbif.pipelines.io.avro.*; import org.slf4j.MDC; @@ -37,7 +32,7 @@ */ @Slf4j @NoArgsConstructor(access = AccessLevel.PRIVATE) -public class DistributionPipeline { +public class DistributionOutlierPipeline { public static void main(String[] args) throws Exception { VersionInfo.print(); @@ -67,11 +62,11 @@ public static void run(DistributionPipelineOptions options) throws Exception { PCollection indexRecords = ALAFsUtils.loadIndexRecords(options, p); - ALADistributionTransform distributionTransform = - new ALADistributionTransform(options.getBaseUrl()); + DistributionOutlierTransform distributionTransform = + new DistributionOutlierTransform(options.getBaseUrl()); log.info("Adding step 2: calculating outliers index"); - PCollection kvRecords = + PCollection kvRecords = indexRecords .apply("Key by species", distributionTransform.toKv()) .apply("Grouping by species", GroupByKey.create()) @@ -80,7 +75,7 @@ public static void run(DistributionPipelineOptions options) throws Exception { distributionTransform.calculateOutlier()) .apply("Flatten records", Flatten.iterables()); - kvRecords.apply( "Write to file",AvroIO.write(ALADistributionRecord.class).to(outputPath+"/outliers").withoutSharding().withSuffix(".avro")); + kvRecords.apply( "Write to file",AvroIO.write(DistributionOutlierRecord.class).to(outputPath+"/outliers").withoutSharding().withSuffix(".avro")); log.info("Running the pipeline"); PipelineResult result = p.run(); diff --git a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/IndexRecordToSolrPipeline.java b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/IndexRecordToSolrPipeline.java index 5f35fde749..a9b3338f87 100644 --- a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/IndexRecordToSolrPipeline.java +++ b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/IndexRecordToSolrPipeline.java @@ -49,8 +49,8 @@ public class IndexRecordToSolrPipeline { JackKnifeOutlierRecord.newBuilder().setId(EMPTY).setItems(new ArrayList<>()).build(); static final Relationships nullClustering = Relationships.newBuilder().setId(EMPTY).setRelationships(new ArrayList<>()).build(); - static final ALADistributionRecord nullOutlier = - ALADistributionRecord.newBuilder().setId(EMPTY).build(); + static final DistributionOutlierRecord nullOutlier = + DistributionOutlierRecord.newBuilder().setId(EMPTY).build(); public static void main(String[] args) throws Exception { VersionInfo.print(); @@ -291,9 +291,9 @@ private static PCollection> addOutlierInfo( PCollection> indexRecords) { // Load outlier records, keyed on ID - PCollection> outlierRecords = + PCollection> outlierRecords = loadOutlierRecords(options, pipeline); - PCollection>> + PCollection>> indexRecordJoinOurlier = Join.leftOuterJoin(indexRecords, outlierRecords, nullOutlier); @@ -557,19 +557,19 @@ public void processElement(ProcessContext c) { }; } - private static DoFn>, KV> + private static DoFn>, KV> addOutlierInfo() { - return new DoFn>, KV>() { + return new DoFn>, KV>() { @ProcessElement public void processElement(ProcessContext c) { - KV> e = c.element(); + KV> e = c.element(); IndexRecord indexRecord = e.getValue().getKey(); String id = indexRecord.getId(); - ALADistributionRecord outlierRecord = e.getValue().getValue(); + DistributionOutlierRecord outlierRecord = e.getValue().getValue(); indexRecord.getDoubles().put(DISTANCE_TO_EDL, outlierRecord.getDistanceOutOfEDL()); @@ -646,7 +646,7 @@ public KV apply(Relationships input) { })); } - private static PCollection> loadOutlierRecords( + private static PCollection> loadOutlierRecords( SolrPipelineOptions options, Pipeline p) { String path = PathBuilder.buildPath( @@ -654,12 +654,12 @@ private static PCollection> loadOutlierRecords .toString(); log.info("Loading outlier from {}", path); - return p.apply(AvroIO.read(ALADistributionRecord.class).from(path)) + return p.apply(AvroIO.read(DistributionOutlierRecord.class).from(path)) .apply( MapElements.via( - new SimpleFunction>() { + new SimpleFunction>() { @Override - public KV apply(ALADistributionRecord input) { + public KV apply(DistributionOutlierRecord input) { return KV.of(input.getId(), input); } })); diff --git a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/interpreters/ALADistributionInterpreter.java b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/interpreters/DistributionOutlierInterpreter.java similarity index 70% rename from livingatlas/pipelines/src/main/java/au/org/ala/pipelines/interpreters/ALADistributionInterpreter.java rename to livingatlas/pipelines/src/main/java/au/org/ala/pipelines/interpreters/DistributionOutlierInterpreter.java index 8c98ef1ad5..ed7b524b52 100644 --- a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/interpreters/ALADistributionInterpreter.java +++ b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/interpreters/DistributionOutlierInterpreter.java @@ -9,7 +9,7 @@ import org.gbif.dwc.terms.DwcTerm; import org.gbif.kvs.geocode.LatLng; import org.gbif.pipelines.core.parsers.common.ParsedField; -import org.gbif.pipelines.io.avro.ALADistributionRecord; +import org.gbif.pipelines.io.avro.DistributionOutlierRecord; import org.gbif.pipelines.io.avro.ExtendedRecord; import org.gbif.pipelines.io.avro.IndexRecord; @@ -17,34 +17,34 @@ * living atlases. */ @NoArgsConstructor(access = AccessLevel.PRIVATE) -public class ALADistributionInterpreter { +public class DistributionOutlierInterpreter { - public static void interpretOccurrenceID(IndexRecord ir, ALADistributionRecord dr) { + public static void interpretOccurrenceID(IndexRecord ir, DistributionOutlierRecord dr) { dr.setOccurrenceID(ir.getId()); } - public static void interpretLocation(IndexRecord ir, ALADistributionRecord dr) { + public static void interpretLocation(IndexRecord ir, DistributionOutlierRecord dr) { String latlng = ir.getLatLng(); String[] coordinates = latlng.split(","); dr.setDecimalLatitude(Double.parseDouble(coordinates[0])); dr.setDecimalLongitude(Double.parseDouble(coordinates[1])); } - public static void interpretSpeciesId(IndexRecord ir, ALADistributionRecord dr) { + public static void interpretSpeciesId(IndexRecord ir, DistributionOutlierRecord dr) { dr.setSpeciesID(ir.getTaxonID()); } /* * Interprete from verbatim */ - public static void interpretOccurrenceID(ExtendedRecord er, ALADistributionRecord dr) { + public static void interpretOccurrenceID(ExtendedRecord er, DistributionOutlierRecord dr) { String value = extractNullAwareValue(er, DwcTerm.occurrenceID); if (!Strings.isNullOrEmpty(value)) { dr.setOccurrenceID(value); } } - public static void interpretLocation(ExtendedRecord er, ALADistributionRecord dr) { + public static void interpretLocation(ExtendedRecord er, DistributionOutlierRecord dr) { ParsedField parsedLatLon = CoordinatesParser.parseCoords(er); addIssue(dr, parsedLatLon.getIssues()); @@ -55,7 +55,7 @@ public static void interpretLocation(ExtendedRecord er, ALADistributionRecord dr } } - public static void interpretSpeciesId(ExtendedRecord er, ALADistributionRecord dr) { + public static void interpretSpeciesId(ExtendedRecord er, DistributionOutlierRecord dr) { String value = extractNullAwareValue(er, DwcTerm.taxonConceptID); if (!Strings.isNullOrEmpty(value)) { dr.setSpeciesID(value); diff --git a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/transforms/ALADistributionTransform.java b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/transforms/DistributionOutlierTransform.java similarity index 73% rename from livingatlas/pipelines/src/main/java/au/org/ala/pipelines/transforms/ALADistributionTransform.java rename to livingatlas/pipelines/src/main/java/au/org/ala/pipelines/transforms/DistributionOutlierTransform.java index b8e009303b..6b1ccbc476 100644 --- a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/transforms/ALADistributionTransform.java +++ b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/transforms/DistributionOutlierTransform.java @@ -4,7 +4,7 @@ import au.org.ala.distribution.DistributionServiceImpl; import au.org.ala.distribution.ExpertDistributionException; import au.org.ala.pipelines.common.ALARecordTypes; -import au.org.ala.pipelines.interpreters.ALADistributionInterpreter; +import au.org.ala.pipelines.interpreters.DistributionOutlierInterpreter; import java.util.*; import java.util.stream.StreamSupport; import org.apache.beam.sdk.transforms.*; @@ -14,15 +14,15 @@ import org.gbif.pipelines.io.avro.*; import org.gbif.pipelines.transforms.Transform; -public class ALADistributionTransform extends Transform { +public class DistributionOutlierTransform extends Transform { private String spatialUrl; - public ALADistributionTransform(String spatialUrl) { + public DistributionOutlierTransform(String spatialUrl) { super( - ALADistributionRecord.class, + DistributionOutlierRecord.class, ALARecordTypes.ALA_DISTRIBUTION, - ALADistributionTransform.class.getName(), + DistributionOutlierTransform.class.getName(), "alaDistributionCount"); this.spatialUrl = spatialUrl; } @@ -31,20 +31,20 @@ public ALADistributionTransform(String spatialUrl) { @Setup public void setup() {} - public ALADistributionTransform counterFn(SerializableConsumer counterFn) { + public DistributionOutlierTransform counterFn(SerializableConsumer counterFn) { setCounterFn(counterFn); return this; } @Override - public Optional convert(IndexRecord source) { - ALADistributionRecord dr = - ALADistributionRecord.newBuilder().setId(source.getId()).setSpeciesID("").build(); + public Optional convert(IndexRecord source) { + DistributionOutlierRecord dr = + DistributionOutlierRecord.newBuilder().setId(source.getId()).setSpeciesID("").build(); return Interpretation.from(source) .to(dr) - .via(ALADistributionInterpreter::interpretOccurrenceID) - .via(ALADistributionInterpreter::interpretLocation) - .via(ALADistributionInterpreter::interpretSpeciesId) + .via(DistributionOutlierInterpreter::interpretOccurrenceID) + .via(DistributionOutlierInterpreter::interpretLocation) + .via(DistributionOutlierInterpreter::interpretSpeciesId) .getOfNullable(); } @@ -53,16 +53,16 @@ public MapElements> toKv() { .via((IndexRecord ir) -> KV.of(ir.getTaxonID(), ir)); } - public MapElements>, Iterable> + public MapElements>, Iterable> calculateOutlier() { return MapElements.via( - (new SimpleFunction>, Iterable>() { + (new SimpleFunction>, Iterable>() { @Override - public Iterable apply(KV> input) { + public Iterable apply(KV> input) { String lsid = input.getKey(); Iterable records = input.getValue(); Iterator iter = records.iterator(); - List outputs = new ArrayList(); + List outputs = new ArrayList(); try { DistributionServiceImpl distributionService = @@ -72,7 +72,7 @@ public Iterable apply(KV> i Map points = new HashMap(); while (iter.hasNext()) { IndexRecord record = iter.next(); - ALADistributionRecord dr = convertToDistribution(record); + DistributionOutlierRecord dr = convertToDistribution(record); outputs.add(dr); Map point = new HashMap(); @@ -105,13 +105,13 @@ public Iterable apply(KV> i } /** - * Stringify {@Link ALADistributionRecord} + * Stringify {@Link DistributionOutlierRecord} * * @return */ - public MapElements flatToString() { + public MapElements flatToString() { return MapElements.into(new TypeDescriptor() {}) - .via((ALADistributionRecord dr) -> this.convertRecordToString(dr)); + .via((DistributionOutlierRecord dr) -> this.convertRecordToString(dr)); } /** @@ -121,9 +121,9 @@ public MapElements flatToString() { * @param record * @return */ - private ALADistributionRecord convertToDistribution(IndexRecord record) { - ALADistributionRecord newRecord = - ALADistributionRecord.newBuilder() + private DistributionOutlierRecord convertToDistribution(IndexRecord record) { + DistributionOutlierRecord newRecord = + DistributionOutlierRecord.newBuilder() .setId(record.getId()) .setOccurrenceID(record.getId()) .setDistanceOutOfEDL(0.0d) @@ -138,7 +138,7 @@ private ALADistributionRecord convertToDistribution(IndexRecord record) { return newRecord; } - private String convertRecordToString(ALADistributionRecord record) { + private String convertRecordToString(DistributionOutlierRecord record) { return String.format( "occurrenceId:%s, lat:%f, lng:%f, speciesId:%s, distanceToEDL:%f", record.getOccurrenceID(), diff --git a/sdks/models/src/main/avro/specific/ala-distribution-record.avsc b/sdks/models/src/main/avro/specific/distribution-outlier-record.avsc similarity index 96% rename from sdks/models/src/main/avro/specific/ala-distribution-record.avsc rename to sdks/models/src/main/avro/specific/distribution-outlier-record.avsc index 7c52a0c521..9e868f13a0 100644 --- a/sdks/models/src/main/avro/specific/ala-distribution-record.avsc +++ b/sdks/models/src/main/avro/specific/distribution-outlier-record.avsc @@ -1,5 +1,5 @@ { - "name":"ALADistributionRecord", + "name":"DistributionOutlierRecord", "namespace":"org.gbif.pipelines.io.avro", "type":"record", "doc":"ALA Expert Distribution Layer data information",