Skip to content

Commit

Permalink
Merge branch '622_distribution' of https://github.com/gbif/pipelines
Browse files Browse the repository at this point in the history
…into 622_distribution
  • Loading branch information
qifeng-bai committed Feb 16, 2022
2 parents 8e193fc + bd31546 commit 241bd35
Show file tree
Hide file tree
Showing 10 changed files with 136 additions and 242 deletions.
4 changes: 2 additions & 2 deletions livingatlas/configs/la-pipelines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ sampling:
# class:au.org.ala.pipelines.beam.DistributionOutlierPipeline
outlier:
appName: Expert distribution outliers for {datasetId}
baseUrl: https://spatial.ala.org.au/ws/
baseUrl: https://spatial-test.ala.org.au/ws/
targetPath: '{fsPath}/pipelines-outlier'
allDatasetsInputPath: '{fsPath}/pipelines-all-datasets'
runner: SparkRunner
Expand Down Expand Up @@ -376,7 +376,7 @@ index-sh-args:
spark-embedded:
jvm: -Xmx8g -XX:+UseG1GC
spark-cluster:
conf: spark.default.parallelism=500
conf: spark.default.parallelism=48
num-executors: 6
executor-cores: 8
executor-memory: 20G
Expand Down
Original file line number Diff line number Diff line change
@@ -1,40 +1,30 @@
package au.org.ala.pipelines.beam;

import static org.apache.beam.sdk.extensions.joinlibrary.Join.leftOuterJoin;

import au.org.ala.pipelines.options.AllDatasetsPipelinesOptions;
import au.org.ala.pipelines.options.DistributionOutlierPipelineOptions;
import au.org.ala.pipelines.transforms.DistributionOutlierTransform;
import au.org.ala.pipelines.util.VersionInfo;
import au.org.ala.utils.ALAFsUtils;
import au.org.ala.utils.CombinedYamlConfiguration;
import java.io.*;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.metrics.*;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.gbif.pipelines.common.beam.metrics.MetricsHandler;
import org.gbif.pipelines.common.beam.options.PipelinesOptionsFactory;
import org.gbif.pipelines.core.factory.FileSystemFactory;
import org.gbif.pipelines.io.avro.*;
import org.jetbrains.annotations.NotNull;
import org.slf4j.MDC;

