From 2a781090ce16beb86e2b4017d2d462793058e57b Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Thu, 14 Nov 2024 22:19:48 -0800 Subject: [PATCH] Add metrics for successful/failed Spark index creation (#837) (#917) * Add index creation metrics * Update emit metric function location * Apply scalafmt * revert FlintSparkIndexFactory metrics emission * Update OS client to emit metrics * Fix index kind detection * Remove cumbersome with() method * Revert the void change * Make the code branchless * Add default case to switches * Apply PR feedback --------- (cherry picked from commit 6f37b2d180793bc2ca32548086ccb20633b30ac5) Signed-off-by: Simeon Widdis Signed-off-by: github-actions[bot] Co-authored-by: github-actions[bot] --- .../flint/core/metrics/MetricConstants.java | 15 ++++++++ .../core/storage/FlintOpenSearchClient.java | 34 ++++++++++++++++++- 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java index 978950b3c..79e70b8c2 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java @@ -171,6 +171,21 @@ public final class MetricConstants { */ public static final String OUTPUT_TOTAL_RECORDS_WRITTEN = "output.totalRecordsWritten.count"; + /** + * Metric group related to skipping indices, such as create success and failure + */ + public static final String CREATE_SKIPPING_INDICES = "query.execution.index.skipping"; + + /** + * Metric group related to covering indices, such as create success and failure + */ + public static final String CREATE_COVERING_INDICES = "query.execution.index.covering"; + + /** + * Metric group related to materialized view indices, such as create success and failure + */ + public static final String CREATE_MV_INDICES = "query.execution.index.mv"; + /** * Metric for tracking the latency of checkpoint deletion */ diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index da22e3751..2bc097bba 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -16,6 +16,8 @@ import org.opensearch.flint.core.FlintClient; import org.opensearch.flint.core.FlintOptions; import org.opensearch.flint.core.IRestHighLevelClient; +import org.opensearch.flint.core.metrics.MetricConstants; +import org.opensearch.flint.core.metrics.MetricsUtil; import scala.Option; import java.io.IOException; @@ -40,7 +42,13 @@ public FlintOpenSearchClient(FlintOptions options) { @Override public void createIndex(String indexName, FlintMetadata metadata) { LOG.info("Creating Flint index " + indexName + " with metadata " + metadata); - createIndex(indexName, FlintOpenSearchIndexMetadataService.serialize(metadata, false), metadata.indexSettings()); + try { + createIndex(indexName, FlintOpenSearchIndexMetadataService.serialize(metadata, false), metadata.indexSettings()); + emitIndexCreationSuccessMetric(metadata.kind()); + } catch (IllegalStateException ex) { + emitIndexCreationFailureMetric(metadata.kind()); + throw ex; + } } protected void createIndex(String indexName, String mapping, Option settings) { @@ -122,4 +130,28 @@ public IRestHighLevelClient createClient() { private String sanitizeIndexName(String indexName) { return OpenSearchClientUtils.sanitizeIndexName(indexName); } + + private void emitIndexCreationSuccessMetric(String indexKind) { + emitIndexCreationMetric(indexKind, "success"); + } + + private void emitIndexCreationFailureMetric(String indexKind) { + emitIndexCreationMetric(indexKind, "failed"); + } + + private void emitIndexCreationMetric(String indexKind, String status) { + switch (indexKind) { + case "skipping": + MetricsUtil.addHistoricGauge(String.format("%s.%s.count", MetricConstants.CREATE_SKIPPING_INDICES, status), 1); + break; + case "covering": + MetricsUtil.addHistoricGauge(String.format("%s.%s.count", MetricConstants.CREATE_COVERING_INDICES, status), 1); + break; + case "mv": + MetricsUtil.addHistoricGauge(String.format("%s.%s.count", MetricConstants.CREATE_MV_INDICES, status), 1); + break; + default: + break; + } + } }