Skip to content

Commit

Permalink
Skip corrupt check for creating and vacuuming index
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Sep 30, 2024
1 parent ed0d3d9 commit a422c3f
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,10 @@ class FlintSparkIndexMonitor(
logInfo(s"Scheduler trigger index monitor task for $indexName")
try {
if (isStreamingJobActive(indexName)) {
logInfo("Streaming job is still active")
flintMetadataLogService.recordHeartbeat(indexName)

if (!flintClient.exists(indexName)) {
if (flintClient.exists(indexName)) {
logInfo("Streaming job is still active")
flintMetadataLogService.recordHeartbeat(indexName)
} else {
logWarning("Streaming job is active but data is deleted")
stopStreamingJobAndMonitor(indexName)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.flint.spark

import org.opensearch.flint.common.metadata.log.{FlintMetadataLogService, OptimisticTransaction}
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.{CREATING, EMPTY, VACUUMING}
import org.opensearch.flint.common.metadata.log.OptimisticTransaction.NO_LOG_ENTRY
import org.opensearch.flint.core.FlintClient

Expand Down Expand Up @@ -54,7 +55,6 @@ trait FlintSparkTransactionSupport extends Logging {
try {
val isCorrupted = isIndexCorrupted(indexName)
if (isCorrupted) {
logWarning(s"Cleaning up for index operation [$opName $indexName] as index is corrupted")
cleanupCorruptedIndex(indexName)
}

Expand Down Expand Up @@ -82,18 +82,32 @@ trait FlintSparkTransactionSupport extends Logging {

/**
* Determines if the index is corrupted, meaning metadata log entry exists but the corresponding
* data index does not. There is no race condition with index creation, as it always creates the
* data index first. However, there is a very small chance with the vacuum operation, which
* deletes the data index before removing the metadata log entry.
* data index does not. For indexes creating or vacuuming, the check for a corrupted index is
* skipped to reduce the possibility of race condition. This is because the index may be in a
* transitional phase where the data index is temporarily missing before the process completes.
*/
private def isIndexCorrupted(indexName: String): Boolean = {
val logEntryExists =
val logEntry =
flintMetadataLogService
.getIndexMetadataLog(indexName)
.flatMap(_.getLatest)
.isPresent
val logEntryExists = logEntry.isPresent
val dataIndexExists = flintClient.exists(indexName)
logEntryExists && !dataIndexExists
val isCreatingOrVacuuming =
logEntry
.filter(e => e.state == EMPTY || e.state == CREATING || e.state == VACUUMING)
.isPresent
val isCorrupted = logEntryExists && !dataIndexExists && !isCreatingOrVacuuming

if (isCorrupted) {
logWarning(s"""
| Cleaning up corrupted index:
| - logEntryExists [$logEntryExists]
| - dataIndexExists [$dataIndexExists]
| - isCreatingOrVacuuming [$isCreatingOrVacuuming]
|""".stripMargin)
}
isCorrupted
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@
package org.opensearch.flint.spark

import java.util.Optional
import java.util.function.{Function, Predicate}

import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito._
import org.opensearch.flint.common.metadata.log.{FlintMetadataLog, FlintMetadataLogEntry, FlintMetadataLogService}
import org.mockito.invocation.InvocationOnMock
import org.opensearch.flint.common.metadata.log.{FlintMetadataLog, FlintMetadataLogEntry, FlintMetadataLogService, OptimisticTransaction}
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState._
import org.opensearch.flint.core.FlintClient
import org.scalatest.matchers.should.Matchers
import org.scalatestplus.mockito.MockitoSugar.mock
Expand All @@ -18,8 +22,9 @@ import org.apache.spark.FlintSuite
class FlintSparkTransactionSupportSuite extends FlintSuite with Matchers {

private val mockFlintClient: FlintClient = mock[FlintClient]
private val mockFlintMetadataLogService: FlintMetadataLogService =
mock[FlintMetadataLogService](RETURNS_DEEP_STUBS)
private val mockFlintMetadataLogService: FlintMetadataLogService = mock[FlintMetadataLogService]
private val mockTransaction = mock[OptimisticTransaction[_]]
private val mockLogEntry = mock[FlintMetadataLogEntry]
private val testIndex = "test_index"
private val testOpName = "test operation"

Expand All @@ -34,48 +39,95 @@ class FlintSparkTransactionSupportSuite extends FlintSuite with Matchers {
super.beforeEach()

val logEntry = mock[FlintMetadataLog[FlintMetadataLogEntry]]
when(logEntry.getLatest).thenReturn(Optional.of(mock[FlintMetadataLogEntry]))
when(logEntry.getLatest).thenReturn(Optional.of(mockLogEntry))
when(mockFlintMetadataLogService.getIndexMetadataLog(testIndex))
.thenReturn(Optional.of(logEntry))
when(mockFlintMetadataLogService.startTransaction(any[String]))
.thenAnswer((_: InvocationOnMock) => mockTransaction)

// Mock transaction method chain
when(mockTransaction.initialLog(any[Predicate[FlintMetadataLogEntry]]))
.thenAnswer((_: InvocationOnMock) => mockTransaction)
when(mockTransaction.finalLog(any[Function[FlintMetadataLogEntry, FlintMetadataLogEntry]]))
.thenAnswer((_: InvocationOnMock) => mockTransaction)
}

override protected def afterEach(): Unit = {
reset(mockFlintClient, mockFlintMetadataLogService)
reset(mockFlintClient, mockFlintMetadataLogService, mockTransaction, mockLogEntry)
super.afterEach()
}

test("execute transaction without force initialization") {
test("execute transaction") {
assertIndexOperation()
.withForceInit(false)
.withResult("test")
.whenIndexDataExists()
.expectResult("test")
.verifyTransaction(forceInit = false)
.verifyLogEntryCleanup(false)
}

test("execute transaction with force initialization") {
test("execute fore init transaction") {
assertIndexOperation()
.withForceInit(true)
.withResult("test")
.whenIndexDataExists()
.expectResult("test")
.verifyTransaction(forceInit = true)
.verifyLogEntryCleanup(false)
}

test("bypass transaction without force initialization when index corrupted") {
assertIndexOperation()
.withForceInit(false)
.withResult("test")
.whenIndexDataNotExist()
.expectNoResult()
Seq(EMPTY, CREATING, VACUUMING).foreach { indexState =>
test(s"execute transaction when corrupted index in $indexState") {
assertIndexOperation()
.withForceInit(false)
.withResult("test")
.withIndexState(indexState)
.whenIndexDataNotExist()
.expectResult("test")
.verifyTransaction(forceInit = false)
.verifyLogEntryCleanup(false)
}
}

test("execute transaction with force initialization even if index corrupted") {
assertIndexOperation()
.withForceInit(true)
.withResult("test")
.whenIndexDataNotExist()
.expectResult("test")
Seq(EMPTY, CREATING, VACUUMING).foreach { indexState =>
test(s"execute force init transaction if corrupted index in $indexState") {
assertIndexOperation()
.withForceInit(true)
.withResult("test")
.withIndexState(indexState)
.whenIndexDataNotExist()
.expectResult("test")
.verifyTransaction(forceInit = true)
.verifyLogEntryCleanup(false)
}
}

Seq(ACTIVE, UPDATING, REFRESHING, DELETING, DELETED, RECOVERING, FAILED).foreach { indexState =>
test(s"clean up log entry and bypass transaction when corrupted index in $indexState") {
assertIndexOperation()
.withForceInit(false)
.withResult("test")
.withIndexState(indexState)
.whenIndexDataNotExist()
.expectNoResult()
.verifyLogEntryCleanup(true)
}
}

Seq(ACTIVE, UPDATING, REFRESHING, DELETING, DELETED, RECOVERING, FAILED).foreach { indexState =>
test(
s"clean up log entry and execute force init transaction when corrupted index in $indexState") {
assertIndexOperation()
.withForceInit(true)
.withResult("test")
.withIndexState(indexState)
.whenIndexDataNotExist()
.expectResult("test")
.verifyLogEntryCleanup(true)
.verifyTransaction(forceInit = true)
.verifyLogEntryCleanup(true)
}
}

test("propagate original exception thrown within transaction") {
Expand Down Expand Up @@ -106,6 +158,11 @@ class FlintSparkTransactionSupportSuite extends FlintSuite with Matchers {
this
}

def withIndexState(expectedState: IndexState): FlintIndexAssertion = {
when(mockLogEntry.state).thenReturn(expectedState)
this
}

def whenIndexDataExists(): FlintIndexAssertion = {
when(mockFlintClient.exists(testIndex)).thenReturn(true)
this
Expand All @@ -116,6 +173,12 @@ class FlintSparkTransactionSupportSuite extends FlintSuite with Matchers {
this
}

def verifyLogEntryCleanup(cleanup: Boolean): FlintIndexAssertion = {
verify(mockTransaction, if (cleanup) times(1) else never())
.commit(any())
this
}

def verifyTransaction(forceInit: Boolean): FlintIndexAssertion = {
verify(mockFlintMetadataLogService, times(1)).startTransaction(testIndex, forceInit)
this
Expand All @@ -129,11 +192,12 @@ class FlintSparkTransactionSupportSuite extends FlintSuite with Matchers {
this
}

def expectNoResult(): Unit = {
def expectNoResult(): FlintIndexAssertion = {
val result = transactionSupport.withTransaction[String](testIndex, testOpName, forceInit) {
_ => expectedResult.getOrElse("")
}
result shouldBe None
this
}
}
}

0 comments on commit a422c3f

Please sign in to comment.