diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index fe2f68333..e9913ccd7 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -58,7 +58,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w /** Flint Spark index monitor */ val flintIndexMonitor: FlintSparkIndexMonitor = - new FlintSparkIndexMonitor(spark, flintMetadataLogService) + new FlintSparkIndexMonitor(spark, flintClient, flintMetadataLogService) /** * Create index builder for creating index with fluent API. diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala index 29acaea6b..2eb99ef34 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala @@ -18,6 +18,7 @@ import dev.failsafe.event.ExecutionAttemptedEvent import dev.failsafe.function.CheckedRunnable import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.{FAILED, REFRESHING} import org.opensearch.flint.common.metadata.log.FlintMetadataLogService +import org.opensearch.flint.core.FlintClient import org.opensearch.flint.core.logging.ExceptionMessages.extractRootCause import org.opensearch.flint.core.metrics.{MetricConstants, MetricsUtil} @@ -31,11 +32,14 @@ import org.apache.spark.sql.flint.newDaemonThreadPoolScheduledExecutor * * @param spark * Spark session + * @param flintClient + * Flint client * @param flintMetadataLogService * Flint metadata log service */ class FlintSparkIndexMonitor( spark: SparkSession, + flintClient: FlintClient, flintMetadataLogService: FlintMetadataLogService) extends Logging { @@ -158,6 +162,11 @@ class FlintSparkIndexMonitor( if (isStreamingJobActive(indexName)) { logInfo("Streaming job is still active") flintMetadataLogService.recordHeartbeat(indexName) + + if (!flintClient.exists(indexName)) { + logWarning("Streaming job is active but data is deleted") + stopStreamingJobAndMonitor(indexName) + } } else { logError("Streaming job is not active. Cancelling monitor task") stopMonitor(indexName) @@ -172,10 +181,7 @@ class FlintSparkIndexMonitor( // Stop streaming job and its monitor if max retry limit reached if (errorCnt >= MAX_ERROR_COUNT) { - logInfo(s"Terminating streaming job and index monitor for $indexName") - stopStreamingJob(indexName) - stopMonitor(indexName) - logInfo(s"Streaming job and index monitor terminated") + stopStreamingJobAndMonitor(indexName) } } } @@ -184,6 +190,13 @@ class FlintSparkIndexMonitor( private def isStreamingJobActive(indexName: String): Boolean = spark.streams.active.exists(_.name == indexName) + private def stopStreamingJobAndMonitor(indexName: String): Unit = { + logInfo(s"Terminating streaming job and index monitor for $indexName") + stopStreamingJob(indexName) + stopMonitor(indexName) + logInfo(s"Streaming job and index monitor terminated") + } + private def stopStreamingJob(indexName: String): Unit = { val job = spark.streams.active.find(_.name == indexName) if (job.isDefined) { diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala index ad5029fcb..8d8311b11 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala @@ -9,9 +9,11 @@ import java.util.Base64 import java.util.concurrent.TimeUnit import scala.collection.JavaConverters.mapAsJavaMapConverter +import scala.concurrent.duration.{DurationInt, FiniteDuration} import org.mockito.ArgumentMatchers.any import org.mockito.Mockito.{doAnswer, spy} +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest import org.opensearch.client.RequestOptions import org.opensearch.flint.OpenSearchTransactionSuite @@ -148,25 +150,38 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc spark.streams.active.exists(_.name == testFlintIndex) shouldBe false } + test("monitor task and streaming job should terminate if data index is deleted") { + val task = FlintSparkIndexMonitor.indexMonitorTracker(testFlintIndex) + openSearchClient + .indices() + .delete(new DeleteIndexRequest(testFlintIndex), RequestOptions.DEFAULT) + + // Wait for index monitor execution and assert + waitForMonitorTaskRun() + task.isCancelled shouldBe true + spark.streams.active.exists(_.name == testFlintIndex) shouldBe false + + // Assert index state is still refreshing + val latestLog = latestLogEntry(testLatestId) + latestLog should contain("state" -> "refreshing") + } + test("await monitor terminated without exception should stay refreshing state") { // Setup a timer to terminate the streaming job - new Thread(() => { - Thread.sleep(3000L) + asyncAfter(3.seconds) { spark.streams.active.find(_.name == testFlintIndex).get.stop() - }).start() + } // Await until streaming job terminated flint.flintIndexMonitor.awaitMonitor() - // Assert index state is active now + // Assert index state is still refreshing val latestLog = latestLogEntry(testLatestId) latestLog should contain("state" -> "refreshing") } test("await monitor terminated with exception should update index state to failed with error") { - new Thread(() => { - Thread.sleep(3000L) - + asyncAfter(3.seconds) { // Set Flint index readonly to simulate streaming job exception val settings = Map("index.blocks.write" -> true) val request = new UpdateSettingsRequest(testFlintIndex).settings(settings.asJava) @@ -178,7 +193,7 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc | PARTITION (year=2023, month=6) | VALUES ('Test', 35, 'Vancouver') | """.stripMargin) - }).start() + } // Await until streaming job terminated flint.flintIndexMonitor.awaitMonitor() @@ -204,6 +219,19 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc latestLog should not contain "error" } + private def async(block: => Unit): Unit = { + new Thread(() => { + block + }).start() + } + + private def asyncAfter(delay: FiniteDuration)(block: => Unit): Unit = { + new Thread(() => { + Thread.sleep(delay.toMillis) + block + }).start() + } + private def getLatestTimestamp: (Long, Long) = { val latest = latestLogEntry(testLatestId) (latest("jobStartTime").asInstanceOf[Long], latest("lastUpdateTime").asInstanceOf[Long]) diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala index 7e0b68376..751647149 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala @@ -69,7 +69,8 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit flint.deleteIndex(testIndex) flint.vacuumIndex(testIndex) } catch { - case _: IllegalStateException => + // Forcefully delete index data and log entry in case of any errors, such as version conflict + case _: Exception => if (openSearchClient .indices() .exists(new GetIndexRequest(testIndex), RequestOptions.DEFAULT)) {