diff --git a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/DistributionOutlierPipeline.java b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/DistributionOutlierPipeline.java index ef5ff37163..69bdaf881b 100644 --- a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/DistributionOutlierPipeline.java +++ b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/DistributionOutlierPipeline.java @@ -11,6 +11,7 @@ import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; +import java.util.Iterator; import java.util.List; import lombok.AccessLevel; import lombok.NoArgsConstructor; @@ -20,6 +21,7 @@ import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.io.AvroIO; import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.metrics.*; import org.apache.beam.sdk.transforms.*; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -32,6 +34,7 @@ import org.gbif.pipelines.common.beam.options.PipelinesOptionsFactory; import org.gbif.pipelines.core.factory.FileSystemFactory; import org.gbif.pipelines.io.avro.*; +import org.jetbrains.annotations.NotNull; import org.slf4j.MDC; /** @@ -49,7 +52,10 @@ @Slf4j @NoArgsConstructor(access = AccessLevel.PRIVATE) public class DistributionOutlierPipeline { - static List WORKLOGS = new ArrayList(); + + static String INDEX_RECORD_COUNTER = "IndexRecordCounter"; + static String EXISTING_RECORD_COUNTER = "ExistingRecordCounter"; + static String NEW_RECORD_COUNTER = "NewRecordCounter"; public static void main(String[] args) throws Exception { VersionInfo.print(); @@ -68,6 +74,11 @@ public static void main(String[] args) throws Exception { public static void run(DistributionOutlierPipelineOptions options) throws Exception { + Counter indexRecordCounter = Metrics.counter(IndexRecord.class, INDEX_RECORD_COUNTER); + Counter existingRecordCounter = + Metrics.counter(DistributionOutlierRecord.class, EXISTING_RECORD_COUNTER); + Counter newRecordCounter = Metrics.counter(IndexRecord.class, NEW_RECORD_COUNTER); + // Create output path // default: {fsPath}/pipelines-outlier // or {fsPath}/pipelines-outlier/{datasetId} @@ -84,40 +95,33 @@ public static void run(DistributionOutlierPipelineOptions options) throws Except it -> !StringUtils.isEmpty(it.getTaxonID()) && !StringUtils.isEmpty(it.getLatLng()) - && !StringUtils.isEmpty(it.getId()))); - - indexRecords - .apply(Count.globally()) - .apply( - MapElements.via( - new SimpleFunction() { - @Override - public Long apply(Long input) { - log.info("Number of indexed records loaded: " + input); - WORKLOGS.add("Number of indexed records loaded: " + input); - return input; - } - })); + && !StringUtils.isEmpty(it.getId()))) + .apply( + MapElements.via( + new SimpleFunction() { + @Override + public IndexRecord apply(IndexRecord input) { + indexRecordCounter.inc(); + return input; + } + })); DistributionOutlierTransform distributionTransform = new DistributionOutlierTransform(options.getBaseUrl()); log.info("Adding step 2: Loading existing outliers records"); - PCollection exitedOutliers = + PCollection existingOutliers = loadExistingRecords(options, p, outputPath); - - exitedOutliers - .apply(Count.globally()) - .apply( - MapElements.via( - new SimpleFunction() { - @Override - public Long apply(Long input) { - log.info("Number of existing outlier records: " + input); - WORKLOGS.add("Number of existing outlier records: " + input); - return input; - } - })); + //Count existing records + existingOutliers.apply( + MapElements.via( + new SimpleFunction() { + @Override + public DistributionOutlierRecord apply(DistributionOutlierRecord input) { + existingRecordCounter.inc(); + return input; + } + })); log.info("Adding step 3: Filtering out the records which already have outliers"); PCollection> kvIndexRecords = @@ -126,7 +130,7 @@ public Long apply(Long input) { .withKeyType(TypeDescriptors.strings())); PCollection> kvExistingOutliers = - exitedOutliers + existingOutliers .apply( MapElements.via( new SimpleFunction() { @@ -157,24 +161,12 @@ public KV apply(String input) { new DoFn>, IndexRecord>() { @ProcessElement public void processElement(ProcessContext c) { + newRecordCounter.inc(); KV kv = c.element().getValue(); c.output(kv.getKey()); } })); - newAddedIndexRecords - .apply(Count.globally()) - .apply( - MapElements.via( - new SimpleFunction() { - @Override - public Long apply(Long input) { - log.info("Number of records to be calculated: " + input); - WORKLOGS.add("Number of records to be calculated: " + input); - return input; - } - })); - log.info("Adding step 4: calculating outliers index"); PCollection kvRecords = @@ -190,7 +182,6 @@ public Long apply(Long input) { String ts = df.format(new Date()); String targetFile = outputPath + "/outlier_" + ts; - WORKLOGS.add("Output: outlier_" + ts + ".avro"); log.info("Adding step 5: writing to " + targetFile + ".avro"); kvRecords.apply( "Write to file", @@ -210,7 +201,34 @@ public Long apply(Long input) { PipelineResult result = p.run(); result.waitUntilFinish(); - writeWorkLogs(options, outputPath); + writeWorkLogs( + options, + outputPath, + getMetricCount(result, INDEX_RECORD_COUNTER), + getMetricCount(result, EXISTING_RECORD_COUNTER), + getMetricCount(result, NEW_RECORD_COUNTER), + targetFile); + } + + /** + * Read count from counter + * + * @param result Pipelines + * @param counterName + * @return + */ + @NotNull + private static Double getMetricCount(PipelineResult result, String counterName) { + Iterator> iter = + result.metrics().queryMetrics(MetricsFilter.builder().build()).getCounters().iterator(); + while (iter.hasNext()) { + MetricResult counter = iter.next(); + if (counter.getName().getName().equalsIgnoreCase(counterName)) { + return counter.getAttempted().doubleValue(); + } + } + + return 0d; } /** @@ -219,9 +237,23 @@ public Long apply(Long input) { * @param options * @param outputPath */ - private static void writeWorkLogs(AllDatasetsPipelinesOptions options, String outputPath) { + private static void writeWorkLogs( + AllDatasetsPipelinesOptions options, + String outputPath, + double numAllRecords, + double numExistingRecords, + double numNewRecords, + String targetFile) { try { - WORKLOGS.add("-------------------------------\r\n"); + List WORKLOGS = new ArrayList(); + WORKLOGS.add("Output file: " + targetFile); + WORKLOGS.add("Total indexed records: " + numAllRecords); + WORKLOGS.add("Records which outlier distances have been calculated: " + numExistingRecords); + WORKLOGS.add("Records will be calculated: " + numNewRecords); + + WORKLOGS.forEach(it -> log.info(it)); + WORKLOGS.add("------------------------------------------------\r\n"); + FileSystem fs = FileSystemFactory.getInstance(options.getHdfsSiteConfig(), options.getCoreSiteConfig()) .getFs(outputPath);