Skip to content

Commit

Permalink
#622 impl(Only calculated new added records)
Browse files Browse the repository at this point in the history
  • Loading branch information
qifeng-bai committed Jan 21, 2022
1 parent 378064e commit 81b891a
Show file tree
Hide file tree
Showing 4 changed files with 217 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -1,21 +1,36 @@
package au.org.ala.pipelines.beam;

import au.org.ala.pipelines.options.AllDatasetsPipelinesOptions;
import au.org.ala.pipelines.options.DistributionOutlierPipelineOptions;
import au.org.ala.pipelines.transforms.DistributionOutlierTransform;
import au.org.ala.pipelines.util.VersionInfo;
import au.org.ala.utils.ALAFsUtils;
import au.org.ala.utils.CombinedYamlConfiguration;
import java.io.*;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.gbif.pipelines.common.beam.options.PipelinesOptionsFactory;
import org.gbif.pipelines.core.factory.FileSystemFactory;
import org.gbif.pipelines.io.avro.*;
import org.slf4j.MDC;

Expand All @@ -34,6 +49,7 @@
@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class DistributionOutlierPipeline {
static List<String> WORKLOGS = new ArrayList();

public static void main(String[] args) throws Exception {
VersionInfo.print();
Expand All @@ -55,52 +71,191 @@ public static void run(DistributionOutlierPipelineOptions options) throws Except
// Create output path
// default: {fsPath}/pipelines-outlier
// or {fsPath}/pipelines-outlier/{datasetId}
String outputPath = ALAFsUtils.buildPathOutlierUsingTargetPath(options);
String outputPath = ALAFsUtils.buildPathOutlierUsingTargetPath(options, false);

log.info("Adding step 1: Collecting index records");
Pipeline p = Pipeline.create(options);

PCollection<IndexRecord> indexRecords = ALAFsUtils.loadIndexRecords(options, p);

DistributionOutlierTransform distributionTransform =
new DistributionOutlierTransform(options.getBaseUrl());

log.info("Adding step 2: calculating outliers index");
PCollection<DistributionOutlierRecord> kvRecords =
indexRecords
PCollection<IndexRecord> indexRecords =
ALAFsUtils.loadIndexRecords(options, p)
.apply(
"Filter out records without id/species/taxon/location",
Filter.by(
it ->
!StringUtils.isEmpty(it.getTaxonID())
&& !StringUtils.isEmpty(it.getLatLng())
&& !StringUtils.isEmpty(it.getId())))
&& !StringUtils.isEmpty(it.getId())));

indexRecords
.apply(Count.globally())
.apply(
MapElements.via(
new SimpleFunction<Long, Long>() {
@Override
public Long apply(Long input) {
log.info("Number of indexed records loaded: " + input);
WORKLOGS.add("Number of indexed records loaded: " + input);
return input;
}
}));

DistributionOutlierTransform distributionTransform =
new DistributionOutlierTransform(options.getBaseUrl());

log.info("Adding step 2: Loading existing outliers records");
PCollection<DistributionOutlierRecord> exitedOutliers =
loadExistingRecords(options, p, outputPath);

exitedOutliers
.apply(Count.globally())
.apply(
MapElements.via(
new SimpleFunction<Long, Long>() {
@Override
public Long apply(Long input) {
log.info("Number of existing outlier records: " + input);
WORKLOGS.add("Number of existing outlier records: " + input);
return input;
}
}));

log.info("Adding step 3: Filtering out the records which already have outliers");
PCollection<KV<String, IndexRecord>> kvIndexRecords =
indexRecords.apply(
WithKeys.<String, IndexRecord>of(it -> it.getId())
.withKeyType(TypeDescriptors.strings()));

PCollection<KV<String, Boolean>> kvExistingOutliers =
exitedOutliers
.apply(
MapElements.via(
new SimpleFunction<DistributionOutlierRecord, String>() {
@Override
public String apply(DistributionOutlierRecord input) {
return input.getId();
}
}))
.apply(Distinct.create())
.apply(
MapElements.via(
new SimpleFunction<String, KV<String, Boolean>>() {
@Override
public KV<String, Boolean> apply(String input) {
return KV.of(input, true);
}
}));

