Skip to content

Commit

Permalink
Add checkpoint.delete.processingTime metric
Browse files Browse the repository at this point in the history
Signed-off-by: Tomoyuki Morita <[email protected]>
  • Loading branch information
ykmr1224 committed Oct 25, 2024
1 parent 7bc0927 commit e46a0c6
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,12 @@ public final class MetricConstants {
*/
public static final String OUTPUT_TOTAL_RECORDS_WRITTEN = "output.totalRecordsWritten.count";

/**
* Metric for tracking the latency of checkpoint deletion
*/
public static final String CHECKPOINT_DELETE_TIME_METRIC = "checkpoint.delete.processingTime";

private MetricConstants() {
// Private constructor to prevent instantiation
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import java.util.function.Supplier;
import org.apache.spark.SparkEnv;
import org.apache.spark.metrics.source.FlintMetricSource;
import org.apache.spark.metrics.source.FlintIndexMetricSource;
Expand Down Expand Up @@ -133,6 +134,21 @@ public static void addHistoricGauge(String metricName, final long value) {
}
}

/**
* Automatically emit latency metric as Historic Gauge for the execution of supplier
* @param supplier the lambda to be metered
* @param metricName name of the metric
* @return value returned by supplier
*/
public static <T> T withLatencyAsHistoricGauge(Supplier<T> supplier, String metricName) {
long startTime = System.currentTimeMillis();
try {
return supplier.get();
} finally {
addHistoricGauge(metricName, System.currentTimeMillis() - startTime);
}
}

private static HistoricGauge getOrCreateHistoricGauge(String metricName) {
MetricRegistry metricRegistry = getMetricRegistry(false);
return metricRegistry != null ? metricRegistry.gauge(metricName, HistoricGauge::new) : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package org.opensearch.flint.spark
import java.util.UUID

import org.apache.hadoop.fs.{FSDataOutputStream, Path}
import org.opensearch.flint.core.metrics.{MetricConstants, MetricsUtil}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
Expand Down Expand Up @@ -75,7 +76,9 @@ class FlintSparkCheckpoint(spark: SparkSession, val checkpointLocation: String)
*/
def delete(): Unit = {
try {
checkpointManager.delete(checkpointRootDir)
MetricsUtil.withLatencyAsHistoricGauge(
() => checkpointManager.delete(checkpointRootDir),
MetricConstants.CHECKPOINT_DELETE_TIME_METRIC)
logInfo(s"Checkpoint directory $checkpointRootDir deleted")
} catch {
case e: Exception =>
Expand Down

0 comments on commit e46a0c6

Please sign in to comment.