/**
Expand Down Expand Up @@ -74,19 +64,13 @@ public static void main(String[] args) throws Exception {

public static void run(DistributionOutlierPipelineOptions options) throws Exception {

Counter indexRecordCounter = Metrics.counter(IndexRecord.class, INDEX_RECORD_COUNTER);
Counter existingRecordCounter =
Metrics.counter(DistributionOutlierRecord.class, EXISTING_RECORD_COUNTER);
Counter newRecordCounter = Metrics.counter(IndexRecord.class, NEW_RECORD_COUNTER);
Counter existingRecordCounter = Metrics.counter(IndexRecord.class, EXISTING_RECORD_COUNTER);
Counter indexRecordCounter = Metrics.counter(IndexRecord.class, INDEX_RECORD_COUNTER);

// Create output path
// default: {fsPath}/pipelines-outlier
// or {fsPath}/pipelines-outlier/{datasetId}
String outputPath = ALAFsUtils.buildPathOutlierUsingTargetPath(options, false);

log.info("Adding step 1: Collecting index records");
Pipeline p = Pipeline.create(options);

log.info("Adding step 1: Read all index records");
PCollection<IndexRecord> indexRecords =
ALAFsUtils.loadIndexRecords(options, p)
.apply(
Expand All @@ -95,50 +79,54 @@ public static void run(DistributionOutlierPipelineOptions options) throws Except
it ->
!StringUtils.isEmpty(it.getTaxonID())
&& !StringUtils.isEmpty(it.getLatLng())
&& !StringUtils.isEmpty(it.getId())))
.apply(
MapElements.via(
new SimpleFunction<IndexRecord, IndexRecord>() {
@Override
public IndexRecord apply(IndexRecord input) {
indexRecordCounter.inc();
return input;
}
}));
&& !StringUtils.isEmpty(it.getId())));

if (options.isAddDebugCounts()) {
log.info("Adding step 1a: Adding idnex record count metric");
indexRecords
.apply(Count.globally())
.apply(
MapElements.via(
new SimpleFunction<Long, Long>() {
@Override
public Long apply(Long input) {
indexRecordCounter.inc(input);
return input;
}
}));
}

DistributionOutlierTransform distributionTransform =
new DistributionOutlierTransform(options.getBaseUrl());

log.info("Adding step 2: Loading existing outliers records");
PCollection<DistributionOutlierRecord> existingOutliers =
loadExistingRecords(options, p, outputPath);
//Count existing records
existingOutliers.apply(
MapElements.via(
new SimpleFunction<DistributionOutlierRecord, DistributionOutlierRecord>() {
@Override
public DistributionOutlierRecord apply(DistributionOutlierRecord input) {
existingRecordCounter.inc();
return input;
}
}));

log.info("Adding step 3: Filtering out the records which already have outliers");
log.info("Adding step 2: Create UUID -> IndexRecords");
PCollection<KV<String, IndexRecord>> kvIndexRecords =
indexRecords.apply(
WithKeys.<String, IndexRecord>of(it -> it.getId())
.withKeyType(TypeDescriptors.strings()));

log.info("Adding step 3: Loading existing outliers records");
PCollection<DistributionOutlierRecord> existingOutliers = loadExistingRecords(options, p);

if (options.isAddDebugCounts()) {
log.info("Adding step 3a: Adding existing record count metric");
existingOutliers
.apply(Count.globally())
.apply(
MapElements.via(
new SimpleFunction<Long, Long>() {
@Override
public Long apply(Long input) {
existingRecordCounter.inc(input);
return input;
}
}));
}

log.info("Adding step 4: Create UUID -> Boolean");
PCollection<KV<String, Boolean>> kvExistingOutliers =
existingOutliers
.apply(
MapElements.via(
new SimpleFunction<DistributionOutlierRecord, String>() {
@Override
public String apply(DistributionOutlierRecord input) {
return input.getId();
}
}))
.apply(MapElements.into(TypeDescriptors.strings()).via(dor -> dor.getId()))
.apply(Distinct.create())
.apply(
MapElements.via(
Expand All @@ -149,9 +137,9 @@ public KV<String, Boolean> apply(String input) {
}
}));

log.info("Adding step 5: Generate list of records not marked as distribution outliers");
PCollection<IndexRecord> newAddedIndexRecords =
org.apache.beam.sdk.extensions.joinlibrary.Join.leftOuterJoin(
kvIndexRecords, kvExistingOutliers, false)
leftOuterJoin(kvIndexRecords, kvExistingOutliers, false)
.apply(
Filter.by(
(SerializableFunction<KV<String, KV<IndexRecord, Boolean>>, Boolean>)
Expand All @@ -167,8 +155,7 @@ public void processElement(ProcessContext c) {
}
}));

log.info("Adding step 4: calculating outliers index");

log.info("Adding step 6: calculating outliers index");
PCollection<DistributionOutlierRecord> kvRecords =
newAddedIndexRecords
.apply("Key by species", distributionTransform.toKv())
Expand All @@ -178,118 +165,38 @@ public void processElement(ProcessContext c) {
distributionTransform.calculateOutlier())
.apply("Flatten records", Flatten.iterables());

DateFormat df = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss");
String ts = df.format(new Date());
log.info("Adding step 7: Join newly create DistributionOutlierRecord to existing ones");
PCollection<DistributionOutlierRecord> union =
PCollectionList.of(kvRecords).and(existingOutliers).apply(Flatten.pCollections());

String targetFile = outputPath + "/outlier_" + ts;
log.info("Adding step 5: writing to " + targetFile + ".avro");
kvRecords.apply(
// Create output path
// default: {fsPath}/pipelines-outlier
// or {fsPath}/pipelines-outlier/{datasetId}
String outputPath = ALAFsUtils.buildPathOutlierUsingTargetPath(options, false);
log.info("Adding step 8: Writing to " + outputPath + "/outlier-*.avro");
union.apply(
"Write to file",
AvroIO.write(DistributionOutlierRecord.class)
.to(targetFile)
.withoutSharding()
.to(outputPath + "/outlier")
.withSuffix(".avro"));
// Checking purpose.
if (System.getenv("test") != null) {
kvRecords
.apply("to String", distributionTransform.flatToString())
.apply(
"Write to text", TextIO.write().to(targetFile).withoutSharding().withSuffix(".txt"));
}

log.info("Running the pipeline");
PipelineResult result = p.run();
result.waitUntilFinish();

writeWorkLogs(
options,
outputPath,
getMetricCount(result, INDEX_RECORD_COUNTER),
getMetricCount(result, EXISTING_RECORD_COUNTER),
getMetricCount(result, NEW_RECORD_COUNTER),
targetFile);
}

/**
* Read count from counter
*
* @param result Pipelines
* @param counterName
* @return
*/
@NotNull
private static Double getMetricCount(PipelineResult result, String counterName) {
Iterator<MetricResult<Long>> iter =
result.metrics().queryMetrics(MetricsFilter.builder().build()).getCounters().iterator();
while (iter.hasNext()) {
MetricResult<Long> counter = iter.next();
if (counter.getName().getName().equalsIgnoreCase(counterName)) {
return counter.getAttempted().doubleValue();
}
}

return 0d;
}

