Skip to content

Commit

Permalink
#622 improve(counters)
Browse files Browse the repository at this point in the history
  • Loading branch information
qifeng-bai committed Feb 2, 2022
1 parent 3bb724b commit eda9310
Showing 1 changed file with 79 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
Expand All @@ -20,6 +21,7 @@
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.metrics.*;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
Expand All @@ -32,6 +34,7 @@
import org.gbif.pipelines.common.beam.options.PipelinesOptionsFactory;
import org.gbif.pipelines.core.factory.FileSystemFactory;
import org.gbif.pipelines.io.avro.*;
import org.jetbrains.annotations.NotNull;
import org.slf4j.MDC;

/**
Expand All @@ -49,7 +52,10 @@
@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class DistributionOutlierPipeline {
static List<String> WORKLOGS = new ArrayList();

static String INDEX_RECORD_COUNTER = "IndexRecordCounter";
static String EXISTING_RECORD_COUNTER = "ExistingRecordCounter";
static String NEW_RECORD_COUNTER = "NewRecordCounter";

public static void main(String[] args) throws Exception {
VersionInfo.print();
Expand All @@ -68,6 +74,11 @@ public static void main(String[] args) throws Exception {

public static void run(DistributionOutlierPipelineOptions options) throws Exception {

Counter indexRecordCounter = Metrics.counter(IndexRecord.class, INDEX_RECORD_COUNTER);
Counter existingRecordCounter =
Metrics.counter(DistributionOutlierRecord.class, EXISTING_RECORD_COUNTER);
Counter newRecordCounter = Metrics.counter(IndexRecord.class, NEW_RECORD_COUNTER);

// Create output path
// default: {fsPath}/pipelines-outlier
// or {fsPath}/pipelines-outlier/{datasetId}
Expand All @@ -84,40 +95,33 @@ public static void run(DistributionOutlierPipelineOptions options) throws Except
it ->
!StringUtils.isEmpty(it.getTaxonID())
&& !StringUtils.isEmpty(it.getLatLng())
&& !StringUtils.isEmpty(it.getId())));

indexRecords
.apply(Count.globally())
.apply(
MapElements.via(
new SimpleFunction<Long, Long>() {
@Override
public Long apply(Long input) {
log.info("Number of indexed records loaded: " + input);
WORKLOGS.add("Number of indexed records loaded: " + input);
return input;
}
}));
&& !StringUtils.isEmpty(it.getId())))
.apply(
MapElements.via(
new SimpleFunction<IndexRecord, IndexRecord>() {
@Override
public IndexRecord apply(IndexRecord input) {
indexRecordCounter.inc();
return input;
}
}));

DistributionOutlierTransform distributionTransform =
new DistributionOutlierTransform(options.getBaseUrl());

log.info("Adding step 2: Loading existing outliers records");
PCollection<DistributionOutlierRecord> exitedOutliers =
PCollection<DistributionOutlierRecord> existingOutliers =
loadExistingRecords(options, p, outputPath);

exitedOutliers
.apply(Count.globally())
.apply(
MapElements.via(
new SimpleFunction<Long, Long>() {
@Override
public Long apply(Long input) {
log.info("Number of existing outlier records: " + input);
WORKLOGS.add("Number of existing outlier records: " + input);
return input;
}
}));
//Count existing records
existingOutliers.apply(
MapElements.via(
new SimpleFunction<DistributionOutlierRecord, DistributionOutlierRecord>() {
@Override
public DistributionOutlierRecord apply(DistributionOutlierRecord input) {
existingRecordCounter.inc();
return input;
}
}));

log.info("Adding step 3: Filtering out the records which already have outliers");
PCollection<KV<String, IndexRecord>> kvIndexRecords =
Expand All @@ -126,7 +130,7 @@ public Long apply(Long input) {
.withKeyType(TypeDescriptors.strings()));

PCollection<KV<String, Boolean>> kvExistingOutliers =
exitedOutliers
existingOutliers
.apply(
MapElements.via(
new SimpleFunction<DistributionOutlierRecord, String>() {
Expand Down Expand Up @@ -157,24 +161,12 @@ public KV<String, Boolean> apply(String input) {
new DoFn<KV<String, KV<IndexRecord, Boolean>>, IndexRecord>() {
@ProcessElement
public void processElement(ProcessContext c) {
newRecordCounter.inc();
KV<IndexRecord, Boolean> kv = c.element().getValue();
c.output(kv.getKey());
}
}));

newAddedIndexRecords
.apply(Count.globally())
.apply(
MapElements.via(
new SimpleFunction<Long, Long>() {
@Override
public Long apply(Long input) {
log.info("Number of records to be calculated: " + input);
WORKLOGS.add("Number of records to be calculated: " + input);
return input;
}
}));

log.info("Adding step 4: calculating outliers index");

PCollection<DistributionOutlierRecord> kvRecords =
Expand All @@ -190,7 +182,6 @@ public Long apply(Long input) {
String ts = df.format(new Date());

String targetFile = outputPath + "/outlier_" + ts;
WORKLOGS.add("Output: outlier_" + ts + ".avro");
log.info("Adding step 5: writing to " + targetFile + ".avro");
kvRecords.apply(
"Write to file",
Expand All @@ -210,7 +201,34 @@ public Long apply(Long input) {
PipelineResult result = p.run();
result.waitUntilFinish();

writeWorkLogs(options, outputPath);
writeWorkLogs(
options,
outputPath,
getMetricCount(result, INDEX_RECORD_COUNTER),
getMetricCount(result, EXISTING_RECORD_COUNTER),
getMetricCount(result, NEW_RECORD_COUNTER),
targetFile);
}

/**
* Read count from counter
*
* @param result Pipelines
* @param counterName
* @return
*/
@NotNull
private static Double getMetricCount(PipelineResult result, String counterName) {
Iterator<MetricResult<Long>> iter =
result.metrics().queryMetrics(MetricsFilter.builder().build()).getCounters().iterator();
while (iter.hasNext()) {
MetricResult<Long> counter = iter.next();
if (counter.getName().getName().equalsIgnoreCase(counterName)) {
return counter.getAttempted().doubleValue();
}
}

return 0d;
}

/**
Expand All @@ -219,9 +237,23 @@ public Long apply(Long input) {
* @param options
* @param outputPath
*/
private static void writeWorkLogs(AllDatasetsPipelinesOptions options, String outputPath) {
private static void writeWorkLogs(
AllDatasetsPipelinesOptions options,
String outputPath,
double numAllRecords,
double numExistingRecords,
double numNewRecords,
String targetFile) {
try {
WORKLOGS.add("-------------------------------\r\n");
List<String> WORKLOGS = new ArrayList();
WORKLOGS.add("Output file: " + targetFile);
WORKLOGS.add("Total indexed records: " + numAllRecords);
WORKLOGS.add("Records which outlier distances have been calculated: " + numExistingRecords);
WORKLOGS.add("Records will be calculated: " + numNewRecords);

WORKLOGS.forEach(it -> log.info(it));
WORKLOGS.add("------------------------------------------------\r\n");

FileSystem fs =
FileSystemFactory.getInstance(options.getHdfsSiteConfig(), options.getCoreSiteConfig())
.getFs(outputPath);
Expand Down

0 comments on commit eda9310

Please sign in to comment.