Skip to content

Commit

Permalink
#622 fix(filter out records without location and taxon)
Browse files Browse the repository at this point in the history
  • Loading branch information
qifeng-bai authored and djtfmartin committed Feb 3, 2022
1 parent 321cbf8 commit c1da691
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 53 deletions.
4 changes: 2 additions & 2 deletions livingatlas/configs/la-pipelines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ sampling:
outlier:
appName: Expert distribution outliers for {datasetId}
baseUrl: https://spatial.ala.org.au/ws/
targetPath: &outlierTargetPath '{fsPath}/pipelines-outlier'
targetPath: '{fsPath}/pipelines-outlier'
allDatasetsInputPath: '{fsPath}/pipelines-all-datasets'
runner: SparkRunner

Expand Down Expand Up @@ -201,7 +201,7 @@ solr:
allDatasetsInputPath: '{fsPath}/pipelines-all-datasets'
jackKnifePath: "{fsPath}/pipelines-jackknife"
clusteringPath: "{fsPath}/pipelines-clustering"
outlierPath: *outlierTargetPath
outlierPath: '{fsPath}/pipelines-outlier'
solrCollection: biocache
includeSampling: false
includeJackKnife: false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,4 @@ Call<List<DistributionLayer>> getLayersByLsid(
@POST("distribution/outliers/{id}")
Call<Map<String, Double>> outliers(
@Path("id") String lsid, @Body Map<String, Map<String, Double>> points);

// @FormUrlEncoded
// @POST("distribution/outliers/{id}")
// Call<DistributionRequest> outliers(@Path(value="id",encoded = true) String lsid,
// @Field("pointsJson") Map<String, Map<String, Double>> points);

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,17 @@ private DistributionServiceImpl(String baseUrl) {
service = retrofit.create(DistributionService.class);
}

/**
* Init Distribution outlier service
*
* @param baseUrl Url of Spatial service
* @return
*/
public static DistributionServiceImpl init(String baseUrl) {
// set up sampling service
return new DistributionServiceImpl(baseUrl);
}

public List<DistributionLayer> getLayers() throws IOException, ExpertDistributionException {
// Response<List<DistributionLayer>> response =
// distributionService.getLayersByLsid("urn:lsid:biodiversity.org.au:afd.taxon:4f3a5260-4f39-4393-a644-4d05b1c45f92", "false").execute();
Response<List<au.org.ala.distribution.DistributionLayer>> response =
service.getLayers().execute();
int code = response.code();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.lang3.StringUtils;
import org.gbif.pipelines.common.beam.options.PipelinesOptionsFactory;
import org.gbif.pipelines.io.avro.*;
import org.slf4j.MDC;
Expand Down Expand Up @@ -69,6 +71,13 @@ public static void run(DistributionOutlierPipelineOptions options) throws Except
log.info("Adding step 2: calculating outliers index");
PCollection<DistributionOutlierRecord> kvRecords =
indexRecords
.apply(
"Filter out records without id/species/taxon/location",
Filter.by(
it ->
!StringUtils.isEmpty(it.getTaxonID())
&& !StringUtils.isEmpty(it.getLatLng())
&& !StringUtils.isEmpty(it.getId())))
.apply("Key by species", distributionTransform.toKv())
.apply("Grouping by species", GroupByKey.create())
.apply(
Expand All @@ -84,12 +93,14 @@ public static void run(DistributionOutlierPipelineOptions options) throws Except
.to(outputPath + "/outlier")
.withoutSharding()
.withSuffix(".avro"));

// kvRecords
// .apply("to String", distributionTransform.flatToString())
// .apply(
// "Write to text",
// TextIO.write().to(outputPath+"/outlier").withoutSharding().withSuffix(".txt"));
// Checking purpose.
if (System.getenv("test") != null) {
kvRecords
.apply("to String", distributionTransform.flatToString())
.apply(
"Write to text",
TextIO.write().to(outputPath + "/outlier").withoutSharding().withSuffix(".txt"));
}

log.info("Running the pipeline");
PipelineResult result = p.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
public class DistributionOutlierInterpreter {

public static void interpretOccurrenceID(IndexRecord ir, DistributionOutlierRecord dr) {
dr.setOccurrenceID(ir.getId());
dr.setId(ir.getId());
}

public static void interpretLocation(IndexRecord ir, DistributionOutlierRecord dr) {
Expand All @@ -40,7 +40,7 @@ public static void interpretSpeciesId(IndexRecord ir, DistributionOutlierRecord
public static void interpretOccurrenceID(ExtendedRecord er, DistributionOutlierRecord dr) {
String value = extractNullAwareValue(er, DwcTerm.occurrenceID);
if (!Strings.isNullOrEmpty(value)) {
dr.setOccurrenceID(value);
dr.setId(value);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.*;
import org.apache.commons.lang3.StringUtils;
import org.gbif.pipelines.core.functions.SerializableConsumer;
import org.gbif.pipelines.core.interpreters.Interpretation;
import org.gbif.pipelines.io.avro.*;
Expand Down Expand Up @@ -80,12 +81,13 @@ public Iterable<DistributionOutlierRecord> apply(
while (iter.hasNext()) {
IndexRecord record = iter.next();
DistributionOutlierRecord dr = convertToDistribution(record, distanceToEDL);
outputs.add(dr);

Map point = new HashMap();
point.put("decimalLatitude", dr.getDecimalLatitude());
point.put("decimalLongitude", dr.getDecimalLongitude());
points.put(dr.getOccurrenceID(), point);
if (dr != null) {
outputs.add(dr);
Map point = new HashMap();
point.put("decimalLatitude", dr.getDecimalLatitude());
point.put("decimalLongitude", dr.getDecimalLongitude());
points.put(dr.getId(), point);
}
}

if (hasEDL) {
Expand All @@ -94,7 +96,7 @@ public Iterable<DistributionOutlierRecord> apply(
while (iterator.hasNext()) {
Map.Entry<String, Double> entry = iterator.next();
StreamSupport.stream(outputs.spliterator(), false)
.filter(it -> it.getOccurrenceID().equalsIgnoreCase(entry.getKey()))
.filter(it -> it.getId().equalsIgnoreCase(entry.getKey()))
.forEach(it -> it.setDistanceOutOfEDL(entry.getValue()));
}
}
Expand Down Expand Up @@ -131,26 +133,34 @@ public MapElements<DistributionOutlierRecord, String> flatToString() {
*/
private DistributionOutlierRecord convertToDistribution(
IndexRecord record, double distanceToEDL) {
DistributionOutlierRecord newRecord =
DistributionOutlierRecord.newBuilder()
.setId(record.getId())
.setOccurrenceID(record.getId())
.setSpeciesID(record.getTaxonID())
.setDistanceOutOfEDL(distanceToEDL)
.build();

String latlng = record.getLatLng();
String[] coordinates = latlng.split(",");
newRecord.setDecimalLatitude(Double.parseDouble(coordinates[0]));
newRecord.setDecimalLongitude(Double.parseDouble(coordinates[1]));

return newRecord;
try {
if (!StringUtils.isEmpty(record.getId())
&& !StringUtils.isEmpty(record.getTaxonID())
&& !StringUtils.isEmpty(record.getLatLng())) {
DistributionOutlierRecord newRecord =
DistributionOutlierRecord.newBuilder()
.setId(record.getId())
.setSpeciesID(record.getTaxonID())
.setDistanceOutOfEDL(distanceToEDL)
.build();

String latlng = record.getLatLng();
String[] coordinates = latlng.split(",");
newRecord.setDecimalLatitude(Double.parseDouble(coordinates[0]));
newRecord.setDecimalLongitude(Double.parseDouble(coordinates[1]));

return newRecord;
}
} catch (Exception ex) {
log.debug(record.getId() + " does not have lat/lng or taxon. ignored..");
}
return null;
}

private String convertRecordToString(DistributionOutlierRecord record) {
return String.format(
"occurrenceId:%s, lat:%f, lng:%f, speciesId:%s, distanceToEDL:%f",
record.getOccurrenceID(),
record.getId(),
record.getDecimalLatitude(),
record.getDecimalLongitude(),
record.getSpeciesID(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,14 +309,10 @@ public static PCollection<IndexRecord> loadIndexRecords(
if (dataResourceFolder == null || "all".equalsIgnoreCase(dataResourceFolder)) {
dataResourceFolder = "*";
}
return p.apply(
AvroIO.read(IndexRecord.class)
.from(
String.join(
"/",
options.getAllDatasetsInputPath(),
"index-record",
dataResourceFolder,
"*.avro")));
String dataSource =
String.join(
"/", options.getAllDatasetsInputPath(), "index-record", dataResourceFolder, "*.avro");
log.info("Loading index records from: " + dataSource);
return p.apply(AvroIO.read(IndexRecord.class).from(dataSource));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@
"type":"record",
"doc":"ALA Expert Distribution Layer data information",
"fields":[
{"name": "id", "type": "string", "doc": "Pipelines identifier"},
{"name" : "occurrenceID", "type" : ["null", "string"], "default" : null, "doc" : "Occurrence identifier"},
{"name": "id", "type": "string", "doc": "Occurrence identifier"},
{"name" : "distanceOutOfEDL", "type" : "double", "default" : -1, ",doc" : "Distance to the expert distribution layers. -1: no EDL, 0: in EDL, >0 =out of EDL" },
{"name": "decimalLatitude","type":["null","double"],"default":null, "doc":"http://rs.tdwg.org/dwc/terms/decimalLatitude"},
{"name": "decimalLongitude","type":["null","double"],"default":null, "doc":"http://rs.tdwg.org/dwc/terms/decimalLongitude"},
{"name": "speciesID","type":"string", "default": "","doc":"http://rs.tdwg.org/dwc/terms/taxonConceptID"},
{"name": "speciesID","type": ["null", "string"], "default": null,"doc":"http://rs.tdwg.org/dwc/terms/taxonConceptID"},
{"name": "issues", "type": "IssueRecord", "default":{}}
]
}

0 comments on commit c1da691

Please sign in to comment.