diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 648258853..461011ce5 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -47,7 +47,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { implicit val formats: Formats = Serialization.formats(NoTypeHints) + SkippingKindSerializer /** Scheduler for updating index state regularly as needed, such as incremental refreshing */ - private val executor: ScheduledExecutorService = Executors.newScheduledThreadPool(1) + var executor: ScheduledExecutorService = Executors.newScheduledThreadPool(1) /** * Data source name. Assign empty string in case of backward compatibility. TODO: remove this in @@ -251,7 +251,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { try { flintClient .startTransaction(indexName, dataSourceName) - .initialLog(_ => true) // bypass state check and recover anyway + .initialLog(latest => Set(ACTIVE, REFRESHING, FAILED).contains(latest.state)) .transientLog(latest => latest.copy(state = RECOVERING)) .finalLog(latest => { scheduleIndexStateUpdate(indexName) diff --git a/integ-test/src/test/scala/org/opensearch/flint/OpenSearchTransactionSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/OpenSearchTransactionSuite.scala index 1e7077799..35b6688e5 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/OpenSearchTransactionSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/OpenSearchTransactionSuite.scala @@ -6,6 +6,7 @@ package org.opensearch.flint import java.util.Collections +import java.util.concurrent.ScheduledExecutorService import scala.collection.JavaConverters.mapAsScalaMapConverter @@ -20,6 +21,7 @@ import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.IndexState import org.opensearch.flint.core.storage.FlintOpenSearchClient._ import org.opensearch.flint.spark.FlintSparkSuite +import org.scalatestplus.mockito.MockitoSugar.mock /** * Transaction test base suite that creates the metadata log index which enables transaction @@ -33,6 +35,9 @@ trait OpenSearchTransactionSuite extends FlintSparkSuite { override def beforeAll(): Unit = { super.beforeAll() spark.conf.set("spark.flint.datasource.name", testDataSourceName) + + // Replace executor to avoid impact on IT + flint.executor = mock[ScheduledExecutorService] } override def beforeEach(): Unit = { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala index f6e23ef83..27693c63b 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala @@ -11,8 +11,9 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter import org.opensearch.flint.OpenSearchTransactionSuite import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.FAILED +import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._ import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName +import org.opensearch.index.seqno.SequenceNumbers.{UNASSIGNED_PRIMARY_TERM, UNASSIGNED_SEQ_NO} import org.scalatest.matchers.should.Matchers class FlintSparkIndexJobITSuite extends OpenSearchTransactionSuite with Matchers { @@ -28,7 +29,7 @@ class FlintSparkIndexJobITSuite extends OpenSearchTransactionSuite with Matchers } override def afterEach(): Unit = { - super.afterEach() + super.afterEach() // must clean up metadata log first and then delete flint.deleteIndex(testIndex) } @@ -58,20 +59,49 @@ class FlintSparkIndexJobITSuite extends OpenSearchTransactionSuite with Matchers latestLogEntry(latestId) should contain("state" -> "refreshing") } - test("recover should succeed even if index is in failed state") { - flint - .skippingIndex() - .onTable(testTable) - .addPartitions("year") - .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) - .create() + Seq(EMPTY, CREATING, DELETING, DELETED, RECOVERING, UNKNOWN).foreach { state => + test(s"recover should fail if index is in $state state") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year") + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) + .create() - updateLatestLogEntry( - new FlintMetadataLogEntry(latestId, 1, 1, latestLogEntry(latestId).asJava), - FAILED) + updateLatestLogEntry( + new FlintMetadataLogEntry( + latestId, + UNASSIGNED_SEQ_NO, + UNASSIGNED_PRIMARY_TERM, + latestLogEntry(latestId).asJava), + state) - flint.recoverIndex(testIndex) shouldBe true - spark.streams.active.exists(_.name == testIndex) - latestLogEntry(latestId) should contain("state" -> "refreshing") + assertThrows[IllegalStateException] { + flint.recoverIndex(testIndex) shouldBe true + } + } + } + + Seq(ACTIVE, REFRESHING, FAILED).foreach { state => + test(s"recover should succeed if index is in $state state") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year") + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) + .create() + + updateLatestLogEntry( + new FlintMetadataLogEntry( + latestId, + UNASSIGNED_SEQ_NO, + UNASSIGNED_PRIMARY_TERM, + latestLogEntry(latestId).asJava), + state) + + flint.recoverIndex(testIndex) shouldBe true + spark.streams.active.exists(_.name == testIndex) + latestLogEntry(latestId) should contain("state" -> "refreshing") + } } }