diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index d199eb7b2..648258853 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -5,7 +5,7 @@ package org.opensearch.flint.spark -import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.{Executors, ScheduledExecutorService, ScheduledFuture, TimeUnit} import scala.collection.JavaConverters._ @@ -288,15 +288,28 @@ class FlintSpark(val spark: SparkSession) extends Logging { spark.streams.active.exists(_.name == indexName) private def scheduleIndexStateUpdate(indexName: String): Unit = { - executor.scheduleAtFixedRate( + var task: ScheduledFuture[_] = null // avoid forward reference compile error at task.cancel() + task = executor.scheduleAtFixedRate( () => { - logInfo("Scheduler triggers index log entry update") + logInfo(s"Scheduler triggers index log entry update for $indexName") try { - flintClient - .startTransaction(indexName, dataSourceName) - .initialLog(latest => latest.state == REFRESHING) - .finalLog(latest => latest) // timestamp will update automatically - .commit(latest => logInfo("Updating log entry to " + latest)) + if (isIncrementalRefreshing(indexName)) { + logInfo("Streaming job is still active") + flintClient + .startTransaction(indexName, dataSourceName) + .initialLog(latest => latest.state == REFRESHING) + .finalLog(latest => latest) // timestamp will update automatically + .commit(_ => {}) + } else { + logError("Streaming job is not active. Cancelling update task") + flintClient + .startTransaction(indexName, dataSourceName) + .initialLog(_ => true) + .finalLog(latest => latest.copy(state = FAILED)) + .commit(_ => {}) + task.cancel(true) + logInfo("Update task is cancelled") + } } catch { case e: Exception => logError("Failed to update index log entry", e) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobSqlITSuite.scala index 09fad9107..ddbfeeb16 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobSqlITSuite.scala @@ -14,6 +14,9 @@ import org.scalatest.matchers.should.Matchers import org.apache.spark.sql.Row +/** + * This suite doesn't enable transaction to avoid side effect of scheduled update task. + */ class FlintSparkIndexJobSqlITSuite extends FlintSparkSuite with Matchers { private val testTable = "spark_catalog.default.index_job_test"