From 5d8ae395d71bb459e52cdd06dacad679377ba004 Mon Sep 17 00:00:00 2001 From: Louis Chu Date: Tue, 8 Oct 2024 07:44:26 -0700 Subject: [PATCH] Resolve comments Signed-off-by: Louis Chu --- .../scala/org/opensearch/flint/spark/FlintSpark.scala | 11 ++++++----- .../FlintSparkJobExternalSchedulingService.scala | 10 +++++----- .../FlintSparkJobInternalSchedulingService.scala | 10 +++++----- .../scheduler/FlintSparkJobSchedulingService.scala | 10 ++++++---- 4 files changed, 22 insertions(+), 19 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 50b970b0f..469e0dd55 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -347,7 +347,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w .transientLog(latest => latest.copy(state = RECOVERING, createTime = System.currentTimeMillis())) .finalLog(latest => { - latest.copy(state = jobSchedulingService.finalStateForUpdate) + latest.copy(state = jobSchedulingService.stateTransitions.finalStateForUpdate) }) .commit(_ => { flintIndexMetadataService.updateIndexMetadata(indexName, updatedIndex.metadata()) @@ -536,10 +536,11 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w tx .initialLog(latest => // Index in external scheduler mode should be in active or refreshing state - Set(jobSchedulingService.initialStateForUnschedule).contains( + Set(jobSchedulingService.stateTransitions.initialStateForUnschedule).contains( latest.state) && latest.entryVersion == indexLogEntry.entryVersion) .transientLog(latest => latest.copy(state = UPDATING)) - .finalLog(latest => latest.copy(state = jobSchedulingService.finalStateForUnschedule)) + .finalLog(latest => + latest.copy(state = jobSchedulingService.stateTransitions.finalStateForUnschedule)) .commit(_ => { flintIndexMetadataService.updateIndexMetadata(indexName, index.metadata) logInfo("Update index options complete") @@ -561,11 +562,11 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w flintIndexMonitor) tx .initialLog(latest => - latest.state == jobSchedulingService.initialStateForUpdate && latest.entryVersion == indexLogEntry.entryVersion) + latest.state == jobSchedulingService.stateTransitions.initialStateForUpdate && latest.entryVersion == indexLogEntry.entryVersion) .transientLog(latest => latest.copy(state = UPDATING, createTime = System.currentTimeMillis())) .finalLog(latest => { - latest.copy(state = jobSchedulingService.finalStateForUpdate) + latest.copy(state = jobSchedulingService.stateTransitions.finalStateForUpdate) }) .commit(_ => { flintIndexMetadataService.updateIndexMetadata(indexName, index.metadata) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobExternalSchedulingService.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobExternalSchedulingService.scala index 79f6393b5..8aac60aee 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobExternalSchedulingService.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobExternalSchedulingService.scala @@ -36,11 +36,11 @@ class FlintSparkJobExternalSchedulingService( extends FlintSparkJobSchedulingService with Logging { - initialStateForUpdate = IndexState.ACTIVE - finalStateForUpdate = IndexState.ACTIVE - - initialStateForUnschedule = IndexState.ACTIVE - finalStateForUnschedule = IndexState.ACTIVE + override val stateTransitions: StateTransitions = StateTransitions( + initialStateForUpdate = IndexState.ACTIVE, + finalStateForUpdate = IndexState.ACTIVE, + initialStateForUnschedule = IndexState.ACTIVE, + finalStateForUnschedule = IndexState.ACTIVE) override def handleJob( index: FlintSparkIndex, diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobInternalSchedulingService.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobInternalSchedulingService.scala index aa1425bb1..d22eff2c9 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobInternalSchedulingService.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobInternalSchedulingService.scala @@ -34,11 +34,11 @@ class FlintSparkJobInternalSchedulingService( extends FlintSparkJobSchedulingService with Logging { - initialStateForUpdate = IndexState.ACTIVE - finalStateForUpdate = IndexState.REFRESHING - - initialStateForUnschedule = IndexState.REFRESHING - finalStateForUnschedule = IndexState.ACTIVE + override val stateTransitions: StateTransitions = StateTransitions( + initialStateForUpdate = IndexState.ACTIVE, + finalStateForUpdate = IndexState.REFRESHING, + initialStateForUnschedule = IndexState.REFRESHING, + finalStateForUnschedule = IndexState.ACTIVE) /** * Handles job-related actions for a given Flint Spark index. diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobSchedulingService.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobSchedulingService.scala index 4b1537173..6e25d8a8c 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobSchedulingService.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobSchedulingService.scala @@ -18,11 +18,13 @@ import org.apache.spark.sql.flint.config.FlintSparkConf */ trait FlintSparkJobSchedulingService { - var initialStateForUpdate: IndexState = _ - var finalStateForUpdate: IndexState = _ + case class StateTransitions( + initialStateForUpdate: IndexState, + finalStateForUpdate: IndexState, + initialStateForUnschedule: IndexState, + finalStateForUnschedule: IndexState) - var initialStateForUnschedule: IndexState = _ - var finalStateForUnschedule: IndexState = _ + val stateTransitions: StateTransitions /** * Handles a job action for a given Flint Spark index.