From e1e9b89e3fc52ac3db0e8f2119093420407c1d9e Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Wed, 30 Oct 2024 20:15:18 +0000 Subject: [PATCH] Add checkpoint.delete.processingTime metric (#817) Signed-off-by: Tomoyuki Morita (cherry picked from commit 341eacb50233936db99551937e337d850cb3f81e) Signed-off-by: github-actions[bot] --- .../flint/core/metrics/MetricConstants.java | 7 ++++++- .../flint/core/metrics/MetricsUtil.java | 16 ++++++++++++++++ .../flint/spark/FlintSparkCheckpoint.scala | 5 ++++- 3 files changed, 26 insertions(+), 2 deletions(-) diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java index 427fab9fe..48adbc3d1 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java @@ -137,7 +137,12 @@ public final class MetricConstants { */ public static final String OUTPUT_TOTAL_RECORDS_WRITTEN = "output.totalRecordsWritten.count"; + /** + * Metric for tracking the latency of checkpoint deletion + */ + public static final String CHECKPOINT_DELETE_TIME_METRIC = "checkpoint.delete.processingTime"; + private MetricConstants() { // Private constructor to prevent instantiation } -} \ No newline at end of file +} diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricsUtil.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricsUtil.java index 511c18664..5a0f0f5ad 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricsUtil.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricsUtil.java @@ -9,6 +9,7 @@ import com.codahale.metrics.Gauge; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; +import java.util.function.Supplier; import org.apache.spark.SparkEnv; import org.apache.spark.metrics.source.FlintMetricSource; import org.apache.spark.metrics.source.FlintIndexMetricSource; @@ -133,6 +134,21 @@ public static void addHistoricGauge(String metricName, final long value) { } } + /** + * Automatically emit latency metric as Historic Gauge for the execution of supplier + * @param supplier the lambda to be metered + * @param metricName name of the metric + * @return value returned by supplier + */ + public static T withLatencyAsHistoricGauge(Supplier supplier, String metricName) { + long startTime = System.currentTimeMillis(); + try { + return supplier.get(); + } finally { + addHistoricGauge(metricName, System.currentTimeMillis() - startTime); + } + } + private static HistoricGauge getOrCreateHistoricGauge(String metricName) { MetricRegistry metricRegistry = getMetricRegistry(false); return metricRegistry != null ? metricRegistry.gauge(metricName, HistoricGauge::new) : null; diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkCheckpoint.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkCheckpoint.scala index 4c18fea77..6ae55bd33 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkCheckpoint.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkCheckpoint.scala @@ -8,6 +8,7 @@ package org.opensearch.flint.spark import java.util.UUID import org.apache.hadoop.fs.{FSDataOutputStream, Path} +import org.opensearch.flint.core.metrics.{MetricConstants, MetricsUtil} import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession @@ -75,7 +76,9 @@ class FlintSparkCheckpoint(spark: SparkSession, val checkpointLocation: String) */ def delete(): Unit = { try { - checkpointManager.delete(checkpointRootDir) + MetricsUtil.withLatencyAsHistoricGauge( + () => checkpointManager.delete(checkpointRootDir), + MetricConstants.CHECKPOINT_DELETE_TIME_METRIC) logInfo(s"Checkpoint directory $checkpointRootDir deleted") } catch { case e: Exception =>