From a176115d25e2126e9c22ad590e5d4f616291aefd Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 30 May 2024 17:44:12 -0700 Subject: [PATCH] Fix failed index cannot be deleted issue Signed-off-by: Chen Dai --- .../opensearch/flint/spark/FlintSpark.scala | 3 +- .../flint/spark/FlintSparkIndexMonitor.scala | 36 ++++++++++++++++--- .../apache/spark/sql/FlintJobITSuite.scala | 12 +++++-- 3 files changed, 42 insertions(+), 9 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 083aa5d16..848bbe61f 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -250,7 +250,8 @@ class FlintSpark(val spark: SparkSession) extends Logging { try { flintClient .startTransaction(indexName, dataSourceName) - .initialLog(latest => latest.state == ACTIVE || latest.state == REFRESHING) + .initialLog(latest => + latest.state == ACTIVE || latest.state == REFRESHING || latest.state == FAILED) .transientLog(latest => latest.copy(state = DELETING)) .finalLog(latest => latest.copy(state = DELETED)) .commit(_ => { diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala index 0a325f7b0..9a89e9637 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala @@ -5,11 +5,16 @@ package org.opensearch.flint.spark +import java.time.Duration +import java.util.Collections.singletonList import java.util.concurrent.{ScheduledExecutorService, ScheduledFuture, TimeUnit} import scala.collection.concurrent.{Map, TrieMap} import scala.sys.addShutdownHook +import dev.failsafe.{Failsafe, RetryPolicy} +import dev.failsafe.event.ExecutionAttemptedEvent +import dev.failsafe.function.CheckedRunnable import org.opensearch.flint.core.FlintClient import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.{FAILED, REFRESHING} import org.opensearch.flint.core.metrics.{MetricConstants, MetricsUtil} @@ -122,11 +127,13 @@ class FlintSparkIndexMonitor( * ``` */ logError(s"Streaming job $name terminated with exception", e) - flintClient - .startTransaction(name, dataSourceName) - .initialLog(latest => latest.state == REFRESHING) - .finalLog(latest => latest.copy(state = FAILED)) - .commit(_ => {}) + retry { + flintClient + .startTransaction(name, dataSourceName) + .initialLog(latest => latest.state == REFRESHING) + .finalLog(latest => latest.copy(state = FAILED)) + .commit(_ => {}) + } } } else { logInfo(s"Index monitor for [$indexName] not found") @@ -189,6 +196,25 @@ class FlintSparkIndexMonitor( logWarning("Refreshing job not found") } } + + private def retry(operation: => Unit): Unit = { + // Retry policy for 3 times every 1 second + val retryPolicy = RetryPolicy + .builder[Unit]() + .handle(classOf[Throwable]) + .withDelay(Duration.ofSeconds(1)) + .withMaxRetries(3) + .onFailedAttempt((event: ExecutionAttemptedEvent[Unit]) => + logError("Attempt to update index state failed: " + event)) + .build() + + // Use the retry policy with Failsafe + Failsafe + .`with`(singletonList(retryPolicy)) + .run(new CheckedRunnable { + override def run(): Unit = operation + }) + } } object FlintSparkIndexMonitor extends Logging { diff --git a/integ-test/src/test/scala/org/apache/spark/sql/FlintJobITSuite.scala b/integ-test/src/test/scala/org/apache/spark/sql/FlintJobITSuite.scala index 4c52dfdc0..4ff43d347 100644 --- a/integ-test/src/test/scala/org/apache/spark/sql/FlintJobITSuite.scala +++ b/integ-test/src/test/scala/org/apache/spark/sql/FlintJobITSuite.scala @@ -42,6 +42,10 @@ class FlintJobITSuite extends FlintSparkSuite with JobTest { super.beforeAll() // initialized after the container is started osClient = new OSClient(new FlintOptions(openSearchOptions.asJava)) + } + + protected override def beforeEach(): Unit = { + super.beforeEach() createPartitionedMultiRowAddressTable(testTable) } @@ -49,8 +53,7 @@ class FlintJobITSuite extends FlintSparkSuite with JobTest { super.afterEach() deleteTestIndex(testIndex) - - waitJobStop(threadLocalFuture.get()) + sql(s"DROP TABLE $testTable") threadLocalFuture.remove() } @@ -160,8 +163,10 @@ class FlintJobITSuite extends FlintSparkSuite with JobTest { threadLocalFuture.set(startJob(query, jobRunId)) // Waiting from streaming job start and complete current batch + Thread.sleep(5000L) pollForResultAndAssert(_ => true, jobRunId) val activeJob = spark.streams.active.find(_.name == testIndex) + activeJob shouldBe defined awaitStreamingComplete(activeJob.get.id.toString) try { @@ -181,7 +186,8 @@ class FlintJobITSuite extends FlintSparkSuite with JobTest { case _: Exception => // expected } - // Assert Flint index transitioned to FAILED state + // Assert Flint index transitioned to FAILED state after waiting seconds + Thread.sleep(2000L) val latestId = Base64.getEncoder.encodeToString(testIndex.getBytes) latestLogEntry(latestId) should contain("state" -> "failed") } finally {