diff --git a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt index 0986e9a13..4efe436d2 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt @@ -82,8 +82,6 @@ import org.opensearch.indexmanagement.indexstatemanagement.transport.action.remo import org.opensearch.indexmanagement.indexstatemanagement.transport.action.removepolicy.TransportRemovePolicyAction import org.opensearch.indexmanagement.indexstatemanagement.transport.action.retryfailedmanagedindex.RetryFailedManagedIndexAction import org.opensearch.indexmanagement.indexstatemanagement.transport.action.retryfailedmanagedindex.TransportRetryFailedManagedIndexAction -import org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.TransportUpdateManagedIndexMetaDataAction -import org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.UpdateManagedIndexMetaDataAction import org.opensearch.indexmanagement.indexstatemanagement.util.DEFAULT_INDEX_TYPE import org.opensearch.indexmanagement.indexstatemanagement.validation.ActionValidation import org.opensearch.indexmanagement.refreshanalyzer.RefreshSearchAnalyzerAction @@ -565,7 +563,6 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin override fun getActions(): List> { return listOf( - ActionPlugin.ActionHandler(UpdateManagedIndexMetaDataAction.INSTANCE, TransportUpdateManagedIndexMetaDataAction::class.java), ActionPlugin.ActionHandler(RemovePolicyAction.INSTANCE, TransportRemovePolicyAction::class.java), ActionPlugin.ActionHandler(RefreshSearchAnalyzerAction.INSTANCE, TransportRefreshSearchAnalyzerAction::class.java), ActionPlugin.ActionHandler(AddPolicyAction.INSTANCE, TransportAddPolicyAction::class.java), diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/DefaultIndexMetadataService.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/DefaultIndexMetadataService.kt index fb6fa43c8..2035294d4 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/DefaultIndexMetadataService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/DefaultIndexMetadataService.kt @@ -16,7 +16,7 @@ import org.opensearch.indexmanagement.opensearchapi.suspendUntil import org.opensearch.indexmanagement.spi.indexstatemanagement.IndexMetadataService import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ISMIndexMetadata -class DefaultIndexMetadataService(val customUUIDSetting: String? = null) : IndexMetadataService { +class DefaultIndexMetadataService(private val customUUIDSetting: String? = null) : IndexMetadataService { /** * Returns the default index metadata needed for ISM @@ -39,7 +39,7 @@ class DefaultIndexMetadataService(val customUUIDSetting: String? = null) : Index response.state.metadata.indices.forEach { // TODO waiting to add document count until it is definitely needed - val uuid = getCustomIndexUUID(it.value) + val uuid = getIndexUUID(it.value) val indexMetadata = ISMIndexMetadata(uuid, it.value.creationDate, -1) indexNameToMetadata[it.key] = indexMetadata } @@ -48,11 +48,11 @@ class DefaultIndexMetadataService(val customUUIDSetting: String? = null) : Index } /* - * If an extension wants Index Management to determine cluster state indices UUID based on a custom index setting if - * present of cluster state, the extension will override this customUUID setting. This allows an index to migrate off - * cluster and back while using this persistent uuid. + * This method supports extension to specify an index setting to decide the original UUID + * Its priority is higher than the one from cluster metadata + * This can happen when index moved out of cluster and re-attach back, it will get a new UUID in cluster metadata */ - fun getCustomIndexUUID(indexMetadata: IndexMetadata): String { + fun getIndexUUID(indexMetadata: IndexMetadata): String { return if (customUUIDSetting != null) { indexMetadata.settings.get(customUUIDSetting, indexMetadata.indexUUID) } else { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt index 3583122e2..720d13751 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt @@ -309,7 +309,7 @@ class ManagedIndexCoordinator( // If there is a custom index uuid associated with the index, we do not auto manage it // This is because cold index uses custom uuid, and we do not auto manage cold-to-warm index val indexMetadata = clusterState.metadata.index(indexName) - val wasOffCluster = defaultIndexMetadataService.getCustomIndexUUID(indexMetadata) != indexMetadata.indexUUID + val wasOffCluster = defaultIndexMetadataService.getIndexUUID(indexMetadata) != indexMetadata.indexUUID val ismIndexMetadata = ismIndicesMetadata[indexName] // We try to find lookup name instead of using index name as datastream indices need the alias to match policy val lookupName = findIndexLookupName(indexName, clusterState) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt index 4ab3f48d0..8afc03734 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt @@ -56,15 +56,13 @@ import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndex import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.JOB_INTERVAL import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.ACTION_VALIDATION_ENABLED import org.opensearch.indexmanagement.indexstatemanagement.util.DEFAULT_INDEX_TYPE -import org.opensearch.indexmanagement.indexstatemanagement.util.MetadataCheck -import org.opensearch.indexmanagement.indexstatemanagement.util.checkMetadata import org.opensearch.indexmanagement.indexstatemanagement.util.deleteManagedIndexMetadataRequest import org.opensearch.indexmanagement.indexstatemanagement.util.deleteManagedIndexRequest import org.opensearch.indexmanagement.indexstatemanagement.util.getCompletedManagedIndexMetaData import org.opensearch.indexmanagement.indexstatemanagement.util.getStartingManagedIndexMetaData import org.opensearch.indexmanagement.indexstatemanagement.util.hasDifferentJobInterval import org.opensearch.indexmanagement.indexstatemanagement.util.hasTimedOut -import org.opensearch.indexmanagement.indexstatemanagement.util.hasVersionConflict +import org.opensearch.indexmanagement.indexstatemanagement.util.hasDifferentPolicyVersion import org.opensearch.indexmanagement.indexstatemanagement.util.isAllowed import org.opensearch.indexmanagement.indexstatemanagement.util.isFailed import org.opensearch.indexmanagement.indexstatemanagement.util.isSafeToChange @@ -264,12 +262,11 @@ object ManagedIndexRunner : // Check the cluster state for the index metadata val clusterStateIndexMetadata = getIndexMetadata(managedIndexConfig.index) val defaultIndexMetadataService = indexMetadataProvider.services[DEFAULT_INDEX_TYPE] as DefaultIndexMetadataService - val clusterStateIndexUUID = clusterStateIndexMetadata?.let { defaultIndexMetadataService.getCustomIndexUUID(it) } - // If the index metadata is null, the index is not in the cluster state. If the index metadata is not null, but - // the cluster state index uuid differs from the one in the managed index config then the config is referring - // to a different index which does not exist in the cluster. We need to check all the extensions to confirm an index exists + val clusterStateIndexUUID = clusterStateIndexMetadata?.let { defaultIndexMetadataService.getIndexUUID(it) } + // If the index metadata is null, the index is not in the cluster state. + // If the index metadata is not null, and the index uuid differs from the one in the managed index config + // These mean this managed index could be a different index type and should use extensions to check if (clusterStateIndexMetadata == null || clusterStateIndexUUID != managedIndexConfig.indexUuid) { - // If the cluster state/default index type didn't have an index with a matching name and uuid combination, try all other index types val nonDefaultIndexTypes = indexMetadataProvider.services.keys.filter { it != DEFAULT_INDEX_TYPE } val multiTypeIndexNameToMetaData = indexMetadataProvider.getMultiTypeISMIndexMetadata(nonDefaultIndexTypes, listOf(managedIndexConfig.index)) @@ -281,29 +278,20 @@ object ManagedIndexRunner : logger.warn("Failed to find IndexMetadata for ${managedIndexConfig.index}.") return } - } else { - val clusterStateMetadata = clusterStateIndexMetadata.getManagedIndexMetadata() - val metadataCheck = checkMetadata(clusterStateMetadata, managedIndexMetaData, managedIndexConfig.indexUuid, logger) - if (metadataCheck != MetadataCheck.SUCCESS) { - logger.info("Skipping execution while metadata status is $metadataCheck") - return - } } - // If policy or managedIndexMetaData is null then initialize - val policy = managedIndexConfig.policy - if (policy == null || managedIndexMetaData == null) { - initManagedIndex(managedIndexConfig, managedIndexMetaData) + if (managedIndexMetaData == null) { + initManagedIndex(managedIndexConfig) return } - // If the policy was completed or failed then return early and disable job so it stops scheduling work + // If the policy was completed or failed then return early and disable job, so it stops scheduling work if (managedIndexMetaData.policyCompleted == true || managedIndexMetaData.isFailed) { disableManagedIndexConfig(managedIndexConfig) return } - if (managedIndexMetaData.hasVersionConflict(managedIndexConfig)) { + if (managedIndexMetaData.hasDifferentPolicyVersion(managedIndexConfig)) { val info = mapOf("message" to "There is a version conflict between your previous execution and your managed index") val result = updateManagedIndexMetaData( managedIndexMetaData.copy( @@ -317,6 +305,7 @@ object ManagedIndexRunner : return } + val policy = managedIndexConfig.policy val state = policy.getStateToExecute(managedIndexMetaData) val action: Action? = state?.getActionToExecute(managedIndexMetaData, indexMetadataProvider) val stepContext = StepContext( @@ -327,6 +316,7 @@ object ManagedIndexRunner : // If Index State Management is disabled and the current step is not null and safe to disable on // then disable the job and return early + // TODO are there any step not safe to disable on? if (!indexStateManagementEnabled && step != null && step.isSafeToDisableOn) { disableManagedIndexConfig(managedIndexConfig) return @@ -384,7 +374,7 @@ object ManagedIndexRunner : } // If this action is not allowed and the step to be executed is the first step in the action then we will fail - // as this action has been removed from the AllowList, but if its not the first step we will let it finish as it's already inflight + // as this action has been removed from the AllowList, but if it's not the first step we will let it finish as it's already inflight if (action?.isAllowed(allowList) == false && step != null && action.isFirstStep(step.name) && action.type != TransitionsAction.name) { val info = mapOf("message" to "Attempted to execute action=${action.type} which is not allowed.") val updated = updateManagedIndexMetaData( @@ -396,8 +386,8 @@ object ManagedIndexRunner : return } - // If any of State, Action, Step components come back as null then we are moving to error in ManagedIndexMetaData val startingManagedIndexMetaData = managedIndexMetaData.getStartingManagedIndexMetaData(state, action, step) + // If any of State, Action, Step components come back as null, then we are moving to error in ManagedIndexMetaData val updateResult = updateManagedIndexMetaData(startingManagedIndexMetaData) @Suppress("ComplexCondition", "MaxLineLength") @@ -411,7 +401,7 @@ object ManagedIndexRunner : actionValidation.validate(action.type, stepContext.metadata.index) } if (validationResult.validationStatus == Validate.ValidationStatus.RE_VALIDATING) { - logger.warn("Validation Status is: RE_VALIDATING. The action is {}, state is {}, step is {}.\", action.type, state.name, step.name") + logger.warn("Validation Status is: RE_VALIDATING. The action is {}, state is {}, step is {}.", action.type, state.name, step.name) publishErrorNotification(policy, managedIndexMetaData) return } @@ -473,32 +463,14 @@ object ManagedIndexRunner : } } - private suspend fun initManagedIndex(managedIndexConfig: ManagedIndexConfig, managedIndexMetaData: ManagedIndexMetaData?) { - var policy: Policy? = managedIndexConfig.policy - val policyID = managedIndexConfig.changePolicy?.policyID ?: managedIndexConfig.policyID - // If policy does not currently exist, we need to save the policy on the ManagedIndexConfig for the first time - // or if a change policy exists then we will also execute the change as we are still in initialization phase - if (policy == null || managedIndexConfig.changePolicy != null) { - // Get the policy by the name unless a ChangePolicy exists then allow the change to happen during initialization - policy = getPolicy(policyID) - // Attempt to save the policy - if (policy != null) { - val saved = savePolicyToManagedIndexConfig(managedIndexConfig, policy) - // If we failed to save the policy, don't initialize ManagedIndexMetaData - if (!saved) return - } - // If we failed to get the policy then we will update the ManagedIndexMetaData with error info - } - + private suspend fun initManagedIndex(managedIndexConfig: ManagedIndexConfig) { + val policy: Policy = managedIndexConfig.policy // at this point we either successfully saved the policy or we failed to get the policy - val updatedManagedIndexMetaData = if (policy == null) { - getFailedInitializedManagedIndexMetaData(managedIndexMetaData, managedIndexConfig, policyID) - } else { + val updatedManagedIndexMetaData = // Initializing ManagedIndexMetaData for the first time - getInitializedManagedIndexMetaData(managedIndexMetaData, managedIndexConfig, policy) - } + getInitializedManagedIndexMetaData(managedIndexConfig, policy) - updateManagedIndexMetaData(updatedManagedIndexMetaData, create = managedIndexMetaData == null) + updateManagedIndexMetaData(updatedManagedIndexMetaData, create = true) } @Suppress("ReturnCount", "BlockingMethodInNonBlockingContext") @@ -577,95 +549,32 @@ object ManagedIndexRunner : } } - private suspend fun getFailedInitializedManagedIndexMetaData( - managedIndexMetaData: ManagedIndexMetaData?, + @Suppress("ComplexMethod") + private suspend fun getInitializedManagedIndexMetaData( managedIndexConfig: ManagedIndexConfig, - policyID: String + policy: Policy ): ManagedIndexMetaData { - // we either haven't initialized any metadata yet or we have already initialized metadata but still have no policy - return managedIndexMetaData?.copy( - policyRetryInfo = PolicyRetryInfoMetaData(failed = true, consumedRetries = 0), - info = mapOf("message" to "Fail to load policy: $policyID") - ) ?: ManagedIndexMetaData( + val state = managedIndexConfig.changePolicy?.state ?: policy.defaultState + val stateMetaData = StateMetaData(state, Instant.now().toEpochMilli()) + + return ManagedIndexMetaData( index = managedIndexConfig.index, indexUuid = managedIndexConfig.indexUuid, - policyID = policyID, - policySeqNo = null, - policyPrimaryTerm = null, + policyID = policy.id, + policySeqNo = policy.seqNo, + policyPrimaryTerm = policy.primaryTerm, policyCompleted = false, rolledOver = false, indexCreationDate = getIndexCreationDate(managedIndexConfig), transitionTo = null, - stateMetaData = null, + stateMetaData = stateMetaData, actionMetaData = null, stepMetaData = null, - policyRetryInfo = PolicyRetryInfoMetaData(failed = true, consumedRetries = 0), - info = mapOf("message" to "Fail to load policy: $policyID") + policyRetryInfo = PolicyRetryInfoMetaData(failed = false, consumedRetries = 0), + info = mapOf("message" to "Successfully initialized policy: ${policy.id}") ) } - @Suppress("ComplexMethod") - private suspend fun getInitializedManagedIndexMetaData( - managedIndexMetaData: ManagedIndexMetaData?, - managedIndexConfig: ManagedIndexConfig, - policy: Policy - ): ManagedIndexMetaData { - val state = managedIndexConfig.changePolicy?.state ?: policy.defaultState - val stateMetaData = StateMetaData(state, Instant.now().toEpochMilli()) - - return when { - managedIndexMetaData == null -> ManagedIndexMetaData( - index = managedIndexConfig.index, - indexUuid = managedIndexConfig.indexUuid, - policyID = policy.id, - policySeqNo = policy.seqNo, - policyPrimaryTerm = policy.primaryTerm, - policyCompleted = false, - rolledOver = false, - indexCreationDate = getIndexCreationDate(managedIndexConfig), - transitionTo = null, - stateMetaData = stateMetaData, - actionMetaData = null, - stepMetaData = null, - policyRetryInfo = PolicyRetryInfoMetaData(failed = false, consumedRetries = 0), - info = mapOf("message" to "Successfully initialized policy: ${policy.id}") - ) - managedIndexMetaData.policySeqNo == null || managedIndexMetaData.policyPrimaryTerm == null -> - // If there is seqNo and PrimaryTerm it is first time populating Policy. - managedIndexMetaData.copy( - policyID = policy.id, - policySeqNo = policy.seqNo, - policyPrimaryTerm = policy.primaryTerm, - stateMetaData = stateMetaData, - policyRetryInfo = PolicyRetryInfoMetaData(failed = false, consumedRetries = 0), - info = mapOf("message" to "Successfully initialized policy: ${policy.id}") - ) - // this is an edge case where a user deletes the job config or index and we already have a policySeqNo/primaryTerm - // in the metadata, in this case we just want to say we successfully initialized the policy again but we will not - // modify the state, action, etc. so it can resume where it left off - managedIndexMetaData.policySeqNo == policy.seqNo && - managedIndexMetaData.policyPrimaryTerm == policy.primaryTerm && - managedIndexMetaData.policyID == policy.id -> - // If existing PolicySeqNo and PolicyPrimaryTerm is equal to cached Policy then no issue. - managedIndexMetaData.copy( - policyRetryInfo = PolicyRetryInfoMetaData(failed = false, consumedRetries = 0), - info = mapOf("message" to "Successfully initialized policy: ${policy.id}") - ) - else -> - // else this means we either tried to load a policy with a different id, seqno, or primaryterm from what is - // in the metadata and we cannot guarantee it will work with the current state in managedIndexMetaData - managedIndexMetaData.copy( - policyRetryInfo = PolicyRetryInfoMetaData(failed = true, consumedRetries = 0), - info = mapOf( - "message" to "Fail to load policy: ${policy.id} with " + - "seqNo ${policy.seqNo} and primaryTerm ${policy.primaryTerm} as it" + - " does not match what's in the metadata [policyID=${managedIndexMetaData.policyID}," + - " policySeqNo=${managedIndexMetaData.policySeqNo}, policyPrimaryTerm=${managedIndexMetaData.policyPrimaryTerm}]" - ) - ) - } - } - /** * update metadata in config index, and save metadata in history after update * this can be called 2 times in one job run, so need to save seqNo & primeTerm @@ -726,11 +635,13 @@ object ManagedIndexRunner : managedIndexMetaData: ManagedIndexMetaData, actionToExecute: Action? ) { - - // should never happen since we only call this if there is a changePolicy, but we'll do it to make changePolicy non null + // should never happen since we only call this if there is a changePolicy, but we'll do it to make changePolicy non-null val changePolicy = managedIndexConfig.changePolicy if (changePolicy == null) { - logger.debug("initChangePolicy was called with a null ChangePolicy, ManagedIndexConfig: $managedIndexConfig") + logger.debug( + "initChangePolicy was called with a null ChangePolicy, ManagedIndexConfig: {}", + managedIndexConfig + ) return } @@ -746,12 +657,11 @@ object ManagedIndexRunner : } else { // if the action to execute is transition then set the actionMetaData to a new transition metadata to reflect we are // in transition (in case we triggered change policy from entering transition) or to reflect this is a new policy transition phase - val newTransitionMetaData = ActionMetaData( - TransitionsAction.name, Instant.now().toEpochMilli(), -1, - false, 0, 0, null - ) val actionMetaData = if (actionToExecute?.type == TransitionsAction.name) { - newTransitionMetaData + ActionMetaData( + TransitionsAction.name, Instant.now().toEpochMilli(), -1, + false, 0, 0, null + ) } else { managedIndexMetaData.actionMetaData } @@ -770,11 +680,11 @@ object ManagedIndexRunner : // check if the safe flag was set by the Change Policy REST API, if it was then do a second validation // before allowing a change to happen if (changePolicy.isSafe) { - // if policy is null then we are only updating error information in metadata so its fine to continue + // if policy is null then we are only updating error information in metadata, so it's fine to continue if (policy != null) { // current policy being null should never happen as we have a check at the top of runner - // if it is unsafe to change then we set safe back to false so we don't keep doing this check every execution - if (managedIndexConfig.policy?.isSafeToChange(managedIndexMetaData.stateMetaData?.name, policy, changePolicy) != true) { + // if it is unsafe to change then we set safe back to false, so we don't keep doing this check every execution + if (!managedIndexConfig.policy.isSafeToChange(managedIndexMetaData.stateMetaData?.name, policy, changePolicy)) { updateManagedIndexConfig(managedIndexConfig.copy(changePolicy = managedIndexConfig.changePolicy.copy(isSafe = false))) return } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexConfig.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexConfig.kt index 7e8d36268..d92af725e 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexConfig.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexConfig.kt @@ -35,7 +35,7 @@ data class ManagedIndexConfig( val policyID: String, val policySeqNo: Long?, val policyPrimaryTerm: Long?, - val policy: Policy?, + val policy: Policy, val changePolicy: ChangePolicy?, val jobJitter: Double? ) : ScheduledJobParameter { @@ -177,11 +177,13 @@ data class ManagedIndexConfig( policyID = requireNotNull(policyID) { "ManagedIndexConfig policy id is null" }, policySeqNo = policySeqNo, policyPrimaryTerm = policyPrimaryTerm, - policy = policy?.copy( - id = policyID, - seqNo = policySeqNo ?: SequenceNumbers.UNASSIGNED_SEQ_NO, - primaryTerm = policyPrimaryTerm ?: SequenceNumbers.UNASSIGNED_PRIMARY_TERM - ), + policy = requireNotNull( + policy?.copy( + id = policyID, + seqNo = policySeqNo ?: SequenceNumbers.UNASSIGNED_SEQ_NO, + primaryTerm = policyPrimaryTerm ?: SequenceNumbers.UNASSIGNED_PRIMARY_TERM + ) + ) { "ManagedIndexConfig policy is null" }, changePolicy = changePolicy, jobJitter = jitter ) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/opensearchapi/OpenSearchExtensions.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/opensearchapi/OpenSearchExtensions.kt index cb7e4c6ad..ceba19187 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/opensearchapi/OpenSearchExtensions.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/opensearchapi/OpenSearchExtensions.kt @@ -61,22 +61,13 @@ fun IndexMetadata.getRolloverSkip(): Boolean { return this.settings.getAsBoolean(ManagedIndexSettings.ROLLOVER_SKIP.key, false) } -fun IndexMetadata.getManagedIndexMetadata(): ManagedIndexMetaData? { - val existingMetaDataMap = this.getCustomData(ManagedIndexMetaData.MANAGED_INDEX_METADATA_TYPE) - - if (existingMetaDataMap != null) { - return ManagedIndexMetaData.fromMap(existingMetaDataMap) - } - return null -} - fun getUuidsForClosedIndices(state: ClusterState, defaultIndexMetadataService: DefaultIndexMetadataService): MutableList { val indexMetadatas = state.metadata.indices val closeList = mutableListOf() indexMetadatas.forEach { // it.key is index name if (it.value.state == IndexMetadata.State.CLOSE) { - closeList.add(defaultIndexMetadataService.getCustomIndexUUID(it.value)) + closeList.add(defaultIndexMetadataService.getIndexUUID(it.value)) } } return closeList diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/TransportAddPolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/TransportAddPolicyAction.kt index da4e81d77..dd58847c0 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/TransportAddPolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/TransportAddPolicyAction.kt @@ -34,8 +34,8 @@ import org.opensearch.common.settings.Settings import org.opensearch.common.unit.TimeValue import org.opensearch.commons.ConfigConstants import org.opensearch.commons.authuser.User -import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.core.index.Index +import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX import org.opensearch.indexmanagement.indexstatemanagement.DefaultIndexMetadataService import org.opensearch.indexmanagement.indexstatemanagement.IndexMetadataProvider @@ -49,7 +49,6 @@ import org.opensearch.indexmanagement.indexstatemanagement.transport.action.mana import org.opensearch.indexmanagement.indexstatemanagement.util.DEFAULT_INDEX_TYPE import org.opensearch.indexmanagement.indexstatemanagement.util.FailedIndex import org.opensearch.indexmanagement.indexstatemanagement.util.managedIndexConfigIndexRequest -import org.opensearch.indexmanagement.indexstatemanagement.util.removeClusterStateMetadatas import org.opensearch.indexmanagement.opensearchapi.IndexManagementSecurityContext import org.opensearch.indexmanagement.opensearchapi.parseFromGetResponse import org.opensearch.indexmanagement.opensearchapi.suspendUntil @@ -61,6 +60,7 @@ import org.opensearch.indexmanagement.util.SecurityUtils.Companion.buildUser import org.opensearch.indexmanagement.util.SecurityUtils.Companion.userHasPermissionForResource import org.opensearch.indexmanagement.util.SecurityUtils.Companion.validateUserConfiguration import org.opensearch.core.rest.RestStatus +import org.opensearch.indexmanagement.indexstatemanagement.util.deleteManagedIndexMetadataRequest import org.opensearch.tasks.Task import org.opensearch.transport.TransportService import java.time.Duration @@ -126,7 +126,7 @@ class TransportAddPolicyAction @Inject constructor( } @Suppress("SpreadOperator") - fun getClusterState() { + private fun getClusterState() { startTime = Instant.now() CoroutineScope(Dispatchers.IO).launch { val indexNameToMetadata: MutableMap = HashMap() @@ -193,10 +193,6 @@ class TransportAddPolicyAction @Inject constructor( clusterStateRequest, object : ActionListener { override fun onResponse(response: ClusterStateResponse) { - CoroutineScope(Dispatchers.IO).launch { - removeClusterStateMetadatas(client, log, indicesToAdd.map { Index(it.value, it.key) }) - } - val defaultIndexMetadataService = indexMetadataProvider.services[DEFAULT_INDEX_TYPE] as DefaultIndexMetadataService getUuidsForClosedIndices(response.state, defaultIndexMetadataService).forEach { failedIndices.add(FailedIndex(indicesToAdd[it] as String, it, "This index is closed")) @@ -346,6 +342,9 @@ class TransportAddPolicyAction @Inject constructor( } } actionListener.onResponse(ISMStatusResponse(indicesToAdd.size, failedIndices)) + + // best effort to clean up ISM metadata + removeMetadatas(indicesToAdd.map { Index(it.value, it.key) }) } override fun onFailure(t: Exception) { @@ -368,6 +367,23 @@ class TransportAddPolicyAction @Inject constructor( private fun onFailure(t: Exception) { actionListener.onFailure(ExceptionsHelper.unwrapCause(t) as Exception) } + + fun removeMetadatas(indices: List) { + val request = indices.map { deleteManagedIndexMetadataRequest(it.uuid) } + val bulkReq = BulkRequest().add(request) + client.bulk( + bulkReq, + object : ActionListener { + override fun onResponse(response: BulkResponse) { + log.info("Successfully cleaned metadata for remove policy indices: {}", indices) + } + + override fun onFailure(e: Exception) { + log.error("Failed to clean metadata for remove policy indices.", e) + } + } + ) + } } companion object { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/TransportChangePolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/TransportChangePolicyAction.kt index 7dca57343..1e9d09a0c 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/TransportChangePolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/TransportChangePolicyAction.kt @@ -41,7 +41,6 @@ import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexCon import org.opensearch.indexmanagement.indexstatemanagement.model.Policy import org.opensearch.indexmanagement.indexstatemanagement.model.coordinator.SweptManagedIndexConfig import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.buildMgetMetadataRequest -import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.getManagedIndexMetadata import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.mgetResponseToMap import org.opensearch.indexmanagement.indexstatemanagement.resthandler.RestChangePolicyAction import org.opensearch.indexmanagement.indexstatemanagement.transport.action.ISMStatusResponse @@ -239,7 +238,7 @@ class TransportChangePolicyAction @Inject constructor( val clusterState = response.state val defaultIndexMetadataService = indexMetadataProvider.services[DEFAULT_INDEX_TYPE] as DefaultIndexMetadataService clusterState.metadata.indices.forEach { - val indexUUID = defaultIndexMetadataService.getCustomIndexUUID(it.value) + val indexUUID = defaultIndexMetadataService.getIndexUUID(it.value) indexUuidToIndexMetadata[indexUUID] = it.value } // ISMIndexMetadata from the default index metadata service uses lenient expand, we want to use strict expand, filter @@ -268,9 +267,6 @@ class TransportChangePolicyAction @Inject constructor( val includedStates = changePolicy.include.map { it.state }.toSet() indicesToUpdate.forEach { (indexUuid, indexName) -> - // indexMetaData and clusterStateMetadata will be null for non-default index types - val indexMetaData = indexUuidToIndexMetadata[indexUuid] - val clusterStateMetadata = indexMetaData?.getManagedIndexMetadata() val mgetFailure = metadataMap[indexUuid]?.second val managedIndexMetadata: ManagedIndexMetaData? = metadataMap[managedIndexMetadataID(indexUuid)]?.first @@ -296,25 +292,16 @@ class TransportChangePolicyAction @Inject constructor( RestChangePolicyAction.INDEX_IN_TRANSITION ) ) - // else if there is no ManagedIndexMetaData yet then the managed index has not initialized and we can change the policy safely + // else if there is no ManagedIndexMetaData yet then the managed index has not initialized, and we can change the policy safely managedIndexMetadata == null -> { - if (clusterStateMetadata != null) { - failedIndices.add( - FailedIndex( - indexName, indexUuid, - "Cannot change policy until metadata has finished migrating" - ) - ) - } else { - managedIndicesToUpdate.add(indexName to indexUuid) - } + managedIndicesToUpdate.add(indexName to indexUuid) } // else if the includedStates is empty (i.e. not being used) then we will always try to update the managed index includedStates.isEmpty() -> managedIndicesToUpdate.add(indexName to indexUuid) // else only update the managed index if its currently in one of the included states includedStates.contains(managedIndexMetadata.stateMetaData?.name) -> managedIndicesToUpdate.add(indexName to indexUuid) - // else the managed index did not match any of the included state filters and we will not update it + // else the managed index did not match any of the included state filters, and we will not update it else -> log.debug("Skipping $indexName as it does not match any of the include state filters") } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt index 9e67742b3..00217f1af 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt @@ -23,7 +23,6 @@ import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.client.node.NodeClient -import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.cluster.routing.Preference import org.opensearch.cluster.service.ClusterService import org.opensearch.common.inject.Inject @@ -44,14 +43,11 @@ import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexCon import org.opensearch.indexmanagement.indexstatemanagement.model.Policy import org.opensearch.indexmanagement.common.model.rest.SearchParams import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexRunner.actionValidation -import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.getManagedIndexMetadata import org.opensearch.indexmanagement.indexstatemanagement.transport.action.managedIndex.ManagedIndexAction import org.opensearch.indexmanagement.indexstatemanagement.transport.action.managedIndex.ManagedIndexRequest import org.opensearch.indexmanagement.indexstatemanagement.util.DEFAULT_INDEX_TYPE import org.opensearch.indexmanagement.indexstatemanagement.util.MANAGED_INDEX_INDEX_UUID_FIELD import org.opensearch.indexmanagement.indexstatemanagement.util.MANAGED_INDEX_NAME_KEYWORD_FIELD -import org.opensearch.indexmanagement.indexstatemanagement.util.MetadataCheck -import org.opensearch.indexmanagement.indexstatemanagement.util.checkMetadata import org.opensearch.indexmanagement.indexstatemanagement.util.managedIndexMetadataID import org.opensearch.indexmanagement.opensearchapi.parseWithType import org.opensearch.indexmanagement.opensearchapi.suspendUntil @@ -202,10 +198,10 @@ class TransportExplainAction @Inject constructor( "enabled" to managedIndex.enabled.toString() ) if (showPolicy) { - managedIndex.policy?.let { appliedPolicies[managedIndex.index] = it } + managedIndex.policy.let { appliedPolicies[managedIndex.index] = it } } if (validateAction) { - managedIndex.policy?.let { policiesforValidation[managedIndex.index] = it } + managedIndex.policy.let { policiesforValidation[managedIndex.index] = it } } } @@ -271,8 +267,7 @@ class TransportExplainAction @Inject constructor( clusterStateRequest, object : ActionListener { override fun onResponse(response: ClusterStateResponse) { - val clusterStateIndexMetadatas = response.state.metadata.indices - getMetadataMap(clusterStateIndexMetadatas, threadContext) + getMetadataMap(threadContext) } override fun onFailure(t: Exception) { @@ -281,11 +276,11 @@ class TransportExplainAction @Inject constructor( } ) } else { - getMetadataMap(null, threadContext) + getMetadataMap(threadContext) } } - private fun getMetadataMap(clusterStateIndexMetadatas: Map?, threadContext: ThreadContext.StoredContext) { + private fun getMetadataMap(threadContext: ThreadContext.StoredContext) { val mgetMetadataReq = MultiGetRequest() indexNamesToUUIDs.values.forEach { uuid -> mgetMetadataReq.add(MultiGetRequest.Item(INDEX_MANAGEMENT_INDEX, managedIndexMetadataID(uuid)).routing(uuid)) @@ -296,7 +291,7 @@ class TransportExplainAction @Inject constructor( override fun onResponse(response: MultiGetResponse) { val metadataMap: Map = response.responses.associate { it.id to getMetadata(it.response)?.toMap() } - buildResponse(indexNamesToUUIDs, metadataMap, clusterStateIndexMetadatas, threadContext) + buildResponse(indexNamesToUUIDs, metadataMap, threadContext) } override fun onFailure(t: Exception) { @@ -310,7 +305,6 @@ class TransportExplainAction @Inject constructor( private fun buildResponse( indices: Map, metadataMap: Map, - clusterStateIndexMetadatas: Map?, threadContext: ThreadContext.StoredContext ) { // cluster state response will not resist the sort order @@ -329,22 +323,13 @@ class TransportExplainAction @Inject constructor( if (metadataMapFromManagedIndex.isNotEmpty()) { managedIndexMetadata = ManagedIndexMetaData.fromMap(metadataMapFromManagedIndex) } - - // clusterStateIndexMetadatas will not be null only for the default index type - if (clusterStateIndexMetadatas != null) { - val currentIndexUuid = indices[indexName] - val clusterStateMetadata = clusterStateIndexMetadatas[indexName]?.getManagedIndexMetadata() - val metadataCheck = checkMetadata(clusterStateMetadata, configIndexMetadataMap, currentIndexUuid, log) - val info = metadataStatusToInfo[metadataCheck] - info?.let { managedIndexMetadata = clusterStateMetadata?.copy(info = it) } - } } if (validateAction) { var validationResult = actionValidation.validate("nothing", indexName) val policy = policiesforValidation[indexName] if (policy != null && managedIndexMetadata != null) { - val state = policy.getStateToExecute(managedIndexMetadata!!) - val action = state?.getActionToExecute(managedIndexMetadata!!, indexMetadataProvider) + val state = policy.getStateToExecute(managedIndexMetadata) + val action = state?.getActionToExecute(managedIndexMetadata, indexMetadataProvider) var actionName = action?.type if (actionName == null) { actionName = "nothing" @@ -445,13 +430,4 @@ class TransportExplainAction @Inject constructor( } } } - - companion object { - const val METADATA_MOVING_WARNING = "Managed index's metadata is pending migration." - const val METADATA_CORRUPT_WARNING = "Managed index's metadata is corrupt, please use remove policy API to clean it." - val metadataStatusToInfo = mapOf( - MetadataCheck.PENDING to mapOf("message" to METADATA_MOVING_WARNING), - MetadataCheck.CORRUPT to mapOf("message" to METADATA_CORRUPT_WARNING) - ) - } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/removepolicy/TransportRemovePolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/removepolicy/TransportRemovePolicyAction.kt index 2c74e4b1a..b9db84887 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/removepolicy/TransportRemovePolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/removepolicy/TransportRemovePolicyAction.kt @@ -48,7 +48,6 @@ import org.opensearch.indexmanagement.indexstatemanagement.util.DEFAULT_INDEX_TY import org.opensearch.indexmanagement.indexstatemanagement.util.FailedIndex import org.opensearch.indexmanagement.indexstatemanagement.util.deleteManagedIndexMetadataRequest import org.opensearch.indexmanagement.indexstatemanagement.util.deleteManagedIndexRequest -import org.opensearch.indexmanagement.indexstatemanagement.util.removeClusterStateMetadatas import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ISMIndexMetadata import org.opensearch.indexmanagement.util.IndexManagementException import org.opensearch.indexmanagement.util.SecurityUtils.Companion.buildUser @@ -352,8 +351,6 @@ class TransportRemovePolicyAction @Inject constructor( // clean metadata for indicesToRemove val indicesToRemoveMetadata = indicesToRemove.map { Index(it.value, it.key) } - // best effort - CoroutineScope(Dispatchers.IO).launch { removeClusterStateMetadatas(client, log, indicesToRemoveMetadata) } removeMetadatas(indicesToRemoveMetadata) } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/retryfailedmanagedindex/TransportRetryFailedManagedIndexAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/retryfailedmanagedindex/TransportRetryFailedManagedIndexAction.kt index fd21528eb..4d85b1c02 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/retryfailedmanagedindex/TransportRetryFailedManagedIndexAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/retryfailedmanagedindex/TransportRetryFailedManagedIndexAction.kt @@ -38,7 +38,6 @@ import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANA import org.opensearch.indexmanagement.indexstatemanagement.DefaultIndexMetadataService import org.opensearch.indexmanagement.indexstatemanagement.IndexMetadataProvider import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.buildMgetMetadataRequest -import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.getManagedIndexMetadata import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.mgetResponseToMap import org.opensearch.indexmanagement.indexstatemanagement.transport.action.ISMStatusResponse import org.opensearch.indexmanagement.indexstatemanagement.transport.action.managedIndex.ManagedIndexAction @@ -168,7 +167,7 @@ class TransportRetryFailedManagedIndexAction @Inject constructor( override fun onResponse(response: ClusterStateResponse) { val defaultIndexMetadataService = indexMetadataProvider.services[DEFAULT_INDEX_TYPE] as DefaultIndexMetadataService response.state.metadata.indices.forEach { - val indexUUID = defaultIndexMetadataService.getCustomIndexUUID(it.value) + val indexUUID = defaultIndexMetadataService.getIndexUUID(it.value) indexUuidToIndexMetadata[indexUUID] = it.value } processResponse() @@ -222,9 +221,6 @@ class TransportRetryFailedManagedIndexAction @Inject constructor( private fun onMgetMetadataResponse(mgetResponse: MultiGetResponse) { val metadataMap = mgetResponseToMap(mgetResponse) indicesToRetry.forEach { (indexUuid, indexName) -> - // indexMetaData and clusterStateMetadata will be null for non-default index types - val indexMetaData = indexUuidToIndexMetadata[indexUuid] - val clusterStateMetadata = indexMetaData?.getManagedIndexMetadata() val mgetFailure = metadataMap[managedIndexMetadataID(indexUuid)]?.second val managedIndexMetadata: ManagedIndexMetaData? = metadataMap[managedIndexMetadataID(indexUuid)]?.first when { @@ -233,11 +229,7 @@ class TransportRetryFailedManagedIndexAction @Inject constructor( mgetFailure != null -> failedIndices.add(FailedIndex(indexName, indexUuid, "Failed to get managed index metadata, $mgetFailure")) managedIndexMetadata == null -> { - if (clusterStateMetadata != null) { - failedIndices.add(FailedIndex(indexName, indexUuid, "Cannot retry until metadata has finished migrating")) - } else { - failedIndices.add(FailedIndex(indexName, indexUuid, "This index has no metadata information")) - } + failedIndices.add(FailedIndex(indexName, indexUuid, "This index has no metadata information")) } !managedIndexMetadata.isFailed -> failedIndices.add(FailedIndex(indexName, indexUuid, "This index is not in failed state.")) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/updateindexmetadata/TransportUpdateManagedIndexMetaDataAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/updateindexmetadata/TransportUpdateManagedIndexMetaDataAction.kt deleted file mode 100644 index 8d9f4f731..000000000 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/updateindexmetadata/TransportUpdateManagedIndexMetaDataAction.kt +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata - -import org.apache.logging.log4j.LogManager -import org.opensearch.core.action.ActionListener -import org.opensearch.action.support.ActionFilters -import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction -import org.opensearch.action.support.master.AcknowledgedResponse -import org.opensearch.cluster.ClusterState -import org.opensearch.cluster.ClusterStateTaskConfig -import org.opensearch.cluster.ClusterStateTaskExecutor -import org.opensearch.cluster.ClusterStateTaskExecutor.ClusterTasksResult -import org.opensearch.cluster.ClusterStateTaskListener -import org.opensearch.cluster.block.ClusterBlockException -import org.opensearch.cluster.block.ClusterBlockLevel -import org.opensearch.cluster.metadata.IndexMetadata -import org.opensearch.cluster.metadata.IndexNameExpressionResolver -import org.opensearch.cluster.metadata.Metadata -import org.opensearch.cluster.service.ClusterService -import org.opensearch.common.Priority -import org.opensearch.common.inject.Inject -import org.opensearch.core.common.io.stream.StreamInput -import org.opensearch.core.common.io.stream.Writeable -import org.opensearch.core.index.Index -import org.opensearch.indexmanagement.IndexManagementPlugin -import org.opensearch.indexmanagement.indexstatemanagement.IndexMetadataProvider -import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData -import org.opensearch.threadpool.ThreadPool -import org.opensearch.transport.TransportService - -class TransportUpdateManagedIndexMetaDataAction @Inject constructor( - threadPool: ThreadPool, - clusterService: ClusterService, - transportService: TransportService, - actionFilters: ActionFilters, - val indexMetadataProvider: IndexMetadataProvider, - indexNameExpressionResolver: IndexNameExpressionResolver -) : TransportClusterManagerNodeAction( - UpdateManagedIndexMetaDataAction.INSTANCE.name(), - transportService, - clusterService, - threadPool, - actionFilters, - Writeable.Reader { UpdateManagedIndexMetaDataRequest(it) }, - indexNameExpressionResolver -) { - - private val log = LogManager.getLogger(javaClass) - private val executor = ManagedIndexMetaDataExecutor() - - override fun checkBlock(request: UpdateManagedIndexMetaDataRequest, state: ClusterState): ClusterBlockException? { - // https://github.com/elastic/elasticsearch/commit/ae14b4e6f96b554ca8f4aaf4039b468f52df0123 - // This commit will help us to give each individual index name and the error that is cause it. For now it will be a generic error message. - val indicesToAddTo = request.indicesToAddManagedIndexMetaDataTo.map { it.first }.toTypedArray() - val indicesToRemoveFrom = request.indicesToRemoveManagedIndexMetaDataFrom.map { it }.toTypedArray() - val indices = checkExtensionsOverrideBlock(indicesToAddTo + indicesToRemoveFrom, state) - - return state.blocks.indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, indices) - } - - /* - * Index Management extensions may provide an index setting, which, if set to true, overrides the cluster metadata write block - */ - private fun checkExtensionsOverrideBlock(indices: Array, state: ClusterState): Array { - val indexBlockOverrideSettings = indexMetadataProvider.getIndexMetadataWriteOverrideSettings() - val indicesToBlock = indices.toMutableList() - indexBlockOverrideSettings.forEach { indexBlockOverrideSetting -> - indicesToBlock.removeIf { state.metadata.getIndexSafe(it).settings.getAsBoolean(indexBlockOverrideSetting, false) } - } - return indicesToBlock - .map { it.name } - .toTypedArray() - } - - override fun clusterManagerOperation( - request: UpdateManagedIndexMetaDataRequest, - state: ClusterState, - listener: ActionListener - ) { - clusterService.submitStateUpdateTask( - IndexManagementPlugin.OLD_PLUGIN_NAME, - ManagedIndexMetaDataTask(request.indicesToAddManagedIndexMetaDataTo, request.indicesToRemoveManagedIndexMetaDataFrom), - ClusterStateTaskConfig.build(Priority.NORMAL), - executor, - object : ClusterStateTaskListener { - override fun onFailure(source: String, e: Exception) = listener.onFailure(e) - - override fun clusterStateProcessed(source: String, oldState: ClusterState, newState: ClusterState) = - listener.onResponse(AcknowledgedResponse(true)) - } - ) - } - - override fun read(si: StreamInput): AcknowledgedResponse { - return AcknowledgedResponse(si) - } - - override fun executor(): String { - return ThreadPool.Names.SAME - } - - inner class ManagedIndexMetaDataExecutor : ClusterStateTaskExecutor { - - override fun execute(currentState: ClusterState, tasks: List): ClusterTasksResult { - val newClusterState = getUpdatedClusterState(currentState, tasks) - return ClusterTasksResult.builder().successes(tasks).build(newClusterState) - } - } - - fun getUpdatedClusterState(currentState: ClusterState, tasks: List): ClusterState { - // If there are no indices to make changes to, return early. - // Also doing this because when creating a metaDataBuilder and making no changes to it, for some - // reason the task does not complete, leading to indefinite suspension. - if (tasks.all { it.indicesToAddManagedIndexMetaDataTo.isEmpty() && it.indicesToRemoveManagedIndexMetaDataFrom.isEmpty() } - ) { - return currentState - } - log.trace("Start of building new cluster state") - val metaDataBuilder = Metadata.builder(currentState.metadata) - for (task in tasks) { - for (pair in task.indicesToAddManagedIndexMetaDataTo) { - if (currentState.metadata.hasIndex(pair.first.name)) { - metaDataBuilder.put( - IndexMetadata.builder(currentState.metadata.index(pair.first)) - .putCustom(ManagedIndexMetaData.MANAGED_INDEX_METADATA_TYPE, pair.second.toMap()) - ) - } else { - log.debug("No IndexMetadata found for [${pair.first.name}] when updating ManagedIndexMetaData") - } - } - - for (index in task.indicesToRemoveManagedIndexMetaDataFrom) { - if (currentState.metadata.hasIndex(index.name)) { - val indexMetaDataBuilder = IndexMetadata.builder(currentState.metadata.index(index)) - indexMetaDataBuilder.removeCustom(ManagedIndexMetaData.MANAGED_INDEX_METADATA_TYPE) - - metaDataBuilder.put(indexMetaDataBuilder) - } else { - log.debug("No IndexMetadata found for [${index.name}] when removing ManagedIndexMetaData") - } - } - } - log.trace("End of building new cluster state") - - return ClusterState.builder(currentState).metadata(metaDataBuilder).build() - } - - companion object { - data class ManagedIndexMetaDataTask( - val indicesToAddManagedIndexMetaDataTo: List>, - val indicesToRemoveManagedIndexMetaDataFrom: List - ) - } -} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/updateindexmetadata/UpdateManagedIndexMetaDataAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/updateindexmetadata/UpdateManagedIndexMetaDataAction.kt deleted file mode 100644 index 7078f2d59..000000000 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/updateindexmetadata/UpdateManagedIndexMetaDataAction.kt +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata - -import org.opensearch.action.ActionType -import org.opensearch.action.support.master.AcknowledgedResponse -import org.opensearch.core.common.io.stream.Writeable - -class UpdateManagedIndexMetaDataAction : ActionType(NAME, reader) { - - companion object { - const val NAME = "cluster:admin/ism/update/managedindexmetadata" - val INSTANCE = UpdateManagedIndexMetaDataAction() - - val reader = Writeable.Reader { AcknowledgedResponse(it) } - } - - override fun getResponseReader(): Writeable.Reader = reader -} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/updateindexmetadata/UpdateManagedIndexMetaDataRequest.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/updateindexmetadata/UpdateManagedIndexMetaDataRequest.kt deleted file mode 100644 index 5d5d31b69..000000000 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/updateindexmetadata/UpdateManagedIndexMetaDataRequest.kt +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata - -import org.opensearch.action.ActionRequestValidationException -import org.opensearch.action.ValidateActions.addValidationError -import org.opensearch.action.support.master.AcknowledgedRequest -import org.opensearch.core.common.io.stream.StreamInput -import org.opensearch.core.common.io.stream.StreamOutput -import org.opensearch.core.index.Index -import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData - -class UpdateManagedIndexMetaDataRequest : AcknowledgedRequest { - - var indicesToAddManagedIndexMetaDataTo: List> - private set - - var indicesToRemoveManagedIndexMetaDataFrom: List - private set - - constructor(si: StreamInput) : super(si) { - indicesToAddManagedIndexMetaDataTo = si.readList { - val index = Index(it) - val managedIndexMetaData = ManagedIndexMetaData.fromStreamInput(it) - Pair(index, managedIndexMetaData) - } - - indicesToRemoveManagedIndexMetaDataFrom = si.readList { Index(it) } - } - - constructor( - indicesToAddManagedIndexMetaDataTo: List> = listOf(), - indicesToRemoveManagedIndexMetaDataFrom: List = listOf() - ) { - this.indicesToAddManagedIndexMetaDataTo = indicesToAddManagedIndexMetaDataTo - this.indicesToRemoveManagedIndexMetaDataFrom = indicesToRemoveManagedIndexMetaDataFrom - } - - override fun validate(): ActionRequestValidationException? { - var validationException: ActionRequestValidationException? = null - - if (this.indicesToAddManagedIndexMetaDataTo.isEmpty() && this.indicesToRemoveManagedIndexMetaDataFrom.isEmpty()) { - validationException = addValidationError( - "At least one non-empty List must be given for UpdateManagedIndexMetaDataRequest", - validationException - ) - } - - return validationException - } - - override fun writeTo(streamOutput: StreamOutput) { - super.writeTo(streamOutput) - - streamOutput.writeCollection(indicesToAddManagedIndexMetaDataTo) { so, pair -> - pair.first.writeTo(so) - pair.second.writeTo(so) - } - - streamOutput.writeCollection(indicesToRemoveManagedIndexMetaDataFrom) { so, index -> - index.writeTo(so) - } - } -} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt index 309ce9931..a9bd2fe39 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt @@ -7,11 +7,8 @@ @file:JvmName("ManagedIndexUtils") package org.opensearch.indexmanagement.indexstatemanagement.util -// import inet.ipaddr.IPAddressString import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.withContext -// import org.apache.logging.log4j.LogManager -import org.apache.logging.log4j.Logger import org.opensearch.action.delete.DeleteRequest import org.opensearch.action.get.GetRequest import org.opensearch.action.get.GetResponse @@ -19,7 +16,6 @@ import org.opensearch.action.index.IndexRequest import org.opensearch.action.search.SearchRequest import org.opensearch.action.support.WriteRequest import org.opensearch.action.update.UpdateRequest -// import org.opensearch.alerting.destination.message.BaseMessage import org.opensearch.client.Client import org.opensearch.cluster.routing.Preference import org.opensearch.core.common.unit.ByteSizeValue @@ -67,7 +63,7 @@ fun managedIndexConfigIndexRequest( uuid: String, policyID: String, jobInterval: Int, - policy: Policy? = null, + policy: Policy, jobJitter: Double? ): IndexRequest { val managedIndexConfig = ManagedIndexConfig( @@ -80,8 +76,8 @@ fun managedIndexConfigIndexRequest( jobEnabledTime = Instant.now(), policyID = policyID, policy = policy, - policySeqNo = policy?.seqNo, - policyPrimaryTerm = policy?.primaryTerm, + policySeqNo = policy.seqNo, + policyPrimaryTerm = policy.primaryTerm, changePolicy = null, jobJitter = jobJitter ) @@ -417,25 +413,24 @@ fun ManagedIndexConfig.shouldChangePolicy(managedIndexMetaData: ManagedIndexMeta return true } - // we need this in so that we can change policy before the first transition happens so policy doesnt get completed + if (managedIndexMetaData.actionMetaData?.name != TransitionsAction.name) { + return false + } + + // we need this in so that we can change policy before the first transition happens so policy doesn't get completed // before we have a chance to change policy if (actionToExecute?.type == TransitionsAction.name) { return true } - if (managedIndexMetaData.actionMetaData?.name != TransitionsAction.name) { - return false - } - return true } -fun ManagedIndexMetaData.hasVersionConflict(managedIndexConfig: ManagedIndexConfig): Boolean = +fun ManagedIndexMetaData.hasDifferentPolicyVersion(managedIndexConfig: ManagedIndexConfig): Boolean = this.policySeqNo != managedIndexConfig.policySeqNo || this.policyPrimaryTerm != managedIndexConfig.policyPrimaryTerm fun ManagedIndexConfig.hasDifferentJobInterval(jobInterval: Int): Boolean { - val schedule = this.schedule - when (schedule) { + when (val schedule = this.schedule) { is IntervalSchedule -> { return schedule.interval != jobInterval } @@ -455,7 +450,7 @@ fun ManagedIndexConfig.hasDifferentJobInterval(jobInterval: Int): Boolean { * @param stateName the name of the state the [ManagedIndexConfig] is currently in * @param newPolicy the new (actual data model) policy we will eventually try to change to * @param changePolicy the change policy to change to - * @return if its safe to change + * @return if it's safe to change */ @Suppress("ReturnCount") fun Policy.isSafeToChange(stateName: String?, newPolicy: Policy, changePolicy: ChangePolicy): Boolean { @@ -486,71 +481,6 @@ fun Policy.isSafeToChange(stateName: String?, newPolicy: Policy, changePolicy: C */ fun Action.isAllowed(allowList: List): Boolean = allowList.contains(this.type) -/** - * Check if cluster state metadata has been moved to config index - * - * log warning if remaining cluster state metadata has newer last_updated_time - */ -@Suppress("ReturnCount", "ComplexCondition", "ComplexMethod") -fun checkMetadata( - clusterStateMetadata: ManagedIndexMetaData?, - configIndexMetadata: Any?, - currentIndexUuid: String?, - logger: Logger -): MetadataCheck { - // indexUuid saved in ISM metadata may be outdated - // if an index restored from snapshot - val indexUuid1 = clusterStateMetadata?.indexUuid - val indexUuid2 = when (configIndexMetadata) { - is ManagedIndexMetaData -> configIndexMetadata.indexUuid - is Map<*, *> -> configIndexMetadata["index_uuid"] - else -> null - } as String? - if ((indexUuid1 != null && indexUuid1 != currentIndexUuid) || - (indexUuid2 != null && indexUuid2 != currentIndexUuid) - ) { - return MetadataCheck.CORRUPT - } - - if (clusterStateMetadata != null) { - if (configIndexMetadata == null) return MetadataCheck.PENDING - - // compare last updated time between 2 metadatas - val t1 = clusterStateMetadata.stepMetaData?.startTime - val t2 = when (configIndexMetadata) { - is ManagedIndexMetaData -> configIndexMetadata.stepMetaData?.startTime - is Map<*, *> -> { - @Suppress("UNCHECKED_CAST") - val stepMetadata = configIndexMetadata["step"] as Map? - stepMetadata?.get("start_time") - } - else -> null - } as Long? - if (t1 != null && t2 != null && t1 > t2) { - logger.warn("Cluster state metadata get updates after moved for [${clusterStateMetadata.index}]") - } - } - return MetadataCheck.SUCCESS -} - -enum class MetadataCheck { - PENDING, CORRUPT, SUCCESS -} - -// private val baseMessageLogger = LogManager.getLogger(BaseMessage::class.java) -// -// fun BaseMessage.isHostInDenylist(networks: List): Boolean { -// val ipStr = IPAddressString(this.uri.host) -// for (network in networks) { -// val netStr = IPAddressString(network) -// if (netStr.contains(ipStr)) { -// baseMessageLogger.error("Host: {} resolves to: {} which is in denylist: {}.", uri.host, InetAddress.getByName(uri.host), netStr) -// return true -// } -// } -// return false -// } - @Suppress("BlockingMethodInNonBlockingContext") suspend fun getManagedIndexConfig(indexUuid: String, client: Client): ManagedIndexConfig? { val request = GetRequest().routing(indexUuid).index(INDEX_MANAGEMENT_INDEX).id(indexUuid) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/RestHandlerUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/RestHandlerUtils.kt index 5fe439716..4442ccc82 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/RestHandlerUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/RestHandlerUtils.kt @@ -6,11 +6,8 @@ @file:Suppress("TopLevelPropertyNaming", "MatchingDeclarationName") package org.opensearch.indexmanagement.indexstatemanagement.util -import org.apache.logging.log4j.Logger import org.opensearch.OpenSearchParseException import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest -import org.opensearch.action.support.master.AcknowledgedResponse -import org.opensearch.client.Client import org.opensearch.core.common.io.stream.StreamInput import org.opensearch.core.common.io.stream.StreamOutput import org.opensearch.core.common.io.stream.Writeable @@ -20,15 +17,10 @@ import org.opensearch.core.xcontent.ToXContent import org.opensearch.core.xcontent.ToXContentFragment import org.opensearch.core.xcontent.XContentBuilder import org.opensearch.common.xcontent.XContentFactory -import org.opensearch.core.index.Index import org.opensearch.indexmanagement.indexstatemanagement.model.ChangePolicy import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig -import org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.UpdateManagedIndexMetaDataAction -import org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.UpdateManagedIndexMetaDataRequest import org.opensearch.indexmanagement.opensearchapi.optionalTimeField -import org.opensearch.indexmanagement.opensearchapi.suspendUntil import org.opensearch.rest.RestRequest -import java.lang.Exception import java.time.Instant const val WITH_TYPE = "with_type" @@ -121,20 +113,6 @@ fun getPartialChangePolicyBuilder( return builder.endObject().endObject() } -/** - * Removes the managed index metadata from the cluster state for the the provided indices. - */ -suspend fun removeClusterStateMetadatas(client: Client, logger: Logger, indices: List) { - val request = UpdateManagedIndexMetaDataRequest(indicesToRemoveManagedIndexMetaDataFrom = indices) - - try { - val response: AcknowledgedResponse = client.suspendUntil { execute(UpdateManagedIndexMetaDataAction.INSTANCE, request, it) } - logger.debug("Cleaned cluster state metadata for $indices, ${response.isAcknowledged}") - } catch (e: Exception) { - logger.error("Failed to clean cluster state metadata for $indices") - } -} - const val MASTER_TIMEOUT_DEPRECATED_MESSAGE = "Parameter [master_timeout] is deprecated and will be removed in 3.0. " + "To support inclusive language, please use [cluster_manager_timeout] instead." diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupFieldValueExpressionResolver.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupFieldValueExpressionResolver.kt index f800b54cd..8446be029 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupFieldValueExpressionResolver.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupFieldValueExpressionResolver.kt @@ -24,6 +24,7 @@ object RollupFieldValueExpressionResolver { private lateinit var scriptService: ScriptService private lateinit var clusterService: ClusterService lateinit var indexAliasUtils: IndexAliasUtils + fun resolve(rollup: Rollup, fieldValue: String): String { val script = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, fieldValue, mapOf()) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt index 7bb838f0a..cbf3b0976 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt @@ -319,8 +319,7 @@ fun randomManagedIndexConfig( schedule: Schedule = IntervalSchedule(Instant.ofEpochMilli(Instant.now().toEpochMilli()), 5, ChronoUnit.MINUTES), lastUpdatedTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null, - policyID: String = OpenSearchRestTestCase.randomAlphaOfLength(10), - policy: Policy? = randomPolicy(), + policy: Policy = randomPolicy(), changePolicy: ChangePolicy? = randomChangePolicy(), jitter: Double? = 0.0 ): ManagedIndexConfig { @@ -332,10 +331,10 @@ fun randomManagedIndexConfig( jobSchedule = schedule, jobLastUpdatedTime = lastUpdatedTime, jobEnabledTime = enabledTime, - policyID = policy?.id ?: policyID, - policySeqNo = policy?.seqNo, - policyPrimaryTerm = policy?.primaryTerm, - policy = policy?.copy(seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM), + policyID = policy.id, + policySeqNo = policy.seqNo, + policyPrimaryTerm = policy.primaryTerm, + policy = policy.copy(seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM), changePolicy = changePolicy, jobJitter = jitter ) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestChangePolicyActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestChangePolicyActionIT.kt index 17f96cda6..aaf302e7c 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestChangePolicyActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestChangePolicyActionIT.kt @@ -220,7 +220,10 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() { // Will use the unique generated description to ensure they are the same policies, the cached policy does not have // id, seqNo, primaryTerm on the policy itself so cannot directly compare // TODO: figure out why the newPolicy.lastUpdatedTime and cached policy lastUpdatedTime is off by a few milliseconds - assertEquals("Initialized policy is not the change policy", newPolicy.description, updatedManagedIndexConfig.policy?.description) + assertEquals( + "Initialized policy is not the change policy", newPolicy.description, + updatedManagedIndexConfig.policy.description + ) } fun `test changing policy on a valid index and log pattern`() { @@ -529,7 +532,10 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() { // Will use the unique generated description to ensure they are the same policies, the cached policy does not have // id, seqNo, primaryTerm on the policy itself so cannot directly compare // TODO: figure out why the newPolicy.lastUpdatedTime and cached policy lastUpdatedTime is off by a few milliseconds - assertEquals("Initialized policy is not the change policy", newPolicy.description, config.policy?.description) + assertEquals( + "Initialized policy is not the change policy", newPolicy.description, + config.policy.description + ) config } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtilsTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtilsTests.kt index 3dd7b20ec..d6dfed16a 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtilsTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtilsTests.kt @@ -22,6 +22,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.model.Transition import org.opensearch.indexmanagement.indexstatemanagement.model.coordinator.SweptManagedIndexConfig import org.opensearch.indexmanagement.indexstatemanagement.randomChangePolicy import org.opensearch.indexmanagement.indexstatemanagement.randomClusterStateManagedIndexConfig +import org.opensearch.indexmanagement.indexstatemanagement.randomPolicy import org.opensearch.indexmanagement.indexstatemanagement.randomSweptManagedIndexConfig import org.opensearch.indexmanagement.opensearchapi.parseWithType import org.opensearch.test.OpenSearchTestCase @@ -34,7 +35,7 @@ class ManagedIndexUtilsTests : OpenSearchTestCase() { val index = randomAlphaOfLength(10) val uuid = randomAlphaOfLength(10) val policyID = randomAlphaOfLength(10) - val createRequest = managedIndexConfigIndexRequest(index, uuid, policyID, 5, jobJitter = 0.0) + val createRequest = managedIndexConfigIndexRequest(index, uuid, policyID, 5, randomPolicy(), jobJitter = 0.0) assertNotNull("IndexRequest not created", createRequest) assertEquals("Incorrect ism index used in request", INDEX_MANAGEMENT_INDEX, createRequest.index())