Skip to content

Commit

Permalink
#622 init(distribution from indexrecord)
Browse files Browse the repository at this point in the history
  • Loading branch information
qifeng-bai authored and djtfmartin committed Mar 3, 2022
1 parent 7c8b3eb commit 2b2539f
Show file tree
Hide file tree
Showing 4 changed files with 258 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package au.org.ala.pipelines.interpreters;

import static org.gbif.pipelines.core.utils.ModelUtils.*;

import au.org.ala.pipelines.parser.CoordinatesParser;
import com.google.common.base.Strings;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.gbif.dwc.terms.DwcTerm;
import org.gbif.kvs.geocode.LatLng;
import org.gbif.pipelines.core.parsers.common.ParsedField;
import org.gbif.pipelines.io.avro.ALADistributionRecord;
import org.gbif.pipelines.io.avro.ExtendedRecord;
import org.gbif.pipelines.io.avro.IndexRecord;
import org.gbif.pipelines.io.avro.Record;

/*
* living atlases.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class ALADistributionInterpreter {

public static void interpretOccurrenceID(IndexRecord ir, ALADistributionRecord dr) {
dr.setOccurrenceID(ir.getId());
}

public static void interpretLocation(IndexRecord ir, ALADistributionRecord dr) {
String latlng = ir.getLatLng();
String[] coordinates = latlng.split(",");
dr.setDecimalLatitude(Double.parseDouble(coordinates[0]));
dr.setDecimalLongitude(Double.parseDouble(coordinates[1]));
}

public static void interpretSpeciesId(IndexRecord ir, ALADistributionRecord dr) {
dr.setSpeciesID(ir.getTaxonID());
}

/*
* Interprete from verbatim
*/
public static void interpretOccurrenceID(ExtendedRecord er, ALADistributionRecord dr) {
String value = extractNullAwareValue(er, DwcTerm.occurrenceID);
if (!Strings.isNullOrEmpty(value)) {
dr.setOccurrenceID(value);
}
}

public static void interpretLocation(ExtendedRecord er, ALADistributionRecord dr) {
ParsedField<LatLng> parsedLatLon = CoordinatesParser.parseCoords(er);
addIssue(dr, parsedLatLon.getIssues());

if (parsedLatLon.isSuccessful()) {
LatLng latlng = parsedLatLon.getResult();
dr.setDecimalLatitude(latlng.getLatitude());
dr.setDecimalLongitude(latlng.getLongitude());
}
}

