diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index fe2f68333..e9913ccd7 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -58,7 +58,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w /** Flint Spark index monitor */ val flintIndexMonitor: FlintSparkIndexMonitor = - new FlintSparkIndexMonitor(spark, flintMetadataLogService) + new FlintSparkIndexMonitor(spark, flintClient, flintMetadataLogService) /** * Create index builder for creating index with fluent API. diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala index 29acaea6b..32e55613d 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala @@ -18,6 +18,7 @@ import dev.failsafe.event.ExecutionAttemptedEvent import dev.failsafe.function.CheckedRunnable import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.{FAILED, REFRESHING} import org.opensearch.flint.common.metadata.log.FlintMetadataLogService +import org.opensearch.flint.core.FlintClient import org.opensearch.flint.core.logging.ExceptionMessages.extractRootCause import org.opensearch.flint.core.metrics.{MetricConstants, MetricsUtil} @@ -36,6 +37,7 @@ import org.apache.spark.sql.flint.newDaemonThreadPoolScheduledExecutor */ class FlintSparkIndexMonitor( spark: SparkSession, + flintClient: FlintClient, flintMetadataLogService: FlintMetadataLogService) extends Logging { @@ -157,7 +159,13 @@ class FlintSparkIndexMonitor( try { if (isStreamingJobActive(indexName)) { logInfo("Streaming job is still active") - flintMetadataLogService.recordHeartbeat(indexName) + + if (flintClient.exists(indexName)) { + flintMetadataLogService.recordHeartbeat(indexName) + } else { + logInfo("Streaming job is active but data is deleted") + stopStreamingJobAndMonitor(indexName) + } } else { logError("Streaming job is not active. Cancelling monitor task") stopMonitor(indexName) @@ -172,10 +180,7 @@ class FlintSparkIndexMonitor( // Stop streaming job and its monitor if max retry limit reached if (errorCnt >= MAX_ERROR_COUNT) { - logInfo(s"Terminating streaming job and index monitor for $indexName") - stopStreamingJob(indexName) - stopMonitor(indexName) - logInfo(s"Streaming job and index monitor terminated") + stopStreamingJobAndMonitor(indexName) } } } @@ -184,6 +189,13 @@ class FlintSparkIndexMonitor( private def isStreamingJobActive(indexName: String): Boolean = spark.streams.active.exists(_.name == indexName) + private def stopStreamingJobAndMonitor(indexName: String): Unit = { + logInfo(s"Terminating streaming job and index monitor for $indexName") + stopStreamingJob(indexName) + stopMonitor(indexName) + logInfo(s"Streaming job and index monitor terminated") + } + private def stopStreamingJob(indexName: String): Unit = { val job = spark.streams.active.find(_.name == indexName) if (job.isDefined) { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala index ad5029fcb..a16aac624 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala @@ -9,9 +9,11 @@ import java.util.Base64 import java.util.concurrent.TimeUnit import scala.collection.JavaConverters.mapAsJavaMapConverter +import scala.concurrent.duration.{DurationInt, FiniteDuration} import org.mockito.ArgumentMatchers.any import org.mockito.Mockito.{doAnswer, spy} +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest import org.opensearch.client.RequestOptions import org.opensearch.flint.OpenSearchTransactionSuite @@ -148,12 +150,25 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc spark.streams.active.exists(_.name == testFlintIndex) shouldBe false } + test("monitor task and streaming job should terminate if data index is deleted") { + val task = FlintSparkIndexMonitor.indexMonitorTracker(testFlintIndex) + async { + openSearchClient + .indices() + .delete(new DeleteIndexRequest(testFlintIndex), RequestOptions.DEFAULT) + } + + // Wait for index monitor execution and assert + Thread.sleep(5000) + task.isCancelled shouldBe true + spark.streams.active.exists(_.name == testFlintIndex) shouldBe false + } + test("await monitor terminated without exception should stay refreshing state") { // Setup a timer to terminate the streaming job - new Thread(() => { - Thread.sleep(3000L) + asyncAfter(3.seconds) { spark.streams.active.find(_.name == testFlintIndex).get.stop() - }).start() + } // Await until streaming job terminated flint.flintIndexMonitor.awaitMonitor() @@ -164,9 +179,7 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc } test("await monitor terminated with exception should update index state to failed with error") { - new Thread(() => { - Thread.sleep(3000L) - + asyncAfter(3.seconds) { // Set Flint index readonly to simulate streaming job exception val settings = Map("index.blocks.write" -> true) val request = new UpdateSettingsRequest(testFlintIndex).settings(settings.asJava) @@ -178,7 +191,7 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc | PARTITION (year=2023, month=6) | VALUES ('Test', 35, 'Vancouver') | """.stripMargin) - }).start() + } // Await until streaming job terminated flint.flintIndexMonitor.awaitMonitor() @@ -204,6 +217,19 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc latestLog should not contain "error" } + private def async(block: => Unit): Unit = { + new Thread(() => { + block + }).start() + } + + private def asyncAfter(delay: FiniteDuration)(block: => Unit): Unit = { + new Thread(() => { + Thread.sleep(delay.toMillis) + block + }).start() + } + private def getLatestTimestamp: (Long, Long) = { val latest = latestLogEntry(testLatestId) (latest("jobStartTime").asInstanceOf[Long], latest("lastUpdateTime").asInstanceOf[Long])