diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 779b7e013..309877fdd 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -48,7 +48,8 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w IGNORE_DOC_ID_COLUMN.optionKey -> "true").asJava) /** Flint client for low-level index operation */ - private val flintClient: FlintClient = FlintClientBuilder.build(flintSparkConf.flintOptions()) + override protected val flintClient: FlintClient = + FlintClientBuilder.build(flintSparkConf.flintOptions()) private val flintIndexMetadataService: FlintIndexMetadataService = { FlintIndexMetadataServiceBuilder.build(flintSparkConf.flintOptions()) @@ -170,7 +171,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w } }) .commit(_ => indexRefresh.start(spark, flintSparkConf)) - } + }.flatten /** * Describe all Flint indexes whose name matches the given pattern. @@ -242,7 +243,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w case (true, false) => updateIndexManualToAuto(index, tx) case (false, false) => updateIndexAutoToManual(index, tx) } - } + }.flatten } /** @@ -276,7 +277,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w logInfo("Flint index to be deleted doesn't exist") false } - } + }.getOrElse(false) /** * Delete a Flint index physically. @@ -319,7 +320,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w logInfo("Flint index to vacuum doesn't exist") false } - } + }.getOrElse(false) /** * Recover index job. @@ -356,24 +357,10 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w true }) } else { - logInfo("Index to be recovered either doesn't exist or not auto refreshed") - if (index.isEmpty) { - /* - * If execution reaches this point, it indicates that the Flint index is corrupted. - * In such cases, clean up the metadata log, as the index data no longer exists. - * There is a very small possibility that users may recreate the index in the - * interim, but metadata log get deleted by this cleanup process. - */ - logWarning("Cleaning up metadata log as index data has been deleted") - tx - .initialLog(_ => true) - .finalLog(_ => NO_LOG_ENTRY) - .commit(_ => { false }) - } else { - false - } + logInfo("Index to be recovered is not auto refreshed") + false } - } + }.getOrElse(false) /** * Build data frame for querying the given index. This is mostly for unit test convenience. diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala index 2eb99ef34..144b58404 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala @@ -159,18 +159,22 @@ class FlintSparkIndexMonitor( override def run(): Unit = { logInfo(s"Scheduler trigger index monitor task for $indexName") try { - if (isStreamingJobActive(indexName)) { - logInfo("Streaming job is still active") - flintMetadataLogService.recordHeartbeat(indexName) + val isJobActive = isStreamingJobActive(indexName) + val indexExists = flintClient.exists(indexName) - if (!flintClient.exists(indexName)) { - logWarning("Streaming job is active but data is deleted") + (isJobActive, indexExists) match { + case (true, true) => + logInfo("Streaming job is active and index exists") + flintMetadataLogService.recordHeartbeat(indexName) + + case (true, false) => + logWarning("Streaming job is active but index is deleted") stopStreamingJobAndMonitor(indexName) - } - } else { - logError("Streaming job is not active. Cancelling monitor task") - stopMonitor(indexName) - logInfo("Index monitor task is cancelled") + + case (false, _) => + logError("Streaming job is not active. Cancelling monitor task") + stopMonitor(indexName) + logInfo("Index monitor task is cancelled") } errorCnt = 0 // Reset counter if no error } catch { diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkTransactionSupport.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkTransactionSupport.scala index 045c527c5..6fef994ce 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkTransactionSupport.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkTransactionSupport.scala @@ -6,6 +6,9 @@ package org.opensearch.flint.spark import org.opensearch.flint.common.metadata.log.{FlintMetadataLogService, OptimisticTransaction} +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.{CREATING, EMPTY, VACUUMING} +import org.opensearch.flint.common.metadata.log.OptimisticTransaction.NO_LOG_ENTRY +import org.opensearch.flint.core.FlintClient import org.apache.spark.internal.Logging @@ -13,11 +16,13 @@ import org.apache.spark.internal.Logging * Provides transaction support with proper error handling and logging capabilities. * * @note - * This trait requires the mixing class to extend Spark's `Logging` to utilize its logging - * functionalities. Meanwhile it needs to provide `FlintClient` and data source name so this - * trait can help create transaction context. + * This trait requires the mixing class to provide both `FlintClient` and + * `FlintMetadataLogService` so this trait can help create transaction context. */ -trait FlintSparkTransactionSupport { self: Logging => +trait FlintSparkTransactionSupport extends Logging { + + /** Flint client defined in the mixing class */ + protected def flintClient: FlintClient /** Flint metadata log service defined in the mixing class */ protected def flintMetadataLogService: FlintMetadataLogService @@ -25,7 +30,9 @@ trait FlintSparkTransactionSupport { self: Logging => /** * Executes a block of code within a transaction context, handling and logging errors * appropriately. This method logs the start and completion of the transaction and captures any - * exceptions that occur, enriching them with detailed error messages before re-throwing. + * exceptions that occur, enriching them with detailed error messages before re-throwing. If the + * index data is missing (excluding index creation actions), the operation is bypassed, and any + * dangling metadata log entries are cleaned up. * * @param indexName * the name of the index on which the operation is performed @@ -39,19 +46,31 @@ trait FlintSparkTransactionSupport { self: Logging => * @tparam T * the type of the result produced by the operation block * @return - * the result of the operation block + * Some(result) of the operation block if the operation is executed, or None if the operation + * execution is bypassed due to index corrupted */ def withTransaction[T](indexName: String, opName: String, forceInit: Boolean = false)( - opBlock: OptimisticTransaction[T] => T): T = { + opBlock: OptimisticTransaction[T] => T): Option[T] = { logInfo(s"Starting index operation [$opName $indexName] with forceInit=$forceInit") try { - // Create transaction (only have side effect if forceInit is true) - val tx: OptimisticTransaction[T] = - flintMetadataLogService.startTransaction(indexName, forceInit) + val isCorrupted = isIndexCorrupted(indexName) + if (isCorrupted) { + cleanupCorruptedIndex(indexName) + } + + // Execute the action if create index action (indicated by forceInit) or not corrupted + if (forceInit || !isCorrupted) { - val result = opBlock(tx) - logInfo(s"Index operation [$opName $indexName] complete") - result + // Create transaction (only have side effect if forceInit is true) + val tx: OptimisticTransaction[T] = + flintMetadataLogService.startTransaction(indexName, forceInit) + val result = opBlock(tx) + logInfo(s"Index operation [$opName $indexName] complete") + Some(result) + } else { + logWarning(s"Bypassing index operation [$opName $indexName]") + None + } } catch { case e: Exception => logError(s"Failed to execute index operation [$opName $indexName]", e) @@ -60,4 +79,42 @@ trait FlintSparkTransactionSupport { self: Logging => throw e } } + + /** + * Determines if the index is corrupted, meaning metadata log entry exists but the corresponding + * data index does not. For indexes creating or vacuuming, the check for a corrupted index is + * skipped to reduce the possibility of race condition. This is because the index may be in a + * transitional phase where the data index is temporarily missing before the process completes. + */ + private def isIndexCorrupted(indexName: String): Boolean = { + val logEntry = + flintMetadataLogService + .getIndexMetadataLog(indexName) + .flatMap(_.getLatest) + val logEntryExists = logEntry.isPresent + val dataIndexExists = flintClient.exists(indexName) + val isCreatingOrVacuuming = + logEntry + .filter(e => e.state == EMPTY || e.state == CREATING || e.state == VACUUMING) + .isPresent + val isCorrupted = logEntryExists && !dataIndexExists && !isCreatingOrVacuuming + + if (isCorrupted) { + logWarning(s""" + | Cleaning up corrupted index: + | - logEntryExists [$logEntryExists] + | - dataIndexExists [$dataIndexExists] + | - isCreatingOrVacuuming [$isCreatingOrVacuuming] + |""".stripMargin) + } + isCorrupted + } + + private def cleanupCorruptedIndex(indexName: String): Unit = { + flintMetadataLogService + .startTransaction(indexName) + .initialLog(_ => true) + .finalLog(_ => NO_LOG_ENTRY) + .commit(_ => {}) + } } diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionSupportSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionSupportSuite.scala index 7646dde60..bf8d92c87 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionSupportSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionSupportSuite.scala @@ -5,40 +5,135 @@ package org.opensearch.flint.spark -import org.mockito.Mockito.{times, verify} -import org.opensearch.flint.common.metadata.log.FlintMetadataLogService +import java.util.Optional +import java.util.function.{Function, Predicate} + +import org.mockito.ArgumentMatchers.any +import org.mockito.Mockito._ +import org.mockito.invocation.InvocationOnMock +import org.opensearch.flint.common.metadata.log.{FlintMetadataLog, FlintMetadataLogEntry, FlintMetadataLogService, OptimisticTransaction} +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState._ +import org.opensearch.flint.core.FlintClient import org.scalatest.matchers.should.Matchers import org.scalatestplus.mockito.MockitoSugar.mock import org.apache.spark.FlintSuite -import org.apache.spark.internal.Logging class FlintSparkTransactionSupportSuite extends FlintSuite with Matchers { + private val mockFlintClient: FlintClient = mock[FlintClient] private val mockFlintMetadataLogService: FlintMetadataLogService = mock[FlintMetadataLogService] + private val mockTransaction = mock[OptimisticTransaction[_]] + private val mockLogEntry = mock[FlintMetadataLogEntry] private val testIndex = "test_index" private val testOpName = "test operation" /** Creating a fake FlintSparkTransactionSupport subclass for test */ - private val transactionSupport = new FlintSparkTransactionSupport with Logging { + private val transactionSupport = new FlintSparkTransactionSupport { + override protected def flintClient: FlintClient = mockFlintClient override protected def flintMetadataLogService: FlintMetadataLogService = mockFlintMetadataLogService } - test("with transaction without force initialization") { - transactionSupport.withTransaction[Unit](testIndex, testOpName) { _ => } + override protected def beforeEach(): Unit = { + super.beforeEach() + + val logEntry = mock[FlintMetadataLog[FlintMetadataLogEntry]] + when(logEntry.getLatest).thenReturn(Optional.of(mockLogEntry)) + when(mockFlintMetadataLogService.getIndexMetadataLog(testIndex)) + .thenReturn(Optional.of(logEntry)) + when(mockFlintMetadataLogService.startTransaction(any[String])) + .thenAnswer((_: InvocationOnMock) => mockTransaction) + + // Mock transaction method chain + when(mockTransaction.initialLog(any[Predicate[FlintMetadataLogEntry]])) + .thenAnswer((_: InvocationOnMock) => mockTransaction) + when(mockTransaction.finalLog(any[Function[FlintMetadataLogEntry, FlintMetadataLogEntry]])) + .thenAnswer((_: InvocationOnMock) => mockTransaction) + } + + override protected def afterEach(): Unit = { + reset(mockFlintClient, mockFlintMetadataLogService, mockTransaction, mockLogEntry) + super.afterEach() + } + + test("execute transaction") { + assertIndexOperation() + .withForceInit(false) + .withResult("test") + .whenIndexDataExists() + .expectResult("test") + .verifyTransaction(forceInit = false) + .verifyLogEntryCleanup(false) + } + + test("execute fore init transaction") { + assertIndexOperation() + .withForceInit(true) + .withResult("test") + .whenIndexDataExists() + .expectResult("test") + .verifyTransaction(forceInit = true) + .verifyLogEntryCleanup(false) + } - verify(mockFlintMetadataLogService, times(1)).startTransaction(testIndex, false) + Seq(EMPTY, CREATING, VACUUMING).foreach { indexState => + test(s"execute transaction when corrupted index in $indexState") { + assertIndexOperation() + .withForceInit(false) + .withResult("test") + .withIndexState(indexState) + .whenIndexDataNotExist() + .expectResult("test") + .verifyTransaction(forceInit = false) + .verifyLogEntryCleanup(false) + } } - test("with transaction with force initialization") { - transactionSupport.withTransaction[Unit](testIndex, testOpName, forceInit = true) { _ => } + Seq(EMPTY, CREATING, VACUUMING).foreach { indexState => + test(s"execute force init transaction if corrupted index in $indexState") { + assertIndexOperation() + .withForceInit(true) + .withResult("test") + .withIndexState(indexState) + .whenIndexDataNotExist() + .expectResult("test") + .verifyTransaction(forceInit = true) + .verifyLogEntryCleanup(false) + } + } - verify(mockFlintMetadataLogService, times(1)).startTransaction(testIndex, true) + Seq(ACTIVE, UPDATING, REFRESHING, DELETING, DELETED, RECOVERING, FAILED).foreach { indexState => + test(s"clean up log entry and bypass transaction when corrupted index in $indexState") { + assertIndexOperation() + .withForceInit(false) + .withResult("test") + .withIndexState(indexState) + .whenIndexDataNotExist() + .expectNoResult() + .verifyLogEntryCleanup(true) + } } - test("should throw original exception") { + Seq(ACTIVE, UPDATING, REFRESHING, DELETING, DELETED, RECOVERING, FAILED).foreach { indexState => + test( + s"clean up log entry and execute force init transaction when corrupted index in $indexState") { + assertIndexOperation() + .withForceInit(true) + .withResult("test") + .withIndexState(indexState) + .whenIndexDataNotExist() + .expectResult("test") + .verifyLogEntryCleanup(true) + .verifyTransaction(forceInit = true) + .verifyLogEntryCleanup(true) + } + } + + test("propagate original exception thrown within transaction") { the[RuntimeException] thrownBy { + when(mockFlintClient.exists(testIndex)).thenReturn(true) + transactionSupport.withTransaction[Unit](testIndex, testOpName) { _ => val rootCause = new IllegalArgumentException("Fake root cause") val cause = new RuntimeException("Fake cause", rootCause) @@ -46,4 +141,63 @@ class FlintSparkTransactionSupportSuite extends FlintSuite with Matchers { } } should have message "Fake cause" } + + private def assertIndexOperation(): FlintIndexAssertion = new FlintIndexAssertion + + class FlintIndexAssertion { + private var forceInit: Boolean = false + private var expectedResult: Option[String] = None + + def withForceInit(forceInit: Boolean): FlintIndexAssertion = { + this.forceInit = forceInit + this + } + + def withResult(expectedResult: String): FlintIndexAssertion = { + this.expectedResult = Some(expectedResult) + this + } + + def withIndexState(expectedState: IndexState): FlintIndexAssertion = { + when(mockLogEntry.state).thenReturn(expectedState) + this + } + + def whenIndexDataExists(): FlintIndexAssertion = { + when(mockFlintClient.exists(testIndex)).thenReturn(true) + this + } + + def whenIndexDataNotExist(): FlintIndexAssertion = { + when(mockFlintClient.exists(testIndex)).thenReturn(false) + this + } + + def verifyLogEntryCleanup(cleanup: Boolean): FlintIndexAssertion = { + verify(mockTransaction, if (cleanup) times(1) else never()) + .commit(any()) + this + } + + def verifyTransaction(forceInit: Boolean): FlintIndexAssertion = { + verify(mockFlintMetadataLogService, times(1)).startTransaction(testIndex, forceInit) + this + } + + def expectResult(expectedResult: String): FlintIndexAssertion = { + val result = transactionSupport.withTransaction[String](testIndex, testOpName, forceInit) { + _ => expectedResult + } + result shouldBe Some(expectedResult) + this + } + + def expectNoResult(): FlintIndexAssertion = { + val result = transactionSupport.withTransaction[String](testIndex, testOpName, forceInit) { + _ => expectedResult.getOrElse("") + } + result shouldBe None + this + } + } } diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala index b543ba87c..debb95370 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala @@ -7,6 +7,8 @@ package org.opensearch.flint.spark import java.util.Base64 +import scala.jdk.CollectionConverters.mapAsJavaMapConverter + import org.json4s.{Formats, NoTypeHints} import org.json4s.native.JsonMethods.parse import org.json4s.native.Serialization @@ -14,7 +16,10 @@ import org.opensearch.action.get.GetRequest import org.opensearch.client.RequestOptions import org.opensearch.client.indices.GetIndexRequest import org.opensearch.flint.OpenSearchTransactionSuite +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.FAILED +import org.opensearch.flint.core.storage.FlintMetadataLogEntryOpenSearchConverter.constructLogEntry import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName +import org.opensearch.index.seqno.SequenceNumbers.{UNASSIGNED_PRIMARY_TERM, UNASSIGNED_SEQ_NO} import org.scalatest.matchers.should.Matchers class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Matchers { @@ -213,22 +218,109 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match s"Flint index $testFlintIndex already exists" } - test("should clean up metadata log entry if index data has been deleted") { - flint - .skippingIndex() - .onTable(testTable) - .addPartitions("year", "month") - .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")), testFlintIndex) - .create() + Seq( + ("refresh", () => flint.refreshIndex(testFlintIndex)), + ("delete", () => flint.deleteIndex(testFlintIndex)), + ("vacuum", () => flint.vacuumIndex(testFlintIndex)), + ("recover", () => flint.recoverIndex(testFlintIndex))).foreach { case (opName, opAction) => + test(s"should clean up metadata log entry when $opName index corrupted") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .create() + + // Simulate user delete index data directly (move index state to deleted to assert index recreated) + flint.deleteIndex(testFlintIndex) + deleteIndex(testFlintIndex) + + // Expect that next API call will clean it up + latestLogEntry(testLatestId) should contain("state" -> "deleted") + opAction() + latestLogEntry(testLatestId) shouldBe empty + } + } + + test("should clean up metadata log entry and recreate when creating index corrupted") { + def createIndex(): Unit = { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .create() + } + createIndex() + + // Simulate user delete index data directly (move index state to deleted to assert index recreated) + flint.deleteIndex(testFlintIndex) + deleteIndex(testFlintIndex) + + // Expect that create action will clean it up and then recreate + latestLogEntry(testLatestId) should contain("state" -> "deleted") + createIndex() + latestLogEntry(testLatestId) should contain("state" -> "active") + } + + Seq( + ("refresh", () => flint.refreshIndex(testFlintIndex)), + ("delete", () => flint.deleteIndex(testFlintIndex)), + ("vacuum", () => flint.vacuumIndex(testFlintIndex)), + ("recover", () => flint.recoverIndex(testFlintIndex))).foreach { case (opName, opAction) => + test(s"should clean up metadata log entry when $opName index corrupted and auto refreshing") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")), testFlintIndex) + .create() + flint.refreshIndex(testFlintIndex) + + // Simulate user delete index data directly and index monitor moves index state to failed + spark.streams.active.find(_.name == testFlintIndex).get.stop() + deleteIndex(testFlintIndex) + updateLatestLogEntry( + constructLogEntry( + testLatestId, + UNASSIGNED_SEQ_NO, + UNASSIGNED_PRIMARY_TERM, + latestLogEntry(testLatestId).asJava), + FAILED) + + // Expect that next API call will clean it up + latestLogEntry(testLatestId) should contain("state" -> "failed") + opAction() + latestLogEntry(testLatestId) shouldBe empty + } + } + + test( + "should clean up metadata log entry and recreate when creating index corrupted and auto refreshing") { + def createIndex(): Unit = { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")), testFlintIndex) + .create() + } + + createIndex() flint.refreshIndex(testFlintIndex) - // Simulate the situation that user delete index data directly and then refresh exits + // Simulate user delete index data directly and index monitor moves index state to failed spark.streams.active.find(_.name == testFlintIndex).get.stop() deleteIndex(testFlintIndex) - - // Index state is refreshing and expect recover API clean it up - latestLogEntry(testLatestId) should contain("state" -> "refreshing") - flint.recoverIndex(testFlintIndex) - latestLogEntry(testLatestId) shouldBe empty + updateLatestLogEntry( + constructLogEntry( + testLatestId, + UNASSIGNED_SEQ_NO, + UNASSIGNED_PRIMARY_TERM, + latestLogEntry(testLatestId).asJava), + FAILED) + + // Expect that create action clean it up and then recreate + latestLogEntry(testLatestId) should contain("state" -> "failed") + createIndex() + latestLogEntry(testLatestId) should contain("state" -> "active") } }