public static void interpretSpeciesId(ExtendedRecord er, ALADistributionRecord dr) {
String value = extractNullAwareValue(er, DwcTerm.taxonConceptID);
if (!Strings.isNullOrEmpty(value)) {
dr.setSpeciesID(value);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package au.org.ala.pipelines.options;

import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;

/**
* Options for pipelines that run against spatial service for Expert distribution layer calculation.
*/
public interface DistributionPipelineOptions extends AllDatasetsPipelinesOptions {

@Description("Base URL for Spatial service")
@Default.String("https://spatial.ala.org.au/ws/")
String getBaseUrl();

void setBaseUrl(String baseUrl);

@Description("Default batch size")
@Default.Integer(25000)
Integer getBatchSize();

void setBatchSize(Integer batchSize);

@Description("Keep download sampling CSVs")
@Default.Integer(1000)
Integer getBatchStatusSleepTime();

void setBatchStatusSleepTime(Integer batchStatusSleepTime);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package au.org.ala.pipelines.transforms;

import au.org.ala.distribution.DistributionServiceImpl;
import au.org.ala.distribution.DistributionLayer;
import au.org.ala.distribution.ExpertDistributionException;
import au.org.ala.pipelines.common.ALARecordTypes;
import au.org.ala.pipelines.interpreters.ALADistributionInterpreter;
import java.util.*;
import java.util.stream.StreamSupport;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.*;
import org.gbif.pipelines.core.functions.SerializableConsumer;
import org.gbif.pipelines.core.interpreters.Interpretation;
import org.gbif.pipelines.io.avro.*;
import org.gbif.pipelines.transforms.Transform;

public class ALADistributionTransform extends Transform<IndexRecord, ALADistributionRecord> {

private String spatialUrl;

public ALADistributionTransform(String spatialUrl) {
super(
ALADistributionRecord.class,
ALARecordTypes.ALA_DISTRIBUTION,
ALADistributionTransform.class.getName(),
"alaDistributionCount");
this.spatialUrl = spatialUrl;
}

/** Beam @Setup initializes resources */
@Setup
public void setup() {}

public ALADistributionTransform counterFn(SerializableConsumer<String> counterFn) {
setCounterFn(counterFn);
return this;
}

@Override
public Optional<ALADistributionRecord> convert(IndexRecord source) {
ALADistributionRecord dr =
ALADistributionRecord.newBuilder().setId(source.getId()).setSpeciesID("").build();
return Interpretation.from(source)
.to(dr)
.via(ALADistributionInterpreter::interpretOccurrenceID)
.via(ALADistributionInterpreter::interpretLocation)
.via(ALADistributionInterpreter::interpretSpeciesId)
.getOfNullable();
}


/**
* Maps {@link ALADistributionRecord} to key value, where key is {@link
* ALADistributionRecord#getSpeciesID()}
*/
public MapElements<ALADistributionRecord, KV<String, ALADistributionRecord>> toKv() {
return MapElements.into(new TypeDescriptor<KV<String, ALADistributionRecord>>() {})
.via((ALADistributionRecord dr) -> KV.of(dr.getSpeciesID(), dr));
}

public MapElements<KV<String, Iterable<ALADistributionRecord>>, Iterable<ALADistributionRecord>>
calculateOutlier() {
return MapElements.via(
(new SimpleFunction<
KV<String, Iterable<ALADistributionRecord>>, Iterable<ALADistributionRecord>>() {
@Override
public Iterable<ALADistributionRecord> apply(
KV<String, Iterable<ALADistributionRecord>> input) {
String lsid = input.getKey();
Iterable<ALADistributionRecord> records = input.getValue();
Iterator<ALADistributionRecord> iter = records.iterator();

try {
DistributionServiceImpl distributionService = DistributionServiceImpl.init(spatialUrl);
List<DistributionLayer> edl = distributionService.findLayersByLsid(lsid);
// Available EDLD of this species
if (edl.size() > 0) {
// Duplicate records because Transform does not allow change input
List<ALADistributionRecord> outputs = new ArrayList();
Map points = new HashMap();
while (iter.hasNext()) {
ALADistributionRecord record = iter.next();
outputs.add(copy(record));

Map point = new HashMap();
point.put("decimalLatitude", record.getDecimalLatitude());
point.put("decimalLongitude", record.getDecimalLongitude());
points.put(record.getOccurrenceID(), point);
}

Map<String, Double> results = distributionService.outliers(lsid, points);
Iterator<Map.Entry<String, Double>> iterator = results.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, Double> entry = iterator.next();
StreamSupport.stream(outputs.spliterator(), false)
.filter(it -> it.getOccurrenceID().equalsIgnoreCase(entry.getKey()))
.forEach(it -> it.setDistanceOutOfEDL(entry.getValue()));
}
return outputs;
}

} catch (ExpertDistributionException e) {
throw new RuntimeException(
"Expert distribution service returns error: " + e.getMessage());
} catch (Exception e) {
throw new RuntimeException("Runtime error: " + e.getMessage());
}

return records;
}
}));
}

/**
* Stringify {@Link ALADistributionRecord}
*
* @return
*/
public MapElements<ALADistributionRecord, String> flatToString() {
return MapElements.into(new TypeDescriptor<String>() {})
.via((ALADistributionRecord dr) -> this.convertRecordToString(dr));
}

/**
* Only can be used when EDL exists - which means the record can only in/out EDL Force to reset
* distanceOutOfEDL 0 because Spatial EDL service only return distance of outlier records
*
* @param record
* @return
*/
private ALADistributionRecord copy(ALADistributionRecord record) {
ALADistributionRecord newRecord = new ALADistributionRecord();
newRecord.setId(record.getId());
newRecord.setOccurrenceID(record.getOccurrenceID());
newRecord.setDistanceOutOfEDL(0.0d);
newRecord.setDecimalLongitude(record.getDecimalLongitude());
newRecord.setDecimalLatitude(record.getDecimalLatitude());
newRecord.setSpeciesID(record.getSpeciesID());
IssueRecord ir =
IssueRecord.newBuilder().setIssueList(record.getIssues().getIssueList()).build();
newRecord.setIssues(ir);
return newRecord;
}

private String convertRecordToString(ALADistributionRecord record) {
return String.format(
"occurrenceId:%s, lat:%f, lng:%f, speciesId:%s, distanceToEDL:%f",
record.getOccurrenceID(), record.getDecimalLatitude(), record.getDecimalLongitude(), record.getSpeciesID(), record.getDistanceOutOfEDL());
}
}
15 changes: 15 additions & 0 deletions sdks/models/src/main/avro/specific/ala-distribution-record.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"name":"ALADistributionRecord",
"namespace":"org.gbif.pipelines.io.avro",
"type":"record",
"doc":"ALA Expert Distribution Layer data information",
"fields":[
{"name": "id", "type": "string", "doc": "Pipelines identifier"},
{"name" : "occurrenceID", "type" : ["null", "string"], "default" : null, "doc" : "Occurrence identifier"},
{"name" : "distanceOutOfEDL", "type" : [ "double"], "default" : -1, ",doc" : "Distance to the expert distribution layers. -1: no EDL, 0: in EDL, >0 =out of EDL" },
{"name": "decimalLatitude","type":["null","double"],"default":null, "doc":"http://rs.tdwg.org/dwc/terms/decimalLatitude"},
{"name": "decimalLongitude","type":["null","double"],"default":null, "doc":"http://rs.tdwg.org/dwc/terms/decimalLongitude"},
{"name": "speciesID","type":"string", "doc":"http://rs.tdwg.org/dwc/terms/taxonConceptID"},
{"name": "issues", "type": "IssueRecord", "default":{}}
]
}

0 comments on commit 2b2539f

Please sign in to comment.