Skip to content

Commit

Permalink
Resolution for previous concurrency issue
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger committed Oct 23, 2024
1 parent 0b6da30 commit f35a78c
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ object FlintSparkIndexOptions {
.externalSchedulerIntervalThreshold())
case (false, _, Some("external")) =>
throw new IllegalArgumentException(
"spark.flint.job.externalScheduler.enabled is false but refresh interval is set to external scheduler mode")
"spark.flint.job.externalScheduler.enabled is false but scheduler_mode is set to external")
case _ =>
updatedOptions += (SCHEDULER_MODE.toString -> SchedulerMode.INTERNAL.toString)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,9 @@ class FlintSparkJobInternalSchedulingService(
index: FlintSparkIndex,
action: AsyncQuerySchedulerAction): Option[String] = {
val indexName = index.name()

action match {
case AsyncQuerySchedulerAction.SCHEDULE => None // No-op
case AsyncQuerySchedulerAction.UPDATE =>
logInfo("Scheduling index state monitor")
flintIndexMonitor.startMonitor(indexName)
startRefreshingJob(index)
case AsyncQuerySchedulerAction.UNSCHEDULE =>
logInfo("Stopping index state monitor")
Expand All @@ -81,7 +78,17 @@ class FlintSparkJobInternalSchedulingService(
private def startRefreshingJob(index: FlintSparkIndex): Option[String] = {
logInfo(s"Starting refreshing job for index ${index.name()}")
val indexRefresh = FlintSparkIndexRefresh.create(index.name(), index)
indexRefresh.start(spark, new FlintSparkConf(spark.conf.getAll.toMap.asJava))
val jobId = indexRefresh.start(spark, new FlintSparkConf(spark.conf.getAll.toMap.asJava))

// NOTE: Resolution for previous concurrency issue
// This code addresses a previously identified concurrency issue with recoverIndex
// where scheduled FlintSparkIndexMonitorTask couldn't detect the active Spark streaming job ID. The issue
// was caused by starting the FlintSparkIndexMonitor before the Spark streaming job was fully
// initialized. In this fixed version, we start the monitor after the streaming job has been
// initiated, ensuring that the job ID is available for detection.
logInfo("Scheduling index state monitor")
flintIndexMonitor.startMonitor(index.name())
jobId
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ class FlintSparkIndexBuilderSuite
None,
None,
Some(
"spark.flint.job.externalScheduler.enabled is false but refresh interval is set to external scheduler mode")),
"spark.flint.job.externalScheduler.enabled is false but scheduler_mode is set to external")),
(
"set external mode when interval above threshold and no mode specified",
true,
Expand Down

0 comments on commit f35a78c

Please sign in to comment.