/**
* todo: hdfs may not support file expending
* @param options
* @param outputPath
* @param numAllRecords
* @param numExistingRecords
* @param numNewRecords
* @param targetFile
*/
private static void writeWorkLogs(
AllDatasetsPipelinesOptions options,
String outputPath,
double numAllRecords,
double numExistingRecords,
double numNewRecords,
String targetFile) {
try {
List<String> WORKLOGS = new ArrayList();
WORKLOGS.add("Output file: " + targetFile);
WORKLOGS.add("Total indexed records: " + numAllRecords);
WORKLOGS.add("Records which outlier distances have been calculated: " + numExistingRecords);
WORKLOGS.add("Records will be calculated: " + numNewRecords);

WORKLOGS.forEach(it -> log.info(it));
WORKLOGS.add("------------------------------------------------\r\n");

FileSystem fs =
FileSystemFactory.getInstance(options.getHdfsSiteConfig(), options.getCoreSiteConfig())
.getFs(outputPath);
FSDataOutputStream os = null;
if (fs.exists(new Path(outputPath + "/work_logs.txt"))) {
os = fs.append(new Path(outputPath + "/work_logs.txt"));
} else {
os = fs.create(new Path(outputPath + "/work_logs.txt"));
}
InputStream is = new ByteArrayInputStream(String.join("\r\n", WORKLOGS).getBytes());
IOUtils.copyBytes(is, os, 4096, true);
} catch (Exception e) {
log.warn("Cannot write work history, appending file may not supported? ignored! ");
}
log.info("Writing metrics.....");
MetricsHandler.saveCountersToTargetPathFile(options, result.metrics());
log.info("Writing metrics written.");
}

private static PCollection<DistributionOutlierRecord> loadExistingRecords(
AllDatasetsPipelinesOptions options, Pipeline p, String outputPath) throws IOException {

FileSystem fs =
FileSystemFactory.getInstance(options.getHdfsSiteConfig(), options.getCoreSiteConfig())
.getFs(outputPath);
AllDatasetsPipelinesOptions options, Pipeline p) {

String outlierPath = ALAFsUtils.getOutlierTargetPath(options);
boolean hasOutlier = ALAFsUtils.hasAvro(fs, outlierPath, false);
log.debug("Try to Load existing outliers from {}", outlierPath);

if (hasOutlier) {
String samplingPath = String.join("/", outlierPath, "*.avro");
return p.apply(AvroIO.read(DistributionOutlierRecord.class).from(samplingPath));
} else {
log.info("No existing outlier AVRO files under " + outlierPath);
return p.apply(Create.empty(AvroCoder.of(DistributionOutlierRecord.class)));
}
log.info("Existing outlier cache path {}", outlierPath);
return p.apply(
AvroIO.read(DistributionOutlierRecord.class)
.from(outlierPath)
.withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,9 @@ public void processElement(ProcessContext c) {
.setInts(indexRecord.getInts())
.build();
if (outlierRecord != null) {
ouputIR.getDoubles().put(DISTANCE_TO_EDL, outlierRecord.getDistanceOutOfEDL());
ouputIR
.getDoubles()
.put(DISTANCE_FROM_EXPERT_DISTRIBUTION, outlierRecord.getDistanceOutOfEDL());
}

c.output(KV.of(id, ouputIR));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,9 @@

import static org.gbif.pipelines.core.utils.ModelUtils.*;

import au.org.ala.pipelines.parser.CoordinatesParser;
import com.google.common.base.Strings;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.gbif.dwc.terms.DwcTerm;
import org.gbif.kvs.geocode.LatLng;
import org.gbif.pipelines.core.parsers.common.ParsedField;
import org.gbif.pipelines.io.avro.DistributionOutlierRecord;
import org.gbif.pipelines.io.avro.ExtendedRecord;
import org.gbif.pipelines.io.avro.IndexRecord;

/*
Expand All @@ -33,32 +27,4 @@ public static void interpretLocation(IndexRecord ir, DistributionOutlierRecord d
public static void interpretSpeciesId(IndexRecord ir, DistributionOutlierRecord dr) {
dr.setSpeciesID(ir.getTaxonID());
}

/*
* Interprete from verbatim
*/
public static void interpretOccurrenceID(ExtendedRecord er, DistributionOutlierRecord dr) {
String value = extractNullAwareValue(er, DwcTerm.occurrenceID);
if (!Strings.isNullOrEmpty(value)) {
dr.setId(value);
}
}

public static void interpretLocation(ExtendedRecord er, DistributionOutlierRecord dr) {
ParsedField<LatLng> parsedLatLon = CoordinatesParser.parseCoords(er);
addIssue(dr, parsedLatLon.getIssues());

if (parsedLatLon.isSuccessful()) {
LatLng latlng = parsedLatLon.getResult();
dr.setDecimalLatitude(latlng.getLatitude());
dr.setDecimalLongitude(latlng.getLongitude());
}
}

public static void interpretSpeciesId(ExtendedRecord er, DistributionOutlierRecord dr) {
String value = extractNullAwareValue(er, DwcTerm.taxonConceptID);
if (!Strings.isNullOrEmpty(value)) {
dr.setSpeciesID(value);
}
}
}
Loading

0 comments on commit 241bd35

Please sign in to comment.