From 053736cf5f65bcf07ca229ffa538d7c7e6da711c Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 22 May 2024 15:46:27 -0700 Subject: [PATCH] Add IT for index state error Signed-off-by: Chen Dai --- .../metadata/log/DefaultOptimisticTransaction.java | 2 +- .../flint/spark/FlintSparkTransactionSupport.scala | 2 +- .../spark/FlintSparkTransactionSupportSuite.scala | 10 +++++----- .../flint/spark/FlintSparkTransactionITSuite.scala | 13 +++++++++++++ 4 files changed, 20 insertions(+), 7 deletions(-) diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java index f3ef364b3..54308bf80 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java @@ -81,7 +81,7 @@ public T commit(Function operation) { if (!initialCondition.test(latest)) { LOG.warning("Initial log entry doesn't satisfy precondition " + latest); throw new IllegalStateException( - "Transaction failed due to initial log precondition not satisfied"); + String.format("Index state [%s] doesn't satisfy precondition", latest.state())); } // Append optional transient log diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkTransactionSupport.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkTransactionSupport.scala index 0af04bed3..2257074da 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkTransactionSupport.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkTransactionSupport.scala @@ -36,7 +36,7 @@ trait FlintSparkTransactionSupport { self: Logging => * @param opName * the name of the operation, used for logging * @param forceInit - * a boolean flag indicating whether to force the initialization of the transaction + * a boolean flag indicating whether to force the initialization of the metadata log * @param opBlock * the operation block to execute within the transaction context, which takes an * `OptimisticTransaction` and returns a value of type `T` diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionSupportSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionSupportSuite.scala index 9120b33e8..155d706c1 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionSupportSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionSupportSuite.scala @@ -20,7 +20,7 @@ class FlintSparkTransactionSupportSuite extends FlintSuite with Matchers { private val testIndex = "test_index" private val testOpName = "test operation" - // Creating a fake FlintSparkTransactionSupport for test + /** Creating a fake FlintSparkTransactionSupport subclass for test */ private val transactionSupport = new FlintSparkTransactionSupport with Logging { override protected def flintClient: FlintClient = mockFlintClient override protected def dataSourceName: String = testDataSource @@ -41,20 +41,20 @@ class FlintSparkTransactionSupportSuite extends FlintSuite with Matchers { test("should throw exception with nested exception message") { the[IllegalStateException] thrownBy { transactionSupport.withTransaction[Unit](testIndex, testOpName) { _ => - throw new IllegalArgumentException("fake exception") + throw new IllegalArgumentException("Fake exception") } } should have message - "Failed to execute index operation [test operation] caused by: fake exception" + "Failed to execute index operation [test operation] caused by: Fake exception" } test("should throw exception with root cause exception message") { the[IllegalStateException] thrownBy { transactionSupport.withTransaction[Unit](testIndex, testOpName) { _ => - val rootCause = new IllegalArgumentException("fake root cause") + val rootCause = new IllegalArgumentException("Fake root cause") val cause = new RuntimeException("message ignored", rootCause) throw new IllegalStateException("message ignored", cause) } } should have message - "Failed to execute index operation [test operation] caused by: fake root cause" + "Failed to execute index operation [test operation] caused by: Fake root cause" } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala index 3bef0a45f..fbed33c81 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala @@ -177,6 +177,19 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match RequestOptions.DEFAULT) shouldBe false } + test("should fail to vacuum index if index is not logically deleted") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .create() + + the[IllegalStateException] thrownBy { + flint.vacuumIndex(testFlintIndex) + } should have message + s"Failed to execute index operation [Vacuum Flint index $testFlintIndex] caused by: Index state [active] doesn't satisfy precondition" + } + test("should not recreate index if index data still exists") { flint .skippingIndex()