PCollection<IndexRecord> newAddedIndexRecords =
org.apache.beam.sdk.extensions.joinlibrary.Join.leftOuterJoin(
kvIndexRecords, kvExistingOutliers, false)
.apply(
Filter.by(
(SerializableFunction<KV<String, KV<IndexRecord, Boolean>>, Boolean>)
input -> !input.getValue().getValue())) // Choose outlier-not-exist
.apply(
ParDo.of(
new DoFn<KV<String, KV<IndexRecord, Boolean>>, IndexRecord>() {
@ProcessElement
public void processElement(ProcessContext c) {
KV<IndexRecord, Boolean> kv = c.element().getValue();
c.output(kv.getKey());
}
}));

newAddedIndexRecords
.apply(Count.globally())
.apply(
MapElements.via(
new SimpleFunction<Long, Long>() {
@Override
public Long apply(Long input) {
log.info("Number of records to be calculated: " + input);
WORKLOGS.add("Number of records to be calculated: " + input);
return input;
}
}));

log.info("Adding step 4: calculating outliers index");

PCollection<DistributionOutlierRecord> kvRecords =
newAddedIndexRecords
.apply("Key by species", distributionTransform.toKv())
.apply("Grouping by species", GroupByKey.create())
.apply(
"Calculating outliers based on the species",
distributionTransform.calculateOutlier())
.apply("Flatten records", Flatten.iterables());

log.info("Adding step 3: writing to outliers");
DateFormat df = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss");
String ts = df.format(new Date());

String targetFile = outputPath + "/outlier_" + ts;
WORKLOGS.add("Output: outlier_" + ts + ".avro");
log.info("Adding step 5: writing to " + targetFile + ".avro");
kvRecords.apply(
"Write to file",
AvroIO.write(DistributionOutlierRecord.class)
.to(outputPath + "/outlier")
.to(targetFile)
.withoutSharding()
.withSuffix(".avro"));
// Checking purpose.
if (System.getenv("test") != null) {
kvRecords
.apply("to String", distributionTransform.flatToString())
.apply(
"Write to text",
TextIO.write().to(outputPath + "/outlier").withoutSharding().withSuffix(".txt"));
"Write to text", TextIO.write().to(targetFile).withoutSharding().withSuffix(".txt"));
}

log.info("Running the pipeline");
PipelineResult result = p.run();
result.waitUntilFinish();

writeWorkLogs(options, outputPath);
}

/**
* TODO: HDSF does not support file appending
*
* @param options
* @param outputPath
*/
private static void writeWorkLogs(AllDatasetsPipelinesOptions options, String outputPath) {
try {
WORKLOGS.add("-------------------------------\r\n");
FileSystem fs =
FileSystemFactory.getInstance(options.getHdfsSiteConfig(), options.getCoreSiteConfig())
.getFs(outputPath);
FSDataOutputStream os = null;
if (fs.exists(new Path(outputPath + "/work_logs.txt"))) {
os = fs.append(new Path(outputPath + "/work_logs.txt"));
} else {
os = fs.create(new Path(outputPath + "/work_logs.txt"));
}
InputStream is = new ByteArrayInputStream(String.join("\r\n", WORKLOGS).getBytes());
IOUtils.copyBytes(is, os, 4096, true);
} catch (Exception e) {
log.warn("Cannot write work history, appending file may not supported? ignored! ");
}
}

