From 55d43d22b52f9586a24be7626326d79a5267b52b Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Mon, 30 Sep 2024 12:04:22 -0700 Subject: [PATCH] Add more IT Signed-off-by: Chen Dai --- .../spark/FlintSparkTransactionSupport.scala | 6 -- .../spark/FlintSparkTransactionITSuite.scala | 83 ++++++++++++++++--- 2 files changed, 73 insertions(+), 16 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkTransactionSupport.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkTransactionSupport.scala index b82210642..6fef994ce 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkTransactionSupport.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkTransactionSupport.scala @@ -110,12 +110,6 @@ trait FlintSparkTransactionSupport extends Logging { isCorrupted } - /* - * If execution reaches this point, it indicates that the Flint index is corrupted. - * In such cases, clean up the metadata log, as the index data no longer exists. - * There is a very small possibility that users may recreate the index in the - * interim, but metadata log get deleted by this cleanup process. - */ private def cleanupCorruptedIndex(indexName: String): Unit = { flintMetadataLogService .startTransaction(indexName) diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala index d415c10a4..debb95370 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala @@ -7,6 +7,8 @@ package org.opensearch.flint.spark import java.util.Base64 +import scala.jdk.CollectionConverters.mapAsJavaMapConverter + import org.json4s.{Formats, NoTypeHints} import org.json4s.native.JsonMethods.parse import org.json4s.native.Serialization @@ -14,7 +16,10 @@ import org.opensearch.action.get.GetRequest import org.opensearch.client.RequestOptions import org.opensearch.client.indices.GetIndexRequest import org.opensearch.flint.OpenSearchTransactionSuite +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.FAILED +import org.opensearch.flint.core.storage.FlintMetadataLogEntryOpenSearchConverter.constructLogEntry import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName +import org.opensearch.index.seqno.SequenceNumbers.{UNASSIGNED_PRIMARY_TERM, UNASSIGNED_SEQ_NO} import org.scalatest.matchers.should.Matchers class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Matchers { @@ -218,7 +223,50 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match ("delete", () => flint.deleteIndex(testFlintIndex)), ("vacuum", () => flint.vacuumIndex(testFlintIndex)), ("recover", () => flint.recoverIndex(testFlintIndex))).foreach { case (opName, opAction) => - test(s"should clean up metadata log entry when $opName if index is corrupted") { + test(s"should clean up metadata log entry when $opName index corrupted") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .create() + + // Simulate user delete index data directly (move index state to deleted to assert index recreated) + flint.deleteIndex(testFlintIndex) + deleteIndex(testFlintIndex) + + // Expect that next API call will clean it up + latestLogEntry(testLatestId) should contain("state" -> "deleted") + opAction() + latestLogEntry(testLatestId) shouldBe empty + } + } + + test("should clean up metadata log entry and recreate when creating index corrupted") { + def createIndex(): Unit = { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .create() + } + createIndex() + + // Simulate user delete index data directly (move index state to deleted to assert index recreated) + flint.deleteIndex(testFlintIndex) + deleteIndex(testFlintIndex) + + // Expect that create action will clean it up and then recreate + latestLogEntry(testLatestId) should contain("state" -> "deleted") + createIndex() + latestLogEntry(testLatestId) should contain("state" -> "active") + } + + Seq( + ("refresh", () => flint.refreshIndex(testFlintIndex)), + ("delete", () => flint.deleteIndex(testFlintIndex)), + ("vacuum", () => flint.vacuumIndex(testFlintIndex)), + ("recover", () => flint.recoverIndex(testFlintIndex))).foreach { case (opName, opAction) => + test(s"should clean up metadata log entry when $opName index corrupted and auto refreshing") { flint .skippingIndex() .onTable(testTable) @@ -227,18 +275,26 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match .create() flint.refreshIndex(testFlintIndex) - // Simulate the situation that user delete index data directly and then refresh exits + // Simulate user delete index data directly and index monitor moves index state to failed spark.streams.active.find(_.name == testFlintIndex).get.stop() deleteIndex(testFlintIndex) - - // Index state is refreshing and expect recover API clean it up - latestLogEntry(testLatestId) should contain("state" -> "refreshing") + updateLatestLogEntry( + constructLogEntry( + testLatestId, + UNASSIGNED_SEQ_NO, + UNASSIGNED_PRIMARY_TERM, + latestLogEntry(testLatestId).asJava), + FAILED) + + // Expect that next API call will clean it up + latestLogEntry(testLatestId) should contain("state" -> "failed") opAction() latestLogEntry(testLatestId) shouldBe empty } } - test("should clean up metadata log entry and create index if index is corrupted") { + test( + "should clean up metadata log entry and recreate when creating index corrupted and auto refreshing") { def createIndex(): Unit = { flint .skippingIndex() @@ -251,12 +307,19 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match createIndex() flint.refreshIndex(testFlintIndex) - // Simulate the situation that user delete index data directly and then refresh exits + // Simulate user delete index data directly and index monitor moves index state to failed spark.streams.active.find(_.name == testFlintIndex).get.stop() deleteIndex(testFlintIndex) - - // Index state is refreshing and expect create action clean it up - latestLogEntry(testLatestId) should contain("state" -> "refreshing") + updateLatestLogEntry( + constructLogEntry( + testLatestId, + UNASSIGNED_SEQ_NO, + UNASSIGNED_PRIMARY_TERM, + latestLogEntry(testLatestId).asJava), + FAILED) + + // Expect that create action clean it up and then recreate + latestLogEntry(testLatestId) should contain("state" -> "failed") createIndex() latestLogEntry(testLatestId) should contain("state" -> "active") }