Skip to content

Commit

Permalink
Fix failed index cannot be deleted issue
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed May 31, 2024
1 parent 95116b3 commit a176115
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,8 @@ class FlintSpark(val spark: SparkSession) extends Logging {
try {
flintClient
.startTransaction(indexName, dataSourceName)
.initialLog(latest => latest.state == ACTIVE || latest.state == REFRESHING)
.initialLog(latest =>
latest.state == ACTIVE || latest.state == REFRESHING || latest.state == FAILED)
.transientLog(latest => latest.copy(state = DELETING))
.finalLog(latest => latest.copy(state = DELETED))
.commit(_ => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@

package org.opensearch.flint.spark

import java.time.Duration
import java.util.Collections.singletonList
import java.util.concurrent.{ScheduledExecutorService, ScheduledFuture, TimeUnit}

import scala.collection.concurrent.{Map, TrieMap}
import scala.sys.addShutdownHook

import dev.failsafe.{Failsafe, RetryPolicy}
import dev.failsafe.event.ExecutionAttemptedEvent
import dev.failsafe.function.CheckedRunnable
import org.opensearch.flint.core.FlintClient
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.{FAILED, REFRESHING}
import org.opensearch.flint.core.metrics.{MetricConstants, MetricsUtil}
Expand Down Expand Up @@ -122,11 +127,13 @@ class FlintSparkIndexMonitor(
* ```
*/
logError(s"Streaming job $name terminated with exception", e)
flintClient
.startTransaction(name, dataSourceName)
.initialLog(latest => latest.state == REFRESHING)
.finalLog(latest => latest.copy(state = FAILED))
.commit(_ => {})
retry {
flintClient
.startTransaction(name, dataSourceName)
.initialLog(latest => latest.state == REFRESHING)
.finalLog(latest => latest.copy(state = FAILED))
.commit(_ => {})
}
}
} else {
logInfo(s"Index monitor for [$indexName] not found")
Expand Down Expand Up @@ -189,6 +196,25 @@ class FlintSparkIndexMonitor(
logWarning("Refreshing job not found")
}
}

private def retry(operation: => Unit): Unit = {
// Retry policy for 3 times every 1 second
val retryPolicy = RetryPolicy
.builder[Unit]()
.handle(classOf[Throwable])
.withDelay(Duration.ofSeconds(1))
.withMaxRetries(3)
.onFailedAttempt((event: ExecutionAttemptedEvent[Unit]) =>
logError("Attempt to update index state failed: " + event))
.build()

// Use the retry policy with Failsafe
Failsafe
.`with`(singletonList(retryPolicy))
.run(new CheckedRunnable {
override def run(): Unit = operation
})
}
}

object FlintSparkIndexMonitor extends Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,18 @@ class FlintJobITSuite extends FlintSparkSuite with JobTest {
super.beforeAll()
// initialized after the container is started
osClient = new OSClient(new FlintOptions(openSearchOptions.asJava))
}

protected override def beforeEach(): Unit = {
super.beforeEach()
createPartitionedMultiRowAddressTable(testTable)
}

protected override def afterEach(): Unit = {
super.afterEach()

deleteTestIndex(testIndex)

waitJobStop(threadLocalFuture.get())
sql(s"DROP TABLE $testTable")

threadLocalFuture.remove()
}
Expand Down Expand Up @@ -160,8 +163,10 @@ class FlintJobITSuite extends FlintSparkSuite with JobTest {
threadLocalFuture.set(startJob(query, jobRunId))

// Waiting from streaming job start and complete current batch
Thread.sleep(5000L)
pollForResultAndAssert(_ => true, jobRunId)
val activeJob = spark.streams.active.find(_.name == testIndex)
activeJob shouldBe defined
awaitStreamingComplete(activeJob.get.id.toString)

try {
Expand All @@ -181,7 +186,8 @@ class FlintJobITSuite extends FlintSparkSuite with JobTest {
case _: Exception => // expected
}

// Assert Flint index transitioned to FAILED state
// Assert Flint index transitioned to FAILED state after waiting seconds
Thread.sleep(2000L)
val latestId = Base64.getEncoder.encodeToString(testIndex.getBytes)
latestLogEntry(latestId) should contain("state" -> "failed")
} finally {
Expand Down

0 comments on commit a176115

Please sign in to comment.