Skip to content

Commit

Permalink
Add more IT
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Oct 15, 2024
1 parent 6487eb6 commit c2ab09f
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,6 @@ trait FlintSparkTransactionSupport extends Logging {
isCorrupted
}

/*
* If execution reaches this point, it indicates that the Flint index is corrupted.
* In such cases, clean up the metadata log, as the index data no longer exists.
* There is a very small possibility that users may recreate the index in the
* interim, but metadata log get deleted by this cleanup process.
*/
private def cleanupCorruptedIndex(indexName: String): Unit = {
flintMetadataLogService
.startTransaction(indexName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,19 @@ package org.opensearch.flint.spark

import java.util.Base64

import scala.jdk.CollectionConverters.mapAsJavaMapConverter

import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.JsonMethods.parse
import org.json4s.native.Serialization
import org.opensearch.action.get.GetRequest
import org.opensearch.client.RequestOptions
import org.opensearch.client.indices.GetIndexRequest
import org.opensearch.flint.OpenSearchTransactionSuite
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.FAILED
import org.opensearch.flint.core.storage.FlintMetadataLogEntryOpenSearchConverter.constructLogEntry
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName
import org.opensearch.index.seqno.SequenceNumbers.{UNASSIGNED_PRIMARY_TERM, UNASSIGNED_SEQ_NO}
import org.scalatest.matchers.should.Matchers

class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Matchers {
Expand Down Expand Up @@ -218,7 +223,50 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match
("delete", () => flint.deleteIndex(testFlintIndex)),
("vacuum", () => flint.vacuumIndex(testFlintIndex)),
("recover", () => flint.recoverIndex(testFlintIndex))).foreach { case (opName, opAction) =>
test(s"should clean up metadata log entry when $opName if index is corrupted") {
test(s"should clean up metadata log entry when $opName index corrupted") {
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.create()

// Simulate user delete index data directly (move index state to deleted to assert index recreated)
flint.deleteIndex(testFlintIndex)
deleteIndex(testFlintIndex)

// Expect that next API call will clean it up
latestLogEntry(testLatestId) should contain("state" -> "deleted")
opAction()
latestLogEntry(testLatestId) shouldBe empty
}
}

test("should clean up metadata log entry and recreate when creating index corrupted") {
def createIndex(): Unit = {
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.create()
}
createIndex()

// Simulate user delete index data directly (move index state to deleted to assert index recreated)
flint.deleteIndex(testFlintIndex)
deleteIndex(testFlintIndex)

// Expect that create action will clean it up and then recreate
latestLogEntry(testLatestId) should contain("state" -> "deleted")
createIndex()
latestLogEntry(testLatestId) should contain("state" -> "active")
}

Seq(
("refresh", () => flint.refreshIndex(testFlintIndex)),
("delete", () => flint.deleteIndex(testFlintIndex)),
("vacuum", () => flint.vacuumIndex(testFlintIndex)),
("recover", () => flint.recoverIndex(testFlintIndex))).foreach { case (opName, opAction) =>
test(s"should clean up metadata log entry when $opName index corrupted and auto refreshing") {
flint
.skippingIndex()
.onTable(testTable)
Expand All @@ -227,18 +275,26 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match
.create()
flint.refreshIndex(testFlintIndex)

// Simulate the situation that user delete index data directly and then refresh exits
// Simulate user delete index data directly and index monitor moves index state to failed
spark.streams.active.find(_.name == testFlintIndex).get.stop()
deleteIndex(testFlintIndex)

// Index state is refreshing and expect recover API clean it up
latestLogEntry(testLatestId) should contain("state" -> "refreshing")
updateLatestLogEntry(
constructLogEntry(
testLatestId,
UNASSIGNED_SEQ_NO,
UNASSIGNED_PRIMARY_TERM,
latestLogEntry(testLatestId).asJava),
FAILED)

// Expect that next API call will clean it up
latestLogEntry(testLatestId) should contain("state" -> "failed")
opAction()
latestLogEntry(testLatestId) shouldBe empty
}
}

test("should clean up metadata log entry and create index if index is corrupted") {
test(
"should clean up metadata log entry and recreate when creating index corrupted and auto refreshing") {
def createIndex(): Unit = {
flint
.skippingIndex()
Expand All @@ -251,12 +307,19 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match
createIndex()
flint.refreshIndex(testFlintIndex)

// Simulate the situation that user delete index data directly and then refresh exits
// Simulate user delete index data directly and index monitor moves index state to failed
spark.streams.active.find(_.name == testFlintIndex).get.stop()
deleteIndex(testFlintIndex)

// Index state is refreshing and expect create action clean it up
latestLogEntry(testLatestId) should contain("state" -> "refreshing")
updateLatestLogEntry(
constructLogEntry(
testLatestId,
UNASSIGNED_SEQ_NO,
UNASSIGNED_PRIMARY_TERM,
latestLogEntry(testLatestId).asJava),
FAILED)

// Expect that create action clean it up and then recreate
latestLogEntry(testLatestId) should contain("state" -> "failed")
createIndex()
latestLogEntry(testLatestId) should contain("state" -> "active")
}
Expand Down

0 comments on commit c2ab09f

Please sign in to comment.