Skip to content

Commit

Permalink
#622 refactor(Use DistributionOutlier)
Browse files Browse the repository at this point in the history
  • Loading branch information
qifeng-bai authored and djtfmartin committed Jan 31, 2022
1 parent a4f7600 commit cceddbf
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 56 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package au.org.ala.pipelines.beam;

import au.org.ala.pipelines.options.DistributionPipelineOptions;
import au.org.ala.pipelines.transforms.ALADistributionTransform;
import au.org.ala.pipelines.transforms.DistributionOutlierTransform;
import au.org.ala.pipelines.util.VersionInfo;
import au.org.ala.utils.ALAFsUtils;
import au.org.ala.utils.CombinedYamlConfiguration;
Expand All @@ -11,14 +11,9 @@
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.PCollection;
import org.apache.hadoop.fs.FileSystem;
import org.gbif.pipelines.common.beam.options.PipelinesOptionsFactory;
import org.gbif.pipelines.common.beam.utils.PathBuilder;
import org.gbif.pipelines.core.factory.FileSystemFactory;
import org.gbif.pipelines.core.utils.FsUtils;
import org.gbif.pipelines.io.avro.*;
import org.slf4j.MDC;

Expand All @@ -37,7 +32,7 @@
*/
@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class DistributionPipeline {
public class DistributionOutlierPipeline {

public static void main(String[] args) throws Exception {
VersionInfo.print();
Expand Down Expand Up @@ -67,11 +62,11 @@ public static void run(DistributionPipelineOptions options) throws Exception {

PCollection<IndexRecord> indexRecords = ALAFsUtils.loadIndexRecords(options, p);

ALADistributionTransform distributionTransform =
new ALADistributionTransform(options.getBaseUrl());
DistributionOutlierTransform distributionTransform =
new DistributionOutlierTransform(options.getBaseUrl());

log.info("Adding step 2: calculating outliers index");
PCollection<ALADistributionRecord> kvRecords =
PCollection<DistributionOutlierRecord> kvRecords =
indexRecords
.apply("Key by species", distributionTransform.toKv())
.apply("Grouping by species", GroupByKey.create())
Expand All @@ -80,7 +75,7 @@ public static void run(DistributionPipelineOptions options) throws Exception {
distributionTransform.calculateOutlier())
.apply("Flatten records", Flatten.iterables());

kvRecords.apply( "Write to file",AvroIO.write(ALADistributionRecord.class).to(outputPath+"/outliers").withoutSharding().withSuffix(".avro"));
kvRecords.apply( "Write to file",AvroIO.write(DistributionOutlierRecord.class).to(outputPath+"/outliers").withoutSharding().withSuffix(".avro"));

log.info("Running the pipeline");
PipelineResult result = p.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ public class IndexRecordToSolrPipeline {
JackKnifeOutlierRecord.newBuilder().setId(EMPTY).setItems(new ArrayList<>()).build();
static final Relationships nullClustering =
Relationships.newBuilder().setId(EMPTY).setRelationships(new ArrayList<>()).build();
static final ALADistributionRecord nullOutlier =
ALADistributionRecord.newBuilder().setId(EMPTY).build();
static final DistributionOutlierRecord nullOutlier =
DistributionOutlierRecord.newBuilder().setId(EMPTY).build();

public static void main(String[] args) throws Exception {
VersionInfo.print();
Expand Down Expand Up @@ -291,9 +291,9 @@ private static PCollection<KV<String, IndexRecord>> addOutlierInfo(
PCollection<KV<String, IndexRecord>> indexRecords) {

// Load outlier records, keyed on ID
PCollection<KV<String, ALADistributionRecord>> outlierRecords =
PCollection<KV<String, DistributionOutlierRecord>> outlierRecords =
loadOutlierRecords(options, pipeline);
PCollection<KV<String, KV<IndexRecord, ALADistributionRecord>>>
PCollection<KV<String, KV<IndexRecord, DistributionOutlierRecord>>>
indexRecordJoinOurlier =
Join.leftOuterJoin(indexRecords, outlierRecords, nullOutlier);

Expand Down Expand Up @@ -557,19 +557,19 @@ public void processElement(ProcessContext c) {
};
}

private static DoFn<KV<String, KV<IndexRecord, ALADistributionRecord>>, KV<String, IndexRecord>>
private static DoFn<KV<String, KV<IndexRecord, DistributionOutlierRecord>>, KV<String, IndexRecord>>
addOutlierInfo() {

return new DoFn<KV<String, KV<IndexRecord, ALADistributionRecord>>, KV<String, IndexRecord>>() {
return new DoFn<KV<String, KV<IndexRecord, DistributionOutlierRecord>>, KV<String, IndexRecord>>() {
@ProcessElement
public void processElement(ProcessContext c) {

KV<String, KV<IndexRecord, ALADistributionRecord>> e = c.element();
KV<String, KV<IndexRecord, DistributionOutlierRecord>> e = c.element();

IndexRecord indexRecord = e.getValue().getKey();
String id = indexRecord.getId();

ALADistributionRecord outlierRecord = e.getValue().getValue();
DistributionOutlierRecord outlierRecord = e.getValue().getValue();

indexRecord.getDoubles().put(DISTANCE_TO_EDL, outlierRecord.getDistanceOutOfEDL());

Expand Down Expand Up @@ -646,20 +646,20 @@ public KV<String, Relationships> apply(Relationships input) {
}));
}

private static PCollection<KV<String, ALADistributionRecord>> loadOutlierRecords(
private static PCollection<KV<String, DistributionOutlierRecord>> loadOutlierRecords(
SolrPipelineOptions options, Pipeline p) {
String path =
PathBuilder.buildPath(
options.getAllDatasetsInputPath() + "/distribution/", "distribution*" + AVRO_EXTENSION)
.toString();
log.info("Loading outlier from {}", path);

return p.apply(AvroIO.read(ALADistributionRecord.class).from(path))
return p.apply(AvroIO.read(DistributionOutlierRecord.class).from(path))
.apply(
MapElements.via(
new SimpleFunction<ALADistributionRecord, KV<String, ALADistributionRecord>>() {
new SimpleFunction<DistributionOutlierRecord, KV<String, DistributionOutlierRecord>>() {
@Override
public KV<String, ALADistributionRecord> apply(ALADistributionRecord input) {
public KV<String, DistributionOutlierRecord> apply(DistributionOutlierRecord input) {
return KV.of(input.getId(), input);
}
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,42 +9,42 @@
import org.gbif.dwc.terms.DwcTerm;
import org.gbif.kvs.geocode.LatLng;
import org.gbif.pipelines.core.parsers.common.ParsedField;
import org.gbif.pipelines.io.avro.ALADistributionRecord;
import org.gbif.pipelines.io.avro.DistributionOutlierRecord;
import org.gbif.pipelines.io.avro.ExtendedRecord;
import org.gbif.pipelines.io.avro.IndexRecord;

/*
* living atlases.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class ALADistributionInterpreter {
public class DistributionOutlierInterpreter {

public static void interpretOccurrenceID(IndexRecord ir, ALADistributionRecord dr) {
public static void interpretOccurrenceID(IndexRecord ir, DistributionOutlierRecord dr) {
dr.setOccurrenceID(ir.getId());
}

public static void interpretLocation(IndexRecord ir, ALADistributionRecord dr) {
public static void interpretLocation(IndexRecord ir, DistributionOutlierRecord dr) {
String latlng = ir.getLatLng();
String[] coordinates = latlng.split(",");
dr.setDecimalLatitude(Double.parseDouble(coordinates[0]));
dr.setDecimalLongitude(Double.parseDouble(coordinates[1]));
}

public static void interpretSpeciesId(IndexRecord ir, ALADistributionRecord dr) {
public static void interpretSpeciesId(IndexRecord ir, DistributionOutlierRecord dr) {
dr.setSpeciesID(ir.getTaxonID());
}

/*
* Interprete from verbatim
*/
public static void interpretOccurrenceID(ExtendedRecord er, ALADistributionRecord dr) {
public static void interpretOccurrenceID(ExtendedRecord er, DistributionOutlierRecord dr) {
String value = extractNullAwareValue(er, DwcTerm.occurrenceID);
if (!Strings.isNullOrEmpty(value)) {
dr.setOccurrenceID(value);
}
}

public static void interpretLocation(ExtendedRecord er, ALADistributionRecord dr) {
public static void interpretLocation(ExtendedRecord er, DistributionOutlierRecord dr) {
ParsedField<LatLng> parsedLatLon = CoordinatesParser.parseCoords(er);
addIssue(dr, parsedLatLon.getIssues());

Expand All @@ -55,7 +55,7 @@ public static void interpretLocation(ExtendedRecord er, ALADistributionRecord dr
}
}

public static void interpretSpeciesId(ExtendedRecord er, ALADistributionRecord dr) {
public static void interpretSpeciesId(ExtendedRecord er, DistributionOutlierRecord dr) {
String value = extractNullAwareValue(er, DwcTerm.taxonConceptID);
if (!Strings.isNullOrEmpty(value)) {
dr.setSpeciesID(value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import au.org.ala.distribution.DistributionServiceImpl;
import au.org.ala.distribution.ExpertDistributionException;
import au.org.ala.pipelines.common.ALARecordTypes;
import au.org.ala.pipelines.interpreters.ALADistributionInterpreter;
import au.org.ala.pipelines.interpreters.DistributionOutlierInterpreter;
import java.util.*;
import java.util.stream.StreamSupport;
import org.apache.beam.sdk.transforms.*;
Expand All @@ -14,15 +14,15 @@
import org.gbif.pipelines.io.avro.*;
import org.gbif.pipelines.transforms.Transform;

public class ALADistributionTransform extends Transform<IndexRecord, ALADistributionRecord> {
public class DistributionOutlierTransform extends Transform<IndexRecord, DistributionOutlierRecord> {

private String spatialUrl;

public ALADistributionTransform(String spatialUrl) {
public DistributionOutlierTransform(String spatialUrl) {
super(
ALADistributionRecord.class,
DistributionOutlierRecord.class,
ALARecordTypes.ALA_DISTRIBUTION,
ALADistributionTransform.class.getName(),
DistributionOutlierTransform.class.getName(),
"alaDistributionCount");
this.spatialUrl = spatialUrl;
}
Expand All @@ -31,20 +31,20 @@ public ALADistributionTransform(String spatialUrl) {
@Setup
public void setup() {}

public ALADistributionTransform counterFn(SerializableConsumer<String> counterFn) {
public DistributionOutlierTransform counterFn(SerializableConsumer<String> counterFn) {
setCounterFn(counterFn);
return this;
}

@Override
public Optional<ALADistributionRecord> convert(IndexRecord source) {
ALADistributionRecord dr =
ALADistributionRecord.newBuilder().setId(source.getId()).setSpeciesID("").build();
public Optional<DistributionOutlierRecord> convert(IndexRecord source) {
DistributionOutlierRecord dr =
DistributionOutlierRecord.newBuilder().setId(source.getId()).setSpeciesID("").build();
return Interpretation.from(source)
.to(dr)
.via(ALADistributionInterpreter::interpretOccurrenceID)
.via(ALADistributionInterpreter::interpretLocation)
.via(ALADistributionInterpreter::interpretSpeciesId)
.via(DistributionOutlierInterpreter::interpretOccurrenceID)
.via(DistributionOutlierInterpreter::interpretLocation)
.via(DistributionOutlierInterpreter::interpretSpeciesId)
.getOfNullable();
}

Expand All @@ -53,16 +53,16 @@ public MapElements<IndexRecord, KV<String, IndexRecord>> toKv() {
.via((IndexRecord ir) -> KV.of(ir.getTaxonID(), ir));
}

public MapElements<KV<String, Iterable<IndexRecord>>, Iterable<ALADistributionRecord>>
public MapElements<KV<String, Iterable<IndexRecord>>, Iterable<DistributionOutlierRecord>>
calculateOutlier() {
return MapElements.via(
(new SimpleFunction<KV<String, Iterable<IndexRecord>>, Iterable<ALADistributionRecord>>() {
(new SimpleFunction<KV<String, Iterable<IndexRecord>>, Iterable<DistributionOutlierRecord>>() {
@Override
public Iterable<ALADistributionRecord> apply(KV<String, Iterable<IndexRecord>> input) {
public Iterable<DistributionOutlierRecord> apply(KV<String, Iterable<IndexRecord>> input) {
String lsid = input.getKey();
Iterable<IndexRecord> records = input.getValue();
Iterator<IndexRecord> iter = records.iterator();
List<ALADistributionRecord> outputs = new ArrayList();
List<DistributionOutlierRecord> outputs = new ArrayList();

try {
DistributionServiceImpl distributionService =
Expand All @@ -72,7 +72,7 @@ public Iterable<ALADistributionRecord> apply(KV<String, Iterable<IndexRecord>> i
Map points = new HashMap();
while (iter.hasNext()) {
IndexRecord record = iter.next();
ALADistributionRecord dr = convertToDistribution(record);
DistributionOutlierRecord dr = convertToDistribution(record);
outputs.add(dr);

Map point = new HashMap();
Expand Down Expand Up @@ -105,13 +105,13 @@ public Iterable<ALADistributionRecord> apply(KV<String, Iterable<IndexRecord>> i
}

/**
* Stringify {@Link ALADistributionRecord}
* Stringify {@Link DistributionOutlierRecord}
*
* @return
*/
public MapElements<ALADistributionRecord, String> flatToString() {
public MapElements<DistributionOutlierRecord, String> flatToString() {
return MapElements.into(new TypeDescriptor<String>() {})
.via((ALADistributionRecord dr) -> this.convertRecordToString(dr));
.via((DistributionOutlierRecord dr) -> this.convertRecordToString(dr));
}

/**
Expand All @@ -121,9 +121,9 @@ public MapElements<ALADistributionRecord, String> flatToString() {
* @param record
* @return
*/
private ALADistributionRecord convertToDistribution(IndexRecord record) {
ALADistributionRecord newRecord =
ALADistributionRecord.newBuilder()
private DistributionOutlierRecord convertToDistribution(IndexRecord record) {
DistributionOutlierRecord newRecord =
DistributionOutlierRecord.newBuilder()
.setId(record.getId())
.setOccurrenceID(record.getId())
.setDistanceOutOfEDL(0.0d)
Expand All @@ -138,7 +138,7 @@ private ALADistributionRecord convertToDistribution(IndexRecord record) {
return newRecord;
}

private String convertRecordToString(ALADistributionRecord record) {
private String convertRecordToString(DistributionOutlierRecord record) {
return String.format(
"occurrenceId:%s, lat:%f, lng:%f, speciesId:%s, distanceToEDL:%f",
record.getOccurrenceID(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name":"ALADistributionRecord",
"name":"DistributionOutlierRecord",
"namespace":"org.gbif.pipelines.io.avro",
"type":"record",
"doc":"ALA Expert Distribution Layer data information",
Expand Down

0 comments on commit cceddbf

Please sign in to comment.