diff --git a/build.gradle b/build.gradle index 63779a569..6c64a01be 100644 --- a/build.gradle +++ b/build.gradle @@ -13,6 +13,7 @@ import java.util.function.Predicate buildscript { ext { + isSnapshot = "true" == System.getProperty("build.snapshot", "true") opensearch_version = System.getProperty("opensearch.version", "2.2.0-SNAPSHOT") buildVersionQualifier = System.getProperty("build.version_qualifier", "") diff --git a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt index 116a5b055..a516fbfd5 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt @@ -40,6 +40,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.MetadataService import org.opensearch.indexmanagement.indexstatemanagement.SkipExecution import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig import org.opensearch.indexmanagement.indexstatemanagement.model.Policy +import org.opensearch.indexmanagement.indexstatemanagement.resthandler.RestExecutePolicyAction import org.opensearch.indexmanagement.indexstatemanagement.resthandler.RestAddPolicyAction import org.opensearch.indexmanagement.indexstatemanagement.resthandler.RestChangePolicyAction import org.opensearch.indexmanagement.indexstatemanagement.resthandler.RestDeletePolicyAction @@ -56,6 +57,8 @@ import org.opensearch.indexmanagement.indexstatemanagement.transport.action.chan import org.opensearch.indexmanagement.indexstatemanagement.transport.action.changepolicy.TransportChangePolicyAction import org.opensearch.indexmanagement.indexstatemanagement.transport.action.deletepolicy.DeletePolicyAction import org.opensearch.indexmanagement.indexstatemanagement.transport.action.deletepolicy.TransportDeletePolicyAction +import org.opensearch.indexmanagement.indexstatemanagement.transport.action.executepolicy.ExecutePolicyAction +import org.opensearch.indexmanagement.indexstatemanagement.transport.action.executepolicy.TransportExecutePolicyAction import org.opensearch.indexmanagement.indexstatemanagement.transport.action.explain.ExplainAction import org.opensearch.indexmanagement.indexstatemanagement.transport.action.explain.TransportExplainAction import org.opensearch.indexmanagement.indexstatemanagement.transport.action.getpolicy.GetPoliciesAction @@ -330,13 +333,14 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin RestExplainTransformAction(), RestStartTransformAction(), RestStopTransformAction(), + RestExecutePolicyAction(), RestGetSMPolicyHandler(), RestStartSMPolicyHandler(), RestStopSMPolicyHandler(), RestExplainSMPolicyHandler(), RestDeleteSMPolicyHandler(), RestCreateSMPolicyHandler(), - RestUpdateSMPolicyHandler() + RestUpdateSMPolicyHandler(), ) } @@ -522,6 +526,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin ActionPlugin.ActionHandler(RemovePolicyAction.INSTANCE, TransportRemovePolicyAction::class.java), ActionPlugin.ActionHandler(RefreshSearchAnalyzerAction.INSTANCE, TransportRefreshSearchAnalyzerAction::class.java), ActionPlugin.ActionHandler(AddPolicyAction.INSTANCE, TransportAddPolicyAction::class.java), + ActionPlugin.ActionHandler(ExecutePolicyAction.INSTANCE, TransportExecutePolicyAction::class.java), ActionPlugin.ActionHandler(RetryFailedManagedIndexAction.INSTANCE, TransportRetryFailedManagedIndexAction::class.java), ActionPlugin.ActionHandler(ChangePolicyAction.INSTANCE, TransportChangePolicyAction::class.java), ActionPlugin.ActionHandler(IndexPolicyAction.INSTANCE, TransportIndexPolicyAction::class.java), diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt index d122272e3..da4db7cea 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt @@ -89,11 +89,14 @@ import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedInde import org.opensearch.indexmanagement.spi.indexstatemanagement.model.PolicyRetryInfoMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StateMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.indexmanagement.util.releaseLockForScheduledJob +import org.opensearch.indexmanagement.util.renewLockForScheduledJob import org.opensearch.jobscheduler.spi.JobExecutionContext import org.opensearch.jobscheduler.spi.LockModel import org.opensearch.jobscheduler.spi.ScheduledJobParameter import org.opensearch.jobscheduler.spi.ScheduledJobRunner import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule +import org.opensearch.jobscheduler.spi.utils.LockService import org.opensearch.rest.RestStatus import org.opensearch.script.Script import org.opensearch.script.ScriptService @@ -219,33 +222,49 @@ object ManagedIndexRunner : } // Attempt to acquire lock - val lock: LockModel? = context.lockService.suspendUntil { acquireLock(job, context, it) } + var lock: LockModel? = context.lockService.suspendUntil { acquireLock(job, context, it) } if (lock == null) { logger.debug("Could not acquire lock [${lock?.lockId}] for ${job.index}") } else { - runManagedIndexConfig(job, context) + if (job.continuous) { + var keepExecuting: Boolean = true + // Need to execute at least once for policy to initialize + do { + // Need to renew lock for current step execution + val renewedLock = renewLockForScheduledJob(context, lock as LockModel, errorNotificationRetryPolicy) + // Failed to renew lock + if (renewedLock == null) { + logger.error("Could not renew lock [${lock.lockId}] for ${job.index}") + break + } else { + lock = renewedLock + keepExecuting = runManagedIndexConfig(job, context.lockService) + } + } while ((job.continuous && keepExecuting)) // Runs until job is no longer continuous or execution should stop + } else { // If job is not continuous run once + runManagedIndexConfig(job, context.lockService) + } // Release lock - val released: Boolean = context.lockService.suspendUntil { release(lock, it) } - if (!released) { - logger.debug("Could not release lock [${lock.lockId}] for ${job.index}") + if (lock == null || !releaseLockForScheduledJob(context, lock)) { + logger.debug("Could not release lock [${lock?.lockId}] for ${job.index}") } } } } @Suppress("ReturnCount", "ComplexMethod", "LongMethod", "ComplexCondition", "NestedBlockDepth") - private suspend fun runManagedIndexConfig(managedIndexConfig: ManagedIndexConfig, jobContext: JobExecutionContext) { + private suspend fun runManagedIndexConfig(managedIndexConfig: ManagedIndexConfig, lock: LockService): Boolean { logger.debug("Run job for index ${managedIndexConfig.index}") // doing a check of local cluster health as we do not want to overload cluster manager node with potentially a lot of calls if (clusterIsRed()) { logger.debug("Skipping current execution of ${managedIndexConfig.index} because of red cluster health") - return + return false } val (managedIndexMetaData, getMetadataSuccess) = client.getManagedIndexMetadata(managedIndexConfig.indexUuid) if (!getMetadataSuccess) { logger.info("Failed to retrieve managed index metadata of index [${managedIndexConfig.index}] from config index, abort this run.") - return + return false } // Check the cluster state for the index metadata @@ -267,14 +286,14 @@ object ManagedIndexRunner : // If no index types had an index with a matching name and uuid combination, return if (!someTypeMatchedUuid) { logger.warn("Failed to find IndexMetadata for ${managedIndexConfig.index}.") - return + return false } } else { val clusterStateMetadata = clusterStateIndexMetadata.getManagedIndexMetadata() val metadataCheck = checkMetadata(clusterStateMetadata, managedIndexMetaData, managedIndexConfig.indexUuid, logger) if (metadataCheck != MetadataCheck.SUCCESS) { logger.info("Skipping execution while metadata status is $metadataCheck") - return + return false } } @@ -282,13 +301,13 @@ object ManagedIndexRunner : val policy = managedIndexConfig.policy if (policy == null || managedIndexMetaData == null) { initManagedIndex(managedIndexConfig, managedIndexMetaData) - return + return true } // If the policy was completed or failed then return early and disable job so it stops scheduling work if (managedIndexMetaData.policyCompleted == true || managedIndexMetaData.isFailed) { disableManagedIndexConfig(managedIndexConfig) - return + return false } if (managedIndexMetaData.hasVersionConflict(managedIndexConfig)) { @@ -302,13 +321,13 @@ object ManagedIndexRunner : if (result.metadataSaved) { disableManagedIndexConfig(managedIndexConfig) } - return + return false } val state = policy.getStateToExecute(managedIndexMetaData) val action: Action? = state?.getActionToExecute(managedIndexMetaData, indexMetadataProvider) val stepContext = StepContext( - managedIndexMetaData, clusterService, client, threadPool.threadContext, policy.user, scriptService, settings, jobContext.lockService + managedIndexMetaData, clusterService, client, threadPool.threadContext, policy.user, scriptService, settings, lock ) val step: Step? = action?.getStepToExecute(stepContext) val currentActionMetaData = action?.getUpdatedActionMetadata(managedIndexMetaData, state.name) @@ -317,7 +336,7 @@ object ManagedIndexRunner : // then disable the job and return early if (!indexStateManagementEnabled && step != null && step.isSafeToDisableOn) { disableManagedIndexConfig(managedIndexConfig) - return + return false } if (action?.hasTimedOut(currentActionMetaData) == true) { @@ -328,19 +347,24 @@ object ManagedIndexRunner : .copy(actionMetaData = currentActionMetaData?.copy(failed = true), info = info) ) if (updated.metadataSaved) disableManagedIndexConfig(managedIndexConfig) - return + return false } if (managedIndexConfig.shouldChangePolicy(managedIndexMetaData, action)) { - initChangePolicy(managedIndexConfig, managedIndexMetaData, action) - return + return if (initChangePolicy(managedIndexConfig, managedIndexMetaData, action)) { + // Don't want to continue execution on old Managed Index Config + false + } else { + // Managed index config was not updated to safe to continue execution of this job + true + } } val shouldBackOff = action?.shouldBackoff(currentActionMetaData, action.configRetry) if (shouldBackOff?.first == true) { // If we should back off then exit early. logger.info("Backoff for retrying. Remaining time ${shouldBackOff.second}") - return + return false } if (managedIndexMetaData.stepMetaData?.stepStatus == Step.StepStatus.STARTING) { @@ -353,8 +377,10 @@ object ManagedIndexRunner : policyRetryInfo = PolicyRetryInfoMetaData(true, 0), info = info ) ) - if (updated.metadataSaved) disableManagedIndexConfig(managedIndexConfig) - return + if (updated.metadataSaved) { + disableManagedIndexConfig(managedIndexConfig) + return false + } } } @@ -368,20 +394,20 @@ object ManagedIndexRunner : ) ) if (updated.metadataSaved) disableManagedIndexConfig(managedIndexConfig) - return + return false } // If this action is not allowed and the step to be executed is the first step in the action then we will fail // as this action has been removed from the AllowList, but if its not the first step we will let it finish as it's already inflight if (action?.isAllowed(allowList) == false && step != null && action.isFirstStep(step.name) && action.type != TransitionsAction.name) { - val info = mapOf("message" to "Attempted to execute action=${action?.type} which is not allowed.") + val info = mapOf("message" to "Attempted to execute action=${action.type} which is not allowed.") val updated = updateManagedIndexMetaData( managedIndexMetaData.copy( policyRetryInfo = PolicyRetryInfoMetaData(true, 0), info = info ) ) if (updated.metadataSaved) disableManagedIndexConfig(managedIndexConfig) - return + return false } // If any of State, Action, Step components come back as null then we are moving to error in ManagedIndexMetaData @@ -412,12 +438,11 @@ object ManagedIndexRunner : executedManagedIndexMetaData = executedManagedIndexMetaData.copy(info = mutableInfo.toMap()) } } - if (executedManagedIndexMetaData.isSuccessfulDelete) { GlobalScope.launch(Dispatchers.IO + CoroutineName("ManagedIndexMetaData-AddHistory")) { ismHistory.addManagedIndexMetaDataHistory(listOf(executedManagedIndexMetaData)) } - return + return false } // If a custom action deletes some off-cluster index and has deleteIndexMetadataAfterFinish set to true, @@ -426,18 +451,25 @@ object ManagedIndexRunner : if (action.isFinishedSuccessfully(executedManagedIndexMetaData)) { if (action.deleteIndexMetadataAfterFinish()) { deleteFromManagedIndex(managedIndexConfig, action.type) - return } } if (!updateManagedIndexMetaData(executedManagedIndexMetaData, updateResult).metadataSaved) { logger.error("Failed to update ManagedIndexMetaData after executing the Step : ${step.name}") + return false } if (managedIndexConfig.hasDifferentJobInterval(jobInterval)) { updateJobInterval(managedIndexConfig, jobInterval) } + // Check that transition condition evaluated to false + if (executedManagedIndexMetaData.stepMetaData?.stepStatus == Step.StepStatus.CONDITION_NOT_MET) { + return false + } + // Made it to end of successful execution block + return true } + return false } private suspend fun initManagedIndex(managedIndexConfig: ManagedIndexConfig, managedIndexMetaData: ManagedIndexMetaData?) { @@ -688,18 +720,19 @@ object ManagedIndexRunner : * Initializes the change policy process where we will get the policy using the change policy's policyID, update the [ManagedIndexMetaData] * to reflect the new policy, and save the new policy to the [ManagedIndexConfig] while resetting the change policy to null */ + // Returning true if Managed Index Config was updated to avoid continuous execution on outdated Managed Index Config @Suppress("ReturnCount", "ComplexMethod") private suspend fun initChangePolicy( managedIndexConfig: ManagedIndexConfig, managedIndexMetaData: ManagedIndexMetaData, actionToExecute: Action? - ) { + ): Boolean { // should never happen since we only call this if there is a changePolicy, but we'll do it to make changePolicy non null val changePolicy = managedIndexConfig.changePolicy if (changePolicy == null) { logger.debug("initChangePolicy was called with a null ChangePolicy, ManagedIndexConfig: $managedIndexConfig") - return + return false } // get the policy we'll attempt to change to @@ -744,7 +777,7 @@ object ManagedIndexRunner : // if it is unsafe to change then we set safe back to false so we don't keep doing this check every execution if (managedIndexConfig.policy?.isSafeToChange(managedIndexMetaData.stateMetaData?.name, policy, changePolicy) != true) { updateManagedIndexConfig(managedIndexConfig.copy(changePolicy = managedIndexConfig.changePolicy.copy(isSafe = false))) - return + return true } } } @@ -757,10 +790,13 @@ object ManagedIndexRunner : * */ val updated = updateManagedIndexMetaData(updatedManagedIndexMetaData) - if (!updated.metadataSaved || policy == null) return + if (!updated.metadataSaved || policy == null) { + return false + } // Change the policy and user stored on the job from changePolicy, this will also set the changePolicy to null on the job - savePolicyToManagedIndexConfig(managedIndexConfig, policy.copy(user = changePolicy.user)) + // Return a flag to make sure saving correct + return savePolicyToManagedIndexConfig(managedIndexConfig, policy.copy(user = changePolicy.user)) } @Suppress("TooGenericExceptionCaught") diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ChangePolicy.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ChangePolicy.kt index 87fd71d6e..b649084a6 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ChangePolicy.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ChangePolicy.kt @@ -35,7 +35,9 @@ data class ChangePolicy( val state: String?, val include: List, val isSafe: Boolean, - val user: User? = null + val user: User? = null, + val continuous: Boolean? = null + ) : Writeable, ToXContentObject { @Throws(IOException::class) @@ -46,16 +48,19 @@ data class ChangePolicy( isSafe = sin.readBoolean(), user = if (sin.readBoolean()) { User(sin) - } else null + } else null, + continuous = if (sin.readBoolean()) { sin.readBoolean() } else null ) override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { - builder - .startObject() + builder.startObject() .field(ManagedIndexConfig.POLICY_ID_FIELD, policyID) .field(StateMetaData.STATE, state) .field(IS_SAFE_FIELD, isSafe) if (params.paramAsBoolean(WITH_USER, true)) builder.optionalUserField(USER_FIELD, user) + if (continuous != null) { + builder.field(ManagedIndexConfig.CONTINUOUS, continuous) + } return builder.endObject() } @@ -67,6 +72,8 @@ data class ChangePolicy( out.writeBoolean(isSafe) out.writeBoolean(user != null) user?.writeTo(out) + out.writeBoolean(continuous != null) + if (continuous != null) out.writeBoolean(continuous) } companion object { @@ -75,6 +82,7 @@ data class ChangePolicy( const val INCLUDE_FIELD = "include" const val IS_SAFE_FIELD = "is_safe" const val USER_FIELD = "user" + const val CONTINUOUS_FIELD = "continuous" @JvmStatic @Throws(IOException::class) @@ -83,6 +91,7 @@ data class ChangePolicy( var state: String? = null var isSafe: Boolean = false var user: User? = null + var continuous: Boolean? = null val include = mutableListOf() ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp) @@ -103,6 +112,7 @@ data class ChangePolicy( USER_FIELD -> { user = if (xcp.currentToken() == Token.VALUE_NULL) null else User.parse(xcp) } + CONTINUOUS_FIELD -> continuous = xcp.booleanValue() else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in ChangePolicy.") } } @@ -112,7 +122,8 @@ data class ChangePolicy( state, include.toList(), isSafe, - user + user, + continuous ) } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexConfig.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexConfig.kt index acaabc864..973843f56 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexConfig.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexConfig.kt @@ -37,7 +37,8 @@ data class ManagedIndexConfig( val policyPrimaryTerm: Long?, val policy: Policy?, val changePolicy: ChangePolicy?, - val jobJitter: Double? + val jobJitter: Double?, + val continuous: Boolean ) : ScheduledJobParameter { init { @@ -81,6 +82,7 @@ data class ManagedIndexConfig( .field(POLICY_FIELD, policy, XCONTENT_WITHOUT_TYPE) .field(CHANGE_POLICY_FIELD, changePolicy) .field(JITTER, jobJitter) + .field(CONTINUOUS, continuous) builder.endObject() return builder.endObject() } @@ -100,6 +102,7 @@ data class ManagedIndexConfig( const val POLICY_PRIMARY_TERM_FIELD = "policy_primary_term" const val CHANGE_POLICY_FIELD = "change_policy" const val JITTER = "jitter" + const val CONTINUOUS = "continuous" @Suppress("ComplexMethod", "LongMethod") @JvmStatic @@ -124,6 +127,7 @@ data class ManagedIndexConfig( var policyPrimaryTerm: Long? = SequenceNumbers.UNASSIGNED_PRIMARY_TERM var policySeqNo: Long? = SequenceNumbers.UNASSIGNED_SEQ_NO var jitter: Double? = null + var continuous: Boolean = false ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp) while (xcp.nextToken() != Token.END_OBJECT) { @@ -154,6 +158,7 @@ data class ManagedIndexConfig( JITTER -> { jitter = if (xcp.currentToken() == Token.VALUE_NULL) null else xcp.doubleValue() } + CONTINUOUS -> continuous = xcp.booleanValue() else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in ManagedIndexConfig.") } } @@ -183,7 +188,8 @@ data class ManagedIndexConfig( primaryTerm = policyPrimaryTerm ?: SequenceNumbers.UNASSIGNED_PRIMARY_TERM ), changePolicy = changePolicy, - jobJitter = jitter + jobJitter = jitter, + continuous = continuous ) } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestAddPolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestAddPolicyAction.kt index da55fb2d0..181fcf20a 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestAddPolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestAddPolicyAction.kt @@ -63,7 +63,9 @@ class RestAddPolicyAction : BaseRestHandler() { val policyID = requireNotNull(body.getOrDefault("policy_id", null)) { "Missing policy_id" } - val addPolicyRequest = AddPolicyRequest(indices.toList(), policyID as String, indexType) + val continuous = body?.get("continuous") as Boolean + + val addPolicyRequest = AddPolicyRequest(indices.toList(), policyID as String, indexType, continuous) return RestChannelConsumer { channel -> client.execute(AddPolicyAction.INSTANCE, addPolicyRequest, RestToXContentListener(channel)) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestChangePolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestChangePolicyAction.kt index ebca535f5..94f490156 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestChangePolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestChangePolicyAction.kt @@ -58,8 +58,8 @@ class RestChangePolicyAction : BaseRestHandler() { val xcp = request.contentParser() ensureExpectedToken(Token.START_OBJECT, xcp.nextToken(), xcp) val changePolicy = ChangePolicy.parse(xcp) - - val changePolicyRequest = ChangePolicyRequest(indices.toList(), changePolicy, indexType) + val continuous = if (changePolicy.continuous != null) changePolicy.continuous as Boolean else false + val changePolicyRequest = ChangePolicyRequest(indices.toList(), changePolicy.copy(continuous = null), indexType, continuous = continuous) return RestChannelConsumer { channel -> client.execute(ChangePolicyAction.INSTANCE, changePolicyRequest, RestToXContentListener(channel)) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestExecutePolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestExecutePolicyAction.kt new file mode 100644 index 000000000..d2e29bd6d --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestExecutePolicyAction.kt @@ -0,0 +1,65 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.resthandler + +import org.opensearch.client.node.NodeClient +import org.opensearch.common.Strings +import org.opensearch.common.xcontent.XContentHelper +import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.ISM_BASE_URI +import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.LEGACY_ISM_BASE_URI +import org.opensearch.indexmanagement.indexstatemanagement.transport.action.executepolicy.ExecutePolicyAction +import org.opensearch.indexmanagement.indexstatemanagement.transport.action.executepolicy.ExecutePolicyRequest +import org.opensearch.rest.BaseRestHandler +import org.opensearch.rest.BaseRestHandler.RestChannelConsumer +import org.opensearch.rest.RestHandler.ReplacedRoute +import org.opensearch.rest.RestHandler.Route +import org.opensearch.rest.RestRequest +import org.opensearch.rest.RestRequest.Method.POST +import org.opensearch.rest.action.RestToXContentListener +import java.io.IOException + +class RestExecutePolicyAction : BaseRestHandler() { + override fun getName(): String = "execute_policy_action" + + override fun routes(): List { + return emptyList() + } + + override fun replacedRoutes(): List { + return listOf( + ReplacedRoute( + POST, "$ISM_BASE_URI/execute", + POST, "$LEGACY_ISM_BASE_URI/execute" + ), + ReplacedRoute( + POST, "$ISM_BASE_URI/execute/{index}", + POST, "$LEGACY_ISM_BASE_URI/{index}" + ) + ) + } + + @Throws(IOException::class) + @Suppress("SpreadOperator") // There is no way around dealing with java vararg without spread operator. + override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { + val indices: Array? = Strings.splitStringByCommaToArray(request.param("index")) + + if (indices.isNullOrEmpty()) { + throw IllegalArgumentException("Missing indices") + } + + val body = if (request.hasContent()) { + XContentHelper.convertToMap(request.requiredContent(), false, request.xContentType).v2() + } else { + mapOf() + } + + val executePolicyRequest = ExecutePolicyRequest(indices.toList()) + + return RestChannelConsumer { channel -> + client.execute(ExecutePolicyAction.INSTANCE, executePolicyRequest, RestToXContentListener(channel)) + } + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/transition/AttemptTransitionStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/transition/AttemptTransitionStep.kt index 843987b4e..7add32837 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/transition/AttemptTransitionStep.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/transition/AttemptTransitionStep.kt @@ -25,6 +25,7 @@ import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaDat import org.opensearch.rest.RestStatus import org.opensearch.transport.RemoteTransportException import java.time.Instant +import org.opensearch.common.unit.TimeValue class AttemptTransitionStep(private val action: TransitionsAction) : Step(name) { @@ -57,11 +58,19 @@ class AttemptTransitionStep(private val action: TransitionsAction) : Step(name) if (indexCreationDate == -1L) { logger.warn("$indexName had an indexCreationDate=-1L, cannot use for comparison") } + val indexAgeTimeValue = if (indexCreationDate == -1L) { + logger.warn("$indexName had an indexCreationDate=-1L, cannot use for comparison") + // since we cannot use for comparison, we can set it to 0 as minAge will never be <= 0 + TimeValue.timeValueMillis(0) + } else { + TimeValue.timeValueMillis(Instant.now().toEpochMilli() - indexCreationDate) + } val stepStartTime = getStepStartTime(context.metadata) var numDocs: Long? = null var indexSize: ByteSizeValue? = null val rolloverDate: Instant? = if (inCluster) indexMetadata.getOldestRolloverTime() else null + if (transitions.any { it.conditions?.rolloverAge !== null }) { // if we have a transition with rollover age condition, then we must have a rollover date // otherwise fail this transition @@ -115,7 +124,16 @@ class AttemptTransitionStep(private val action: TransitionsAction) : Step(name) stepStatus = StepStatus.CONDITION_NOT_MET message = getEvaluatingMessage(indexName) } - info = mapOf("message" to message) + + // store conditions in a map to add to info + val conditions = listOfNotNull( + "index creation date" to indexCreationDate, + "Number of docs" to numDocs, + "Index Size" to indexSize, + "Step Start Time" to stepStartTime, + "RolloverDate" to rolloverDate + ).toMap() + info = mapOf("message" to message, "conditions" to conditions) } catch (e: RemoteTransportException) { handleException(indexName, ExceptionsHelper.unwrapCause(e) as Exception) } catch (e: Exception) { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/AddPolicyRequest.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/AddPolicyRequest.kt index c9ac3bb51..3f7f354ae 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/AddPolicyRequest.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/AddPolicyRequest.kt @@ -16,14 +16,16 @@ import java.io.IOException class AddPolicyRequest( val indices: List, val policyID: String, - val indexType: String + val indexType: String, + val continuous: Boolean ) : ActionRequest() { @Throws(IOException::class) constructor(sin: StreamInput) : this( indices = sin.readStringList(), policyID = sin.readString(), - indexType = sin.readString() + indexType = sin.readString(), + continuous = sin.readBoolean() ) override fun validate(): ActionRequestValidationException? { @@ -44,6 +46,7 @@ class AddPolicyRequest( out.writeStringCollection(indices) out.writeString(policyID) out.writeString(indexType) + out.writeBoolean(continuous) } companion object { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/TransportAddPolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/TransportAddPolicyAction.kt index 0efdd0686..57d83289c 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/TransportAddPolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/TransportAddPolicyAction.kt @@ -345,10 +345,9 @@ class TransportAddPolicyAction @Inject constructor( val bulkReq = BulkRequest().timeout(TimeValue.timeValueMillis(bulkReqTimeout)) indicesToAdd.forEach { (uuid, name) -> bulkReq.add( - managedIndexConfigIndexRequest(name, uuid, request.policyID, jobInterval, policy = policy.copy(user = this.user), jobJitter) + managedIndexConfigIndexRequest(name, uuid, request.policyID, jobInterval, policy = policy.copy(user = this.user), jobJitter, continuous = request.continuous) ) } - client.bulk( bulkReq, object : ActionListener { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/ChangePolicyRequest.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/ChangePolicyRequest.kt index fcec1bf74..64f7e8f6b 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/ChangePolicyRequest.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/ChangePolicyRequest.kt @@ -17,14 +17,16 @@ import java.io.IOException class ChangePolicyRequest( val indices: List, val changePolicy: ChangePolicy, - val indexType: String + val indexType: String, + val continuous: Boolean = false ) : ActionRequest() { @Throws(IOException::class) constructor(sin: StreamInput) : this( indices = sin.readStringList(), changePolicy = ChangePolicy(sin), - indexType = sin.readString() + indexType = sin.readString(), + continuous = sin.readBoolean() ) override fun validate(): ActionRequestValidationException? { @@ -45,6 +47,7 @@ class ChangePolicyRequest( out.writeStringCollection(indices) changePolicy.writeTo(out) out.writeString(indexType) + out.writeBoolean(if (continuous != null) continuous else false) } companion object { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/TransportChangePolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/TransportChangePolicyAction.kt index fed9cf828..a916628ae 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/TransportChangePolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/TransportChangePolicyAction.kt @@ -370,7 +370,7 @@ class TransportChangePolicyAction @Inject constructor( val currentStateName = indexUuidToCurrentState[sweptConfig.uuid] val updatedChangePolicy = changePolicy .copy(isSafe = sweptConfig.policy?.isSafeToChange(currentStateName, policy, changePolicy) == true, user = this.user) - bulkUpdateManagedIndexRequest.add(updateManagedIndexRequest(sweptConfig.copy(changePolicy = updatedChangePolicy))) + bulkUpdateManagedIndexRequest.add(updateManagedIndexRequest(sweptConfig.copy(changePolicy = updatedChangePolicy), continuous = request.continuous)) mapOfItemIdToIndex[id] = Index(sweptConfig.index, sweptConfig.uuid) } client.bulk( diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/ExecutePolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/ExecutePolicyAction.kt new file mode 100644 index 000000000..1b584f55d --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/ExecutePolicyAction.kt @@ -0,0 +1,16 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.transport.action.executepolicy + +import org.opensearch.action.ActionType +import org.opensearch.action.support.master.AcknowledgedResponse + +class ExecutePolicyAction private constructor() : ActionType(NAME, ::AcknowledgedResponse) { + companion object { + val INSTANCE = ExecutePolicyAction() + const val NAME = "cluster:admin/opendistro/ism/managedindex/execute" + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/ExecutePolicyRequest.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/ExecutePolicyRequest.kt new file mode 100644 index 000000000..d6ac75824 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/ExecutePolicyRequest.kt @@ -0,0 +1,34 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.transport.action.executepolicy + +import org.opensearch.action.ActionRequest +import org.opensearch.action.ActionRequestValidationException +import org.opensearch.action.ValidateActions +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import java.io.IOException + +class ExecutePolicyRequest( + val indices: List +) : ActionRequest() { + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + indices = sin.readStringList() + ) + + override fun validate(): ActionRequestValidationException? { + var validationException: ActionRequestValidationException? = null + if (indices.isEmpty()) { + validationException = ValidateActions.addValidationError("Missing indices", validationException) + } + return validationException + } + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeStringCollection(indices) + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/TransportExecutePolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/TransportExecutePolicyAction.kt new file mode 100644 index 000000000..52f004ef4 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/TransportExecutePolicyAction.kt @@ -0,0 +1,125 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.transport.action.executepolicy + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext +import org.apache.logging.log4j.LogManager +import org.opensearch.action.ActionListener +import org.opensearch.action.support.ActionFilters +import org.opensearch.action.support.HandledTransportAction +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.inject.Inject +import org.opensearch.common.xcontent.NamedXContentRegistry +import org.opensearch.commons.ConfigConstants +import org.opensearch.commons.authuser.User +import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexRunner +import org.opensearch.tasks.Task +import org.opensearch.transport.TransportService +import org.opensearch.action.support.master.AcknowledgedResponse +import org.opensearch.client.node.NodeClient +import org.opensearch.jobscheduler.spi.utils.LockService +import org.opensearch.ExceptionsHelper +import org.opensearch.action.admin.cluster.state.ClusterStateRequest +import org.opensearch.action.admin.cluster.state.ClusterStateResponse +import org.opensearch.action.get.MultiGetRequest +import org.opensearch.action.get.MultiGetResponse +import org.opensearch.action.support.IndicesOptions +import org.opensearch.jobscheduler.spi.JobDocVersion +import org.opensearch.jobscheduler.spi.JobExecutionContext +import java.time.Instant + +private val log = LogManager.getLogger(TransportExecutePolicyAction::class.java) +private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO) + +class TransportExecutePolicyAction @Inject constructor( + transportService: TransportService, + val client: NodeClient, + private val clusterService: ClusterService, + private val runner: ManagedIndexRunner, + actionFilters: ActionFilters, + val xContentRegistry: NamedXContentRegistry, + val request: ExecutePolicyRequest +) : HandledTransportAction ( + ExecutePolicyAction.NAME, transportService, actionFilters, ::ExecutePolicyRequest +) { + + override fun doExecute(task: Task, execPolicyRequest: ExecutePolicyRequest, actionListener: ActionListener) { + val userStr = client.threadPool().threadContext.getTransient(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT) + log.debug("User and roles string from thread context: $userStr") + val user: User? = User.parse(userStr) + val indices = mutableSetOf() + + runner.launch { + try { + val lock = LockService(client, clusterService) + // Temp context in order to pass into runJob + val newContext = JobExecutionContext(Instant.now(), JobDocVersion(0L, 0L, 0L), lock, "", "") + // Need to get managed index metadata to pass into runJob + // runner.runJob(None, newContext) + } catch (e: Exception) { + log.error("Unexpected error trying to execute policy") + withContext(Dispatchers.IO) { + actionListener.onFailure(ExceptionsHelper.unwrapCause(e) as Exception) + } + } + } + fun getExistingManagedIndices() { + + val multiGetReq = MultiGetRequest() + + client.multiGet( + multiGetReq, + object : ActionListener { + override fun onResponse(response: MultiGetResponse) { + + response.forEach { + // get managed index configs + } + } + + override fun onFailure(e: java.lang.Exception?) { + actionListener.onFailure(ExceptionsHelper.unwrapCause(e) as Exception) + } + } + ) + } + fun getClusterState() { + val strictExpandOptions = IndicesOptions.strictExpand() + + val clusterStateRequest = ClusterStateRequest() + .clear() + .indices(*request.indices.toTypedArray()) + .metadata(true) + .local(false) + .indicesOptions(strictExpandOptions) + + client.threadPool().threadContext.stashContext().use { + client.admin() + .cluster() + .state( + clusterStateRequest, + object : ActionListener { + override fun onResponse(response: ClusterStateResponse) { + val indexMetadatas = response.state.metadata.indices + indexMetadatas.forEach { + indices.add(it.value.indexUUID) + } + + getExistingManagedIndices() + } + + override fun onFailure(t: Exception) { + actionListener.onFailure(ExceptionsHelper.unwrapCause(t) as Exception) + } + } + ) + } + } + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainResponse.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainResponse.kt index cda65f548..d33289126 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainResponse.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainResponse.kt @@ -28,14 +28,16 @@ open class ExplainResponse : ActionResponse, ToXContentObject { val totalManagedIndices: Int val enabledState: Map val policies: Map - + val continuous: Map + @Suppress("LongParameterList") constructor( indexNames: List, indexPolicyIDs: List, indexMetadatas: List, totalManagedIndices: Int, enabledState: Map, - policies: Map + policies: Map, + continuous: Map ) : super() { this.indexNames = indexNames this.indexPolicyIDs = indexPolicyIDs @@ -43,6 +45,7 @@ open class ExplainResponse : ActionResponse, ToXContentObject { this.totalManagedIndices = totalManagedIndices this.enabledState = enabledState this.policies = policies + this.continuous = continuous } @Throws(IOException::class) @@ -52,7 +55,8 @@ open class ExplainResponse : ActionResponse, ToXContentObject { indexMetadatas = sin.readList { ManagedIndexMetaData.fromStreamInput(it) }, totalManagedIndices = sin.readInt(), enabledState = sin.readMap() as Map, - policies = sin.readMap(StreamInput::readString, ::Policy) + policies = sin.readMap(StreamInput::readString, ::Policy), + continuous = sin.readMap() as Map ) @Throws(IOException::class) @@ -67,6 +71,7 @@ open class ExplainResponse : ActionResponse, ToXContentObject { { _out, key -> _out.writeString(key) }, { _out, policy -> policy.writeTo(_out) } ) + out.writeMap(continuous) } override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { @@ -78,6 +83,7 @@ open class ExplainResponse : ActionResponse, ToXContentObject { indexMetadatas[ind]?.toXContent(builder, ToXContent.EMPTY_PARAMS) builder.field("enabled", enabledState[name]) policies[name]?.let { builder.field(Policy.POLICY_TYPE, it, XCONTENT_WITHOUT_TYPE_AND_USER) } + builder.field("continuous", continuous[name]) builder.endObject() } builder.field(TOTAL_MANAGED_INDICES, totalManagedIndices) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt index ee7b7a33d..797a3a4d5 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt @@ -83,7 +83,6 @@ class TransportExplainAction @Inject constructor( ) : HandledTransportAction( ExplainAction.NAME, transportService, actionFilters, ::ExplainRequest ) { - override fun doExecute(task: Task, request: ExplainRequest, listener: ActionListener) { ExplainHandler(client, listener, request).start() } @@ -117,6 +116,7 @@ class TransportExplainAction @Inject constructor( private val indexMetadatas = mutableListOf() private var totalManagedIndices = 0 private val appliedPolicies: MutableMap = mutableMapOf() + private val continuousField: MutableMap = mutableMapOf() @Suppress("SpreadOperator", "NestedBlockDepth") fun start() { @@ -193,6 +193,7 @@ class TransportExplainAction @Inject constructor( "policy_id" to managedIndex.policyID, "enabled" to managedIndex.enabled.toString() ) + continuousField[managedIndex.index] = managedIndex.continuous if (showPolicy) { managedIndex.policy?.let { appliedPolicies[managedIndex.index] = it } } @@ -204,7 +205,7 @@ class TransportExplainAction @Inject constructor( // edge case: if specify query param pagination size to be 0 // we still show total managed indices indexNames.clear() - sendResponse(indexNames, indexMetadatas, indexPolicyIDs, enabledState, totalManagedIndices, appliedPolicies) + sendResponse(indexNames, indexMetadatas, indexPolicyIDs, enabledState, totalManagedIndices, appliedPolicies, continuousField) return } else { // Clear and add the managedIndices from the response to preserve the sort order and size @@ -230,7 +231,7 @@ class TransportExplainAction @Inject constructor( return } indexNames.clear() - sendResponse(indexNames, indexMetadatas, indexPolicyIDs, enabledState, totalManagedIndices, appliedPolicies) + sendResponse(indexNames, indexMetadatas, indexPolicyIDs, enabledState, totalManagedIndices, appliedPolicies, continuousField) return } actionListener.onFailure(ExceptionsHelper.unwrapCause(t) as Exception) @@ -327,7 +328,7 @@ class TransportExplainAction @Inject constructor( managedIndicesMetaDataMap.clear() if (user == null || indexNames.isEmpty()) { - sendResponse(indexNames, indexMetadatas, indexPolicyIDs, enabledState, totalManagedIndices, appliedPolicies) + sendResponse(indexNames, indexMetadatas, indexPolicyIDs, enabledState, totalManagedIndices, appliedPolicies, continuousField) } else { filterAndSendResponse(threadContext) } @@ -340,29 +341,95 @@ class TransportExplainAction @Inject constructor( val filteredPolicies = mutableListOf() val enabledStatus = mutableMapOf() val filteredAppliedPolicies = mutableMapOf() + filter(0, filteredIndices, filteredMetadata, filteredPolicies, enabledStatus, filteredAppliedPolicies, continuousField) + } - CoroutineScope(Dispatchers.IO).launch { - // filter out indicies for which user doesn't have manage index permissions - for (i in 0 until indexNames.count()) { - val request = ManagedIndexRequest().indices(indexNames[i]) - try { - client.suspendUntil { execute(ManagedIndexAction.INSTANCE, request, it) } - filteredIndices.add(indexNames[i]) - filteredMetadata.add(indexMetadatas[i]) - filteredPolicies.add(indexPolicyIDs[i]) - enabledState[indexNames[i]]?.let { enabledStatus[indexNames[i]] = it } - appliedPolicies[indexNames[i]]?.let { filteredAppliedPolicies[indexNames[i]] = it } - } catch (e: OpenSearchSecurityException) { - totalManagedIndices -= 1 - } catch (e: Exception) { - actionListener.onFailure(e) + @Suppress("LongParameterList") + private fun filter( + current: Int, + filteredIndices: MutableList, + filteredMetadata: MutableList, + filteredPolicies: MutableList, + enabledStatus: MutableMap, + filteredAppliedPolicies: MutableMap, + continuousStatus: MutableMap + ) { + val request = ManagedIndexRequest().indices(indexNames[current]) + client.execute( + ManagedIndexAction.INSTANCE, + request, + object : ActionListener { + override fun onResponse(response: AcknowledgedResponse) { + filteredIndices.add(indexNames[current]) + filteredMetadata.add(indexMetadatas[current]) + filteredPolicies.add(indexPolicyIDs[current]) + enabledState[indexNames[current]]?.let { enabledStatus[indexNames[current]] = it } + appliedPolicies[indexNames[current]]?.let { filteredAppliedPolicies[indexNames[current]] = it } + continuousField[indexNames[current]]?.let { continuousStatus[indexNames[current]] = it } + if (current < indexNames.count() - 1) { + // do nothing - skip the index and go to next one + filter(current + 1, filteredIndices, filteredMetadata, filteredPolicies, enabledStatus, filteredAppliedPolicies, continuousStatus) + } else { + sendResponse( + filteredIndices, filteredMetadata, filteredPolicies, enabledStatus, + totalManagedIndices, filteredAppliedPolicies, continuousStatus + ) + } + } + + override fun onFailure(e: Exception) { + when (e is OpenSearchSecurityException) { + true -> { + totalManagedIndices -= 1 + if (current < indexNames.count() - 1) { + // do nothing - skip the index and go to next one + filter( + current + 1, + filteredIndices, + filteredMetadata, + filteredPolicies, + enabledStatus, + filteredAppliedPolicies, + continuousStatus + ) + } else { + sendResponse( + filteredIndices, filteredMetadata, filteredPolicies, enabledStatus, + totalManagedIndices, filteredAppliedPolicies, continuousStatus + ) + } + } + + false -> { + actionListener.onFailure(e) + } + } + + CoroutineScope(Dispatchers.IO).launch { + // filter out indicies for which user doesn't have manage index permissions + for (i in 0 until indexNames.count()) { + val request = ManagedIndexRequest().indices(indexNames[i]) + try { + client.suspendUntil { execute(ManagedIndexAction.INSTANCE, request, it) } + filteredIndices.add(indexNames[i]) + filteredMetadata.add(indexMetadatas[i]) + filteredPolicies.add(indexPolicyIDs[i]) + enabledState[indexNames[i]]?.let { enabledStatus[indexNames[i]] = it } + appliedPolicies[indexNames[i]]?.let { filteredAppliedPolicies[indexNames[i]] = it } + } catch (e: OpenSearchSecurityException) { + totalManagedIndices -= 1 + } catch (e: Exception) { + actionListener.onFailure(e) + } + } + sendResponse( + filteredIndices, filteredMetadata, filteredPolicies, enabledStatus, + totalManagedIndices, filteredAppliedPolicies, continuousField + ) + } } } - sendResponse( - filteredIndices, filteredMetadata, filteredPolicies, enabledStatus, - totalManagedIndices, filteredAppliedPolicies - ) - } + ) } @Suppress("LongParameterList") @@ -372,9 +439,10 @@ class TransportExplainAction @Inject constructor( policyIDs: List, enabledStatus: Map, totalIndices: Int, - policies: Map + policies: Map, + continuous: Map, ) { - actionListener.onResponse(ExplainResponse(indices, policyIDs, metadata, totalIndices, enabledStatus, policies)) + actionListener.onResponse(ExplainResponse(indices, policyIDs, metadata, totalIndices, enabledStatus, policies, continuous)) } @Suppress("ReturnCount") diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt index bea3ed57c..30ecbfded 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt @@ -67,7 +67,8 @@ fun managedIndexConfigIndexRequest( policyID: String, jobInterval: Int, policy: Policy? = null, - jobJitter: Double? + jobJitter: Double?, + continuous: Boolean = false ): IndexRequest { val managedIndexConfig = ManagedIndexConfig( jobName = index, @@ -82,7 +83,8 @@ fun managedIndexConfigIndexRequest( policySeqNo = policy?.seqNo, policyPrimaryTerm = policy?.primaryTerm, changePolicy = null, - jobJitter = jobJitter + jobJitter = jobJitter, + continuous = continuous ) return IndexRequest(INDEX_MANAGEMENT_INDEX) @@ -164,11 +166,11 @@ fun deleteManagedIndexMetadataRequest(uuid: String): DeleteRequest { return DeleteRequest(INDEX_MANAGEMENT_INDEX, managedIndexMetadataID(uuid)).routing(uuid) } -fun updateManagedIndexRequest(sweptManagedIndexConfig: SweptManagedIndexConfig): UpdateRequest { +fun updateManagedIndexRequest(sweptManagedIndexConfig: SweptManagedIndexConfig, continuous: Boolean?): UpdateRequest { return UpdateRequest(INDEX_MANAGEMENT_INDEX, sweptManagedIndexConfig.uuid) .setIfPrimaryTerm(sweptManagedIndexConfig.primaryTerm) .setIfSeqNo(sweptManagedIndexConfig.seqNo) - .doc(getPartialChangePolicyBuilder(sweptManagedIndexConfig.changePolicy)) + .doc(getPartialChangePolicyBuilder(sweptManagedIndexConfig.changePolicy, continuous = continuous)) } /** @@ -377,7 +379,7 @@ fun ManagedIndexMetaData.getCompletedManagedIndexMetaData( val ManagedIndexMetaData.isSuccessfulDelete: Boolean get() = (this.actionMetaData?.name == DeleteAction.name && !this.actionMetaData!!.failed) && - (this.stepMetaData?.name == DeleteAction.name && this.stepMetaData!!.stepStatus == Step.StepStatus.COMPLETED) && + (this.stepMetaData?.name == "attempt_delete" && this.stepMetaData!!.stepStatus == Step.StepStatus.COMPLETED) && (this.policyRetryInfo?.failed != true) val ManagedIndexMetaData.isFailed: Boolean diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/RestHandlerUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/RestHandlerUtils.kt index c4118321f..f315cbcba 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/RestHandlerUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/RestHandlerUtils.kt @@ -103,13 +103,15 @@ data class FailedIndex(val name: String, val uuid: String, val reason: String) : * Gets the XContentBuilder for partially updating a [ManagedIndexConfig]'s ChangePolicy */ fun getPartialChangePolicyBuilder( - changePolicy: ChangePolicy? + changePolicy: ChangePolicy?, + continuous: Boolean? ): XContentBuilder { val builder = XContentFactory.jsonBuilder() .startObject() .startObject(ManagedIndexConfig.MANAGED_INDEX_TYPE) .optionalTimeField(ManagedIndexConfig.LAST_UPDATED_TIME_FIELD, Instant.now()) .field(ManagedIndexConfig.CHANGE_POLICY_FIELD, changePolicy) + if (continuous != null) { builder.field(ManagedIndexConfig.CONTINUOUS, continuous) } return builder.endObject().endObject() } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMRunner.kt index 3e36ed28d..14d96c859 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMRunner.kt @@ -61,7 +61,7 @@ object SMRunner : this.clusterService = clusterService return this } - + @Suppress("MagicNumber") private val backoffPolicy: BackoffPolicy = BackoffPolicy.exponentialBackoff( TimeValue.timeValueMillis(1000L), 3 ) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/SMStateMachine.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/SMStateMachine.kt index ae17e25ce..c2d18a65c 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/SMStateMachine.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/SMStateMachine.kt @@ -30,6 +30,7 @@ import org.opensearch.threadpool.ThreadPool import java.time.Instant.now @OpenForTesting +@Suppress("MagicNumber") class SMStateMachine( val client: Client, val job: SMPolicy, diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMPolicy.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMPolicy.kt index 60ccac14b..cec3232dd 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMPolicy.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMPolicy.kt @@ -36,7 +36,7 @@ import java.time.Instant import java.time.Instant.now import java.time.ZoneId import java.time.temporal.ChronoUnit - +@Suppress("MaxLineLength") data class SMPolicy( val id: String, val description: String? = null, @@ -53,7 +53,6 @@ data class SMPolicy( val notificationConfig: NotificationConfig? = null, val user: User? = null, ) : ScheduledJobParameter, Writeable { - init { require(snapshotConfig["repository"] != null && snapshotConfig["repository"] != "") { "Must provide the repository in snapshot config." } require(creation.schedule.getNextExecutionTime(now()) != null) { "Next execution time from the creation schedule is null, please provide a valid cron expression." } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/action/delete/TransportDeleteTransformsAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/action/delete/TransportDeleteTransformsAction.kt index 913229246..5da2fc554 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/action/delete/TransportDeleteTransformsAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/action/delete/TransportDeleteTransformsAction.kt @@ -108,7 +108,7 @@ class TransportDeleteTransformsAction @Inject constructor( } } - @Suppress("LongMethod") + @Suppress("LongMethod", "NestedBlockDepth") private fun bulkDelete(response: MultiGetResponse, ids: List, forceDelete: Boolean, actionListener: ActionListener) { val enabledIDs = mutableListOf() val notTransform = mutableListOf() diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/action/explain/TransportExplainTransformAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/action/explain/TransportExplainTransformAction.kt index 51c4d14ae..ef3a16a5e 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/action/explain/TransportExplainTransformAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/action/explain/TransportExplainTransformAction.kt @@ -90,6 +90,7 @@ class TransportExplainTransformAction @Inject constructor( client.search( searchRequest, object : ActionListener { + @Suppress("LongMethod") override fun onResponse(response: SearchResponse) { val metadataIdToTransform: MutableMap = HashMap() try { diff --git a/src/main/resources/mappings/opendistro-ism-config.json b/src/main/resources/mappings/opendistro-ism-config.json index 67c5fc47e..cb68653bb 100644 --- a/src/main/resources/mappings/opendistro-ism-config.json +++ b/src/main/resources/mappings/opendistro-ism-config.json @@ -1,6 +1,6 @@ { "_meta" : { - "schema_version": 16 + "schema_version": 17 }, "dynamic": "strict", "properties": { @@ -668,6 +668,9 @@ }, "jitter": { "type": "double" + }, + "continuous": { + "type": "boolean" } } }, diff --git a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementIndicesIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementIndicesIT.kt index 22e9f9d03..ad77be6af 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementIndicesIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementIndicesIT.kt @@ -166,14 +166,14 @@ class IndexManagementIndicesIT : IndexStateManagementRestTestCase() { val addPolicyResponse = client().makeRequest( RestRequest.Method.POST.toString(), "${RestAddPolicyAction.LEGACY_ADD_POLICY_BASE_URI}/$indexName", - StringEntity("{ \"policy_id\": \"$policyId\" }", ContentType.APPLICATION_JSON) + StringEntity("{ \"policy_id\": \"$policyId\", \"continuous\": false }", ContentType.APPLICATION_JSON) ) assertEquals("Unexpected RestStatus", RestStatus.OK, addPolicyResponse.restStatus()) val changePolicyResponse = client().makeRequest( RestRequest.Method.POST.toString(), "${RestAddPolicyAction.LEGACY_ADD_POLICY_BASE_URI}/$indexName", - StringEntity("{ \"policy_id\": \"$policyId\" }", ContentType.APPLICATION_JSON) + StringEntity("{ \"policy_id\": \"$policyId\", \"continuous\": false }", ContentType.APPLICATION_JSON) ) assertEquals("Unexpected RestStatus", RestStatus.OK, changePolicyResponse.restStatus()) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt index d22d6876d..db7ce6a7b 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt @@ -27,7 +27,7 @@ import javax.management.remote.JMXServiceURL abstract class IndexManagementRestTestCase : ODFERestTestCase() { - val configSchemaVersion = 16 + val configSchemaVersion = 17 val historySchemaVersion = 5 // Having issues with tests leaking into other tests and mappings being incorrect and they are not caught by any pending task wait check as diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementIntegTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementIntegTestCase.kt index 540049fe1..dd6043aac 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementIntegTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementIntegTestCase.kt @@ -151,11 +151,13 @@ abstract class IndexStateManagementIntegTestCase : OpenSearchIntegTestCase() { protected fun addPolicyToIndex( index: String, - policyID: String + policyID: String, + continuous: Boolean = false ) { val body = """ { - "policy_id": "$policyID" + "policy_id": "$policyID", + "continuous": $continuous } """.trimIndent() val response = getRestClient() diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt index be64799ff..2fd47d869 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt @@ -167,7 +167,8 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase() replicas: String? = null, shards: String? = null, mapping: String = "", - settings: Settings? = null + settings: Settings? = null, + continuous: Boolean = false ): Pair { val waitForActiveShards = if (isMultiNode) "all" else "1" val builtSettings = Settings.builder().let { @@ -185,7 +186,7 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase() val aliases = if (alias == null) "" else "\"$alias\": { \"is_write_index\": true }" createIndex(index, builtSettings, mapping, aliases) if (policyID != null) { - addPolicyToIndex(index, policyID) + addPolicyToIndex(index, policyID, continuous) } return index to policyID } @@ -244,11 +245,13 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase() protected fun addPolicyToIndex( index: String, - policyID: String + policyID: String, + continuous: Boolean = false ) { val body = """ { - "policy_id": "$policyID" + "policy_id": "$policyID", + "continuous": $continuous } """.trimIndent() val response = client().makeRequest("POST", "/_opendistro/_ism/add/$index", StringEntity(body, APPLICATION_JSON)) @@ -438,8 +441,8 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase() if (state != null) { string += "\"${ChangePolicy.STATE_FIELD}\":\"$state\"," } - string += "\"${ChangePolicy.INCLUDE_FIELD}\":${include.map { "{\"${StateFilter.STATE_FIELD}\":\"${it.state}\"}" }}}" - + string += "\"${ChangePolicy.INCLUDE_FIELD}\":${include.map { "{\"${StateFilter.STATE_FIELD}\":\"${it.state}\"}" }}" + string += if (continuous != null) ",\"${ChangePolicy.CONTINUOUS_FIELD}\":\"$continuous\"}" else "}" return StringEntity(string, APPLICATION_JSON) } @@ -615,7 +618,7 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase() } // make sure metadata is initialised - assertTrue(metadata.transitionTo != null || metadata.stateMetaData != null || metadata.info != null || metadata.policyCompleted != null) + assertTrue("transitionTo: ${metadata.transitionTo} stateMetaData: ${metadata.stateMetaData}, metadata info: ${metadata.info}, policyCompleted: ${metadata.policyCompleted} ", metadata.transitionTo != null || metadata.stateMetaData != null || metadata.info != null || metadata.policyCompleted != null) return metadata } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt index aad56c02b..41dac6208 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt @@ -302,7 +302,8 @@ fun randomManagedIndexConfig( policyID: String = OpenSearchRestTestCase.randomAlphaOfLength(10), policy: Policy? = randomPolicy(), changePolicy: ChangePolicy? = randomChangePolicy(), - jitter: Double? = 0.0 + jitter: Double? = 0.0, + continuous: Boolean = false ): ManagedIndexConfig { return ManagedIndexConfig( jobName = name, @@ -317,7 +318,8 @@ fun randomManagedIndexConfig( policyPrimaryTerm = policy?.primaryTerm, policy = policy?.copy(seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM), changePolicy = changePolicy, - jobJitter = jitter + jobJitter = jitter, + continuous = continuous ) } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ActionRetryIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ActionRetryIT.kt index cf05dd34d..42b97b138 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ActionRetryIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ActionRetryIT.kt @@ -42,12 +42,10 @@ class ActionRetryIT : IndexStateManagementRestTestCase() { // Change the start time so the job will trigger in 2 seconds. // First execution. We need to initialize the policy. updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } // Second execution is to fail the step once. updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { val managedIndexMetaData = getExplainManagedIndexMetaData(indexName) assertEquals( @@ -57,13 +55,11 @@ class ActionRetryIT : IndexStateManagementRestTestCase() { ), managedIndexMetaData.actionMetaData ) - assertEquals(expectedInfoString, managedIndexMetaData.info.toString()) } // Third execution is to fail the step second time. updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { val managedIndexMetaData = getExplainManagedIndexMetaData(indexName) assertEquals( @@ -79,10 +75,10 @@ class ActionRetryIT : IndexStateManagementRestTestCase() { // Fourth execution is to fail the step third time and finally fail the action. updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { val managedIndexMetaData = getExplainManagedIndexMetaData(indexName) assertEquals( + "failing this one: ", ActionMetaData( "rollover", managedIndexMetaData.actionMetaData?.startTime, 0, true, 2, managedIndexMetaData.actionMetaData?.lastRetryTime, null @@ -161,7 +157,8 @@ class ActionRetryIT : IndexStateManagementRestTestCase() { PolicyRetryInfoMetaData.RETRY_INFO to fun(retryInfoMetaDataMap: Any?): Boolean = assertRetryInfoEquals(PolicyRetryInfoMetaData(false, 0), retryInfoMetaDataMap), ManagedIndexMetaData.INFO to fun(info: Any?): Boolean = expectedInfoString == info.toString(), - ManagedIndexMetaData.ENABLED to true::equals + ManagedIndexMetaData.ENABLED to true::equals, + "continuous" to false::equals ) ), getExplainMap(indexName) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/DeleteActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/DeleteActionIT.kt index ff7265df9..c74bffef3 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/DeleteActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/DeleteActionIT.kt @@ -8,6 +8,7 @@ package org.opensearch.indexmanagement.indexstatemanagement.action import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase import org.opensearch.indexmanagement.indexstatemanagement.model.Policy import org.opensearch.indexmanagement.indexstatemanagement.model.State +import org.opensearch.indexmanagement.indexstatemanagement.model.Transition import org.opensearch.indexmanagement.indexstatemanagement.randomErrorNotification import org.opensearch.indexmanagement.waitFor import java.time.Instant @@ -78,4 +79,36 @@ class DeleteActionIT : IndexStateManagementRestTestCase() { // ) // } } + // Want to test continuos execution occurs on policy + fun `test continuous index deletes immediately`() { + val indexName = "${testIndexName}_delete_continuous" + val policyID = "continuous_delete" + val actionConfig = DeleteAction(0) + val states = listOf( + State("State1", listOf(), listOf(Transition("State2", null))), + State("State2", listOf(), listOf(Transition("State3", null))), + State("State3", listOf(), listOf(Transition("DeleteState", null))), + State("DeleteState", listOf(actionConfig), listOf()) + ) + + val policy = Policy( + id = policyID, + description = "$testIndexName description", + schemaVersion = 1L, + lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), + errorNotification = randomErrorNotification(), + defaultState = states[0].name, + states = states + ) + createPolicy(policy, policyID) + createIndex(indexName, policyID = policy.id, continuous = true) + waitFor { assertIndexExists(indexName) } + + val managedIndexConfig = getExistingManagedIndexConfig(indexName) + // Change the start time so the job will trigger in 2 seconds. + updateManagedIndexConfigStartTime(managedIndexConfig) + // confirm index does not exist anymore + // should delete in one cycle + waitFor { assertIndexDoesNotExist(indexName) } + } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ForceMergeActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ForceMergeActionIT.kt index 9866f175a..773d18d94 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ForceMergeActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ForceMergeActionIT.kt @@ -139,4 +139,43 @@ class ForceMergeActionIT : IndexStateManagementRestTestCase() { assertTrue("Segment count for [$indexName] after force merge is incorrect", validateSegmentCount(indexName, min = 1, max = 1)) assertEquals("true", getIndexBlocksWriteSetting(indexName)) } + // Test Force Merge works on continuous indices + fun `test continuous force merge`() { + val indexName = "${testIndexName}_continuous_index" + val policyID = "${testIndexName}_testPolicyName_1" + + // Create a Policy with one State that only preforms a force_merge Action + val forceMergeActionConfig = ForceMergeAction(maxNumSegments = 1, index = 0) + val states = listOf(State("ForceMergeState", listOf(forceMergeActionConfig), listOf())) + + val policy = Policy( + id = policyID, + description = "$indexName description", + schemaVersion = 1L, + lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), + errorNotification = randomErrorNotification(), + defaultState = states[0].name, + states = states + ) + + waitFor { createPolicy(policy, policyID) } + waitFor { createIndex(indexName, policyID, continuous = true) } + val managedIndexConfig = getExistingManagedIndexConfig(indexName) + + // Add sample data to increase segment count, passing in a delay to ensure multiple segments get created + insertSampleData(indexName, 3, 0) + + waitFor { assertTrue("Segment count for [$indexName] was less than expected", validateSegmentCount(indexName, min = 2)) } + + // Will change the startTime each execution so that it triggers in 2 seconds + // 1. Initialize, 2. Set to read only, 3. Merge shards into 1 segment + updateManagedIndexConfigStartTime(managedIndexConfig) + + // Check force merge executed correctly + waitFor { assertTrue("Segment count for [$indexName] after force merge is incorrect", validateSegmentCount(indexName, min = 1, max = 1)) } + // verify we reset actionproperties at end of forcemerge + waitFor { assertNull("maxNumSegments was not reset", getExplainManagedIndexMetaData(indexName).actionMetaData?.actionProperties) } + // index should still be readonly after force merge finishes + waitFor { assertEquals("true", getIndexBlocksWriteSetting(indexName)) } + } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ShrinkActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ShrinkActionIT.kt index 0bdf32ab0..fedc6c7da 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ShrinkActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ShrinkActionIT.kt @@ -688,4 +688,59 @@ class ShrinkActionIT : IndexStateManagementRestTestCase() { ) } } + // Test that shrink action works on continuous indices + fun `test continuous shrink`() { + val indexName = "${testIndexName}_index_1" + val policyID = "${testIndexName}_testPolicyName_1" + + val shrinkAction = ShrinkAction( + numNewShards = 1, + maxShardSize = null, + percentageOfSourceShards = null, + targetIndexTemplate = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, "{{ctx.index}}$testIndexSuffix", mapOf()), + aliases = listOf(Alias("test-alias1"), Alias("test-alias2").filter(QueryBuilders.termQuery("foo", "bar")).writeIndex(true)), + forceUnsafe = true, + index = 0 + ) + val states = listOf(State("ShrinkState", listOf(shrinkAction), listOf())) + + val policy = Policy( + id = policyID, + description = "$testIndexName description", + schemaVersion = 11L, + lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), + errorNotification = randomErrorNotification(), + defaultState = states[0].name, + states = states + ) + + createPolicy(policy, policyID) + createIndex(indexName, policyID, null, "0", "3", "", continuous = true) + + insertSampleData(indexName, 3) + + // Set the index as readonly to check that the setting is preserved after the shrink finishes + updateIndexSetting(indexName, IndexMetadata.SETTING_BLOCKS_WRITE, "true") + + // Will change the startTime each execution so that it triggers in 2 seconds + // First execution: Policy is initialized + val managedIndexConfig = getExistingManagedIndexConfig(indexName) + // Starts and executes entire shrink action + updateManagedIndexConfigStartTime(managedIndexConfig) + + val targetIndexName = indexName + testIndexSuffix + + // Check that wait for shrink step executed + waitFor(Instant.ofEpochSecond(60)) { + // one primary and one replica + assertTrue(getIndexShards(targetIndexName).size == 2) + assertEquals( + WaitForShrinkStep.SUCCESS_MESSAGE, + getExplainManagedIndexMetaData(indexName).info?.get("message") + ) + assertEquals("Write block setting was not reset after successful shrink", "true", getIndexBlocksWriteSetting(indexName)) + val aliases = getAlias(targetIndexName, "") + assertTrue("Aliases were not added to shrunken index", aliases.containsKey("test-alias1") && aliases.containsKey("test-alias2")) + } + } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/SnapshotActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/SnapshotActionIT.kt index e1bd328d7..e84ed4a20 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/SnapshotActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/SnapshotActionIT.kt @@ -321,4 +321,37 @@ class SnapshotActionIT : IndexStateManagementRestTestCase() { assertEquals(AttemptSnapshotStep.getBlockedMessage(denyList, repository, indexName), getExplainManagedIndexMetaData(indexName).info?.get("message")) } } + // Test entire action executes on a continuous index + fun `test snapshot on continuous index`() { + val indexName = "${testIndexName}_continuous_index" + val policyID = "${testIndexName}_policy" + val repository = "repository" + val snapshot = "snapshot" + val actionConfig = SnapshotAction(repository, snapshot, 0) + val states = listOf( + State("Snapshot", listOf(actionConfig), listOf()) + ) + + waitFor { createRepository(repository) } + + val policy = Policy( + id = policyID, + description = "$testIndexName description", + schemaVersion = 1L, + lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), + errorNotification = randomErrorNotification(), + defaultState = states[0].name, + states = states + ) + createPolicy(policy, policyID) + createIndex(indexName, policyID, continuous = true) + + val managedIndexConfig = getExistingManagedIndexConfig(indexName) + + // Change the start time so the job will trigger in 2 seconds. + updateManagedIndexConfigStartTime(managedIndexConfig) + + waitFor { assertSnapshotExists(repository, "snapshot") } + waitFor { assertSnapshotFinishedWithSuccess(repository, "snapshot") } + } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransitionActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransitionActionIT.kt index 80f153738..e34bcf1f6 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransitionActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransitionActionIT.kt @@ -66,7 +66,6 @@ class TransitionActionIT : IndexStateManagementRestTestCase() { // Should have evaluated to true waitFor { assertEquals(AttemptTransitionStep.getSuccessMessage(indexName, secondStateName), getExplainManagedIndexMetaData(indexName).info?.get("message")) } } - fun `test rollover age transition for index with no rollover fails`() { val indexName = "${testIndexName}_rollover_age_no_rollover" val policyID = "${testIndexName}_rollover_age_no_rollover_policy" @@ -142,4 +141,34 @@ class TransitionActionIT : IndexStateManagementRestTestCase() { // Should have evaluated to true waitFor { assertEquals(AttemptTransitionStep.getSuccessMessage(indexName, secondStateName), getExplainManagedIndexMetaData(indexName).info?.get("message")) } } + // Test that continuous index stops when condition is not met + fun `test continuous index stops when condition not met`() { + val indexName = "${testIndexName}_continuous_condition_check" + val policyID = "continuous_condition_check" + val states = listOf( + State("State1", listOf(), listOf(Transition("State2", null))), + // Should stop here + State("State2", listOf(), listOf(Transition("State3", Conditions(indexAge = TimeValue.timeValueDays(30L))))), + State("State3", listOf(), listOf()) + ) + + val policy = Policy( + id = policyID, + description = "$testIndexName description", + schemaVersion = 1L, + lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), + errorNotification = randomErrorNotification(), + defaultState = states[0].name, + states = states + ) + createPolicy(policy, policyID) + createIndex(indexName, policyID = policy.id, continuous = true) + waitFor { assertIndexExists(indexName) } + + val managedIndexConfig = getExistingManagedIndexConfig(indexName) + // Change the start time so the job will trigger in 2 seconds. + updateManagedIndexConfigStartTime(managedIndexConfig) + // confirm index stopped at State2 + waitFor { assertEquals("State2", getExplainManagedIndexMetaData(indexName).stateMetaData?.name) } + } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/coordinator/ManagedIndexCoordinatorIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/coordinator/ManagedIndexCoordinatorIT.kt index 605e0a1c4..38df3fc5f 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/coordinator/ManagedIndexCoordinatorIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/coordinator/ManagedIndexCoordinatorIT.kt @@ -92,7 +92,8 @@ class ManagedIndexCoordinatorIT : IndexStateManagementRestTestCase() { policyID == null, explainResponseOpenSearchPolicyIdSetting to fun(policyID: Any?): Boolean = policyID == null, - ManagedIndexMetaData.ENABLED to fun(enabled: Any?): Boolean = enabled == null + ManagedIndexMetaData.ENABLED to fun(enabled: Any?): Boolean = enabled == null, + "continuous" to fun(enabled: Any?): Boolean = enabled == null ) ), getExplainMap(index), diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/ISMTemplateRestAPIIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/ISMTemplateRestAPIIT.kt index 563c3de50..3568bf0a1 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/ISMTemplateRestAPIIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/ISMTemplateRestAPIIT.kt @@ -121,7 +121,8 @@ class ISMTemplateRestAPIIT : IndexStateManagementRestTestCase() { indexName1 to listOf( explainResponseOpendistroPolicyIdSetting to fun(policyID: Any?): Boolean = policyID == null, explainResponseOpenSearchPolicyIdSetting to fun(policyID: Any?): Boolean = policyID == null, - ManagedIndexMetaData.ENABLED to fun(enabled: Any?): Boolean = enabled == null + ManagedIndexMetaData.ENABLED to fun(enabled: Any?): Boolean = enabled == null, + "continuous" to fun(enabled: Any?): Boolean = enabled == null ) ), getExplainMap(indexName1), @@ -135,7 +136,8 @@ class ISMTemplateRestAPIIT : IndexStateManagementRestTestCase() { indexName1 to listOf( explainResponseOpendistroPolicyIdSetting to fun(policyID: Any?): Boolean = policyID == null, explainResponseOpenSearchPolicyIdSetting to fun(policyID: Any?): Boolean = policyID == null, - ManagedIndexMetaData.ENABLED to fun(enabled: Any?): Boolean = enabled == null + ManagedIndexMetaData.ENABLED to fun(enabled: Any?): Boolean = enabled == null, + "continuous" to fun(enabled: Any?): Boolean = enabled == null ) ), getExplainMap(indexName1), diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestAddPolicyActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestAddPolicyActionIT.kt index 92a657afc..a08baaffe 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestAddPolicyActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestAddPolicyActionIT.kt @@ -54,7 +54,7 @@ class RestAddPolicyActionIT : IndexStateManagementRestTestCase() { val response = client().makeRequest( POST.toString(), "${RestAddPolicyAction.ADD_POLICY_BASE_URI}/$index", - StringEntity("{ \"policy_id\": \"${policy.id}\" }", APPLICATION_JSON) + StringEntity("{ \"policy_id\": \"${policy.id}\",\"continuous\": false }", APPLICATION_JSON) ) assertEquals("Unexpected RestStatus", RestStatus.OK, response.restStatus()) val actualMessage = response.asMap() @@ -81,7 +81,7 @@ class RestAddPolicyActionIT : IndexStateManagementRestTestCase() { val response = client().makeRequest( POST.toString(), "${RestAddPolicyAction.ADD_POLICY_BASE_URI}/$index", - StringEntity("{ \"policy_id\": \"${policy.id}\" }", APPLICATION_JSON) + StringEntity("{ \"policy_id\": \"${policy.id}\",\"continuous\": false }", APPLICATION_JSON) ) assertEquals("Unexpected RestStatus", RestStatus.OK, response.restStatus()) val actualMessage = response.asMap() @@ -113,7 +113,7 @@ class RestAddPolicyActionIT : IndexStateManagementRestTestCase() { val response = client().makeRequest( POST.toString(), "${RestAddPolicyAction.ADD_POLICY_BASE_URI}/$indexOne,$indexTwo", - StringEntity("{ \"policy_id\": \"${newPolicy.id}\" }", APPLICATION_JSON) + StringEntity("{ \"policy_id\": \"${newPolicy.id}\",\"continuous\": false }", APPLICATION_JSON) ) assertEquals("Unexpected RestStatus", RestStatus.OK, response.restStatus()) val actualMessage = response.asMap() @@ -153,7 +153,7 @@ class RestAddPolicyActionIT : IndexStateManagementRestTestCase() { val response = client().makeRequest( POST.toString(), "${RestAddPolicyAction.ADD_POLICY_BASE_URI}/$indexPattern*", - StringEntity("{ \"policy_id\": \"${newPolicy.id}\" }", APPLICATION_JSON) + StringEntity("{ \"policy_id\": \"${newPolicy.id}\",\"continuous\": false }", APPLICATION_JSON) ) assertEquals("Unexpected RestStatus", RestStatus.OK, response.restStatus()) val actualMessage = response.asMap() @@ -198,7 +198,7 @@ class RestAddPolicyActionIT : IndexStateManagementRestTestCase() { val response = client().makeRequest( POST.toString(), "${RestAddPolicyAction.ADD_POLICY_BASE_URI}/.*", - StringEntity("{ \"policy_id\": \"${policy.id}\" }", APPLICATION_JSON) + StringEntity("{ \"policy_id\": \"${policy.id}\",\"continuous\": false }", APPLICATION_JSON) ) assertEquals("Unexpected RestStatus", RestStatus.OK, response.restStatus()) val actualMessage = response.asMap() @@ -237,7 +237,6 @@ class RestAddPolicyActionIT : IndexStateManagementRestTestCase() { assertEquals(policy.id, getPolicyIDOfManagedIndex(indexFour)) } } - /** * The util UUID method doesn't work for hidden indices because strict warning check, the following method skips the strict check */ diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestChangePolicyActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestChangePolicyActionIT.kt index 22c2f4586..0df207780 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestChangePolicyActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestChangePolicyActionIT.kt @@ -623,4 +623,27 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() { ) } } + + // Check if Continuous field is changed from false to true + fun `test changing value of continuous in index`() { + val indexName = "${testIndexName}_continuous_check" + val policy = createRandomPolicy() + createIndex(indexName, policyID = policy.id, continuous = false) + Thread.sleep(1_000) + var managedIndexConfig = getManagedIndexConfig(indexName) + // Check continuous field is false intially + assertEquals("Incorrect initial continuous field: ${managedIndexConfig?.continuous}", false, managedIndexConfig?.continuous) + + // Change the policy continuous flag to true + val changePolicy = ChangePolicy(policy.id, null, emptyList(), false, continuous = true) + // Make Change Policy Request + client().makeRequest( + RestRequest.Method.POST.toString(), + "${RestChangePolicyAction.CHANGE_POLICY_BASE_URI}/$indexName", emptyMap(), changePolicy.toHttpEntity() + ) + Thread.sleep(1_000) + managedIndexConfig = getManagedIndexConfig(indexName) + // Check continuous flag was changed to true + assertEquals("Continuous field did not change: ${managedIndexConfig?.continuous}", true, managedIndexConfig?.continuous) + } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestExplainActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestExplainActionIT.kt index c7a980932..f9142b0d2 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestExplainActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestExplainActionIT.kt @@ -35,7 +35,8 @@ class RestExplainActionIT : IndexStateManagementRestTestCase() { indexName to mapOf( explainResponseOpendistroPolicyIdSetting to null, explainResponseOpenSearchPolicyIdSetting to null, - ManagedIndexMetaData.ENABLED to null + ManagedIndexMetaData.ENABLED to null, + "continuous" to null ) ) assertResponseMap(expected, getExplainMap(indexName)) @@ -55,7 +56,7 @@ class RestExplainActionIT : IndexStateManagementRestTestCase() { val indexName1 = "${testIndexName}_managed" val indexName2 = "${testIndexName}_not_managed" val policy = createRandomPolicy() - createIndex(indexName1, policy.id) + createIndex(indexName1, policy.id, continuous = false) createIndex(indexName2, null) val expected = mapOf( @@ -66,12 +67,14 @@ class RestExplainActionIT : IndexStateManagementRestTestCase() { "index" to indexName1, "index_uuid" to getUuid(indexName1), "policy_id" to policy.id, - ManagedIndexMetaData.ENABLED to true + ManagedIndexMetaData.ENABLED to true, + "continuous" to false ), indexName2 to mapOf( explainResponseOpendistroPolicyIdSetting to null, explainResponseOpenSearchPolicyIdSetting to null, - ManagedIndexMetaData.ENABLED to null + ManagedIndexMetaData.ENABLED to null, + "continuous" to null ) ) waitFor { @@ -94,7 +97,8 @@ class RestExplainActionIT : IndexStateManagementRestTestCase() { "index" to indexName1, "index_uuid" to getUuid(indexName1), "policy_id" to policy.id, - "enabled" to true + "enabled" to true, + "continuous" to false ), "total_managed_indices" to 1 ) @@ -119,7 +123,8 @@ class RestExplainActionIT : IndexStateManagementRestTestCase() { "index" to indexName1, "index_uuid" to getUuid(indexName1), "policy_id" to policy.id, - ManagedIndexMetaData.ENABLED to true + ManagedIndexMetaData.ENABLED to true, + "continuous" to false ), indexName2 to mapOf( explainResponseOpendistroPolicyIdSetting to policy.id, @@ -127,12 +132,14 @@ class RestExplainActionIT : IndexStateManagementRestTestCase() { "index" to indexName2, "index_uuid" to getUuid(indexName2), "policy_id" to policy.id, - ManagedIndexMetaData.ENABLED to true + ManagedIndexMetaData.ENABLED to true, + "continuous" to false ), indexName3 to mapOf( explainResponseOpendistroPolicyIdSetting to null, explainResponseOpenSearchPolicyIdSetting to null, - ManagedIndexMetaData.ENABLED to null + ManagedIndexMetaData.ENABLED to null, + "continuous" to null ) ) waitFor { @@ -162,7 +169,8 @@ class RestExplainActionIT : IndexStateManagementRestTestCase() { "index" to indexName1, "index_uuid" to getUuid(indexName1), "policy_id" to policy.id, - "enabled" to true + "enabled" to true, + "continuous" to false ) val indexName2Map = indexName2 to mapOf( explainResponseOpendistroPolicyIdSetting to policy.id, @@ -170,7 +178,8 @@ class RestExplainActionIT : IndexStateManagementRestTestCase() { "index" to indexName2, "index_uuid" to getUuid(indexName2), "policy_id" to policy.id, - "enabled" to true + "enabled" to true, + "continuous" to false ) val indexName4Map = indexName4 to mapOf( explainResponseOpendistroPolicyIdSetting to policy.id, @@ -178,7 +187,8 @@ class RestExplainActionIT : IndexStateManagementRestTestCase() { "index" to indexName4, "index_uuid" to getUuid(indexName4), "policy_id" to policy.id, - "enabled" to true + "enabled" to true, + "continuous" to false ) val indexName5Map = indexName5 to mapOf( explainResponseOpendistroPolicyIdSetting to policy.id, @@ -186,7 +196,8 @@ class RestExplainActionIT : IndexStateManagementRestTestCase() { "index" to indexName5, "index_uuid" to getUuid(indexName5), "policy_id" to policy.id, - "enabled" to true + "enabled" to true, + "continuous" to false ) val datastreamMap = ".ds-$dataStreamName-000001" to mapOf( explainResponseOpendistroPolicyIdSetting to policy.id, @@ -194,7 +205,8 @@ class RestExplainActionIT : IndexStateManagementRestTestCase() { "index" to ".ds-$dataStreamName-000001", "index_uuid" to getUuid(".ds-$dataStreamName-000001"), "policy_id" to policy.id, - "enabled" to true + "enabled" to true, + "continuous" to false ) waitFor { @@ -279,7 +291,8 @@ class RestExplainActionIT : IndexStateManagementRestTestCase() { PolicyRetryInfoMetaData.RETRY_INFO to fun(retryInfoMetaDataMap: Any?): Boolean = assertRetryInfoEquals(PolicyRetryInfoMetaData(false, 0), retryInfoMetaDataMap), ManagedIndexMetaData.INFO to fun(info: Any?): Boolean = expectedInfoString == info.toString(), - ManagedIndexMetaData.ENABLED to true::equals + ManagedIndexMetaData.ENABLED to true::equals, + "continuous" to false::equals ) ), getExplainMap(indexName) @@ -292,7 +305,7 @@ class RestExplainActionIT : IndexStateManagementRestTestCase() { val policy = createRandomPolicy() createIndex(indexName, policy.id) val newPolicy = createRandomPolicy() - val changePolicy = ChangePolicy(newPolicy.id, null, emptyList(), false) + val changePolicy = ChangePolicy(newPolicy.id, null, emptyList(), false, continuous = false) client().makeRequest( RestRequest.Method.POST.toString(), "${RestChangePolicyAction.CHANGE_POLICY_BASE_URI}/$indexName", emptyMap(), changePolicy.toHttpEntity() @@ -321,7 +334,8 @@ class RestExplainActionIT : IndexStateManagementRestTestCase() { assertRetryInfoEquals(PolicyRetryInfoMetaData(true, 0), retryInfoMetaDataMap), ManagedIndexMetaData.INFO to fun(info: Any?): Boolean = expectedInfoString == info.toString(), ManagedIndexMetaData.INDEX_CREATION_DATE to fun(indexCreationDate: Any?): Boolean = (indexCreationDate as Long) > 1L, - ManagedIndexMetaData.ENABLED to true::equals + ManagedIndexMetaData.ENABLED to true::equals, + "continuous" to false::equals ) ), getExplainMap(indexName) @@ -343,7 +357,8 @@ class RestExplainActionIT : IndexStateManagementRestTestCase() { "index_uuid" to getUuid(indexName), "policy_id" to policy.id, ManagedIndexMetaData.ENABLED to true, - "policy" to expectedPolicy + "policy" to expectedPolicy, + "continuous" to false ), TOTAL_MANAGED_INDICES to 1, ) @@ -372,4 +387,17 @@ class RestExplainActionIT : IndexStateManagementRestTestCase() { assertEquals("Expected and actual values does not match", entry.value, actual[entry.key]) } } + // Want to see if the continuous field shows up as correct value in the explain response + fun `test continuous field shown`() { + val indexName = "${testIndexName}_continuous_check" + val policy = createRandomPolicy() + createIndex(indexName, policyID = policy.id, continuous = true) + // Need to give time to create index + Thread.sleep(1_000) + val explainMap = getExplainMap(indexName) + val indexMap = explainMap?.get(indexName) as Map + waitFor { + assertEquals("Continuous field in explain map is wrong", true, indexMap?.get("continuous")) + } + } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/AddPolicyRequestTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/AddPolicyRequestTests.kt index 704fef4b5..c2e68c1a0 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/AddPolicyRequestTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/AddPolicyRequestTests.kt @@ -15,7 +15,8 @@ class AddPolicyRequestTests : OpenSearchTestCase() { fun `test add policy request`() { val indices = listOf("index1", "index2") val policyID = "policyID" - val req = AddPolicyRequest(indices, policyID, DEFAULT_INDEX_TYPE) + val continuous = false + val req = AddPolicyRequest(indices, policyID, DEFAULT_INDEX_TYPE, continuous) val out = BytesStreamOutput() req.writeTo(out) @@ -28,7 +29,8 @@ class AddPolicyRequestTests : OpenSearchTestCase() { fun `test add policy request with non default index type and multiple indices fails`() { val indices = listOf("index1", "index2") val policyID = "policyID" - val req = AddPolicyRequest(indices, policyID, "non-existent-index-type") + val continuous = false + val req = AddPolicyRequest(indices, policyID, "non-existent-index-type", continuous) val actualException: String? = req.validate()?.validationErrors()?.firstOrNull() val expectedException: String = AddPolicyRequest.MULTIPLE_INDICES_CUSTOM_INDEX_TYPE_ERROR assertEquals("Add policy request should have failed validation with specific exception", actualException, expectedException) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/ChangePolicyRequestTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/ChangePolicyRequestTests.kt index ac49b2432..f1d79bb53 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/ChangePolicyRequestTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/ChangePolicyRequestTests.kt @@ -18,7 +18,8 @@ class ChangePolicyRequestTests : OpenSearchTestCase() { val indices = listOf("index1", "index2") val stateFilter = StateFilter("state1") val changePolicy = ChangePolicy("policyID", "state1", listOf(stateFilter), true) - val req = ChangePolicyRequest(indices, changePolicy, DEFAULT_INDEX_TYPE) + val continuous = false + val req = ChangePolicyRequest(indices, changePolicy, DEFAULT_INDEX_TYPE, continuous = continuous) val out = BytesStreamOutput() req.writeTo(out) @@ -32,7 +33,8 @@ class ChangePolicyRequestTests : OpenSearchTestCase() { val indices = listOf("index1", "index2") val stateFilter = StateFilter("state1") val changePolicy = ChangePolicy("policyID", "state1", listOf(stateFilter), true) - val req = ChangePolicyRequest(indices, changePolicy, "non-existent-index-type") + val continuous = false + val req = ChangePolicyRequest(indices, changePolicy, "non-existent-index-type", continuous = continuous) val actualException: String? = req.validate()?.validationErrors()?.firstOrNull() val expectedException: String = ChangePolicyRequest.MULTIPLE_INDICES_CUSTOM_INDEX_TYPE_ERROR assertEquals("Add policy request should have failed validation with specific exception", actualException, expectedException) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainResponseTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainResponseTests.kt index 647ffd99f..f0c6df293 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainResponseTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainResponseTests.kt @@ -36,7 +36,8 @@ class ExplainResponseTests : OpenSearchTestCase() { val totalManagedIndices = 1 val enabledState = mapOf("index1" to true) val appliedPolicies = mapOf("policy" to randomPolicy()) - val res = ExplainResponse(indexNames, indexPolicyIDs, indexMetadatas, totalManagedIndices, enabledState, appliedPolicies) + val continuous = mapOf("continuous" to false) + val res = ExplainResponse(indexNames, indexPolicyIDs, indexMetadatas, totalManagedIndices, enabledState, appliedPolicies, continuous) val out = BytesStreamOutput() res.writeTo(out) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtilsTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtilsTests.kt index 207ad3845..67cd1f21d 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtilsTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtilsTests.kt @@ -66,7 +66,7 @@ class ManagedIndexUtilsTests : OpenSearchTestCase() { index = index, uuid = uuid, policyID = policyID, primaryTerm = 1, seqNo = 1, changePolicy = randomChangePolicy(policyID = policyID), policy = null ) - val updateRequest = updateManagedIndexRequest(sweptManagedIndexConfig) + val updateRequest = updateManagedIndexRequest(sweptManagedIndexConfig, continuous = false) assertNotNull("UpdateRequest not created", updateRequest) assertEquals("Incorrect ism index used in request", INDEX_MANAGEMENT_INDEX, updateRequest.index()) diff --git a/src/test/resources/mappings/cached-opendistro-ism-config.json b/src/test/resources/mappings/cached-opendistro-ism-config.json index 67c5fc47e..cb68653bb 100644 --- a/src/test/resources/mappings/cached-opendistro-ism-config.json +++ b/src/test/resources/mappings/cached-opendistro-ism-config.json @@ -1,6 +1,6 @@ { "_meta" : { - "schema_version": 16 + "schema_version": 17 }, "dynamic": "strict", "properties": { @@ -668,6 +668,9 @@ }, "jitter": { "type": "double" + }, + "continuous": { + "type": "boolean" } } },