private static PCollection<DistributionOutlierRecord> loadExistingRecords(
AllDatasetsPipelinesOptions options, Pipeline p, String outputPath) throws IOException {

FileSystem fs =
FileSystemFactory.getInstance(options.getHdfsSiteConfig(), options.getCoreSiteConfig())
.getFs(outputPath);

String outlierPath = ALAFsUtils.getOutlierTargetPath(options);
boolean hasOutlier = ALAFsUtils.existsAndNonEmpty(fs, outlierPath);

log.info("Has outlier records from previous runs? {}", hasOutlier);

if (hasOutlier) {
String samplingPath = String.join("/", outlierPath, "*.avro");
log.debug("Loading existing outliers from {}", samplingPath);
return p.apply(AvroIO.read(DistributionOutlierRecord.class).from(samplingPath));
} else {
return p.apply(Create.empty(AvroCoder.of(DistributionOutlierRecord.class)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public Iterable<DistributionOutlierRecord> apply(
Iterable<IndexRecord> records = input.getValue();
Iterator<IndexRecord> iter = records.iterator();
List<DistributionOutlierRecord> outputs = new ArrayList();
Map points = new HashMap();

try {
DistributionServiceImpl distributionService =
DistributionServiceImpl.init(spatialUrl);
Expand All @@ -78,9 +78,10 @@ public Iterable<DistributionOutlierRecord> apply(

boolean hasEDL = edl.size() > 0 ? true : false;
double distanceToEDL = hasEDL ? 0 : -1; // 0 -inside, -1: no EDL
// Available EDLD of this species
// Available EDLs of this species

if (hasEDL) {
Map points = new HashMap();
while (iter.hasNext()) {
IndexRecord record = iter.next();
DistributionOutlierRecord dr = convertToDistribution(record, distanceToEDL);
Expand All @@ -103,10 +104,17 @@ public Iterable<DistributionOutlierRecord> apply(
.filter(it -> it.getId().equalsIgnoreCase(entry.getKey()))
.forEach(it -> it.setDistanceOutOfEDL(entry.getValue()));
}
} else {
while (iter.hasNext()) {
IndexRecord record = iter.next();
DistributionOutlierRecord dr = convertToDistribution(record, distanceToEDL);
if (dr != null) {
outputs.add(dr);
}
}
}
} catch (ExpertDistributionException e) {
log.error("Error in processing the species: " + lsid + " . Ignored");
log.error("Points: " + points);
log.error(e.getMessage());
} catch (Exception e) {
log.error("Runtime error in processing the species: " + lsid + " . Ignored");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,40 @@ public static String buildPathSamplingUsingTargetPath(AllDatasetsPipelinesOption
}

/**
* Build a path to outlier records. {fsPath}/pipelines-outlier/{datasetId}
* NOTE: It will delete the existing folder Build a path to outlier records.
* {fsPath}/pipelines-outlier/{datasetId} {fsPath}/pipelines-outlier/all
*/
public static String buildPathOutlierUsingTargetPath(
AllDatasetsPipelinesOptions options, boolean delete) throws IOException {
// default: {fsPath}/pipelines-outlier
FileSystem fs =
FileSystemFactory.getInstance(options.getHdfsSiteConfig(), options.getCoreSiteConfig())
.getFs(options.getTargetPath());

String outputPath = PathBuilder.buildPath(options.getTargetPath()).toString();

// {fsPath}/pipelines-outlier/{datasetId}
if (options.getDatasetId() != null && !"all".equalsIgnoreCase(options.getDatasetId())) {
outputPath = PathBuilder.buildPath(outputPath, options.getDatasetId()).toString();
} else {
// {fsPath}/pipelines-outlier/all
outputPath = PathBuilder.buildPath(outputPath, "all").toString();
}
// delete previous runs
if (delete)
FsUtils.deleteIfExist(options.getHdfsSiteConfig(), options.getCoreSiteConfig(), outputPath);
else {
if (!exists(fs, outputPath)) ALAFsUtils.createDirectory(fs, outputPath);
}

return outputPath;
}

/**
* Get an output path to outlier records. {fsPath}/pipelines-outlier/{datasetId}
* {fsPath}/pipelines-outlier/all
*/
public static String buildPathOutlierUsingTargetPath(AllDatasetsPipelinesOptions options)
public static String getOutlierTargetPath(AllDatasetsPipelinesOptions options)
throws IOException {
// default: {fsPath}/pipelines-outlier
FileSystem fs =
Expand All @@ -91,9 +121,7 @@ public static String buildPathOutlierUsingTargetPath(AllDatasetsPipelinesOptions
// {fsPath}/pipelines-outlier/all
outputPath = PathBuilder.buildPath(outputPath, "all").toString();
}
// delete previous runs
FsUtils.deleteIfExist(options.getHdfsSiteConfig(), options.getCoreSiteConfig(), outputPath);
ALAFsUtils.createDirectory(fs, outputPath);

return outputPath;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,13 @@ public void url_as_id_outliers() {
inPoint.put("decimalLongitude", 142.854663);
points.put("e512c707-fe92-492c-b869-799c57388c45", inPoint);

//Map<String, Double> results = impl.outliers(URLEncoder.encode("urn:lsid:biodiversity.org.au:afd.taxon:0c3e2403-05c4-4a43-8019-30e6d657a283", StandardCharsets.UTF_8.toString()), points);
Map<String, Double> results = impl.outliers("https://id.biodiversity.org.au/node/apni/2908371", points);
// Map<String, Double> results =
// impl.outliers(URLEncoder.encode("urn:lsid:biodiversity.org.au:afd.taxon:0c3e2403-05c4-4a43-8019-30e6d657a283", StandardCharsets.UTF_8.toString()), points);
Map<String, Double> results =
impl.outliers("https://id.biodiversity.org.au/node/apni/2908371", points);
assertSame(0, results.size());
} catch (Exception e) {
e.printStackTrace();
}
}


}

0 comments on commit 81b891a

Please sign in to comment.