From 88bd4130eafb855af39a475f14018c98ccae0675 Mon Sep 17 00:00:00 2001 From: Louis Chu Date: Tue, 8 Oct 2024 08:52:22 -0700 Subject: [PATCH] Add index metrics Signed-off-by: Louis Chu --- .../flint/core/metrics/MetricConstants.java | 17 ++- .../flint/core/metrics/MetricsUtil.java | 90 ++++++++++---- .../metrics/source/FlintMetricSource.scala | 14 ++- .../opensearch/flint/core/FlintOptions.java | 6 +- .../flint/core/metrics/MetricsUtilTest.java | 114 +++++++++++++----- .../refresh/IncrementalIndexRefresh.scala | 41 +++++-- .../refresh/util/RefreshMetricsHelper.scala | 75 ++++++++++++ ...intSparkJobExternalSchedulingService.scala | 4 + 8 files changed, 289 insertions(+), 72 deletions(-) create mode 100644 flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/util/RefreshMetricsHelper.scala diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java index 4d0e53184..a62a2ce67 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java @@ -108,19 +108,24 @@ public final class MetricConstants { public static final String STREAMING_SUCCESS_METRIC = "streaming.success.count"; /** - * Metric for tracking the count of successful refresh operations. + * Metric for tracking the count of requests to external scheduler. */ - public static final String REFRESH_SUCCESS_METRIC = "refresh.success.count"; + public static final String EXTERNAL_SCHEDULER_REQUEST_CNT_METRIC = "externalScheduler.request.count"; /** - * Metric for tracking the count of failed refresh operations. + * Metric for tracking the count of successful incremental refresh operations. */ - public static final String REFRESH_FAILED_METRIC = "refresh.failed.count"; + public static final String INCREMENTAL_REFRESH_SUCCESS_METRIC = "incrementalRefresh.success.count"; /** - * Metric for tracking the processing time of refresh operations. + * Metric for tracking the count of failed incremental refresh operations. */ - public static final String REFRESH_PROCESSING_TIME_METRIC = "refresh.processingTime"; + public static final String INCREMENTAL_REFRESH_FAILED_METRIC = "incrementalRefresh.failed.count"; + + /** + * Metric for tracking the processing time of incremental refresh operations. + */ + public static final String INCREMENTAL_REFRESH_PROCESSING_TIME_METRIC = "incrementalRefresh.processingTime"; /** * Metric for tracking the count of failed heartbeat signals in streaming jobs. diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricsUtil.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricsUtil.java index 8e63992f5..81a482d5e 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricsUtil.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricsUtil.java @@ -11,6 +11,7 @@ import com.codahale.metrics.Timer; import org.apache.spark.SparkEnv; import org.apache.spark.metrics.source.FlintMetricSource; +import org.apache.spark.metrics.source.FlintIndexMetricSource; import org.apache.spark.metrics.source.Source; import scala.collection.Seq; @@ -33,10 +34,20 @@ private MetricsUtil() { * If the counter does not exist, it is created before being incremented. * * @param metricName The name of the metric for which the counter is incremented. - * This name is used to retrieve or create the counter. */ public static void incrementCounter(String metricName) { - Counter counter = getOrCreateCounter(metricName); + incrementCounter(metricName, false); + } + + /** + * Increments the Counter metric associated with the given metric name. + * If the counter does not exist, it is created before being incremented. + * + * @param metricName The name of the metric for which the counter is incremented. + * @param isIndexMetric Whether this metric is an index-specific metric. + */ + public static void incrementCounter(String metricName, boolean isIndexMetric) { + Counter counter = getOrCreateCounter(metricName, isIndexMetric); if (counter != null) { counter.inc(); } @@ -48,7 +59,17 @@ public static void incrementCounter(String metricName) { * @param metricName The name of the metric counter to be decremented. */ public static void decrementCounter(String metricName) { - Counter counter = getOrCreateCounter(metricName); + decrementCounter(metricName, false); + } + + /** + * Decrements the value of the specified metric counter by one, if the counter exists and its current count is greater than zero. + * + * @param metricName The name of the metric counter to be decremented. + * @param isIndexMetric Whether this metric is an index-specific metric. + */ + public static void decrementCounter(String metricName, boolean isIndexMetric) { + Counter counter = getOrCreateCounter(metricName, isIndexMetric); if (counter != null && counter.getCount() > 0) { counter.dec(); } @@ -56,21 +77,30 @@ public static void decrementCounter(String metricName) { /** * Retrieves a {@link Timer.Context} for the specified metric name, creating a new timer if one does not already exist. - * This context can be used to measure the duration of a particular operation or event. * * @param metricName The name of the metric timer to retrieve the context for. * @return A {@link Timer.Context} instance for timing operations, or {@code null} if the timer could not be created or retrieved. */ public static Timer.Context getTimerContext(String metricName) { - Timer timer = getOrCreateTimer(metricName); + return getTimerContext(metricName, false); + } + + /** + * Retrieves a {@link Timer.Context} for the specified metric name, creating a new timer if one does not already exist. + * + * @param metricName The name of the metric timer to retrieve the context for. + * @param isIndexMetric Whether this metric is an index-specific metric. + * @return A {@link Timer.Context} instance for timing operations, or {@code null} if the timer could not be created or retrieved. + */ + public static Timer.Context getTimerContext(String metricName, boolean isIndexMetric) { + Timer timer = getOrCreateTimer(metricName, isIndexMetric); return timer != null ? timer.time() : null; } /** - * Stops the timer associated with the given {@link Timer.Context}, effectively recording the elapsed time since the timer was started - * and returning the duration. If the context is {@code null}, this method does nothing and returns {@code null}. + * Stops the timer associated with the given {@link Timer.Context}. * - * @param context The {@link Timer.Context} to stop. May be {@code null}, in which case this method has no effect and returns {@code null}. + * @param context The {@link Timer.Context} to stop. May be {@code null}. * @return The elapsed time in nanoseconds since the timer was started, or {@code null} if the context was {@code null}. */ public static Long stopTimer(Timer.Context context) { @@ -79,13 +109,23 @@ public static Long stopTimer(Timer.Context context) { /** * Registers a gauge metric with the provided name and value. - * The gauge will reflect the current value of the AtomicInteger provided. * * @param metricName The name of the gauge metric to register. - * @param value The AtomicInteger whose current value should be reflected by the gauge. + * @param value The AtomicInteger whose current value should be reflected by the gauge. */ public static void registerGauge(String metricName, final AtomicInteger value) { - MetricRegistry metricRegistry = getMetricRegistry(); + registerGauge(metricName, value, false); + } + + /** + * Registers a gauge metric with the provided name and value. + * + * @param metricName The name of the gauge metric to register. + * @param value The AtomicInteger whose current value should be reflected by the gauge. + * @param isIndexMetric Whether this metric is an index-specific metric. + */ + public static void registerGauge(String metricName, final AtomicInteger value, boolean isIndexMetric) { + MetricRegistry metricRegistry = getMetricRegistry(isIndexMetric); if (metricRegistry == null) { LOG.warning("MetricRegistry not available, cannot register gauge: " + metricName); return; @@ -93,39 +133,37 @@ public static void registerGauge(String metricName, final AtomicInteger value) { metricRegistry.register(metricName, (Gauge) value::get); } - // Retrieves or creates a new counter for the given metric name - private static Counter getOrCreateCounter(String metricName) { - MetricRegistry metricRegistry = getMetricRegistry(); + private static Counter getOrCreateCounter(String metricName, boolean isIndexMetric) { + MetricRegistry metricRegistry = getMetricRegistry(isIndexMetric); return metricRegistry != null ? metricRegistry.counter(metricName) : null; } - // Retrieves or creates a new Timer for the given metric name - private static Timer getOrCreateTimer(String metricName) { - MetricRegistry metricRegistry = getMetricRegistry(); + private static Timer getOrCreateTimer(String metricName, boolean isIndexMetric) { + MetricRegistry metricRegistry = getMetricRegistry(isIndexMetric); return metricRegistry != null ? metricRegistry.timer(metricName) : null; } - // Retrieves the MetricRegistry from the current Spark environment. - private static MetricRegistry getMetricRegistry() { + private static MetricRegistry getMetricRegistry(boolean isIndexMetric) { SparkEnv sparkEnv = SparkEnv.get(); if (sparkEnv == null) { LOG.warning("Spark environment not available, cannot access MetricRegistry."); return null; } - FlintMetricSource flintMetricSource = getOrInitFlintMetricSource(sparkEnv); - return flintMetricSource.metricRegistry(); + Source metricSource = isIndexMetric ? + getOrInitMetricSource(sparkEnv, FlintMetricSource.FLINT_INDEX_METRIC_SOURCE_NAME(), FlintIndexMetricSource::new) : + getOrInitMetricSource(sparkEnv, FlintMetricSource.FLINT_METRIC_SOURCE_NAME(), FlintMetricSource::new); + return metricSource.metricRegistry(); } - // Gets or initializes the FlintMetricSource - private static FlintMetricSource getOrInitFlintMetricSource(SparkEnv sparkEnv) { - Seq metricSourceSeq = sparkEnv.metricsSystem().getSourcesByName(FlintMetricSource.FLINT_METRIC_SOURCE_NAME()); + private static Source getOrInitMetricSource(SparkEnv sparkEnv, String sourceName, java.util.function.Supplier sourceSupplier) { + Seq metricSourceSeq = sparkEnv.metricsSystem().getSourcesByName(sourceName); if (metricSourceSeq == null || metricSourceSeq.isEmpty()) { - FlintMetricSource metricSource = new FlintMetricSource(); + Source metricSource = sourceSupplier.get(); sparkEnv.metricsSystem().registerSource(metricSource); return metricSource; } - return (FlintMetricSource) metricSourceSeq.head(); + return metricSourceSeq.head(); } } diff --git a/flint-core/src/main/scala/apache/spark/metrics/source/FlintMetricSource.scala b/flint-core/src/main/scala/apache/spark/metrics/source/FlintMetricSource.scala index d5f241572..7bdfa11e6 100644 --- a/flint-core/src/main/scala/apache/spark/metrics/source/FlintMetricSource.scala +++ b/flint-core/src/main/scala/apache/spark/metrics/source/FlintMetricSource.scala @@ -7,13 +7,25 @@ package org.apache.spark.metrics.source import com.codahale.metrics.MetricRegistry -class FlintMetricSource() extends Source { +/** + * Metric source for general Flint metrics. + */ +class FlintMetricSource extends Source { // Implementing the Source trait override val sourceName: String = FlintMetricSource.FLINT_METRIC_SOURCE_NAME override val metricRegistry: MetricRegistry = new MetricRegistry } +/** + * Metric source for Flint index-specific metrics. + */ +class FlintIndexMetricSource extends Source { + override val sourceName: String = FlintMetricSource.FLINT_INDEX_METRIC_SOURCE_NAME + override val metricRegistry: MetricRegistry = new MetricRegistry +} + object FlintMetricSource { val FLINT_METRIC_SOURCE_NAME = "Flint" // Default source name + val FLINT_INDEX_METRIC_SOURCE_NAME = "FlintIndex" // Index specific source name } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java index 563f75eff..6ddc6ae9c 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java @@ -105,6 +105,8 @@ public class FlintOptions implements Serializable { public static final String DEFAULT_SUPPORT_SHARD = "true"; + private static final String UNKNOWN = "UNKNOWN"; + public static final String BULK_REQUEST_RATE_LIMIT_PER_NODE = "bulkRequestRateLimitPerNode"; public static final String DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE = "0"; public static final String DEFAULT_EXTERNAL_SCHEDULER_INTERVAL = "5 minutes"; @@ -186,9 +188,9 @@ public String getDataSourceName() { * @return the AWS accountId */ public String getAWSAccountId() { - String clusterName = System.getenv().getOrDefault("FLINT_CLUSTER_NAME", ""); + String clusterName = System.getenv().getOrDefault("FLINT_CLUSTER_NAME", UNKNOWN + ":" + UNKNOWN); String[] parts = clusterName.split(":"); - return parts.length == 2 ? parts[0] : ""; + return parts.length == 2 ? parts[0] : UNKNOWN; } public String getSystemIndexName() { diff --git a/flint-core/src/test/java/org/opensearch/flint/core/metrics/MetricsUtilTest.java b/flint-core/src/test/java/org/opensearch/flint/core/metrics/MetricsUtilTest.java index 3b8940536..b54269ce0 100644 --- a/flint-core/src/test/java/org/opensearch/flint/core/metrics/MetricsUtilTest.java +++ b/flint-core/src/test/java/org/opensearch/flint/core/metrics/MetricsUtilTest.java @@ -5,6 +5,8 @@ import com.codahale.metrics.Timer; import org.apache.spark.SparkEnv; import org.apache.spark.metrics.source.FlintMetricSource; +import org.apache.spark.metrics.source.FlintIndexMetricSource; +import org.apache.spark.metrics.source.Source; import org.junit.Test; import org.junit.jupiter.api.Assertions; import org.mockito.MockedStatic; @@ -26,55 +28,73 @@ public class MetricsUtilTest { @Test public void testIncrementDecrementCounter() { + testIncrementDecrementCounterHelper(false); + } + + @Test + public void testIncrementDecrementCounterForIndexMetrics() { + testIncrementDecrementCounterHelper(true); + } + + private void testIncrementDecrementCounterHelper(boolean isIndexMetric) { try (MockedStatic sparkEnvMock = mockStatic(SparkEnv.class)) { // Mock SparkEnv SparkEnv sparkEnv = mock(SparkEnv.class, RETURNS_DEEP_STUBS); sparkEnvMock.when(SparkEnv::get).thenReturn(sparkEnv); - // Mock FlintMetricSource - FlintMetricSource flintMetricSource = Mockito.spy(new FlintMetricSource()); - when(sparkEnv.metricsSystem().getSourcesByName(FlintMetricSource.FLINT_METRIC_SOURCE_NAME()).head()) - .thenReturn(flintMetricSource); + // Mock appropriate MetricSource + String sourceName = isIndexMetric ? FlintMetricSource.FLINT_INDEX_METRIC_SOURCE_NAME() : FlintMetricSource.FLINT_METRIC_SOURCE_NAME(); + Source metricSource = isIndexMetric ? Mockito.spy(new FlintIndexMetricSource()) : Mockito.spy(new FlintMetricSource()); + when(sparkEnv.metricsSystem().getSourcesByName(sourceName).head()).thenReturn(metricSource); // Test the methods String testMetric = "testPrefix.2xx.count"; - MetricsUtil.incrementCounter(testMetric); - MetricsUtil.incrementCounter(testMetric); - MetricsUtil.decrementCounter(testMetric); + MetricsUtil.incrementCounter(testMetric, isIndexMetric); + MetricsUtil.incrementCounter(testMetric, isIndexMetric); + MetricsUtil.decrementCounter(testMetric, isIndexMetric); // Verify interactions verify(sparkEnv.metricsSystem(), times(0)).registerSource(any()); - verify(flintMetricSource, times(3)).metricRegistry(); - Counter counter = flintMetricSource.metricRegistry().getCounters().get(testMetric); + verify(metricSource, times(3)).metricRegistry(); + Counter counter = metricSource.metricRegistry().getCounters().get(testMetric); Assertions.assertNotNull(counter); - Assertions.assertEquals(counter.getCount(), 1); + Assertions.assertEquals(1, counter.getCount()); } } @Test public void testStartStopTimer() { + testStartStopTimerHelper(false); + } + + @Test + public void testStartStopTimerForIndexMetrics() { + testStartStopTimerHelper(true); + } + + private void testStartStopTimerHelper(boolean isIndexMetric) { try (MockedStatic sparkEnvMock = mockStatic(SparkEnv.class)) { // Mock SparkEnv SparkEnv sparkEnv = mock(SparkEnv.class, RETURNS_DEEP_STUBS); sparkEnvMock.when(SparkEnv::get).thenReturn(sparkEnv); - // Mock FlintMetricSource - FlintMetricSource flintMetricSource = Mockito.spy(new FlintMetricSource()); - when(sparkEnv.metricsSystem().getSourcesByName(FlintMetricSource.FLINT_METRIC_SOURCE_NAME()).head()) - .thenReturn(flintMetricSource); + // Mock appropriate MetricSource + String sourceName = isIndexMetric ? FlintMetricSource.FLINT_INDEX_METRIC_SOURCE_NAME() : FlintMetricSource.FLINT_METRIC_SOURCE_NAME(); + Source metricSource = isIndexMetric ? Mockito.spy(new FlintIndexMetricSource()) : Mockito.spy(new FlintMetricSource()); + when(sparkEnv.metricsSystem().getSourcesByName(sourceName).head()).thenReturn(metricSource); // Test the methods String testMetric = "testPrefix.processingTime"; - Timer.Context context = MetricsUtil.getTimerContext(testMetric); + Timer.Context context = MetricsUtil.getTimerContext(testMetric, isIndexMetric); TimeUnit.MILLISECONDS.sleep(500); MetricsUtil.stopTimer(context); // Verify interactions verify(sparkEnv.metricsSystem(), times(0)).registerSource(any()); - verify(flintMetricSource, times(1)).metricRegistry(); - Timer timer = flintMetricSource.metricRegistry().getTimers().get(testMetric); + verify(metricSource, times(1)).metricRegistry(); + Timer timer = metricSource.metricRegistry().getTimers().get(testMetric); Assertions.assertNotNull(timer); - Assertions.assertEquals(timer.getCount(), 1L); + Assertions.assertEquals(1L, timer.getCount()); assertEquals(1.9, timer.getMeanRate(), 0.1); } catch (InterruptedException e) { throw new RuntimeException(e); @@ -82,33 +102,71 @@ public void testStartStopTimer() { } @Test - public void testRegisterGaugeWhenMetricRegistryIsAvailable() { + public void testRegisterGauge() { + testRegisterGaugeHelper(false); + } + + @Test + public void testRegisterGaugeForIndexMetrics() { + testRegisterGaugeHelper(true); + } + + private void testRegisterGaugeHelper(boolean isIndexMetric) { try (MockedStatic sparkEnvMock = mockStatic(SparkEnv.class)) { // Mock SparkEnv SparkEnv sparkEnv = mock(SparkEnv.class, RETURNS_DEEP_STUBS); sparkEnvMock.when(SparkEnv::get).thenReturn(sparkEnv); - // Mock FlintMetricSource - FlintMetricSource flintMetricSource = Mockito.spy(new FlintMetricSource()); - when(sparkEnv.metricsSystem().getSourcesByName(FlintMetricSource.FLINT_METRIC_SOURCE_NAME()).head()) - .thenReturn(flintMetricSource); + // Mock appropriate MetricSource + String sourceName = isIndexMetric ? FlintMetricSource.FLINT_INDEX_METRIC_SOURCE_NAME() : FlintMetricSource.FLINT_METRIC_SOURCE_NAME(); + Source metricSource = isIndexMetric ? Mockito.spy(new FlintIndexMetricSource()) : Mockito.spy(new FlintMetricSource()); + when(sparkEnv.metricsSystem().getSourcesByName(sourceName).head()).thenReturn(metricSource); // Setup gauge AtomicInteger testValue = new AtomicInteger(1); String gaugeName = "test.gauge"; - MetricsUtil.registerGauge(gaugeName, testValue); + MetricsUtil.registerGauge(gaugeName, testValue, isIndexMetric); verify(sparkEnv.metricsSystem(), times(0)).registerSource(any()); - verify(flintMetricSource, times(1)).metricRegistry(); + verify(metricSource, times(1)).metricRegistry(); - Gauge gauge = flintMetricSource.metricRegistry().getGauges().get(gaugeName); + Gauge gauge = metricSource.metricRegistry().getGauges().get(gaugeName); Assertions.assertNotNull(gauge); - Assertions.assertEquals(gauge.getValue(), 1); + Assertions.assertEquals(1, gauge.getValue()); testValue.incrementAndGet(); testValue.incrementAndGet(); testValue.decrementAndGet(); - Assertions.assertEquals(gauge.getValue(), 2); + Assertions.assertEquals(2, gauge.getValue()); + } + } + + @Test + public void testDefaultBehavior() { + try (MockedStatic sparkEnvMock = mockStatic(SparkEnv.class)) { + // Mock SparkEnv + SparkEnv sparkEnv = mock(SparkEnv.class, RETURNS_DEEP_STUBS); + sparkEnvMock.when(SparkEnv::get).thenReturn(sparkEnv); + + // Mock FlintMetricSource + FlintMetricSource flintMetricSource = Mockito.spy(new FlintMetricSource()); + when(sparkEnv.metricsSystem().getSourcesByName(FlintMetricSource.FLINT_METRIC_SOURCE_NAME()).head()) + .thenReturn(flintMetricSource); + + // Test default behavior (non-index metrics) + String testCountMetric = "testDefault.count"; + String testTimerMetric = "testDefault.time"; + String testGaugeMetric = "testDefault.gauge"; + MetricsUtil.incrementCounter(testCountMetric); + MetricsUtil.getTimerContext(testTimerMetric); + MetricsUtil.registerGauge(testGaugeMetric, new AtomicInteger(0), false); + + // Verify interactions + verify(sparkEnv.metricsSystem(), times(0)).registerSource(any()); + verify(flintMetricSource, times(3)).metricRegistry(); + Assertions.assertNotNull(flintMetricSource.metricRegistry().getCounters().get(testCountMetric)); + Assertions.assertNotNull(flintMetricSource.metricRegistry().getTimers().get(testTimerMetric)); + Assertions.assertNotNull(flintMetricSource.metricRegistry().getGauges().get(testGaugeMetric)); } } } \ No newline at end of file diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala index 98f0d838f..382b6d68e 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala @@ -5,8 +5,11 @@ package org.opensearch.flint.spark.refresh +import org.opensearch.flint.core.metrics.MetricConstants +import org.opensearch.flint.core.metrics.MetricsUtil import org.opensearch.flint.spark.{FlintSparkIndex, FlintSparkValidationHelper} import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.{INCREMENTAL, RefreshMode} +import org.opensearch.flint.spark.refresh.util.RefreshMetricsHelper import org.apache.spark.sql.SparkSession import org.apache.spark.sql.flint.config.FlintSparkConf @@ -43,15 +46,35 @@ class IncrementalIndexRefresh(indexName: String, index: FlintSparkIndex) override def start(spark: SparkSession, flintSparkConf: FlintSparkConf): Option[String] = { logInfo(s"Start refreshing index $indexName in incremental mode") - // Reuse auto refresh which uses AvailableNow trigger and will stop once complete - val jobId = - new AutoIndexRefresh(indexName, index) - .start(spark, flintSparkConf) + val clientId = flintSparkConf.flintOptions().getAWSAccountId() + val dataSource = flintSparkConf.flintOptions().getDataSourceName() - // Blocks the calling thread until the streaming query finishes - spark.streams - .get(jobId.get) - .awaitTermination() - None + val refreshMetricsHelper = new RefreshMetricsHelper(clientId, dataSource, indexName) + + // Start timer for processing time metric + val timerContext = refreshMetricsHelper.getTimerContext( + MetricConstants.INCREMENTAL_REFRESH_PROCESSING_TIME_METRIC) + + try { + // Reuse auto refresh which uses AvailableNow trigger and will stop once complete + val jobId = + new AutoIndexRefresh(indexName, index) + .start(spark, flintSparkConf) + + // Blocks the calling thread until the streaming query finishes + spark.streams + .get(jobId.get) + .awaitTermination() + + refreshMetricsHelper.incrementCounter(MetricConstants.INCREMENTAL_REFRESH_SUCCESS_METRIC) + None + } catch { + case e: Exception => + refreshMetricsHelper.incrementCounter(MetricConstants.INCREMENTAL_REFRESH_FAILED_METRIC) + throw e + } finally { + // Stop timer and record processing time + MetricsUtil.stopTimer(timerContext) + } } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/util/RefreshMetricsHelper.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/util/RefreshMetricsHelper.scala new file mode 100644 index 000000000..4b91b0be2 --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/util/RefreshMetricsHelper.scala @@ -0,0 +1,75 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.refresh.util + +import com.codahale.metrics.Timer +import org.opensearch.flint.core.metrics.MetricsUtil + +/** + * Helper class for constructing dimensioned metric names used in refresh operations. + */ +class RefreshMetricsHelper(clientId: String, dataSource: String, indexName: String) { + private val isIndexMetric = true + + /** + * Increments a counter metric with the specified dimensioned name. + * + * @param metricName + * The name of the metric to increment + */ + def incrementCounter(metricName: String): Unit = { + MetricsUtil.incrementCounter( + RefreshMetricsHelper.constructDimensionedMetricName( + metricName, + clientId, + dataSource, + indexName), + isIndexMetric) + } + + /** + * Gets a timer context for the specified metric name. + * + * @param metricName + * The name of the metric + * @return + * A Timer.Context object + */ + def getTimerContext(metricName: String): Timer.Context = { + MetricsUtil.getTimerContext( + RefreshMetricsHelper.constructDimensionedMetricName( + metricName, + clientId, + dataSource, + indexName), + isIndexMetric) + } +} + +object RefreshMetricsHelper { + + /** + * Constructs a dimensioned metric name for external scheduler request count. + * + * @param metricName + * The name of the metric + * @param clientId + * The ID of the client making the request + * @param dataSource + * The data source being used + * @param indexName + * The name of the index being refreshed + * @return + * A formatted string representing the dimensioned metric name + */ + private def constructDimensionedMetricName( + metricName: String, + clientId: String, + dataSource: String, + indexName: String): String = { + s"${metricName}[clientId##${clientId},dataSource##${dataSource},indexName##${indexName}]" + } +} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobExternalSchedulingService.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobExternalSchedulingService.scala index 8aac60aee..23e965724 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobExternalSchedulingService.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobExternalSchedulingService.scala @@ -10,8 +10,10 @@ import java.time.Instant import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState import org.opensearch.flint.common.scheduler.AsyncQueryScheduler import org.opensearch.flint.common.scheduler.model.{AsyncQuerySchedulerRequest, LangType} +import org.opensearch.flint.core.metrics.{MetricConstants, MetricsUtil} import org.opensearch.flint.core.storage.OpenSearchClientUtils import org.opensearch.flint.spark.FlintSparkIndex +import org.opensearch.flint.spark.refresh.util.RefreshMetricsHelper import org.opensearch.flint.spark.scheduler.AsyncQuerySchedulerBuilder.AsyncQuerySchedulerAction import org.opensearch.flint.spark.scheduler.util.RefreshQueryGenerator @@ -49,6 +51,7 @@ class FlintSparkJobExternalSchedulingService( val clientId = flintSparkConf.flintOptions().getAWSAccountId() // This is to make sure jobId is consistent with the index name val indexName = OpenSearchClientUtils.sanitizeIndexName(index.name()) + val refreshMetricsHelper = new RefreshMetricsHelper(clientId, dataSource, indexName) logInfo(s"handleAsyncQueryScheduler invoked: $action") @@ -80,6 +83,7 @@ class FlintSparkJobExternalSchedulingService( case _ => throw new IllegalArgumentException(s"Unsupported action: $action") } + refreshMetricsHelper.incrementCounter(MetricConstants.EXTERNAL_SCHEDULER_REQUEST_CNT_METRIC) None // Return None for all cases } }