diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt index d30e5bdfc..f80959661 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt @@ -316,7 +316,6 @@ object ManagedIndexRunner : // If Index State Management is disabled and the current step is not null and safe to disable on // then disable the job and return early - // TODO are there any step not safe to disable on? if (!indexStateManagementEnabled && step != null && step.isSafeToDisableOn) { disableManagedIndexConfig(managedIndexConfig) return @@ -329,15 +328,12 @@ object ManagedIndexRunner : managedIndexMetaData .copy(actionMetaData = currentActionMetaData?.copy(failed = true), info = info) ) - if (updated.metadataSaved) disableManagedIndexConfig(managedIndexConfig) + if (updated.metadataSaved) + disableManagedIndexConfig(managedIndexConfig) return } - logger.info("change policy $managedIndexConfig") - logger.info("change policy $managedIndexMetaData") - logger.info("change policy $action") - if (managedIndexConfig.shouldChangePolicy(managedIndexMetaData, action)) { - logger.info("Change policy for index ${managedIndexConfig.index}") + if (managedIndexConfig.shouldChangePolicy(action)) { initChangePolicy(managedIndexConfig, managedIndexMetaData, action) return } @@ -469,9 +465,9 @@ object ManagedIndexRunner : private suspend fun initManagedIndex(managedIndexConfig: ManagedIndexConfig) { var policy: Policy = managedIndexConfig.policy - lateinit var metadata: ManagedIndexMetaData + var metadata = getInitializedManagedIndexMetaData(managedIndexConfig, policy, policy.id) - metadata = getInitializedManagedIndexMetaData(managedIndexConfig, policy) + // User may change policy before first metadata initialization if (managedIndexConfig.changePolicy != null) { val policyID = managedIndexConfig.changePolicy.policyID val newPolicy = getPolicy(policyID) @@ -482,26 +478,8 @@ object ManagedIndexRunner : logger.error("Failed to save policy to ManagedIndexConfig(${managedIndexConfig.index})") return } - metadata = getInitializedManagedIndexMetaData(managedIndexConfig, policy) - } else { - // no policy found for change policy TODO can we check this in change policy API - metadata = ManagedIndexMetaData( - index = managedIndexConfig.index, - indexUuid = managedIndexConfig.indexUuid, - policyID = policyID, - policySeqNo = null, - policyPrimaryTerm = null, - policyCompleted = false, - rolledOver = false, - indexCreationDate = getIndexCreationDate(managedIndexConfig), - transitionTo = null, - stateMetaData = null, - actionMetaData = null, - stepMetaData = null, - policyRetryInfo = PolicyRetryInfoMetaData(failed = true, consumedRetries = 0), - info = mapOf("message" to "Fail to load policy: $policyID") - ) } + metadata = getInitializedManagedIndexMetaData(managedIndexConfig, policy, policyID) } updateManagedIndexMetaData(metadata, create = true) @@ -586,27 +564,48 @@ object ManagedIndexRunner : @Suppress("ComplexMethod") private suspend fun getInitializedManagedIndexMetaData( managedIndexConfig: ManagedIndexConfig, - policy: Policy + policy: Policy?, + policyID: String, ): ManagedIndexMetaData { - val state = managedIndexConfig.changePolicy?.state ?: policy.defaultState - val stateMetaData = StateMetaData(state, Instant.now().toEpochMilli()) - - return ManagedIndexMetaData( - index = managedIndexConfig.index, - indexUuid = managedIndexConfig.indexUuid, - policyID = policy.id, - policySeqNo = policy.seqNo, - policyPrimaryTerm = policy.primaryTerm, - policyCompleted = false, - rolledOver = false, - indexCreationDate = getIndexCreationDate(managedIndexConfig), - transitionTo = null, - stateMetaData = stateMetaData, - actionMetaData = null, - stepMetaData = null, - policyRetryInfo = PolicyRetryInfoMetaData(failed = false, consumedRetries = 0), - info = mapOf("message" to "Successfully initialized policy: ${policy.id}") - ) + val indexCreationDate = getIndexCreationDate(managedIndexConfig) + if (policy == null) { + // We check policy existence in change policy API, but it maybe deleted after that + return ManagedIndexMetaData( + index = managedIndexConfig.index, + indexUuid = managedIndexConfig.indexUuid, + policyID = policyID, + policySeqNo = null, + policyPrimaryTerm = null, + policyCompleted = false, + rolledOver = false, + indexCreationDate = indexCreationDate, + transitionTo = null, + stateMetaData = null, + actionMetaData = null, + stepMetaData = null, + policyRetryInfo = PolicyRetryInfoMetaData(failed = true, consumedRetries = 0), + info = mapOf("message" to "Fail to load policy: $policyID") + ) + } else { + val state = managedIndexConfig.changePolicy?.state ?: policy.defaultState + val stateMetaData = StateMetaData(state, Instant.now().toEpochMilli()) + return ManagedIndexMetaData( + index = managedIndexConfig.index, + indexUuid = managedIndexConfig.indexUuid, + policyID = policy.id, + policySeqNo = policy.seqNo, + policyPrimaryTerm = policy.primaryTerm, + policyCompleted = false, + rolledOver = false, + indexCreationDate = indexCreationDate, + transitionTo = null, + stateMetaData = stateMetaData, + actionMetaData = null, + stepMetaData = null, + policyRetryInfo = PolicyRetryInfoMetaData(failed = false, consumedRetries = 0), + info = mapOf("message" to "Successfully initialized policy: ${policy.id}") + ) + } } /** @@ -669,8 +668,8 @@ object ManagedIndexRunner : managedIndexMetaData: ManagedIndexMetaData, actionToExecute: Action? ) { - // should never happen since we only call this if there is a changePolicy, but we'll do it to make changePolicy non-null val changePolicy = managedIndexConfig.changePolicy + // should never happen since we only call this if there is a changePolicy, but we'll do it to make changePolicy non-null if (changePolicy == null) { logger.debug( "initChangePolicy was called with a null ChangePolicy, ManagedIndexConfig: {}", @@ -679,9 +678,7 @@ object ManagedIndexRunner : return } - // get the policy we'll attempt to change to val policy = getPolicy(changePolicy.policyID) - // update the ManagedIndexMetaData with new information val updatedManagedIndexMetaData = if (policy == null) { managedIndexMetaData.copy( @@ -689,8 +686,9 @@ object ManagedIndexRunner : policyRetryInfo = PolicyRetryInfoMetaData(failed = true, consumedRetries = 0) ) } else { - // if the action to execute is transition then set the actionMetaData to a new transition metadata to reflect we are - // in transition (in case we triggered change policy from entering transition) or to reflect this is a new policy transition phase + // 1. entering transition action in this run + // 2. has been in transition action + // Refresh the transition action metadata, meaning we start the transition for change policy val actionMetaData = if (actionToExecute?.type == TransitionsAction.name) { ActionMetaData( TransitionsAction.name, Instant.now().toEpochMilli(), -1, @@ -699,6 +697,7 @@ object ManagedIndexRunner : } else { managedIndexMetaData.actionMetaData } + managedIndexMetaData.copy( info = mapOf("message" to "Attempting to change policy to ${policy.id}"), transitionTo = changePolicy.state, @@ -711,12 +710,13 @@ object ManagedIndexRunner : ) } - // check if the safe flag was set by the Change Policy REST API, if it was then do a second validation - // before allowing a change to happen + /** + * The freshness of isSafe may change between runs, and we use it to decide whether to enter this method + * n [shouldChangePolicy]. So here we check the safeness again + */ if (changePolicy.isSafe) { // if policy is null then we are only updating error information in metadata, so it's fine to continue if (policy != null) { - // current policy being null should never happen as we have a check at the top of runner // if it is unsafe to change then we set safe back to false, so we don't keep doing this check every execution if (!managedIndexConfig.policy.isSafeToChange(managedIndexMetaData.stateMetaData?.name, policy, changePolicy)) { updateManagedIndexConfig(managedIndexConfig.copy(changePolicy = managedIndexConfig.changePolicy.copy(isSafe = false))) @@ -725,17 +725,17 @@ object ManagedIndexRunner : } } - /* - * Try to update the ManagedIndexMetaData in cluster state, we need to do this first before updating the - * ManagedIndexConfig because if this fails we can fail early and still retry this whole process on the next - * execution whereas if we do the update to ManagedIndexConfig first we lose the ChangePolicy on the job and - * could fail to update the ManagedIndexMetaData which would put us in a bad state - * */ + /** + * Try to update the ManagedIndexMetaData, we need to do this first before updating the + * ManagedIndexConfig because if this fails we can fail early and still retry this whole process on the next + * execution whereas if we do the update to ManagedIndexConfig first we lose the ChangePolicy on the job and + * could fail to update the ManagedIndexMetaData which would put us in a bad state + */ val updated = updateManagedIndexMetaData(updatedManagedIndexMetaData) if (!updated.metadataSaved || policy == null) return - // Change the policy and user stored on the job from changePolicy, this will also set the changePolicy to null on the job + // Change the policy and user stored on the job, this will also set the changePolicy to null on the job savePolicyToManagedIndexConfig(managedIndexConfig, policy.copy(user = changePolicy.user)) } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt index cda41cf40..ebb2d7ee5 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt @@ -380,18 +380,16 @@ val ManagedIndexMetaData.isPolicyCompleted: Boolean get() = this.policyCompleted == true /** - * We will change the policy if a change policy exists and if we are currently in - * a Transitions action (which means we're safely at the end of a state). If a - * transitionTo exists on the [ManagedIndexMetaData] it should still be fine to - * change policy as we have not actually transitioned yet. If the next action is Transition - * or if the rest API determined it was "safe", meaning the new policy has the same structure + * We will change the policy if a change policy exists and if we are currently in a Transitions action + * which means we're safely at the end of a state. + * + * If the next action is Transition or if the rest API determined it was "safe", meaning the new policy has the same structure * of the current state, it should be safe to immediately change (even in the middle of the state). * - * @param managedIndexMetaData current [ManagedIndexMetaData] * @return {@code true} if we should change policy, {@code false} if not */ @Suppress("ReturnCount") -fun ManagedIndexConfig.shouldChangePolicy(managedIndexMetaData: ManagedIndexMetaData, actionToExecute: Action?): Boolean { +fun ManagedIndexConfig.shouldChangePolicy(actionToExecute: Action?): Boolean { if (this.changePolicy == null) { return false } @@ -400,17 +398,7 @@ fun ManagedIndexConfig.shouldChangePolicy(managedIndexMetaData: ManagedIndexMeta return true } - // we need this in so that we can change policy before the first transition happens so policy doesn't get completed - // before we have a chance to change policy - if (actionToExecute?.type == TransitionsAction.name) { - return true - } - - // TODO actionToExecute is correlate to the actionMetadata? - // actionToExecute is found out by checking the metadata, it can be current unfinished one or the next - // actionMetadata has already been updated, it can be current unfinished one or the next - // In change policy context, we only accept unfinished transition or the new transition - return managedIndexMetaData.actionMetaData?.name == TransitionsAction.name + return actionToExecute?.type == TransitionsAction.name } fun ManagedIndexMetaData.hasDifferentPolicyVersion(managedIndexConfig: ManagedIndexConfig): Boolean = @@ -426,13 +414,13 @@ fun ManagedIndexConfig.hasDifferentJobInterval(jobInterval: Int): Boolean { } /** - * A policy is safe to change to a new policy when each policy has the current state - * the [ManagedIndexConfig] is in and that state has the same actions in the same order. + * A policy is safe to change to a new policy when + * both policies have the current state the [ManagedIndexConfig] is in and that state has the same actions in the same order. * This allows simple things like configuration updates to happen which won't break the execution/contract * between [ManagedIndexMetaData] and [ManagedIndexConfig] as the metadata only knows about the current state. - * We never consider a policy safe to immediately change if the ChangePolicy contains a state to transition to - * as this could transition a user into a different state from the middle of the current state which we do not - * want to allow. + * + * If the ChangePolicy contains a state to transition to, we don't consider it's safe to change here + * as this may transition a user into a different state from the middle of the current state. * * @param stateName the name of the state the [ManagedIndexConfig] is currently in * @param newPolicy the new (actual data model) policy we will eventually try to change to @@ -441,20 +429,19 @@ fun ManagedIndexConfig.hasDifferentJobInterval(jobInterval: Int): Boolean { */ @Suppress("ReturnCount") fun Policy.isSafeToChange(stateName: String?, newPolicy: Policy, changePolicy: ChangePolicy): Boolean { - // if stateName is null it means we either have not initialized the job (no metadata to pull stateName from) + // if stateName is null it means we either have not initialized the job // or we failed to load the initial policy, both cases its safe to change the policy if (stateName == null) return true if (changePolicy.state != null) return false + val currentState = this.states.find { it.name == stateName } val newState = newPolicy.states.find { it.name == stateName } if (currentState == null || newState == null) { return false } - if (currentState.actions.size != newState.actions.size) { return false } - currentState.actions.forEachIndexed { index, action -> val newStateAction = newState.actions[index] if (action.type != newStateAction.type) return@isSafeToChange false