Skip to content

Commit

Permalink
resolve comments
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger committed Oct 24, 2024
1 parent 6cf9a45 commit 4dcc1ea
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
val indexName = index.name
val indexLogEntry = index.latestLogEntry.get
val internalSchedulingService =
new FlintSparkJobInternalSchedulingService(spark, flintIndexMonitor)
new FlintSparkJobInternalSchedulingService(spark, flintSparkConf, flintIndexMonitor)
val externalSchedulingService =
new FlintSparkJobExternalSchedulingService(flintAsyncQueryScheduler, flintSparkConf)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.sql.flint.config.FlintSparkConf
*/
class FlintSparkJobInternalSchedulingService(
spark: SparkSession,
flintSparkConf: FlintSparkConf,
flintIndexMonitor: FlintSparkIndexMonitor)
extends FlintSparkJobSchedulingService
with Logging {
Expand Down Expand Up @@ -78,7 +79,7 @@ class FlintSparkJobInternalSchedulingService(
private def startRefreshingJob(index: FlintSparkIndex): Option[String] = {
logInfo(s"Starting refreshing job for index ${index.name()}")
val indexRefresh = FlintSparkIndexRefresh.create(index.name(), index)
val jobId = indexRefresh.start(spark, new FlintSparkConf(spark.conf.getAll.toMap.asJava))
val jobId = indexRefresh.start(spark, flintSparkConf)

// NOTE: Resolution for previous concurrency issue
// This code addresses a previously identified concurrency issue with recoverIndex
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ object FlintSparkJobSchedulingService {
if (isExternalSchedulerEnabled(index)) {
new FlintSparkJobExternalSchedulingService(flintAsyncQueryScheduler, flintSparkConf)
} else {
new FlintSparkJobInternalSchedulingService(spark, flintIndexMonitor)
new FlintSparkJobInternalSchedulingService(spark, flintSparkConf, flintIndexMonitor)
}
}

Expand Down

0 comments on commit 4dcc1ea

Please sign in to comment.