Skip to content

Commit

Permalink
Resolve comments
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger committed Oct 8, 2024
1 parent 4a104cb commit 5d8ae39
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
.transientLog(latest =>
latest.copy(state = RECOVERING, createTime = System.currentTimeMillis()))
.finalLog(latest => {
latest.copy(state = jobSchedulingService.finalStateForUpdate)
latest.copy(state = jobSchedulingService.stateTransitions.finalStateForUpdate)
})
.commit(_ => {
flintIndexMetadataService.updateIndexMetadata(indexName, updatedIndex.metadata())
Expand Down Expand Up @@ -536,10 +536,11 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
tx
.initialLog(latest =>
// Index in external scheduler mode should be in active or refreshing state
Set(jobSchedulingService.initialStateForUnschedule).contains(
Set(jobSchedulingService.stateTransitions.initialStateForUnschedule).contains(
latest.state) && latest.entryVersion == indexLogEntry.entryVersion)
.transientLog(latest => latest.copy(state = UPDATING))
.finalLog(latest => latest.copy(state = jobSchedulingService.finalStateForUnschedule))
.finalLog(latest =>
latest.copy(state = jobSchedulingService.stateTransitions.finalStateForUnschedule))
.commit(_ => {
flintIndexMetadataService.updateIndexMetadata(indexName, index.metadata)
logInfo("Update index options complete")
Expand All @@ -561,11 +562,11 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
flintIndexMonitor)
tx
.initialLog(latest =>
latest.state == jobSchedulingService.initialStateForUpdate && latest.entryVersion == indexLogEntry.entryVersion)
latest.state == jobSchedulingService.stateTransitions.initialStateForUpdate && latest.entryVersion == indexLogEntry.entryVersion)
.transientLog(latest =>
latest.copy(state = UPDATING, createTime = System.currentTimeMillis()))
.finalLog(latest => {
latest.copy(state = jobSchedulingService.finalStateForUpdate)
latest.copy(state = jobSchedulingService.stateTransitions.finalStateForUpdate)
})
.commit(_ => {
flintIndexMetadataService.updateIndexMetadata(indexName, index.metadata)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ class FlintSparkJobExternalSchedulingService(
extends FlintSparkJobSchedulingService
with Logging {

initialStateForUpdate = IndexState.ACTIVE
finalStateForUpdate = IndexState.ACTIVE

initialStateForUnschedule = IndexState.ACTIVE
finalStateForUnschedule = IndexState.ACTIVE
override val stateTransitions: StateTransitions = StateTransitions(
initialStateForUpdate = IndexState.ACTIVE,
finalStateForUpdate = IndexState.ACTIVE,
initialStateForUnschedule = IndexState.ACTIVE,
finalStateForUnschedule = IndexState.ACTIVE)

override def handleJob(
index: FlintSparkIndex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ class FlintSparkJobInternalSchedulingService(
extends FlintSparkJobSchedulingService
with Logging {

initialStateForUpdate = IndexState.ACTIVE
finalStateForUpdate = IndexState.REFRESHING

initialStateForUnschedule = IndexState.REFRESHING
finalStateForUnschedule = IndexState.ACTIVE
override val stateTransitions: StateTransitions = StateTransitions(
initialStateForUpdate = IndexState.ACTIVE,
finalStateForUpdate = IndexState.REFRESHING,
initialStateForUnschedule = IndexState.REFRESHING,
finalStateForUnschedule = IndexState.ACTIVE)

/**
* Handles job-related actions for a given Flint Spark index.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ import org.apache.spark.sql.flint.config.FlintSparkConf
*/
trait FlintSparkJobSchedulingService {

var initialStateForUpdate: IndexState = _
var finalStateForUpdate: IndexState = _
case class StateTransitions(
initialStateForUpdate: IndexState,
finalStateForUpdate: IndexState,
initialStateForUnschedule: IndexState,
finalStateForUnschedule: IndexState)

var initialStateForUnschedule: IndexState = _
var finalStateForUnschedule: IndexState = _
val stateTransitions: StateTransitions

/**
* Handles a job action for a given Flint Spark index.
Expand Down

0 comments on commit 5d8ae39

Please sign in to comment.