Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
Signed-off-by: bowenlan-amzn <[email protected]>
  • Loading branch information
bowenlan-amzn committed Oct 11, 2023
1 parent e4a86e4 commit 691afb2
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,6 @@ object ManagedIndexRunner :

// If Index State Management is disabled and the current step is not null and safe to disable on
// then disable the job and return early
// TODO are there any step not safe to disable on?
if (!indexStateManagementEnabled && step != null && step.isSafeToDisableOn) {
disableManagedIndexConfig(managedIndexConfig)
return
Expand All @@ -329,15 +328,12 @@ object ManagedIndexRunner :
managedIndexMetaData
.copy(actionMetaData = currentActionMetaData?.copy(failed = true), info = info)
)
if (updated.metadataSaved) disableManagedIndexConfig(managedIndexConfig)
if (updated.metadataSaved)
disableManagedIndexConfig(managedIndexConfig)

Check warning on line 332 in src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt

View check run for this annotation

Codecov / codecov/patch

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt#L332

Added line #L332 was not covered by tests
return
}

logger.info("change policy $managedIndexConfig")
logger.info("change policy $managedIndexMetaData")
logger.info("change policy $action")
if (managedIndexConfig.shouldChangePolicy(managedIndexMetaData, action)) {
logger.info("Change policy for index ${managedIndexConfig.index}")
if (managedIndexConfig.shouldChangePolicy(action)) {
initChangePolicy(managedIndexConfig, managedIndexMetaData, action)
return
}
Expand Down Expand Up @@ -469,9 +465,9 @@ object ManagedIndexRunner :

private suspend fun initManagedIndex(managedIndexConfig: ManagedIndexConfig) {
var policy: Policy = managedIndexConfig.policy
lateinit var metadata: ManagedIndexMetaData
var metadata = getInitializedManagedIndexMetaData(managedIndexConfig, policy, policy.id)

metadata = getInitializedManagedIndexMetaData(managedIndexConfig, policy)
// User may change policy before first metadata initialization
if (managedIndexConfig.changePolicy != null) {
val policyID = managedIndexConfig.changePolicy.policyID
val newPolicy = getPolicy(policyID)

Check warning on line 473 in src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt

View check run for this annotation

Codecov / codecov/patch

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt#L472-L473

Added lines #L472 - L473 were not covered by tests
Expand All @@ -482,26 +478,8 @@ object ManagedIndexRunner :
logger.error("Failed to save policy to ManagedIndexConfig(${managedIndexConfig.index})")
return

Check warning on line 479 in src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt

View check run for this annotation

Codecov / codecov/patch

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt#L478-L479

Added lines #L478 - L479 were not covered by tests
}
metadata = getInitializedManagedIndexMetaData(managedIndexConfig, policy)
} else {
// no policy found for change policy TODO can we check this in change policy API
metadata = ManagedIndexMetaData(
index = managedIndexConfig.index,
indexUuid = managedIndexConfig.indexUuid,
policyID = policyID,
policySeqNo = null,
policyPrimaryTerm = null,
policyCompleted = false,
rolledOver = false,
indexCreationDate = getIndexCreationDate(managedIndexConfig),
transitionTo = null,
stateMetaData = null,
actionMetaData = null,
stepMetaData = null,
policyRetryInfo = PolicyRetryInfoMetaData(failed = true, consumedRetries = 0),
info = mapOf("message" to "Fail to load policy: $policyID")
)
}
metadata = getInitializedManagedIndexMetaData(managedIndexConfig, policy, policyID)

Check warning on line 482 in src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt

View check run for this annotation

Codecov / codecov/patch

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt#L482

Added line #L482 was not covered by tests
}

