Skip to content

Commit

Permalink
Merge pull request #681 from gbif/622_distribution_dm
Browse files Browse the repository at this point in the history
Reverted back to deltas for expert distribution
  • Loading branch information
qifeng-bai authored Mar 2, 2022
2 parents dd59900 + 5b657fa commit d05e9fb
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
import au.org.ala.pipelines.util.VersionInfo;
import au.org.ala.utils.ALAFsUtils;
import au.org.ala.utils.CombinedYamlConfiguration;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -19,7 +22,6 @@
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.commons.lang3.StringUtils;
import org.gbif.pipelines.common.beam.metrics.MetricsHandler;
Expand Down Expand Up @@ -165,20 +167,18 @@ public void processElement(ProcessContext c) {
distributionTransform.calculateOutlier())
.apply("Flatten records", Flatten.iterables());

log.info("Adding step 7: Join newly create DistributionOutlierRecord to existing ones");
PCollection<DistributionOutlierRecord> union =
PCollectionList.of(kvRecords).and(existingOutliers).apply(Flatten.pCollections());
DateFormat df = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss");
String ts = df.format(new Date());

// Create output path
// default: {fsPath}/pipelines-outlier
// or {fsPath}/pipelines-outlier/{datasetId}
String outputPath = ALAFsUtils.buildPathOutlierUsingTargetPath(options, false);
log.info("Adding step 8: Writing to " + outputPath + "/outlier-*.avro");
union.apply(
log.info("Adding step 8: Writing to " + outputPath + "/outlier_" + ts + "*.avro");
kvRecords.apply(
"Write to file",
AvroIO.write(DistributionOutlierRecord.class)
.to(outputPath + "/outlier")
.withoutSharding()
.to(outputPath + "/outlier_" + ts)
.withSuffix(".avro"));

log.info("Running the pipeline");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,7 @@ public void processElement(ProcessContext c) {
.setInts(indexRecord.getInts())
.setStrings(stringsToPersist)
.setDoubles(doublesToPersist)
.setDynamicProperties(indexRecord.getDynamicProperties())
.build();

c.output(KV.of(indexRecord.getId(), ir));
Expand All @@ -563,26 +564,15 @@ public void processElement(ProcessContext c) {
String id = e.getKey();

DistributionOutlierRecord outlierRecord = e.getValue().getValue();

IndexRecord indexRecord = e.getValue().getKey();
IndexRecord ouputIR =
IndexRecord.newBuilder()
.setId(indexRecord.getId())
.setTaxonID(indexRecord.getTaxonID())
.setLatLng(indexRecord.getLatLng())
.setMultiValues(indexRecord.getMultiValues())
.setDates(indexRecord.getDates())
.setLongs(indexRecord.getLongs())
.setBooleans(indexRecord.getBooleans())
.setInts(indexRecord.getInts())
.build();

if (outlierRecord != null) {
ouputIR
indexRecord
.getDoubles()
.put(DISTANCE_FROM_EXPERT_DISTRIBUTION, outlierRecord.getDistanceOutOfEDL());
}

c.output(KV.of(id, ouputIR));
c.output(KV.of(id, indexRecord));
}
};
}
Expand Down

0 comments on commit d05e9fb

Please sign in to comment.