diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 309877fdd..e5805731b 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -566,7 +566,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w val indexName = index.name val indexLogEntry = index.latestLogEntry.get val internalSchedulingService = - new FlintSparkJobInternalSchedulingService(spark, flintIndexMonitor) + new FlintSparkJobInternalSchedulingService(spark, flintSparkConf, flintIndexMonitor) val externalSchedulingService = new FlintSparkJobExternalSchedulingService(flintAsyncQueryScheduler, flintSparkConf) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobInternalSchedulingService.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobInternalSchedulingService.scala index c6a8fc12b..8928357c7 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobInternalSchedulingService.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobInternalSchedulingService.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.flint.config.FlintSparkConf */ class FlintSparkJobInternalSchedulingService( spark: SparkSession, + flintSparkConf: FlintSparkConf, flintIndexMonitor: FlintSparkIndexMonitor) extends FlintSparkJobSchedulingService with Logging { @@ -78,7 +79,7 @@ class FlintSparkJobInternalSchedulingService( private def startRefreshingJob(index: FlintSparkIndex): Option[String] = { logInfo(s"Starting refreshing job for index ${index.name()}") val indexRefresh = FlintSparkIndexRefresh.create(index.name(), index) - val jobId = indexRefresh.start(spark, new FlintSparkConf(spark.conf.getAll.toMap.asJava)) + val jobId = indexRefresh.start(spark, flintSparkConf) // NOTE: Resolution for previous concurrency issue // This code addresses a previously identified concurrency issue with recoverIndex diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobSchedulingService.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobSchedulingService.scala index 6e25d8a8c..b813c7dd0 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobSchedulingService.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobSchedulingService.scala @@ -70,7 +70,7 @@ object FlintSparkJobSchedulingService { if (isExternalSchedulerEnabled(index)) { new FlintSparkJobExternalSchedulingService(flintAsyncQueryScheduler, flintSparkConf) } else { - new FlintSparkJobInternalSchedulingService(spark, flintIndexMonitor) + new FlintSparkJobInternalSchedulingService(spark, flintSparkConf, flintIndexMonitor) } }