From 0f3a01fe6e2859b4e6a91b838ef37a130fc8397a Mon Sep 17 00:00:00 2001 From: Simeon Widdis Date: Thu, 14 Nov 2024 22:05:29 -0800 Subject: [PATCH] Add metrics for successful/failed Spark index creation (#837) * Add index creation metrics Signed-off-by: Simeon Widdis * Update emit metric function location Signed-off-by: Simeon Widdis * Apply scalafmt Signed-off-by: Simeon Widdis * revert FlintSparkIndexFactory metrics emission Signed-off-by: Simeon Widdis * Update OS client to emit metrics Signed-off-by: Simeon Widdis * Fix index kind detection Signed-off-by: Simeon Widdis * Remove cumbersome with() method Signed-off-by: Simeon Widdis * Revert the void change Signed-off-by: Simeon Widdis * Make the code branchless Signed-off-by: Simeon Widdis * Add default case to switches Signed-off-by: Simeon Widdis * Apply PR feedback Signed-off-by: Simeon Widdis --------- Signed-off-by: Simeon Widdis --- .../flint/core/metrics/MetricConstants.java | 15 ++++++++ .../core/storage/FlintOpenSearchClient.java | 34 ++++++++++++++++++- 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java index 978950b3c..79e70b8c2 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java @@ -171,6 +171,21 @@ public final class MetricConstants { */ public static final String OUTPUT_TOTAL_RECORDS_WRITTEN = "output.totalRecordsWritten.count"; + /** + * Metric group related to skipping indices, such as create success and failure + */ + public static final String CREATE_SKIPPING_INDICES = "query.execution.index.skipping"; + + /** + * Metric group related to covering indices, such as create success and failure + */ + public static final String CREATE_COVERING_INDICES = "query.execution.index.covering"; + + /** + * Metric group related to materialized view indices, such as create success and failure + */ + public static final String CREATE_MV_INDICES = "query.execution.index.mv"; + /** * Metric for tracking the latency of checkpoint deletion */ diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index da22e3751..2bc097bba 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -16,6 +16,8 @@ import org.opensearch.flint.core.FlintClient; import org.opensearch.flint.core.FlintOptions; import org.opensearch.flint.core.IRestHighLevelClient; +import org.opensearch.flint.core.metrics.MetricConstants; +import org.opensearch.flint.core.metrics.MetricsUtil; import scala.Option; import java.io.IOException; @@ -40,7 +42,13 @@ public FlintOpenSearchClient(FlintOptions options) { @Override public void createIndex(String indexName, FlintMetadata metadata) { LOG.info("Creating Flint index " + indexName + " with metadata " + metadata); - createIndex(indexName, FlintOpenSearchIndexMetadataService.serialize(metadata, false), metadata.indexSettings()); + try { + createIndex(indexName, FlintOpenSearchIndexMetadataService.serialize(metadata, false), metadata.indexSettings()); + emitIndexCreationSuccessMetric(metadata.kind()); + } catch (IllegalStateException ex) { + emitIndexCreationFailureMetric(metadata.kind()); + throw ex; + } } protected void createIndex(String indexName, String mapping, Option settings) { @@ -122,4 +130,28 @@ public IRestHighLevelClient createClient() { private String sanitizeIndexName(String indexName) { return OpenSearchClientUtils.sanitizeIndexName(indexName); } + + private void emitIndexCreationSuccessMetric(String indexKind) { + emitIndexCreationMetric(indexKind, "success"); + } + + private void emitIndexCreationFailureMetric(String indexKind) { + emitIndexCreationMetric(indexKind, "failed"); + } + + private void emitIndexCreationMetric(String indexKind, String status) { + switch (indexKind) { + case "skipping": + MetricsUtil.addHistoricGauge(String.format("%s.%s.count", MetricConstants.CREATE_SKIPPING_INDICES, status), 1); + break; + case "covering": + MetricsUtil.addHistoricGauge(String.format("%s.%s.count", MetricConstants.CREATE_COVERING_INDICES, status), 1); + break; + case "mv": + MetricsUtil.addHistoricGauge(String.format("%s.%s.count", MetricConstants.CREATE_MV_INDICES, status), 1); + break; + default: + break; + } + } }