From 0a5ef42ef829a804337c88019b343f454561cad3 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Thu, 31 Oct 2024 05:00:06 +0000 Subject: [PATCH] Add index state metrics (#825) * Add index state metrics Signed-off-by: Tomoyuki Morita * Change metric name Signed-off-by: Tomoyuki Morita * Add tests for metrics Signed-off-by: Tomoyuki Morita * Fix test Signed-off-by: Tomoyuki Morita --------- Signed-off-by: Tomoyuki Morita (cherry picked from commit d03cc8c8ae5cd6706a5f00779c7abfa623bb44b9) Signed-off-by: github-actions[bot] --- .../flint/core/metrics/MetricConstants.java | 10 +++ .../log/DefaultOptimisticTransaction.java | 12 ++++ ...efaultOptimisticTransactionMetricTest.java | 68 +++++++++++++++++++ 3 files changed, 90 insertions(+) create mode 100644 flint-core/src/test/java/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransactionMetricTest.java diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java index 35297de6a..733b656e7 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java @@ -160,6 +160,16 @@ public final class MetricConstants { */ public static final String CHECKPOINT_DELETE_TIME_METRIC = "checkpoint.delete.processingTime"; + /** + * Metric prefix for tracking the index state transitions + */ + public static final String INDEX_STATE_UPDATED_TO_PREFIX = "indexState.updatedTo."; + + /** + * Metric for tracking the index state transitions + */ + public static final String INITIAL_CONDITION_CHECK_FAILED_PREFIX = "initialConditionCheck.failed."; + private MetricConstants() { // Private constructor to prevent instantiation } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java index ec1eabf14..466787c81 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java @@ -16,6 +16,8 @@ import org.opensearch.flint.common.metadata.log.FlintMetadataLog; import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry; import org.opensearch.flint.common.metadata.log.OptimisticTransaction; +import org.opensearch.flint.core.metrics.MetricConstants; +import org.opensearch.flint.core.metrics.MetricsUtil; /** * Default optimistic transaction implementation that captures the basic workflow for @@ -73,6 +75,7 @@ public T commit(Function operation) { // Perform initial log check if (!initialCondition.test(latest)) { LOG.warning("Initial log entry doesn't satisfy precondition " + latest); + emitConditionCheckFailedMetric(latest); throw new IllegalStateException( String.format("Index state [%s] doesn't satisfy precondition", latest.state())); } @@ -104,6 +107,7 @@ public T commit(Function operation) { metadataLog.purge(); } else { metadataLog.add(finalLog); + emitFinalLogStateMetric(finalLog); } return result; } catch (Exception e) { @@ -119,4 +123,12 @@ public T commit(Function operation) { throw new IllegalStateException("Failed to commit transaction operation", e); } } + + private void emitConditionCheckFailedMetric(FlintMetadataLogEntry latest) { + MetricsUtil.addHistoricGauge(MetricConstants.INITIAL_CONDITION_CHECK_FAILED_PREFIX + latest.state() + ".count", 1); + } + + private void emitFinalLogStateMetric(FlintMetadataLogEntry finalLog) { + MetricsUtil.addHistoricGauge(MetricConstants.INDEX_STATE_UPDATED_TO_PREFIX + finalLog.state() + ".count", 1); + } } diff --git a/flint-core/src/test/java/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransactionMetricTest.java b/flint-core/src/test/java/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransactionMetricTest.java new file mode 100644 index 000000000..838e9978c --- /dev/null +++ b/flint-core/src/test/java/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransactionMetricTest.java @@ -0,0 +1,68 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.metadata.log; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.flint.common.metadata.log.FlintMetadataLog; +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry; + +import java.util.Optional; +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState$; +import org.opensearch.flint.core.metrics.MetricsTestUtil; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +@ExtendWith(MockitoExtension.class) +class DefaultOptimisticTransactionMetricTest { + + @Mock + private FlintMetadataLog metadataLog; + + @Mock + private FlintMetadataLogEntry logEntry; + + @InjectMocks + private DefaultOptimisticTransaction transaction; + + @Test + void testCommitWithValidInitialCondition() throws Exception { + MetricsTestUtil.withMetricEnv(verifier -> { + when(metadataLog.getLatest()).thenReturn(Optional.of(logEntry)); + when(metadataLog.add(any(FlintMetadataLogEntry.class))).thenReturn(logEntry); + when(logEntry.state()).thenReturn(IndexState$.MODULE$.ACTIVE()); + + transaction.initialLog(entry -> true) + .transientLog(entry -> logEntry) + .finalLog(entry -> logEntry) + .commit(entry -> "Success"); + + verify(metadataLog, times(2)).add(logEntry); + verifier.assertHistoricGauge("indexState.updatedTo.active.count", 1); + }); + } + + @Test + void testConditionCheckFailed() throws Exception { + MetricsTestUtil.withMetricEnv(verifier -> { + when(metadataLog.getLatest()).thenReturn(Optional.of(logEntry)); + when(logEntry.state()).thenReturn(IndexState$.MODULE$.DELETED()); + + transaction.initialLog(entry -> false) + .finalLog(entry -> logEntry); + + assertThrows(IllegalStateException.class, () -> { + transaction.commit(entry -> "Should Fail"); + }); + verifier.assertHistoricGauge("initialConditionCheck.failed.deleted.count", 1); + }); + } +}