diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index d20985d82..532bd8e60 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -579,7 +579,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w val indexName = index.name val indexLogEntry = index.latestLogEntry.get val internalSchedulingService = - new FlintSparkJobInternalSchedulingService(spark, flintIndexMonitor) + new FlintSparkJobInternalSchedulingService(spark, flintSparkConf, flintIndexMonitor) val externalSchedulingService = new FlintSparkJobExternalSchedulingService(flintAsyncQueryScheduler, flintSparkConf) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala index 053245fe3..9b58a696c 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala @@ -256,7 +256,7 @@ object FlintSparkIndexOptions { .externalSchedulerIntervalThreshold()) case (false, _, Some("external")) => throw new IllegalArgumentException( - "spark.flint.job.externalScheduler.enabled is false but refresh interval is set to external scheduler mode") + "spark.flint.job.externalScheduler.enabled is false but scheduler_mode is set to external") case _ => updatedOptions += (SCHEDULER_MODE.toString -> SchedulerMode.INTERNAL.toString) } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobInternalSchedulingService.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobInternalSchedulingService.scala index d22eff2c9..8928357c7 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobInternalSchedulingService.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobInternalSchedulingService.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.flint.config.FlintSparkConf */ class FlintSparkJobInternalSchedulingService( spark: SparkSession, + flintSparkConf: FlintSparkConf, flintIndexMonitor: FlintSparkIndexMonitor) extends FlintSparkJobSchedulingService with Logging { @@ -55,12 +56,9 @@ class FlintSparkJobInternalSchedulingService( index: FlintSparkIndex, action: AsyncQuerySchedulerAction): Option[String] = { val indexName = index.name() - action match { case AsyncQuerySchedulerAction.SCHEDULE => None // No-op case AsyncQuerySchedulerAction.UPDATE => - logInfo("Scheduling index state monitor") - flintIndexMonitor.startMonitor(indexName) startRefreshingJob(index) case AsyncQuerySchedulerAction.UNSCHEDULE => logInfo("Stopping index state monitor") @@ -81,7 +79,17 @@ class FlintSparkJobInternalSchedulingService( private def startRefreshingJob(index: FlintSparkIndex): Option[String] = { logInfo(s"Starting refreshing job for index ${index.name()}") val indexRefresh = FlintSparkIndexRefresh.create(index.name(), index) - indexRefresh.start(spark, new FlintSparkConf(spark.conf.getAll.toMap.asJava)) + val jobId = indexRefresh.start(spark, flintSparkConf) + + // NOTE: Resolution for previous concurrency issue + // This code addresses a previously identified concurrency issue with recoverIndex + // where scheduled FlintSparkIndexMonitorTask couldn't detect the active Spark streaming job ID. The issue + // was caused by starting the FlintSparkIndexMonitor before the Spark streaming job was fully + // initialized. In this fixed version, we start the monitor after the streaming job has been + // initiated, ensuring that the job ID is available for detection. + logInfo("Scheduling index state monitor") + flintIndexMonitor.startMonitor(index.name()) + jobId } /** diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobSchedulingService.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobSchedulingService.scala index 6e25d8a8c..b813c7dd0 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobSchedulingService.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobSchedulingService.scala @@ -70,7 +70,7 @@ object FlintSparkJobSchedulingService { if (isExternalSchedulerEnabled(index)) { new FlintSparkJobExternalSchedulingService(flintAsyncQueryScheduler, flintSparkConf) } else { - new FlintSparkJobInternalSchedulingService(spark, flintIndexMonitor) + new FlintSparkJobInternalSchedulingService(spark, flintSparkConf, flintIndexMonitor) } } diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexBuilderSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexBuilderSuite.scala index ec2606ca0..80b788253 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexBuilderSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexBuilderSuite.scala @@ -208,7 +208,7 @@ class FlintSparkIndexBuilderSuite None, None, Some( - "spark.flint.job.externalScheduler.enabled is false but refresh interval is set to external scheduler mode")), + "spark.flint.job.externalScheduler.enabled is false but scheduler_mode is set to external")), ( "set external mode when interval below threshold and no mode specified", true,