From 6487eb6316ec5396dc6539966c18498ea326c69b Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Mon, 30 Sep 2024 09:34:01 -0700 Subject: [PATCH] Skip corrupt check for creating and vacuuming index Signed-off-by: Chen Dai --- .../flint/spark/FlintSparkIndexMonitor.scala | 8 +- .../spark/FlintSparkTransactionSupport.scala | 28 +++-- .../FlintSparkTransactionSupportSuite.scala | 104 ++++++++++++++---- 3 files changed, 109 insertions(+), 31 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala index 2eb99ef34..1295e3f1d 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala @@ -160,10 +160,10 @@ class FlintSparkIndexMonitor( logInfo(s"Scheduler trigger index monitor task for $indexName") try { if (isStreamingJobActive(indexName)) { - logInfo("Streaming job is still active") - flintMetadataLogService.recordHeartbeat(indexName) - - if (!flintClient.exists(indexName)) { + if (flintClient.exists(indexName)) { + logInfo("Streaming job is still active") + flintMetadataLogService.recordHeartbeat(indexName) + } else { logWarning("Streaming job is active but data is deleted") stopStreamingJobAndMonitor(indexName) } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkTransactionSupport.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkTransactionSupport.scala index 87e8b60f9..b82210642 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkTransactionSupport.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkTransactionSupport.scala @@ -6,6 +6,7 @@ package org.opensearch.flint.spark import org.opensearch.flint.common.metadata.log.{FlintMetadataLogService, OptimisticTransaction} +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.{CREATING, EMPTY, VACUUMING} import org.opensearch.flint.common.metadata.log.OptimisticTransaction.NO_LOG_ENTRY import org.opensearch.flint.core.FlintClient @@ -54,7 +55,6 @@ trait FlintSparkTransactionSupport extends Logging { try { val isCorrupted = isIndexCorrupted(indexName) if (isCorrupted) { - logWarning(s"Cleaning up for index operation [$opName $indexName] as index is corrupted") cleanupCorruptedIndex(indexName) } @@ -82,18 +82,32 @@ trait FlintSparkTransactionSupport extends Logging { /** * Determines if the index is corrupted, meaning metadata log entry exists but the corresponding - * data index does not. There is no race condition with index creation, as it always creates the - * data index first. However, there is a very small chance with the vacuum operation, which - * deletes the data index before removing the metadata log entry. + * data index does not. For indexes creating or vacuuming, the check for a corrupted index is + * skipped to reduce the possibility of race condition. This is because the index may be in a + * transitional phase where the data index is temporarily missing before the process completes. */ private def isIndexCorrupted(indexName: String): Boolean = { - val logEntryExists = + val logEntry = flintMetadataLogService .getIndexMetadataLog(indexName) .flatMap(_.getLatest) - .isPresent + val logEntryExists = logEntry.isPresent val dataIndexExists = flintClient.exists(indexName) - logEntryExists && !dataIndexExists + val isCreatingOrVacuuming = + logEntry + .filter(e => e.state == EMPTY || e.state == CREATING || e.state == VACUUMING) + .isPresent + val isCorrupted = logEntryExists && !dataIndexExists && !isCreatingOrVacuuming + + if (isCorrupted) { + logWarning(s""" + | Cleaning up corrupted index: + | - logEntryExists [$logEntryExists] + | - dataIndexExists [$dataIndexExists] + | - isCreatingOrVacuuming [$isCreatingOrVacuuming] + |""".stripMargin) + } + isCorrupted } /* diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionSupportSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionSupportSuite.scala index 93a7a8bce..bf8d92c87 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionSupportSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionSupportSuite.scala @@ -6,9 +6,13 @@ package org.opensearch.flint.spark import java.util.Optional +import java.util.function.{Function, Predicate} +import org.mockito.ArgumentMatchers.any import org.mockito.Mockito._ -import org.opensearch.flint.common.metadata.log.{FlintMetadataLog, FlintMetadataLogEntry, FlintMetadataLogService} +import org.mockito.invocation.InvocationOnMock +import org.opensearch.flint.common.metadata.log.{FlintMetadataLog, FlintMetadataLogEntry, FlintMetadataLogService, OptimisticTransaction} +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState._ import org.opensearch.flint.core.FlintClient import org.scalatest.matchers.should.Matchers import org.scalatestplus.mockito.MockitoSugar.mock @@ -18,8 +22,9 @@ import org.apache.spark.FlintSuite class FlintSparkTransactionSupportSuite extends FlintSuite with Matchers { private val mockFlintClient: FlintClient = mock[FlintClient] - private val mockFlintMetadataLogService: FlintMetadataLogService = - mock[FlintMetadataLogService](RETURNS_DEEP_STUBS) + private val mockFlintMetadataLogService: FlintMetadataLogService = mock[FlintMetadataLogService] + private val mockTransaction = mock[OptimisticTransaction[_]] + private val mockLogEntry = mock[FlintMetadataLogEntry] private val testIndex = "test_index" private val testOpName = "test operation" @@ -34,48 +39,95 @@ class FlintSparkTransactionSupportSuite extends FlintSuite with Matchers { super.beforeEach() val logEntry = mock[FlintMetadataLog[FlintMetadataLogEntry]] - when(logEntry.getLatest).thenReturn(Optional.of(mock[FlintMetadataLogEntry])) + when(logEntry.getLatest).thenReturn(Optional.of(mockLogEntry)) when(mockFlintMetadataLogService.getIndexMetadataLog(testIndex)) .thenReturn(Optional.of(logEntry)) + when(mockFlintMetadataLogService.startTransaction(any[String])) + .thenAnswer((_: InvocationOnMock) => mockTransaction) + + // Mock transaction method chain + when(mockTransaction.initialLog(any[Predicate[FlintMetadataLogEntry]])) + .thenAnswer((_: InvocationOnMock) => mockTransaction) + when(mockTransaction.finalLog(any[Function[FlintMetadataLogEntry, FlintMetadataLogEntry]])) + .thenAnswer((_: InvocationOnMock) => mockTransaction) } override protected def afterEach(): Unit = { - reset(mockFlintClient, mockFlintMetadataLogService) + reset(mockFlintClient, mockFlintMetadataLogService, mockTransaction, mockLogEntry) super.afterEach() } - test("execute transaction without force initialization") { + test("execute transaction") { assertIndexOperation() .withForceInit(false) .withResult("test") .whenIndexDataExists() .expectResult("test") .verifyTransaction(forceInit = false) + .verifyLogEntryCleanup(false) } - test("execute transaction with force initialization") { + test("execute fore init transaction") { assertIndexOperation() .withForceInit(true) .withResult("test") .whenIndexDataExists() .expectResult("test") .verifyTransaction(forceInit = true) + .verifyLogEntryCleanup(false) } - test("bypass transaction without force initialization when index corrupted") { - assertIndexOperation() - .withForceInit(false) - .withResult("test") - .whenIndexDataNotExist() - .expectNoResult() + Seq(EMPTY, CREATING, VACUUMING).foreach { indexState => + test(s"execute transaction when corrupted index in $indexState") { + assertIndexOperation() + .withForceInit(false) + .withResult("test") + .withIndexState(indexState) + .whenIndexDataNotExist() + .expectResult("test") + .verifyTransaction(forceInit = false) + .verifyLogEntryCleanup(false) + } } - test("execute transaction with force initialization even if index corrupted") { - assertIndexOperation() - .withForceInit(true) - .withResult("test") - .whenIndexDataNotExist() - .expectResult("test") + Seq(EMPTY, CREATING, VACUUMING).foreach { indexState => + test(s"execute force init transaction if corrupted index in $indexState") { + assertIndexOperation() + .withForceInit(true) + .withResult("test") + .withIndexState(indexState) + .whenIndexDataNotExist() + .expectResult("test") + .verifyTransaction(forceInit = true) + .verifyLogEntryCleanup(false) + } + } + + Seq(ACTIVE, UPDATING, REFRESHING, DELETING, DELETED, RECOVERING, FAILED).foreach { indexState => + test(s"clean up log entry and bypass transaction when corrupted index in $indexState") { + assertIndexOperation() + .withForceInit(false) + .withResult("test") + .withIndexState(indexState) + .whenIndexDataNotExist() + .expectNoResult() + .verifyLogEntryCleanup(true) + } + } + + Seq(ACTIVE, UPDATING, REFRESHING, DELETING, DELETED, RECOVERING, FAILED).foreach { indexState => + test( + s"clean up log entry and execute force init transaction when corrupted index in $indexState") { + assertIndexOperation() + .withForceInit(true) + .withResult("test") + .withIndexState(indexState) + .whenIndexDataNotExist() + .expectResult("test") + .verifyLogEntryCleanup(true) + .verifyTransaction(forceInit = true) + .verifyLogEntryCleanup(true) + } } test("propagate original exception thrown within transaction") { @@ -106,6 +158,11 @@ class FlintSparkTransactionSupportSuite extends FlintSuite with Matchers { this } + def withIndexState(expectedState: IndexState): FlintIndexAssertion = { + when(mockLogEntry.state).thenReturn(expectedState) + this + } + def whenIndexDataExists(): FlintIndexAssertion = { when(mockFlintClient.exists(testIndex)).thenReturn(true) this @@ -116,6 +173,12 @@ class FlintSparkTransactionSupportSuite extends FlintSuite with Matchers { this } + def verifyLogEntryCleanup(cleanup: Boolean): FlintIndexAssertion = { + verify(mockTransaction, if (cleanup) times(1) else never()) + .commit(any()) + this + } + def verifyTransaction(forceInit: Boolean): FlintIndexAssertion = { verify(mockFlintMetadataLogService, times(1)).startTransaction(testIndex, forceInit) this @@ -129,11 +192,12 @@ class FlintSparkTransactionSupportSuite extends FlintSuite with Matchers { this } - def expectNoResult(): Unit = { + def expectNoResult(): FlintIndexAssertion = { val result = transactionSupport.withTransaction[String](testIndex, testOpName, forceInit) { _ => expectedResult.getOrElse("") } result shouldBe None + this } } }