From b02e57456234e5f4cf699a2ac41c75e9ccdea3d6 Mon Sep 17 00:00:00 2001 From: Simeon Widdis Date: Wed, 30 Oct 2024 00:09:05 +0000 Subject: [PATCH] Add index creation metrics Signed-off-by: Simeon Widdis --- .../flint/core/metrics/MetricConstants.java | 15 +++++++++++++++ .../flint/spark/FlintSparkIndexFactory.scala | 19 ++++++++++++++++++- 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java index 427fab9fe..796186f20 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java @@ -137,6 +137,21 @@ public final class MetricConstants { */ public static final String OUTPUT_TOTAL_RECORDS_WRITTEN = "output.totalRecordsWritten.count"; + /** + * Metric group related to skipping indices, such as create success and failure + */ + public static final String CREATE_SKIPPING_INDICES = "query.execution.index.skipping"; + + /** + * Metric group related to covering indices, such as create success and failure + */ + public static final String CREATE_COVERING_INDICES = "query.execution.index.covering"; + + /** + * Metric group related to materialized view indices, such as create success and failure + */ + public static final String CREATE_MV_INDICES = "query.execution.index.mv"; + private MetricConstants() { // Private constructor to prevent instantiation } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala index 78636d992..ab4eef0ae 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala @@ -11,6 +11,8 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter import scala.collection.JavaConverters.mapAsScalaMapConverter import org.opensearch.flint.common.metadata.FlintMetadata +import org.opensearch.flint.core.metrics.MetricConstants +import org.opensearch.flint.core.metrics.MetricsUtil import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.COVERING_INDEX_TYPE import org.opensearch.flint.spark.mv.FlintSparkMaterializedView @@ -26,6 +28,18 @@ import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy import org.apache.spark.internal.Logging +def emitIndexCreationStatusMetric(metadata: FlintMetadata, success: Boolean): Unit = { + val successSuffix = if (success) ".create_success" else ".create_failed" + metadata.kind match { + case SKIPPING_INDEX_TYPE => + MetricsUtil.addHistoricGauge(MetricConstants.CREATE_SKIPPING_INDICES + successSuffix + ".count", 1) + case COVERING_INDEX_TYPE => + MetricsUtil.addHistoricGauge(MetricConstants.CREATE_COVERING_INDICES + successSuffix + ".count", 1) + case MV_INDEX_TYPE => + MetricsUtil.addHistoricGauge(MetricConstants.CREATE_MV_INDICES + successSuffix + ".count", 1) + } +} + /** * Flint Spark index factory that encapsulates specific Flint index instance creation. This is for * internal code use instead of user facing API. @@ -42,9 +56,12 @@ object FlintSparkIndexFactory extends Logging { */ def create(metadata: FlintMetadata): Option[FlintSparkIndex] = { try { - Some(doCreate(metadata)) + val result = doCreate(metadata) + emitIndexCreationStatusMetric(metadata, success=true) + Some(result) } catch { case e: Exception => + emitIndexCreationStatusMetric(metadata, success=false) logWarning(s"Failed to create Flint index from metadata $metadata", e) None }