From 6cf9a45c952dba64765a78f42b2dfda487924589 Mon Sep 17 00:00:00 2001 From: Louis Chu Date: Wed, 23 Oct 2024 11:54:44 -0700 Subject: [PATCH 1/2] Resolution for previous concurrency issue Signed-off-by: Louis Chu --- .../flint/spark/FlintSparkIndexOptions.scala | 2 +- .../FlintSparkJobInternalSchedulingService.scala | 15 +++++++++++---- .../flint/spark/FlintSparkIndexBuilderSuite.scala | 2 +- 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala index 4bfc50c55..fc1a611ef 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala @@ -257,7 +257,7 @@ object FlintSparkIndexOptions { .externalSchedulerIntervalThreshold()) case (false, _, Some("external")) => throw new IllegalArgumentException( - "spark.flint.job.externalScheduler.enabled is false but refresh interval is set to external scheduler mode") + "spark.flint.job.externalScheduler.enabled is false but scheduler_mode is set to external") case _ => updatedOptions += (SCHEDULER_MODE.toString -> SchedulerMode.INTERNAL.toString) } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobInternalSchedulingService.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobInternalSchedulingService.scala index d22eff2c9..c6a8fc12b 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobInternalSchedulingService.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobInternalSchedulingService.scala @@ -55,12 +55,9 @@ class FlintSparkJobInternalSchedulingService( index: FlintSparkIndex, action: AsyncQuerySchedulerAction): Option[String] = { val indexName = index.name() - action match { case AsyncQuerySchedulerAction.SCHEDULE => None // No-op case AsyncQuerySchedulerAction.UPDATE => - logInfo("Scheduling index state monitor") - flintIndexMonitor.startMonitor(indexName) startRefreshingJob(index) case AsyncQuerySchedulerAction.UNSCHEDULE => logInfo("Stopping index state monitor") @@ -81,7 +78,17 @@ class FlintSparkJobInternalSchedulingService( private def startRefreshingJob(index: FlintSparkIndex): Option[String] = { logInfo(s"Starting refreshing job for index ${index.name()}") val indexRefresh = FlintSparkIndexRefresh.create(index.name(), index) - indexRefresh.start(spark, new FlintSparkConf(spark.conf.getAll.toMap.asJava)) + val jobId = indexRefresh.start(spark, new FlintSparkConf(spark.conf.getAll.toMap.asJava)) + + // NOTE: Resolution for previous concurrency issue + // This code addresses a previously identified concurrency issue with recoverIndex + // where scheduled FlintSparkIndexMonitorTask couldn't detect the active Spark streaming job ID. The issue + // was caused by starting the FlintSparkIndexMonitor before the Spark streaming job was fully + // initialized. In this fixed version, we start the monitor after the streaming job has been + // initiated, ensuring that the job ID is available for detection. + logInfo("Scheduling index state monitor") + flintIndexMonitor.startMonitor(index.name()) + jobId } /** diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexBuilderSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexBuilderSuite.scala index 063c32074..4ff5b5adb 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexBuilderSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexBuilderSuite.scala @@ -208,7 +208,7 @@ class FlintSparkIndexBuilderSuite None, None, Some( - "spark.flint.job.externalScheduler.enabled is false but refresh interval is set to external scheduler mode")), + "spark.flint.job.externalScheduler.enabled is false but scheduler_mode is set to external")), ( "set external mode when interval above threshold and no mode specified", true, From 4dcc1eae43dcaec7c83a22e188c7c9f5195bc938 Mon Sep 17 00:00:00 2001 From: Louis Chu Date: Thu, 24 Oct 2024 10:37:46 -0700 Subject: [PATCH 2/2] resolve comments Signed-off-by: Louis Chu --- .../src/main/scala/org/opensearch/flint/spark/FlintSpark.scala | 2 +- .../scheduler/FlintSparkJobInternalSchedulingService.scala | 3 ++- .../flint/spark/scheduler/FlintSparkJobSchedulingService.scala | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 309877fdd..e5805731b 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -566,7 +566,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w val indexName = index.name val indexLogEntry = index.latestLogEntry.get val internalSchedulingService = - new FlintSparkJobInternalSchedulingService(spark, flintIndexMonitor) + new FlintSparkJobInternalSchedulingService(spark, flintSparkConf, flintIndexMonitor) val externalSchedulingService = new FlintSparkJobExternalSchedulingService(flintAsyncQueryScheduler, flintSparkConf) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobInternalSchedulingService.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobInternalSchedulingService.scala index c6a8fc12b..8928357c7 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobInternalSchedulingService.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobInternalSchedulingService.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.flint.config.FlintSparkConf */ class FlintSparkJobInternalSchedulingService( spark: SparkSession, + flintSparkConf: FlintSparkConf, flintIndexMonitor: FlintSparkIndexMonitor) extends FlintSparkJobSchedulingService with Logging { @@ -78,7 +79,7 @@ class FlintSparkJobInternalSchedulingService( private def startRefreshingJob(index: FlintSparkIndex): Option[String] = { logInfo(s"Starting refreshing job for index ${index.name()}") val indexRefresh = FlintSparkIndexRefresh.create(index.name(), index) - val jobId = indexRefresh.start(spark, new FlintSparkConf(spark.conf.getAll.toMap.asJava)) + val jobId = indexRefresh.start(spark, flintSparkConf) // NOTE: Resolution for previous concurrency issue // This code addresses a previously identified concurrency issue with recoverIndex diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobSchedulingService.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobSchedulingService.scala index 6e25d8a8c..b813c7dd0 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobSchedulingService.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobSchedulingService.scala @@ -70,7 +70,7 @@ object FlintSparkJobSchedulingService { if (isExternalSchedulerEnabled(index)) { new FlintSparkJobExternalSchedulingService(flintAsyncQueryScheduler, flintSparkConf) } else { - new FlintSparkJobInternalSchedulingService(spark, flintIndexMonitor) + new FlintSparkJobInternalSchedulingService(spark, flintSparkConf, flintIndexMonitor) } }