Skip to content

Commit

Permalink
Detect streaming job state in update task
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Nov 1, 2023
1 parent 9e6eaa9 commit 49831a9
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package org.opensearch.flint.spark

import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit}
import java.util.concurrent.{Executors, ScheduledExecutorService, ScheduledFuture, TimeUnit}

import scala.collection.JavaConverters._

Expand Down Expand Up @@ -288,15 +288,28 @@ class FlintSpark(val spark: SparkSession) extends Logging {
spark.streams.active.exists(_.name == indexName)

private def scheduleIndexStateUpdate(indexName: String): Unit = {
executor.scheduleAtFixedRate(
var task: ScheduledFuture[_] = null // avoid forward reference compile error at task.cancel()
task = executor.scheduleAtFixedRate(
() => {
logInfo("Scheduler triggers index log entry update")
logInfo(s"Scheduler triggers index log entry update for $indexName")
try {
flintClient
.startTransaction(indexName, dataSourceName)
.initialLog(latest => latest.state == REFRESHING)
.finalLog(latest => latest) // timestamp will update automatically
.commit(latest => logInfo("Updating log entry to " + latest))
if (isIncrementalRefreshing(indexName)) {
logInfo("Streaming job is still active")
flintClient
.startTransaction(indexName, dataSourceName)
.initialLog(latest => latest.state == REFRESHING)
.finalLog(latest => latest) // timestamp will update automatically
.commit(_ => {})
} else {
logError("Streaming job is not active. Cancelling update task")
flintClient
.startTransaction(indexName, dataSourceName)
.initialLog(_ => true)
.finalLog(latest => latest.copy(state = FAILED))
.commit(_ => {})
task.cancel(true)
logInfo("Update task is cancelled")
}
} catch {
case e: Exception =>
logError("Failed to update index log entry", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import org.scalatest.matchers.should.Matchers

import org.apache.spark.sql.Row

/**
* This suite doesn't enable transaction to avoid side effect of scheduled update task.
*/
class FlintSparkIndexJobSqlITSuite extends FlintSparkSuite with Matchers {

private val testTable = "spark_catalog.default.index_job_test"
Expand Down

0 comments on commit 49831a9

Please sign in to comment.