updateManagedIndexMetaData(metadata, create = true)
Expand Down Expand Up @@ -586,27 +564,48 @@ object ManagedIndexRunner :
@Suppress("ComplexMethod")
private suspend fun getInitializedManagedIndexMetaData(
managedIndexConfig: ManagedIndexConfig,
policy: Policy
policy: Policy?,
policyID: String,
): ManagedIndexMetaData {
val state = managedIndexConfig.changePolicy?.state ?: policy.defaultState
val stateMetaData = StateMetaData(state, Instant.now().toEpochMilli())

return ManagedIndexMetaData(
index = managedIndexConfig.index,
indexUuid = managedIndexConfig.indexUuid,
policyID = policy.id,
policySeqNo = policy.seqNo,
policyPrimaryTerm = policy.primaryTerm,
policyCompleted = false,
rolledOver = false,
indexCreationDate = getIndexCreationDate(managedIndexConfig),
transitionTo = null,
stateMetaData = stateMetaData,
actionMetaData = null,
stepMetaData = null,
policyRetryInfo = PolicyRetryInfoMetaData(failed = false, consumedRetries = 0),
info = mapOf("message" to "Successfully initialized policy: ${policy.id}")
)
val indexCreationDate = getIndexCreationDate(managedIndexConfig)
if (policy == null) {
// We check policy existence in change policy API, but it maybe deleted after that
return ManagedIndexMetaData(
index = managedIndexConfig.index,
indexUuid = managedIndexConfig.indexUuid,
policyID = policyID,
policySeqNo = null,
policyPrimaryTerm = null,
policyCompleted = false,
rolledOver = false,
indexCreationDate = indexCreationDate,
transitionTo = null,
stateMetaData = null,
actionMetaData = null,
stepMetaData = null,
policyRetryInfo = PolicyRetryInfoMetaData(failed = true, consumedRetries = 0),
info = mapOf("message" to "Fail to load policy: $policyID")

Check warning on line 587 in src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt

View check run for this annotation

Codecov / codecov/patch

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt#L573-L587

Added lines #L573 - L587 were not covered by tests
)
} else {
val state = managedIndexConfig.changePolicy?.state ?: policy.defaultState
val stateMetaData = StateMetaData(state, Instant.now().toEpochMilli())
return ManagedIndexMetaData(
index = managedIndexConfig.index,
indexUuid = managedIndexConfig.indexUuid,
policyID = policy.id,
policySeqNo = policy.seqNo,
policyPrimaryTerm = policy.primaryTerm,
policyCompleted = false,
rolledOver = false,
indexCreationDate = indexCreationDate,
transitionTo = null,
stateMetaData = stateMetaData,
actionMetaData = null,
stepMetaData = null,
policyRetryInfo = PolicyRetryInfoMetaData(failed = false, consumedRetries = 0),
info = mapOf("message" to "Successfully initialized policy: ${policy.id}")
)
}
}

