diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java index ecd8c009b..35297de6a 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java @@ -155,7 +155,12 @@ public final class MetricConstants { */ public static final String OUTPUT_TOTAL_RECORDS_WRITTEN = "output.totalRecordsWritten.count"; + /** + * Metric for tracking the latency of checkpoint deletion + */ + public static final String CHECKPOINT_DELETE_TIME_METRIC = "checkpoint.delete.processingTime"; + private MetricConstants() { // Private constructor to prevent instantiation } -} \ No newline at end of file +} diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricsUtil.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricsUtil.java index 511c18664..5a0f0f5ad 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricsUtil.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricsUtil.java @@ -9,6 +9,7 @@ import com.codahale.metrics.Gauge; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; +import java.util.function.Supplier; import org.apache.spark.SparkEnv; import org.apache.spark.metrics.source.FlintMetricSource; import org.apache.spark.metrics.source.FlintIndexMetricSource; @@ -133,6 +134,21 @@ public static void addHistoricGauge(String metricName, final long value) { } } + /** + * Automatically emit latency metric as Historic Gauge for the execution of supplier + * @param supplier the lambda to be metered + * @param metricName name of the metric + * @return value returned by supplier + */ + public static T withLatencyAsHistoricGauge(Supplier supplier, String metricName) { + long startTime = System.currentTimeMillis(); + try { + return supplier.get(); + } finally { + addHistoricGauge(metricName, System.currentTimeMillis() - startTime); + } + } + private static HistoricGauge getOrCreateHistoricGauge(String metricName) { MetricRegistry metricRegistry = getMetricRegistry(false); return metricRegistry != null ? metricRegistry.gauge(metricName, HistoricGauge::new) : null; diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkCheckpoint.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkCheckpoint.scala index 4c18fea77..6ae55bd33 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkCheckpoint.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkCheckpoint.scala @@ -8,6 +8,7 @@ package org.opensearch.flint.spark import java.util.UUID import org.apache.hadoop.fs.{FSDataOutputStream, Path} +import org.opensearch.flint.core.metrics.{MetricConstants, MetricsUtil} import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession @@ -75,7 +76,9 @@ class FlintSparkCheckpoint(spark: SparkSession, val checkpointLocation: String) */ def delete(): Unit = { try { - checkpointManager.delete(checkpointRootDir) + MetricsUtil.withLatencyAsHistoricGauge( + () => checkpointManager.delete(checkpointRootDir), + MetricConstants.CHECKPOINT_DELETE_TIME_METRIC) logInfo(s"Checkpoint directory $checkpointRootDir deleted") } catch { case e: Exception =>