Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 0.5-nexus] Resolution for previous concurrency issue #808

Merged
merged 1 commit into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
val indexName = index.name
val indexLogEntry = index.latestLogEntry.get
val internalSchedulingService =
new FlintSparkJobInternalSchedulingService(spark, flintIndexMonitor)
new FlintSparkJobInternalSchedulingService(spark, flintSparkConf, flintIndexMonitor)
val externalSchedulingService =
new FlintSparkJobExternalSchedulingService(flintAsyncQueryScheduler, flintSparkConf)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ object FlintSparkIndexOptions {
.externalSchedulerIntervalThreshold())
case (false, _, Some("external")) =>
throw new IllegalArgumentException(
"spark.flint.job.externalScheduler.enabled is false but refresh interval is set to external scheduler mode")
"spark.flint.job.externalScheduler.enabled is false but scheduler_mode is set to external")
case _ =>
updatedOptions += (SCHEDULER_MODE.toString -> SchedulerMode.INTERNAL.toString)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.sql.flint.config.FlintSparkConf
*/
class FlintSparkJobInternalSchedulingService(
spark: SparkSession,
flintSparkConf: FlintSparkConf,
flintIndexMonitor: FlintSparkIndexMonitor)
extends FlintSparkJobSchedulingService
with Logging {
Expand All @@ -55,12 +56,9 @@ class FlintSparkJobInternalSchedulingService(
index: FlintSparkIndex,
action: AsyncQuerySchedulerAction): Option[String] = {
val indexName = index.name()

action match {
case AsyncQuerySchedulerAction.SCHEDULE => None // No-op
case AsyncQuerySchedulerAction.UPDATE =>
logInfo("Scheduling index state monitor")
flintIndexMonitor.startMonitor(indexName)
startRefreshingJob(index)
case AsyncQuerySchedulerAction.UNSCHEDULE =>
logInfo("Stopping index state monitor")
Expand All @@ -81,7 +79,17 @@ class FlintSparkJobInternalSchedulingService(
private def startRefreshingJob(index: FlintSparkIndex): Option[String] = {
logInfo(s"Starting refreshing job for index ${index.name()}")
val indexRefresh = FlintSparkIndexRefresh.create(index.name(), index)
indexRefresh.start(spark, new FlintSparkConf(spark.conf.getAll.toMap.asJava))
val jobId = indexRefresh.start(spark, flintSparkConf)

// NOTE: Resolution for previous concurrency issue
// This code addresses a previously identified concurrency issue with recoverIndex
// where scheduled FlintSparkIndexMonitorTask couldn't detect the active Spark streaming job ID. The issue
// was caused by starting the FlintSparkIndexMonitor before the Spark streaming job was fully
// initialized. In this fixed version, we start the monitor after the streaming job has been
// initiated, ensuring that the job ID is available for detection.
logInfo("Scheduling index state monitor")
flintIndexMonitor.startMonitor(index.name())
jobId
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ object FlintSparkJobSchedulingService {
if (isExternalSchedulerEnabled(index)) {
new FlintSparkJobExternalSchedulingService(flintAsyncQueryScheduler, flintSparkConf)
} else {
new FlintSparkJobInternalSchedulingService(spark, flintIndexMonitor)
new FlintSparkJobInternalSchedulingService(spark, flintSparkConf, flintIndexMonitor)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ class FlintSparkIndexBuilderSuite
None,
None,
Some(
"spark.flint.job.externalScheduler.enabled is false but refresh interval is set to external scheduler mode")),
"spark.flint.job.externalScheduler.enabled is false but scheduler_mode is set to external")),
(
"set external mode when interval above threshold and no mode specified",
true,
Expand Down
Loading