/**
Expand Down Expand Up @@ -669,8 +668,8 @@ object ManagedIndexRunner :
managedIndexMetaData: ManagedIndexMetaData,
actionToExecute: Action?
) {
// should never happen since we only call this if there is a changePolicy, but we'll do it to make changePolicy non-null
val changePolicy = managedIndexConfig.changePolicy
// should never happen since we only call this if there is a changePolicy, but we'll do it to make changePolicy non-null
if (changePolicy == null) {
logger.debug(
"initChangePolicy was called with a null ChangePolicy, ManagedIndexConfig: {}",
Expand All @@ -679,18 +678,17 @@ object ManagedIndexRunner :
return
}

// get the policy we'll attempt to change to
val policy = getPolicy(changePolicy.policyID)

// update the ManagedIndexMetaData with new information
val updatedManagedIndexMetaData = if (policy == null) {
managedIndexMetaData.copy(
info = mapOf("message" to "Failed to load change policy: ${changePolicy.policyID}"),
policyRetryInfo = PolicyRetryInfoMetaData(failed = true, consumedRetries = 0)
)
} else {
// if the action to execute is transition then set the actionMetaData to a new transition metadata to reflect we are
// in transition (in case we triggered change policy from entering transition) or to reflect this is a new policy transition phase
// 1. entering transition action in this run
// 2. has been in transition action
// Refresh the transition action metadata, meaning we start the transition for change policy
val actionMetaData = if (actionToExecute?.type == TransitionsAction.name) {
ActionMetaData(
TransitionsAction.name, Instant.now().toEpochMilli(), -1,
Expand All @@ -699,6 +697,7 @@ object ManagedIndexRunner :
} else {
managedIndexMetaData.actionMetaData
}

managedIndexMetaData.copy(
info = mapOf("message" to "Attempting to change policy to ${policy.id}"),
transitionTo = changePolicy.state,
Expand All @@ -711,12 +710,13 @@ object ManagedIndexRunner :
)
}

// check if the safe flag was set by the Change Policy REST API, if it was then do a second validation
// before allowing a change to happen
/**
* The freshness of isSafe may change between runs, and we use it to decide whether to enter this method
* n [shouldChangePolicy]. So here we check the safeness again
*/
if (changePolicy.isSafe) {
// if policy is null then we are only updating error information in metadata, so it's fine to continue
if (policy != null) {
// current policy being null should never happen as we have a check at the top of runner
// if it is unsafe to change then we set safe back to false, so we don't keep doing this check every execution
if (!managedIndexConfig.policy.isSafeToChange(managedIndexMetaData.stateMetaData?.name, policy, changePolicy)) {
updateManagedIndexConfig(managedIndexConfig.copy(changePolicy = managedIndexConfig.changePolicy.copy(isSafe = false)))
Expand All @@ -725,17 +725,17 @@ object ManagedIndexRunner :
}
}

/*
* Try to update the ManagedIndexMetaData in cluster state, we need to do this first before updating the
* ManagedIndexConfig because if this fails we can fail early and still retry this whole process on the next
* execution whereas if we do the update to ManagedIndexConfig first we lose the ChangePolicy on the job and
* could fail to update the ManagedIndexMetaData which would put us in a bad state
* */
/**
* Try to update the ManagedIndexMetaData, we need to do this first before updating the
* ManagedIndexConfig because if this fails we can fail early and still retry this whole process on the next
* execution whereas if we do the update to ManagedIndexConfig first we lose the ChangePolicy on the job and
* could fail to update the ManagedIndexMetaData which would put us in a bad state
*/
val updated = updateManagedIndexMetaData(updatedManagedIndexMetaData)

if (!updated.metadataSaved || policy == null) return

// Change the policy and user stored on the job from changePolicy, this will also set the changePolicy to null on the job
// Change the policy and user stored on the job, this will also set the changePolicy to null on the job
savePolicyToManagedIndexConfig(managedIndexConfig, policy.copy(user = changePolicy.user))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,18 +380,16 @@ val ManagedIndexMetaData.isPolicyCompleted: Boolean
get() = this.policyCompleted == true

/**
* We will change the policy if a change policy exists and if we are currently in
* a Transitions action (which means we're safely at the end of a state). If a
* transitionTo exists on the [ManagedIndexMetaData] it should still be fine to
* change policy as we have not actually transitioned yet. If the next action is Transition
* or if the rest API determined it was "safe", meaning the new policy has the same structure
* We will change the policy if a change policy exists and if we are currently in a Transitions action
* which means we're safely at the end of a state.
*
* If the next action is Transition or if the rest API determined it was "safe", meaning the new policy has the same structure
* of the current state, it should be safe to immediately change (even in the middle of the state).
*
* @param managedIndexMetaData current [ManagedIndexMetaData]
* @return {@code true} if we should change policy, {@code false} if not
*/
@Suppress("ReturnCount")
fun ManagedIndexConfig.shouldChangePolicy(managedIndexMetaData: ManagedIndexMetaData, actionToExecute: Action?): Boolean {
fun ManagedIndexConfig.shouldChangePolicy(actionToExecute: Action?): Boolean {
if (this.changePolicy == null) {
return false
}
Expand All @@ -400,17 +398,7 @@ fun ManagedIndexConfig.shouldChangePolicy(managedIndexMetaData: ManagedIndexMeta
return true
}

// we need this in so that we can change policy before the first transition happens so policy doesn't get completed
// before we have a chance to change policy
if (actionToExecute?.type == TransitionsAction.name) {
return true
}

// TODO actionToExecute is correlate to the actionMetadata?
// actionToExecute is found out by checking the metadata, it can be current unfinished one or the next
// actionMetadata has already been updated, it can be current unfinished one or the next
// In change policy context, we only accept unfinished transition or the new transition
return managedIndexMetaData.actionMetaData?.name == TransitionsAction.name
return actionToExecute?.type == TransitionsAction.name
}

fun ManagedIndexMetaData.hasDifferentPolicyVersion(managedIndexConfig: ManagedIndexConfig): Boolean =
Expand All @@ -426,13 +414,13 @@ fun ManagedIndexConfig.hasDifferentJobInterval(jobInterval: Int): Boolean {
}

/**
* A policy is safe to change to a new policy when each policy has the current state
* the [ManagedIndexConfig] is in and that state has the same actions in the same order.
* A policy is safe to change to a new policy when
* both policies have the current state the [ManagedIndexConfig] is in and that state has the same actions in the same order.
* This allows simple things like configuration updates to happen which won't break the execution/contract
* between [ManagedIndexMetaData] and [ManagedIndexConfig] as the metadata only knows about the current state.
* We never consider a policy safe to immediately change if the ChangePolicy contains a state to transition to
* as this could transition a user into a different state from the middle of the current state which we do not
* want to allow.
*
* If the ChangePolicy contains a state to transition to, we don't consider it's safe to change here
* as this may transition a user into a different state from the middle of the current state.
*
* @param stateName the name of the state the [ManagedIndexConfig] is currently in
* @param newPolicy the new (actual data model) policy we will eventually try to change to
Expand All @@ -441,20 +429,19 @@ fun ManagedIndexConfig.hasDifferentJobInterval(jobInterval: Int): Boolean {
*/
@Suppress("ReturnCount")
fun Policy.isSafeToChange(stateName: String?, newPolicy: Policy, changePolicy: ChangePolicy): Boolean {
// if stateName is null it means we either have not initialized the job (no metadata to pull stateName from)
// if stateName is null it means we either have not initialized the job
// or we failed to load the initial policy, both cases its safe to change the policy
if (stateName == null) return true
if (changePolicy.state != null) return false

val currentState = this.states.find { it.name == stateName }
val newState = newPolicy.states.find { it.name == stateName }
if (currentState == null || newState == null) {
return false
}

if (currentState.actions.size != newState.actions.size) {
return false
}

currentState.actions.forEachIndexed { index, action ->
val newStateAction = newState.actions[index]
if (action.type != newStateAction.type) return@isSafeToChange false
Expand Down

0 comments on commit 691afb2

Please sign in to comment.