From 8173369ae0ab78ef56d36c2b5123eb180699eeba Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Wed, 6 Jul 2022 14:43:09 -0700 Subject: [PATCH 01/17] 2.1 release note (#405) Signed-off-by: bowenlan-amzn Signed-off-by: Ronnak Saxena --- ...-index-management.release-notes-2.1.0.0.md | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) create mode 100644 release-notes/opensearch-index-management.release-notes-2.1.0.0.md diff --git a/release-notes/opensearch-index-management.release-notes-2.1.0.0.md b/release-notes/opensearch-index-management.release-notes-2.1.0.0.md new file mode 100644 index 000000000..9858c55a0 --- /dev/null +++ b/release-notes/opensearch-index-management.release-notes-2.1.0.0.md @@ -0,0 +1,24 @@ +## Version 2.1.0.0 2022-07-06 + +Compatible with OpenSearch 2.1.0 + +### Features +* Merge snapshot management into main branch ([#390](https://github.com/opensearch-project/index-management/pull/390)) +* Adds snapshot management notification implementation ([#387](https://github.com/opensearch-project/index-management/pull/387)) +* Snapshot management default date format in snapshot name ([#392](https://github.com/opensearch-project/index-management/pull/392)) + +### Enhancements +* Bump mapping schema version ([#394](https://github.com/opensearch-project/index-management/pull/394)) +* Clean the mapping update log in SMStateMachine ([#396](https://github.com/opensearch-project/index-management/pull/396)) + +### Bug Fixes +* BWC for rollover skip, restricted index pattern ([#371](https://github.com/opensearch-project/index-management/pull/371)) + +### Infrastructure +* Uses custom plugin to publish zips to maven ([#366](https://github.com/opensearch-project/index-management/pull/366)) + +### Documentation +* Updated issue templates from .github. ([#324](https://github.com/opensearch-project/index-management/pull/324)) + +### Maintenance +* version upgrade to 2.1.0 ([#389](https://github.com/opensearch-project/index-management/pull/389)) From 35c5d2f9964afbcfcb8b2ecd9148c3e4a88041de Mon Sep 17 00:00:00 2001 From: Petar Date: Wed, 20 Jul 2022 19:32:39 +0200 Subject: [PATCH 02/17] Removed recursion from Explain Action to avoid stackoverflow in some situations (#419) Signed-off-by: Petar Dzepina Signed-off-by: Ronnak Saxena --- .../action/explain/TransportExplainAction.kt | 80 +++++-------------- 1 file changed, 21 insertions(+), 59 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt index f0b084177..ee7b7a33d 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt @@ -52,6 +52,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.util.MetadataCheck import org.opensearch.indexmanagement.indexstatemanagement.util.checkMetadata import org.opensearch.indexmanagement.indexstatemanagement.util.managedIndexMetadataID import org.opensearch.indexmanagement.opensearchapi.parseWithType +import org.opensearch.indexmanagement.opensearchapi.suspendUntil import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ISMIndexMetadata import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData import org.opensearch.indexmanagement.util.SecurityUtils.Companion.buildUser @@ -339,68 +340,29 @@ class TransportExplainAction @Inject constructor( val filteredPolicies = mutableListOf() val enabledStatus = mutableMapOf() val filteredAppliedPolicies = mutableMapOf() - filter(0, filteredIndices, filteredMetadata, filteredPolicies, enabledStatus, filteredAppliedPolicies) - } - - @Suppress("LongParameterList") - private fun filter( - current: Int, - filteredIndices: MutableList, - filteredMetadata: MutableList, - filteredPolicies: MutableList, - enabledStatus: MutableMap, - filteredAppliedPolicies: MutableMap - ) { - val request = ManagedIndexRequest().indices(indexNames[current]) - client.execute( - ManagedIndexAction.INSTANCE, - request, - object : ActionListener { - override fun onResponse(response: AcknowledgedResponse) { - filteredIndices.add(indexNames[current]) - filteredMetadata.add(indexMetadatas[current]) - filteredPolicies.add(indexPolicyIDs[current]) - enabledState[indexNames[current]]?.let { enabledStatus[indexNames[current]] = it } - appliedPolicies[indexNames[current]]?.let { filteredAppliedPolicies[indexNames[current]] = it } - if (current < indexNames.count() - 1) { - // do nothing - skip the index and go to next one - filter(current + 1, filteredIndices, filteredMetadata, filteredPolicies, enabledStatus, filteredAppliedPolicies) - } else { - sendResponse( - filteredIndices, filteredMetadata, filteredPolicies, enabledStatus, - totalManagedIndices, filteredAppliedPolicies - ) - } - } - override fun onFailure(e: Exception) { - when (e is OpenSearchSecurityException) { - true -> { - totalManagedIndices -= 1 - if (current < indexNames.count() - 1) { - // do nothing - skip the index and go to next one - filter( - current + 1, - filteredIndices, - filteredMetadata, - filteredPolicies, - enabledStatus, - filteredAppliedPolicies - ) - } else { - sendResponse( - filteredIndices, filteredMetadata, filteredPolicies, enabledStatus, - totalManagedIndices, filteredAppliedPolicies - ) - } - } - false -> { - actionListener.onFailure(e) - } - } + CoroutineScope(Dispatchers.IO).launch { + // filter out indicies for which user doesn't have manage index permissions + for (i in 0 until indexNames.count()) { + val request = ManagedIndexRequest().indices(indexNames[i]) + try { + client.suspendUntil { execute(ManagedIndexAction.INSTANCE, request, it) } + filteredIndices.add(indexNames[i]) + filteredMetadata.add(indexMetadatas[i]) + filteredPolicies.add(indexPolicyIDs[i]) + enabledState[indexNames[i]]?.let { enabledStatus[indexNames[i]] = it } + appliedPolicies[indexNames[i]]?.let { filteredAppliedPolicies[indexNames[i]] = it } + } catch (e: OpenSearchSecurityException) { + totalManagedIndices -= 1 + } catch (e: Exception) { + actionListener.onFailure(e) } } - ) + sendResponse( + filteredIndices, filteredMetadata, filteredPolicies, enabledStatus, + totalManagedIndices, filteredAppliedPolicies + ) + } } @Suppress("LongParameterList") From ed0de00bd15a12c2c288c873000ca965664c3f74 Mon Sep 17 00:00:00 2001 From: Ronnak Saxena Date: Tue, 26 Jul 2022 14:15:00 -0700 Subject: [PATCH 03/17] Adds continuous field to managed index metadata Signed-off-by: Ronnak Saxena --- build.gradle | 4 +- .../ManagedIndexRunner.kt | 74 +++++++---- .../model/ChangePolicy.kt | 20 ++- .../model/ManagedIndexConfig.kt | 10 +- .../resthandler/RestAddPolicyAction.kt | 4 +- .../resthandler/RestChangePolicyAction.kt | 4 +- .../step/shrink/AttemptMoveShardsStep.kt | 4 +- .../step/shrink/AttemptShrinkStep.kt | 4 +- .../step/transition/AttemptTransitionStep.kt | 20 ++- .../action/addpolicy/AddPolicyRequest.kt | 7 +- .../addpolicy/TransportAddPolicyAction.kt | 3 +- .../changepolicy/ChangePolicyRequest.kt | 7 +- .../TransportChangePolicyAction.kt | 2 +- .../action/explain/ExplainResponse.kt | 12 +- .../action/explain/TransportExplainAction.kt | 121 ++++++++++++++---- .../util/ManagedIndexUtils.kt | 12 +- .../util/RestHandlerUtils.kt | 4 +- .../indexstatemanagement/util/StepUtils.kt | 14 +- .../snapshotmanagement/SMRunner.kt | 2 +- .../engine/SMStateMachine.kt | 1 + .../snapshotmanagement/model/SMPolicy.kt | 3 +- .../delete/TransportDeleteTransformsAction.kt | 2 +- .../TransportExplainTransformAction.kt | 1 + .../mappings/opendistro-ism-config.json | 5 +- .../IndexManagementRestTestCase.kt | 2 +- .../IndexStateManagementIntegTestCase.kt | 6 +- .../IndexStateManagementRestTestCase.kt | 17 ++- .../indexstatemanagement/TestHelpers.kt | 6 +- .../action/ActionRetryIT.kt | 9 +- .../action/DeleteActionIT.kt | 33 +++++ .../action/ForceMergeActionIT.kt | 39 ++++++ .../action/ShrinkActionIT.kt | 55 ++++++++ .../action/SnapshotActionIT.kt | 33 +++++ .../action/TransitionActionIT.kt | 31 ++++- .../coordinator/ManagedIndexCoordinatorIT.kt | 3 +- .../resthandler/ISMTemplateRestAPIIT.kt | 6 +- .../resthandler/RestAddPolicyActionIT.kt | 11 +- .../resthandler/RestChangePolicyActionIT.kt | 23 ++++ .../resthandler/RestExplainActionIT.kt | 62 ++++++--- .../action/addpolicy/AddPolicyRequestTests.kt | 6 +- .../changepolicy/ChangePolicyRequestTests.kt | 6 +- .../action/explain/ExplainResponseTests.kt | 3 +- .../util/ManagedIndexUtilsTests.kt | 2 +- .../util/StepUtilsTests.kt | 14 +- .../cached-opendistro-ism-config.json | 5 +- 45 files changed, 558 insertions(+), 154 deletions(-) diff --git a/build.gradle b/build.gradle index f6437aabf..80399bd05 100644 --- a/build.gradle +++ b/build.gradle @@ -13,8 +13,8 @@ import java.util.function.Predicate buildscript { ext { - isSnapshot = "true" == System.getProperty("build.snapshot", "true") - opensearch_version = System.getProperty("opensearch.version", "2.1.0-SNAPSHOT") + isSnapshot = "true" == System.getProperty("build.snapshot", "false") + opensearch_version = System.getProperty("opensearch.version", "2.1.0") buildVersionQualifier = System.getProperty("build.version_qualifier", "") // 2.0.0-SNAPSHOT -> 2.0.0.0-SNAPSHOT version_tokens = opensearch_version.tokenize('-') diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt index d122272e3..113fed82c 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt @@ -89,6 +89,8 @@ import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedInde import org.opensearch.indexmanagement.spi.indexstatemanagement.model.PolicyRetryInfoMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StateMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.indexmanagement.util.releaseLockForScheduledJob +import org.opensearch.indexmanagement.util.renewLockForScheduledJob import org.opensearch.jobscheduler.spi.JobExecutionContext import org.opensearch.jobscheduler.spi.LockModel import org.opensearch.jobscheduler.spi.ScheduledJobParameter @@ -219,33 +221,49 @@ object ManagedIndexRunner : } // Attempt to acquire lock - val lock: LockModel? = context.lockService.suspendUntil { acquireLock(job, context, it) } + var lock: LockModel? = context.lockService.suspendUntil { acquireLock(job, context, it) } if (lock == null) { logger.debug("Could not acquire lock [${lock?.lockId}] for ${job.index}") } else { - runManagedIndexConfig(job, context) + if (job?.continuous) { + var keepExecuting: Boolean = true + // Need to execute at least once for policy to initialize + do { + // Need to renew lock for current step execution + val renewedLock = renewLockForScheduledJob(context, lock as LockModel, errorNotificationRetryPolicy) + // Failed to renew lock + if (renewedLock == null) { + logger.error("Could not renew lock [${lock?.lockId}] for ${job.index}") + break + } else { + lock = renewedLock as LockModel + keepExecuting = runManagedIndexConfig(job, context) + } + } while ((job.continuous && keepExecuting)) // Runs until job is no longer continuous or execution should stop + } else { // If job is not continuous run once + runManagedIndexConfig(job, context) + } // Release lock - val released: Boolean = context.lockService.suspendUntil { release(lock, it) } - if (!released) { - logger.debug("Could not release lock [${lock.lockId}] for ${job.index}") + if (lock == null || !releaseLockForScheduledJob(context, lock as LockModel)) { + logger.debug("Could not release lock [${lock?.lockId}] for ${job.index}") } } } } @Suppress("ReturnCount", "ComplexMethod", "LongMethod", "ComplexCondition", "NestedBlockDepth") - private suspend fun runManagedIndexConfig(managedIndexConfig: ManagedIndexConfig, jobContext: JobExecutionContext) { + private suspend fun runManagedIndexConfig(managedIndexConfig: ManagedIndexConfig, jobContext: JobExecutionContext): Boolean { logger.debug("Run job for index ${managedIndexConfig.index}") // doing a check of local cluster health as we do not want to overload cluster manager node with potentially a lot of calls if (clusterIsRed()) { logger.debug("Skipping current execution of ${managedIndexConfig.index} because of red cluster health") - return + return false } val (managedIndexMetaData, getMetadataSuccess) = client.getManagedIndexMetadata(managedIndexConfig.indexUuid) if (!getMetadataSuccess) { logger.info("Failed to retrieve managed index metadata of index [${managedIndexConfig.index}] from config index, abort this run.") - return + return false } // Check the cluster state for the index metadata @@ -267,14 +285,14 @@ object ManagedIndexRunner : // If no index types had an index with a matching name and uuid combination, return if (!someTypeMatchedUuid) { logger.warn("Failed to find IndexMetadata for ${managedIndexConfig.index}.") - return + return false } } else { val clusterStateMetadata = clusterStateIndexMetadata.getManagedIndexMetadata() val metadataCheck = checkMetadata(clusterStateMetadata, managedIndexMetaData, managedIndexConfig.indexUuid, logger) if (metadataCheck != MetadataCheck.SUCCESS) { logger.info("Skipping execution while metadata status is $metadataCheck") - return + return false } } @@ -282,13 +300,13 @@ object ManagedIndexRunner : val policy = managedIndexConfig.policy if (policy == null || managedIndexMetaData == null) { initManagedIndex(managedIndexConfig, managedIndexMetaData) - return + return true } // If the policy was completed or failed then return early and disable job so it stops scheduling work if (managedIndexMetaData.policyCompleted == true || managedIndexMetaData.isFailed) { disableManagedIndexConfig(managedIndexConfig) - return + return false } if (managedIndexMetaData.hasVersionConflict(managedIndexConfig)) { @@ -302,7 +320,7 @@ object ManagedIndexRunner : if (result.metadataSaved) { disableManagedIndexConfig(managedIndexConfig) } - return + return false } val state = policy.getStateToExecute(managedIndexMetaData) @@ -317,7 +335,7 @@ object ManagedIndexRunner : // then disable the job and return early if (!indexStateManagementEnabled && step != null && step.isSafeToDisableOn) { disableManagedIndexConfig(managedIndexConfig) - return + return false } if (action?.hasTimedOut(currentActionMetaData) == true) { @@ -328,19 +346,19 @@ object ManagedIndexRunner : .copy(actionMetaData = currentActionMetaData?.copy(failed = true), info = info) ) if (updated.metadataSaved) disableManagedIndexConfig(managedIndexConfig) - return + return false } if (managedIndexConfig.shouldChangePolicy(managedIndexMetaData, action)) { initChangePolicy(managedIndexConfig, managedIndexMetaData, action) - return + return true } val shouldBackOff = action?.shouldBackoff(currentActionMetaData, action.configRetry) if (shouldBackOff?.first == true) { // If we should back off then exit early. logger.info("Backoff for retrying. Remaining time ${shouldBackOff.second}") - return + return false } if (managedIndexMetaData.stepMetaData?.stepStatus == Step.StepStatus.STARTING) { @@ -353,8 +371,10 @@ object ManagedIndexRunner : policyRetryInfo = PolicyRetryInfoMetaData(true, 0), info = info ) ) - if (updated.metadataSaved) disableManagedIndexConfig(managedIndexConfig) - return + if (updated.metadataSaved) { + disableManagedIndexConfig(managedIndexConfig) + return false + } } } @@ -368,7 +388,7 @@ object ManagedIndexRunner : ) ) if (updated.metadataSaved) disableManagedIndexConfig(managedIndexConfig) - return + return false } // If this action is not allowed and the step to be executed is the first step in the action then we will fail @@ -381,7 +401,7 @@ object ManagedIndexRunner : ) ) if (updated.metadataSaved) disableManagedIndexConfig(managedIndexConfig) - return + return false } // If any of State, Action, Step components come back as null then we are moving to error in ManagedIndexMetaData @@ -412,12 +432,11 @@ object ManagedIndexRunner : executedManagedIndexMetaData = executedManagedIndexMetaData.copy(info = mutableInfo.toMap()) } } - if (executedManagedIndexMetaData.isSuccessfulDelete) { GlobalScope.launch(Dispatchers.IO + CoroutineName("ManagedIndexMetaData-AddHistory")) { ismHistory.addManagedIndexMetaDataHistory(listOf(executedManagedIndexMetaData)) } - return + return false } // If a custom action deletes some off-cluster index and has deleteIndexMetadataAfterFinish set to true, @@ -426,18 +445,25 @@ object ManagedIndexRunner : if (action.isFinishedSuccessfully(executedManagedIndexMetaData)) { if (action.deleteIndexMetadataAfterFinish()) { deleteFromManagedIndex(managedIndexConfig, action.type) - return } } if (!updateManagedIndexMetaData(executedManagedIndexMetaData, updateResult).metadataSaved) { logger.error("Failed to update ManagedIndexMetaData after executing the Step : ${step.name}") + return false } if (managedIndexConfig.hasDifferentJobInterval(jobInterval)) { updateJobInterval(managedIndexConfig, jobInterval) } + // Check that transition condition evaluated to false + if (executedManagedIndexMetaData.stepMetaData?.stepStatus == Step.StepStatus.CONDITION_NOT_MET) { + return false + } + // Made it to end of successful execution block + return true } + return false } private suspend fun initManagedIndex(managedIndexConfig: ManagedIndexConfig, managedIndexMetaData: ManagedIndexMetaData?) { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ChangePolicy.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ChangePolicy.kt index 87fd71d6e..226dce0c7 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ChangePolicy.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ChangePolicy.kt @@ -35,7 +35,9 @@ data class ChangePolicy( val state: String?, val include: List, val isSafe: Boolean, - val user: User? = null + val user: User? = null, + var continuous: Boolean? = null + ) : Writeable, ToXContentObject { @Throws(IOException::class) @@ -46,16 +48,19 @@ data class ChangePolicy( isSafe = sin.readBoolean(), user = if (sin.readBoolean()) { User(sin) - } else null + } else null, + continuous = if (sin.readBoolean()) { sin.readBoolean() } else null ) override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { - builder - .startObject() + builder.startObject() .field(ManagedIndexConfig.POLICY_ID_FIELD, policyID) .field(StateMetaData.STATE, state) .field(IS_SAFE_FIELD, isSafe) if (params.paramAsBoolean(WITH_USER, true)) builder.optionalUserField(USER_FIELD, user) + if (continuous != null) { + builder.field(ManagedIndexConfig.CONTINUOUS, continuous) + } return builder.endObject() } @@ -67,6 +72,7 @@ data class ChangePolicy( out.writeBoolean(isSafe) out.writeBoolean(user != null) user?.writeTo(out) + if (continuous != null) { out.writeBoolean(continuous as Boolean) } } companion object { @@ -75,6 +81,7 @@ data class ChangePolicy( const val INCLUDE_FIELD = "include" const val IS_SAFE_FIELD = "is_safe" const val USER_FIELD = "user" + const val CONTINUOUS_FIELD = "continuous" @JvmStatic @Throws(IOException::class) @@ -83,6 +90,7 @@ data class ChangePolicy( var state: String? = null var isSafe: Boolean = false var user: User? = null + var continuous: Boolean? = null val include = mutableListOf() ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp) @@ -103,6 +111,7 @@ data class ChangePolicy( USER_FIELD -> { user = if (xcp.currentToken() == Token.VALUE_NULL) null else User.parse(xcp) } + CONTINUOUS_FIELD -> continuous = xcp.booleanValue() else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in ChangePolicy.") } } @@ -112,7 +121,8 @@ data class ChangePolicy( state, include.toList(), isSafe, - user + user, + continuous ) } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexConfig.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexConfig.kt index acaabc864..973843f56 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexConfig.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexConfig.kt @@ -37,7 +37,8 @@ data class ManagedIndexConfig( val policyPrimaryTerm: Long?, val policy: Policy?, val changePolicy: ChangePolicy?, - val jobJitter: Double? + val jobJitter: Double?, + val continuous: Boolean ) : ScheduledJobParameter { init { @@ -81,6 +82,7 @@ data class ManagedIndexConfig( .field(POLICY_FIELD, policy, XCONTENT_WITHOUT_TYPE) .field(CHANGE_POLICY_FIELD, changePolicy) .field(JITTER, jobJitter) + .field(CONTINUOUS, continuous) builder.endObject() return builder.endObject() } @@ -100,6 +102,7 @@ data class ManagedIndexConfig( const val POLICY_PRIMARY_TERM_FIELD = "policy_primary_term" const val CHANGE_POLICY_FIELD = "change_policy" const val JITTER = "jitter" + const val CONTINUOUS = "continuous" @Suppress("ComplexMethod", "LongMethod") @JvmStatic @@ -124,6 +127,7 @@ data class ManagedIndexConfig( var policyPrimaryTerm: Long? = SequenceNumbers.UNASSIGNED_PRIMARY_TERM var policySeqNo: Long? = SequenceNumbers.UNASSIGNED_SEQ_NO var jitter: Double? = null + var continuous: Boolean = false ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp) while (xcp.nextToken() != Token.END_OBJECT) { @@ -154,6 +158,7 @@ data class ManagedIndexConfig( JITTER -> { jitter = if (xcp.currentToken() == Token.VALUE_NULL) null else xcp.doubleValue() } + CONTINUOUS -> continuous = xcp.booleanValue() else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in ManagedIndexConfig.") } } @@ -183,7 +188,8 @@ data class ManagedIndexConfig( primaryTerm = policyPrimaryTerm ?: SequenceNumbers.UNASSIGNED_PRIMARY_TERM ), changePolicy = changePolicy, - jobJitter = jitter + jobJitter = jitter, + continuous = continuous ) } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestAddPolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestAddPolicyAction.kt index da55fb2d0..181fcf20a 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestAddPolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestAddPolicyAction.kt @@ -63,7 +63,9 @@ class RestAddPolicyAction : BaseRestHandler() { val policyID = requireNotNull(body.getOrDefault("policy_id", null)) { "Missing policy_id" } - val addPolicyRequest = AddPolicyRequest(indices.toList(), policyID as String, indexType) + val continuous = body?.get("continuous") as Boolean + + val addPolicyRequest = AddPolicyRequest(indices.toList(), policyID as String, indexType, continuous) return RestChannelConsumer { channel -> client.execute(AddPolicyAction.INSTANCE, addPolicyRequest, RestToXContentListener(channel)) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestChangePolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestChangePolicyAction.kt index ebca535f5..94f490156 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestChangePolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestChangePolicyAction.kt @@ -58,8 +58,8 @@ class RestChangePolicyAction : BaseRestHandler() { val xcp = request.contentParser() ensureExpectedToken(Token.START_OBJECT, xcp.nextToken(), xcp) val changePolicy = ChangePolicy.parse(xcp) - - val changePolicyRequest = ChangePolicyRequest(indices.toList(), changePolicy, indexType) + val continuous = if (changePolicy.continuous != null) changePolicy.continuous as Boolean else false + val changePolicyRequest = ChangePolicyRequest(indices.toList(), changePolicy.copy(continuous = null), indexType, continuous = continuous) return RestChannelConsumer { channel -> client.execute(ChangePolicyAction.INSTANCE, changePolicyRequest, RestToXContentListener(channel)) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/AttemptMoveShardsStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/AttemptMoveShardsStep.kt index 552a73c2e..8d2614226 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/AttemptMoveShardsStep.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/AttemptMoveShardsStep.kt @@ -267,7 +267,7 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : ShrinkStep(name, shardStats: Array, indexSizeInBytes: Long ): List { - val nodesStatsReq = NodesStatsRequest().addMetric(OS_METRIC) + val nodesStatsReq = NodesStatsRequest().addMetric(FS_METRIC) val nodeStatsResponse: NodesStatsResponse = stepContext.client.admin().cluster().suspendUntil { nodesStats(nodesStatsReq, it) } val nodesList = nodeStatsResponse.nodes.filter { it.node.isDataNode } // Sort in increasing order of keys, in our case this is memory remaining @@ -393,7 +393,7 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : ShrinkStep(name, override fun isIdempotent() = true companion object { - const val OS_METRIC = "os" + const val FS_METRIC = "fs" const val ROUTING_SETTING = "index.routing.allocation.require._name" const val DEFAULT_TARGET_SUFFIX = "_shrunken" const val name = "attempt_move_shards_step" diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/AttemptShrinkStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/AttemptShrinkStep.kt index 297ebae89..023aec502 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/AttemptShrinkStep.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/AttemptShrinkStep.kt @@ -60,8 +60,8 @@ class AttemptShrinkStep(private val action: ShrinkAction) : ShrinkStep(name, tru return false } val indexSizeInBytes = statsStore.sizeInBytes - // Get the remaining memory in the node - val nodesStatsReq = NodesStatsRequest().addMetric(AttemptMoveShardsStep.OS_METRIC) + // Get the remaining memory and disk space in the node. + val nodesStatsReq = NodesStatsRequest().addMetric(AttemptMoveShardsStep.FS_METRIC) val nodeStatsResponse: NodesStatsResponse = context.client.admin().cluster().suspendUntil { nodesStats(nodesStatsReq, it) } // If the node has been replaced, this will fail val node = nodeStatsResponse.nodes.firstOrNull { it.node.name == nodeName } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/transition/AttemptTransitionStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/transition/AttemptTransitionStep.kt index 843987b4e..7add32837 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/transition/AttemptTransitionStep.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/transition/AttemptTransitionStep.kt @@ -25,6 +25,7 @@ import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaDat import org.opensearch.rest.RestStatus import org.opensearch.transport.RemoteTransportException import java.time.Instant +import org.opensearch.common.unit.TimeValue class AttemptTransitionStep(private val action: TransitionsAction) : Step(name) { @@ -57,11 +58,19 @@ class AttemptTransitionStep(private val action: TransitionsAction) : Step(name) if (indexCreationDate == -1L) { logger.warn("$indexName had an indexCreationDate=-1L, cannot use for comparison") } + val indexAgeTimeValue = if (indexCreationDate == -1L) { + logger.warn("$indexName had an indexCreationDate=-1L, cannot use for comparison") + // since we cannot use for comparison, we can set it to 0 as minAge will never be <= 0 + TimeValue.timeValueMillis(0) + } else { + TimeValue.timeValueMillis(Instant.now().toEpochMilli() - indexCreationDate) + } val stepStartTime = getStepStartTime(context.metadata) var numDocs: Long? = null var indexSize: ByteSizeValue? = null val rolloverDate: Instant? = if (inCluster) indexMetadata.getOldestRolloverTime() else null + if (transitions.any { it.conditions?.rolloverAge !== null }) { // if we have a transition with rollover age condition, then we must have a rollover date // otherwise fail this transition @@ -115,7 +124,16 @@ class AttemptTransitionStep(private val action: TransitionsAction) : Step(name) stepStatus = StepStatus.CONDITION_NOT_MET message = getEvaluatingMessage(indexName) } - info = mapOf("message" to message) + + // store conditions in a map to add to info + val conditions = listOfNotNull( + "index creation date" to indexCreationDate, + "Number of docs" to numDocs, + "Index Size" to indexSize, + "Step Start Time" to stepStartTime, + "RolloverDate" to rolloverDate + ).toMap() + info = mapOf("message" to message, "conditions" to conditions) } catch (e: RemoteTransportException) { handleException(indexName, ExceptionsHelper.unwrapCause(e) as Exception) } catch (e: Exception) { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/AddPolicyRequest.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/AddPolicyRequest.kt index c9ac3bb51..3f7f354ae 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/AddPolicyRequest.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/AddPolicyRequest.kt @@ -16,14 +16,16 @@ import java.io.IOException class AddPolicyRequest( val indices: List, val policyID: String, - val indexType: String + val indexType: String, + val continuous: Boolean ) : ActionRequest() { @Throws(IOException::class) constructor(sin: StreamInput) : this( indices = sin.readStringList(), policyID = sin.readString(), - indexType = sin.readString() + indexType = sin.readString(), + continuous = sin.readBoolean() ) override fun validate(): ActionRequestValidationException? { @@ -44,6 +46,7 @@ class AddPolicyRequest( out.writeStringCollection(indices) out.writeString(policyID) out.writeString(indexType) + out.writeBoolean(continuous) } companion object { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/TransportAddPolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/TransportAddPolicyAction.kt index 0efdd0686..57d83289c 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/TransportAddPolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/TransportAddPolicyAction.kt @@ -345,10 +345,9 @@ class TransportAddPolicyAction @Inject constructor( val bulkReq = BulkRequest().timeout(TimeValue.timeValueMillis(bulkReqTimeout)) indicesToAdd.forEach { (uuid, name) -> bulkReq.add( - managedIndexConfigIndexRequest(name, uuid, request.policyID, jobInterval, policy = policy.copy(user = this.user), jobJitter) + managedIndexConfigIndexRequest(name, uuid, request.policyID, jobInterval, policy = policy.copy(user = this.user), jobJitter, continuous = request.continuous) ) } - client.bulk( bulkReq, object : ActionListener { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/ChangePolicyRequest.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/ChangePolicyRequest.kt index fcec1bf74..8f9bd76e8 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/ChangePolicyRequest.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/ChangePolicyRequest.kt @@ -17,14 +17,16 @@ import java.io.IOException class ChangePolicyRequest( val indices: List, val changePolicy: ChangePolicy, - val indexType: String + val indexType: String, + val continuous: Boolean ) : ActionRequest() { @Throws(IOException::class) constructor(sin: StreamInput) : this( indices = sin.readStringList(), changePolicy = ChangePolicy(sin), - indexType = sin.readString() + indexType = sin.readString(), + continuous = sin.readBoolean() ) override fun validate(): ActionRequestValidationException? { @@ -45,6 +47,7 @@ class ChangePolicyRequest( out.writeStringCollection(indices) changePolicy.writeTo(out) out.writeString(indexType) + out.writeBoolean(continuous) } companion object { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/TransportChangePolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/TransportChangePolicyAction.kt index fed9cf828..a916628ae 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/TransportChangePolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/TransportChangePolicyAction.kt @@ -370,7 +370,7 @@ class TransportChangePolicyAction @Inject constructor( val currentStateName = indexUuidToCurrentState[sweptConfig.uuid] val updatedChangePolicy = changePolicy .copy(isSafe = sweptConfig.policy?.isSafeToChange(currentStateName, policy, changePolicy) == true, user = this.user) - bulkUpdateManagedIndexRequest.add(updateManagedIndexRequest(sweptConfig.copy(changePolicy = updatedChangePolicy))) + bulkUpdateManagedIndexRequest.add(updateManagedIndexRequest(sweptConfig.copy(changePolicy = updatedChangePolicy), continuous = request.continuous)) mapOfItemIdToIndex[id] = Index(sweptConfig.index, sweptConfig.uuid) } client.bulk( diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainResponse.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainResponse.kt index cda65f548..d33289126 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainResponse.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainResponse.kt @@ -28,14 +28,16 @@ open class ExplainResponse : ActionResponse, ToXContentObject { val totalManagedIndices: Int val enabledState: Map val policies: Map - + val continuous: Map + @Suppress("LongParameterList") constructor( indexNames: List, indexPolicyIDs: List, indexMetadatas: List, totalManagedIndices: Int, enabledState: Map, - policies: Map + policies: Map, + continuous: Map ) : super() { this.indexNames = indexNames this.indexPolicyIDs = indexPolicyIDs @@ -43,6 +45,7 @@ open class ExplainResponse : ActionResponse, ToXContentObject { this.totalManagedIndices = totalManagedIndices this.enabledState = enabledState this.policies = policies + this.continuous = continuous } @Throws(IOException::class) @@ -52,7 +55,8 @@ open class ExplainResponse : ActionResponse, ToXContentObject { indexMetadatas = sin.readList { ManagedIndexMetaData.fromStreamInput(it) }, totalManagedIndices = sin.readInt(), enabledState = sin.readMap() as Map, - policies = sin.readMap(StreamInput::readString, ::Policy) + policies = sin.readMap(StreamInput::readString, ::Policy), + continuous = sin.readMap() as Map ) @Throws(IOException::class) @@ -67,6 +71,7 @@ open class ExplainResponse : ActionResponse, ToXContentObject { { _out, key -> _out.writeString(key) }, { _out, policy -> policy.writeTo(_out) } ) + out.writeMap(continuous) } override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { @@ -78,6 +83,7 @@ open class ExplainResponse : ActionResponse, ToXContentObject { indexMetadatas[ind]?.toXContent(builder, ToXContent.EMPTY_PARAMS) builder.field("enabled", enabledState[name]) policies[name]?.let { builder.field(Policy.POLICY_TYPE, it, XCONTENT_WITHOUT_TYPE_AND_USER) } + builder.field("continuous", continuous[name]) builder.endObject() } builder.field(TOTAL_MANAGED_INDICES, totalManagedIndices) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt index ee7b7a33d..aad4c9d33 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt @@ -83,7 +83,6 @@ class TransportExplainAction @Inject constructor( ) : HandledTransportAction( ExplainAction.NAME, transportService, actionFilters, ::ExplainRequest ) { - override fun doExecute(task: Task, request: ExplainRequest, listener: ActionListener) { ExplainHandler(client, listener, request).start() } @@ -117,6 +116,7 @@ class TransportExplainAction @Inject constructor( private val indexMetadatas = mutableListOf() private var totalManagedIndices = 0 private val appliedPolicies: MutableMap = mutableMapOf() + private val continuousField: MutableMap = mutableMapOf() @Suppress("SpreadOperator", "NestedBlockDepth") fun start() { @@ -193,6 +193,7 @@ class TransportExplainAction @Inject constructor( "policy_id" to managedIndex.policyID, "enabled" to managedIndex.enabled.toString() ) + continuousField[managedIndex.index] = managedIndex.continuous if (showPolicy) { managedIndex.policy?.let { appliedPolicies[managedIndex.index] = it } } @@ -204,7 +205,7 @@ class TransportExplainAction @Inject constructor( // edge case: if specify query param pagination size to be 0 // we still show total managed indices indexNames.clear() - sendResponse(indexNames, indexMetadatas, indexPolicyIDs, enabledState, totalManagedIndices, appliedPolicies) + sendResponse(indexNames, indexMetadatas, indexPolicyIDs, enabledState, totalManagedIndices, appliedPolicies, continuousField) return } else { // Clear and add the managedIndices from the response to preserve the sort order and size @@ -230,7 +231,7 @@ class TransportExplainAction @Inject constructor( return } indexNames.clear() - sendResponse(indexNames, indexMetadatas, indexPolicyIDs, enabledState, totalManagedIndices, appliedPolicies) + sendResponse(indexNames, indexMetadatas, indexPolicyIDs, enabledState, totalManagedIndices, appliedPolicies, continuousField) return } actionListener.onFailure(ExceptionsHelper.unwrapCause(t) as Exception) @@ -327,7 +328,7 @@ class TransportExplainAction @Inject constructor( managedIndicesMetaDataMap.clear() if (user == null || indexNames.isEmpty()) { - sendResponse(indexNames, indexMetadatas, indexPolicyIDs, enabledState, totalManagedIndices, appliedPolicies) + sendResponse(indexNames, indexMetadatas, indexPolicyIDs, enabledState, totalManagedIndices, appliedPolicies, continuousField) } else { filterAndSendResponse(threadContext) } @@ -341,28 +342,95 @@ class TransportExplainAction @Inject constructor( val enabledStatus = mutableMapOf() val filteredAppliedPolicies = mutableMapOf() - CoroutineScope(Dispatchers.IO).launch { - // filter out indicies for which user doesn't have manage index permissions - for (i in 0 until indexNames.count()) { - val request = ManagedIndexRequest().indices(indexNames[i]) - try { - client.suspendUntil { execute(ManagedIndexAction.INSTANCE, request, it) } - filteredIndices.add(indexNames[i]) - filteredMetadata.add(indexMetadatas[i]) - filteredPolicies.add(indexPolicyIDs[i]) - enabledState[indexNames[i]]?.let { enabledStatus[indexNames[i]] = it } - appliedPolicies[indexNames[i]]?.let { filteredAppliedPolicies[indexNames[i]] = it } - } catch (e: OpenSearchSecurityException) { - totalManagedIndices -= 1 - } catch (e: Exception) { - actionListener.onFailure(e) + filter(0, filteredIndices, filteredMetadata, filteredPolicies, enabledStatus, filteredAppliedPolicies, continuousField) + } + + @Suppress("LongParameterList") + private fun filter( + current: Int, + filteredIndices: MutableList, + filteredMetadata: MutableList, + filteredPolicies: MutableList, + enabledStatus: MutableMap, + filteredAppliedPolicies: MutableMap, + continuousStatus: MutableMap + ) { + val request = ManagedIndexRequest().indices(indexNames[current]) + client.execute( + ManagedIndexAction.INSTANCE, + request, + object : ActionListener { + override fun onResponse(response: AcknowledgedResponse) { + filteredIndices.add(indexNames[current]) + filteredMetadata.add(indexMetadatas[current]) + filteredPolicies.add(indexPolicyIDs[current]) + enabledState[indexNames[current]]?.let { enabledStatus[indexNames[current]] = it } + appliedPolicies[indexNames[current]]?.let { filteredAppliedPolicies[indexNames[current]] = it } + continuousField[indexNames[current]]?.let { continuousStatus[indexNames[current]] = it } + if (current < indexNames.count() - 1) { + // do nothing - skip the index and go to next one + filter(current + 1, filteredIndices, filteredMetadata, filteredPolicies, enabledStatus, filteredAppliedPolicies, continuousStatus) + } else { + sendResponse( + filteredIndices, filteredMetadata, filteredPolicies, enabledStatus, + totalManagedIndices, filteredAppliedPolicies, continuousStatus + ) + } + } + + override fun onFailure(e: Exception) { + when (e is OpenSearchSecurityException) { + true -> { + totalManagedIndices -= 1 + if (current < indexNames.count() - 1) { + // do nothing - skip the index and go to next one + filter( + current + 1, + filteredIndices, + filteredMetadata, + filteredPolicies, + enabledStatus, + filteredAppliedPolicies, + continuousStatus + ) + } else { + sendResponse( + filteredIndices, filteredMetadata, filteredPolicies, enabledStatus, + totalManagedIndices, filteredAppliedPolicies, continuousStatus + ) + } + } + + false -> { + actionListener.onFailure(e) + } + } + + CoroutineScope(Dispatchers.IO).launch { + // filter out indicies for which user doesn't have manage index permissions + for (i in 0 until indexNames.count()) { + val request = ManagedIndexRequest().indices(indexNames[i]) + try { + client.suspendUntil { execute(ManagedIndexAction.INSTANCE, request, it) } + filteredIndices.add(indexNames[i]) + filteredMetadata.add(indexMetadatas[i]) + filteredPolicies.add(indexPolicyIDs[i]) + enabledState[indexNames[i]]?.let { enabledStatus[indexNames[i]] = it } + appliedPolicies[indexNames[i]]?.let { filteredAppliedPolicies[indexNames[i]] = it } + } catch (e: OpenSearchSecurityException) { + totalManagedIndices -= 1 + } catch (e: Exception) { + actionListener.onFailure(e) + } + } + sendResponse( + filteredIndices, filteredMetadata, filteredPolicies, enabledStatus, + totalManagedIndices, filteredAppliedPolicies, continuousField + ) + } } } - sendResponse( - filteredIndices, filteredMetadata, filteredPolicies, enabledStatus, - totalManagedIndices, filteredAppliedPolicies - ) - } + ) } @Suppress("LongParameterList") @@ -372,9 +440,10 @@ class TransportExplainAction @Inject constructor( policyIDs: List, enabledStatus: Map, totalIndices: Int, - policies: Map + policies: Map, + continuous: Map, ) { - actionListener.onResponse(ExplainResponse(indices, policyIDs, metadata, totalIndices, enabledStatus, policies)) + actionListener.onResponse(ExplainResponse(indices, policyIDs, metadata, totalIndices, enabledStatus, policies, continuous)) } @Suppress("ReturnCount") diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt index bea3ed57c..30ecbfded 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt @@ -67,7 +67,8 @@ fun managedIndexConfigIndexRequest( policyID: String, jobInterval: Int, policy: Policy? = null, - jobJitter: Double? + jobJitter: Double?, + continuous: Boolean = false ): IndexRequest { val managedIndexConfig = ManagedIndexConfig( jobName = index, @@ -82,7 +83,8 @@ fun managedIndexConfigIndexRequest( policySeqNo = policy?.seqNo, policyPrimaryTerm = policy?.primaryTerm, changePolicy = null, - jobJitter = jobJitter + jobJitter = jobJitter, + continuous = continuous ) return IndexRequest(INDEX_MANAGEMENT_INDEX) @@ -164,11 +166,11 @@ fun deleteManagedIndexMetadataRequest(uuid: String): DeleteRequest { return DeleteRequest(INDEX_MANAGEMENT_INDEX, managedIndexMetadataID(uuid)).routing(uuid) } -fun updateManagedIndexRequest(sweptManagedIndexConfig: SweptManagedIndexConfig): UpdateRequest { +fun updateManagedIndexRequest(sweptManagedIndexConfig: SweptManagedIndexConfig, continuous: Boolean?): UpdateRequest { return UpdateRequest(INDEX_MANAGEMENT_INDEX, sweptManagedIndexConfig.uuid) .setIfPrimaryTerm(sweptManagedIndexConfig.primaryTerm) .setIfSeqNo(sweptManagedIndexConfig.seqNo) - .doc(getPartialChangePolicyBuilder(sweptManagedIndexConfig.changePolicy)) + .doc(getPartialChangePolicyBuilder(sweptManagedIndexConfig.changePolicy, continuous = continuous)) } /** @@ -377,7 +379,7 @@ fun ManagedIndexMetaData.getCompletedManagedIndexMetaData( val ManagedIndexMetaData.isSuccessfulDelete: Boolean get() = (this.actionMetaData?.name == DeleteAction.name && !this.actionMetaData!!.failed) && - (this.stepMetaData?.name == DeleteAction.name && this.stepMetaData!!.stepStatus == Step.StepStatus.COMPLETED) && + (this.stepMetaData?.name == "attempt_delete" && this.stepMetaData!!.stepStatus == Step.StepStatus.COMPLETED) && (this.policyRetryInfo?.failed != true) val ManagedIndexMetaData.isFailed: Boolean diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/RestHandlerUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/RestHandlerUtils.kt index c4118321f..f315cbcba 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/RestHandlerUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/RestHandlerUtils.kt @@ -103,13 +103,15 @@ data class FailedIndex(val name: String, val uuid: String, val reason: String) : * Gets the XContentBuilder for partially updating a [ManagedIndexConfig]'s ChangePolicy */ fun getPartialChangePolicyBuilder( - changePolicy: ChangePolicy? + changePolicy: ChangePolicy?, + continuous: Boolean? ): XContentBuilder { val builder = XContentFactory.jsonBuilder() .startObject() .startObject(ManagedIndexConfig.MANAGED_INDEX_TYPE) .optionalTimeField(ManagedIndexConfig.LAST_UPDATED_TIME_FIELD, Instant.now()) .field(ManagedIndexConfig.CHANGE_POLICY_FIELD, changePolicy) + if (continuous != null) { builder.field(ManagedIndexConfig.CONTINUOUS, continuous) } return builder.endObject().endObject() } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/StepUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/StepUtils.kt index 8ec297d27..512129f88 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/StepUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/StepUtils.kt @@ -164,15 +164,15 @@ fun getDiskSettings(clusterSettings: ClusterSettings): Settings { * if adding 2*indexSizeInBytes goes over the high watermark threshold, or if nodeStats does not contain OsStats. */ fun getNodeFreeMemoryAfterShrink(node: NodeStats, indexSizeInBytes: Long, clusterSettings: ClusterSettings): Long { - val osStats = node.os - if (osStats != null) { - val memLeftInNode = osStats.mem.free.bytes - val totalNodeMem = osStats.mem.total.bytes - val freeBytesThresholdHigh = getFreeBytesThresholdHigh(clusterSettings, totalNodeMem) + val fsStats = node.fs + if (fsStats != null) { + val diskSpaceLeftInNode = fsStats.total.free.bytes + val totalNodeDisk = fsStats.total.total.bytes + val freeBytesThresholdHigh = getFreeBytesThresholdHigh(clusterSettings, totalNodeDisk) // We require that a node has enough space to be below the high watermark disk level with an additional 2 * the index size free val requiredBytes = (2 * indexSizeInBytes) + freeBytesThresholdHigh - if (memLeftInNode > requiredBytes) { - return memLeftInNode - requiredBytes + if (diskSpaceLeftInNode > requiredBytes) { + return diskSpaceLeftInNode - requiredBytes } } return -1L diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMRunner.kt index 3e36ed28d..14d96c859 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMRunner.kt @@ -61,7 +61,7 @@ object SMRunner : this.clusterService = clusterService return this } - + @Suppress("MagicNumber") private val backoffPolicy: BackoffPolicy = BackoffPolicy.exponentialBackoff( TimeValue.timeValueMillis(1000L), 3 ) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/SMStateMachine.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/SMStateMachine.kt index ae17e25ce..c2d18a65c 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/SMStateMachine.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/SMStateMachine.kt @@ -30,6 +30,7 @@ import org.opensearch.threadpool.ThreadPool import java.time.Instant.now @OpenForTesting +@Suppress("MagicNumber") class SMStateMachine( val client: Client, val job: SMPolicy, diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMPolicy.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMPolicy.kt index 60ccac14b..cec3232dd 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMPolicy.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMPolicy.kt @@ -36,7 +36,7 @@ import java.time.Instant import java.time.Instant.now import java.time.ZoneId import java.time.temporal.ChronoUnit - +@Suppress("MaxLineLength") data class SMPolicy( val id: String, val description: String? = null, @@ -53,7 +53,6 @@ data class SMPolicy( val notificationConfig: NotificationConfig? = null, val user: User? = null, ) : ScheduledJobParameter, Writeable { - init { require(snapshotConfig["repository"] != null && snapshotConfig["repository"] != "") { "Must provide the repository in snapshot config." } require(creation.schedule.getNextExecutionTime(now()) != null) { "Next execution time from the creation schedule is null, please provide a valid cron expression." } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/action/delete/TransportDeleteTransformsAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/action/delete/TransportDeleteTransformsAction.kt index 913229246..5da2fc554 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/action/delete/TransportDeleteTransformsAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/action/delete/TransportDeleteTransformsAction.kt @@ -108,7 +108,7 @@ class TransportDeleteTransformsAction @Inject constructor( } } - @Suppress("LongMethod") + @Suppress("LongMethod", "NestedBlockDepth") private fun bulkDelete(response: MultiGetResponse, ids: List, forceDelete: Boolean, actionListener: ActionListener) { val enabledIDs = mutableListOf() val notTransform = mutableListOf() diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/action/explain/TransportExplainTransformAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/action/explain/TransportExplainTransformAction.kt index 51c4d14ae..ef3a16a5e 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/action/explain/TransportExplainTransformAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/action/explain/TransportExplainTransformAction.kt @@ -90,6 +90,7 @@ class TransportExplainTransformAction @Inject constructor( client.search( searchRequest, object : ActionListener { + @Suppress("LongMethod") override fun onResponse(response: SearchResponse) { val metadataIdToTransform: MutableMap = HashMap() try { diff --git a/src/main/resources/mappings/opendistro-ism-config.json b/src/main/resources/mappings/opendistro-ism-config.json index 67c5fc47e..cb68653bb 100644 --- a/src/main/resources/mappings/opendistro-ism-config.json +++ b/src/main/resources/mappings/opendistro-ism-config.json @@ -1,6 +1,6 @@ { "_meta" : { - "schema_version": 16 + "schema_version": 17 }, "dynamic": "strict", "properties": { @@ -668,6 +668,9 @@ }, "jitter": { "type": "double" + }, + "continuous": { + "type": "boolean" } } }, diff --git a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt index 82308c85e..56d1cbb64 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt @@ -27,7 +27,7 @@ import javax.management.remote.JMXServiceURL abstract class IndexManagementRestTestCase : ODFERestTestCase() { - val configSchemaVersion = 16 + val configSchemaVersion = 17 val historySchemaVersion = 5 // Having issues with tests leaking into other tests and mappings being incorrect and they are not caught by any pending task wait check as diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementIntegTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementIntegTestCase.kt index 540049fe1..dd6043aac 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementIntegTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementIntegTestCase.kt @@ -151,11 +151,13 @@ abstract class IndexStateManagementIntegTestCase : OpenSearchIntegTestCase() { protected fun addPolicyToIndex( index: String, - policyID: String + policyID: String, + continuous: Boolean = false ) { val body = """ { - "policy_id": "$policyID" + "policy_id": "$policyID", + "continuous": $continuous } """.trimIndent() val response = getRestClient() diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt index be64799ff..2fd47d869 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt @@ -167,7 +167,8 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase() replicas: String? = null, shards: String? = null, mapping: String = "", - settings: Settings? = null + settings: Settings? = null, + continuous: Boolean = false ): Pair { val waitForActiveShards = if (isMultiNode) "all" else "1" val builtSettings = Settings.builder().let { @@ -185,7 +186,7 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase() val aliases = if (alias == null) "" else "\"$alias\": { \"is_write_index\": true }" createIndex(index, builtSettings, mapping, aliases) if (policyID != null) { - addPolicyToIndex(index, policyID) + addPolicyToIndex(index, policyID, continuous) } return index to policyID } @@ -244,11 +245,13 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase() protected fun addPolicyToIndex( index: String, - policyID: String + policyID: String, + continuous: Boolean = false ) { val body = """ { - "policy_id": "$policyID" + "policy_id": "$policyID", + "continuous": $continuous } """.trimIndent() val response = client().makeRequest("POST", "/_opendistro/_ism/add/$index", StringEntity(body, APPLICATION_JSON)) @@ -438,8 +441,8 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase() if (state != null) { string += "\"${ChangePolicy.STATE_FIELD}\":\"$state\"," } - string += "\"${ChangePolicy.INCLUDE_FIELD}\":${include.map { "{\"${StateFilter.STATE_FIELD}\":\"${it.state}\"}" }}}" - + string += "\"${ChangePolicy.INCLUDE_FIELD}\":${include.map { "{\"${StateFilter.STATE_FIELD}\":\"${it.state}\"}" }}" + string += if (continuous != null) ",\"${ChangePolicy.CONTINUOUS_FIELD}\":\"$continuous\"}" else "}" return StringEntity(string, APPLICATION_JSON) } @@ -615,7 +618,7 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase() } // make sure metadata is initialised - assertTrue(metadata.transitionTo != null || metadata.stateMetaData != null || metadata.info != null || metadata.policyCompleted != null) + assertTrue("transitionTo: ${metadata.transitionTo} stateMetaData: ${metadata.stateMetaData}, metadata info: ${metadata.info}, policyCompleted: ${metadata.policyCompleted} ", metadata.transitionTo != null || metadata.stateMetaData != null || metadata.info != null || metadata.policyCompleted != null) return metadata } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt index aad56c02b..41dac6208 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt @@ -302,7 +302,8 @@ fun randomManagedIndexConfig( policyID: String = OpenSearchRestTestCase.randomAlphaOfLength(10), policy: Policy? = randomPolicy(), changePolicy: ChangePolicy? = randomChangePolicy(), - jitter: Double? = 0.0 + jitter: Double? = 0.0, + continuous: Boolean = false ): ManagedIndexConfig { return ManagedIndexConfig( jobName = name, @@ -317,7 +318,8 @@ fun randomManagedIndexConfig( policyPrimaryTerm = policy?.primaryTerm, policy = policy?.copy(seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM), changePolicy = changePolicy, - jobJitter = jitter + jobJitter = jitter, + continuous = continuous ) } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ActionRetryIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ActionRetryIT.kt index cf05dd34d..42b97b138 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ActionRetryIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ActionRetryIT.kt @@ -42,12 +42,10 @@ class ActionRetryIT : IndexStateManagementRestTestCase() { // Change the start time so the job will trigger in 2 seconds. // First execution. We need to initialize the policy. updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } // Second execution is to fail the step once. updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { val managedIndexMetaData = getExplainManagedIndexMetaData(indexName) assertEquals( @@ -57,13 +55,11 @@ class ActionRetryIT : IndexStateManagementRestTestCase() { ), managedIndexMetaData.actionMetaData ) - assertEquals(expectedInfoString, managedIndexMetaData.info.toString()) } // Third execution is to fail the step second time. updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { val managedIndexMetaData = getExplainManagedIndexMetaData(indexName) assertEquals( @@ -79,10 +75,10 @@ class ActionRetryIT : IndexStateManagementRestTestCase() { // Fourth execution is to fail the step third time and finally fail the action. updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { val managedIndexMetaData = getExplainManagedIndexMetaData(indexName) assertEquals( + "failing this one: ", ActionMetaData( "rollover", managedIndexMetaData.actionMetaData?.startTime, 0, true, 2, managedIndexMetaData.actionMetaData?.lastRetryTime, null @@ -161,7 +157,8 @@ class ActionRetryIT : IndexStateManagementRestTestCase() { PolicyRetryInfoMetaData.RETRY_INFO to fun(retryInfoMetaDataMap: Any?): Boolean = assertRetryInfoEquals(PolicyRetryInfoMetaData(false, 0), retryInfoMetaDataMap), ManagedIndexMetaData.INFO to fun(info: Any?): Boolean = expectedInfoString == info.toString(), - ManagedIndexMetaData.ENABLED to true::equals + ManagedIndexMetaData.ENABLED to true::equals, + "continuous" to false::equals ) ), getExplainMap(indexName) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/DeleteActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/DeleteActionIT.kt index ff7265df9..c74bffef3 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/DeleteActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/DeleteActionIT.kt @@ -8,6 +8,7 @@ package org.opensearch.indexmanagement.indexstatemanagement.action import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase import org.opensearch.indexmanagement.indexstatemanagement.model.Policy import org.opensearch.indexmanagement.indexstatemanagement.model.State +import org.opensearch.indexmanagement.indexstatemanagement.model.Transition import org.opensearch.indexmanagement.indexstatemanagement.randomErrorNotification import org.opensearch.indexmanagement.waitFor import java.time.Instant @@ -78,4 +79,36 @@ class DeleteActionIT : IndexStateManagementRestTestCase() { // ) // } } + // Want to test continuos execution occurs on policy + fun `test continuous index deletes immediately`() { + val indexName = "${testIndexName}_delete_continuous" + val policyID = "continuous_delete" + val actionConfig = DeleteAction(0) + val states = listOf( + State("State1", listOf(), listOf(Transition("State2", null))), + State("State2", listOf(), listOf(Transition("State3", null))), + State("State3", listOf(), listOf(Transition("DeleteState", null))), + State("DeleteState", listOf(actionConfig), listOf()) + ) + + val policy = Policy( + id = policyID, + description = "$testIndexName description", + schemaVersion = 1L, + lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), + errorNotification = randomErrorNotification(), + defaultState = states[0].name, + states = states + ) + createPolicy(policy, policyID) + createIndex(indexName, policyID = policy.id, continuous = true) + waitFor { assertIndexExists(indexName) } + + val managedIndexConfig = getExistingManagedIndexConfig(indexName) + // Change the start time so the job will trigger in 2 seconds. + updateManagedIndexConfigStartTime(managedIndexConfig) + // confirm index does not exist anymore + // should delete in one cycle + waitFor { assertIndexDoesNotExist(indexName) } + } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ForceMergeActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ForceMergeActionIT.kt index 9866f175a..773d18d94 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ForceMergeActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ForceMergeActionIT.kt @@ -139,4 +139,43 @@ class ForceMergeActionIT : IndexStateManagementRestTestCase() { assertTrue("Segment count for [$indexName] after force merge is incorrect", validateSegmentCount(indexName, min = 1, max = 1)) assertEquals("true", getIndexBlocksWriteSetting(indexName)) } + // Test Force Merge works on continuous indices + fun `test continuous force merge`() { + val indexName = "${testIndexName}_continuous_index" + val policyID = "${testIndexName}_testPolicyName_1" + + // Create a Policy with one State that only preforms a force_merge Action + val forceMergeActionConfig = ForceMergeAction(maxNumSegments = 1, index = 0) + val states = listOf(State("ForceMergeState", listOf(forceMergeActionConfig), listOf())) + + val policy = Policy( + id = policyID, + description = "$indexName description", + schemaVersion = 1L, + lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), + errorNotification = randomErrorNotification(), + defaultState = states[0].name, + states = states + ) + + waitFor { createPolicy(policy, policyID) } + waitFor { createIndex(indexName, policyID, continuous = true) } + val managedIndexConfig = getExistingManagedIndexConfig(indexName) + + // Add sample data to increase segment count, passing in a delay to ensure multiple segments get created + insertSampleData(indexName, 3, 0) + + waitFor { assertTrue("Segment count for [$indexName] was less than expected", validateSegmentCount(indexName, min = 2)) } + + // Will change the startTime each execution so that it triggers in 2 seconds + // 1. Initialize, 2. Set to read only, 3. Merge shards into 1 segment + updateManagedIndexConfigStartTime(managedIndexConfig) + + // Check force merge executed correctly + waitFor { assertTrue("Segment count for [$indexName] after force merge is incorrect", validateSegmentCount(indexName, min = 1, max = 1)) } + // verify we reset actionproperties at end of forcemerge + waitFor { assertNull("maxNumSegments was not reset", getExplainManagedIndexMetaData(indexName).actionMetaData?.actionProperties) } + // index should still be readonly after force merge finishes + waitFor { assertEquals("true", getIndexBlocksWriteSetting(indexName)) } + } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ShrinkActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ShrinkActionIT.kt index 0bdf32ab0..fedc6c7da 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ShrinkActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ShrinkActionIT.kt @@ -688,4 +688,59 @@ class ShrinkActionIT : IndexStateManagementRestTestCase() { ) } } + // Test that shrink action works on continuous indices + fun `test continuous shrink`() { + val indexName = "${testIndexName}_index_1" + val policyID = "${testIndexName}_testPolicyName_1" + + val shrinkAction = ShrinkAction( + numNewShards = 1, + maxShardSize = null, + percentageOfSourceShards = null, + targetIndexTemplate = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, "{{ctx.index}}$testIndexSuffix", mapOf()), + aliases = listOf(Alias("test-alias1"), Alias("test-alias2").filter(QueryBuilders.termQuery("foo", "bar")).writeIndex(true)), + forceUnsafe = true, + index = 0 + ) + val states = listOf(State("ShrinkState", listOf(shrinkAction), listOf())) + + val policy = Policy( + id = policyID, + description = "$testIndexName description", + schemaVersion = 11L, + lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), + errorNotification = randomErrorNotification(), + defaultState = states[0].name, + states = states + ) + + createPolicy(policy, policyID) + createIndex(indexName, policyID, null, "0", "3", "", continuous = true) + + insertSampleData(indexName, 3) + + // Set the index as readonly to check that the setting is preserved after the shrink finishes + updateIndexSetting(indexName, IndexMetadata.SETTING_BLOCKS_WRITE, "true") + + // Will change the startTime each execution so that it triggers in 2 seconds + // First execution: Policy is initialized + val managedIndexConfig = getExistingManagedIndexConfig(indexName) + // Starts and executes entire shrink action + updateManagedIndexConfigStartTime(managedIndexConfig) + + val targetIndexName = indexName + testIndexSuffix + + // Check that wait for shrink step executed + waitFor(Instant.ofEpochSecond(60)) { + // one primary and one replica + assertTrue(getIndexShards(targetIndexName).size == 2) + assertEquals( + WaitForShrinkStep.SUCCESS_MESSAGE, + getExplainManagedIndexMetaData(indexName).info?.get("message") + ) + assertEquals("Write block setting was not reset after successful shrink", "true", getIndexBlocksWriteSetting(indexName)) + val aliases = getAlias(targetIndexName, "") + assertTrue("Aliases were not added to shrunken index", aliases.containsKey("test-alias1") && aliases.containsKey("test-alias2")) + } + } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/SnapshotActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/SnapshotActionIT.kt index e1bd328d7..e84ed4a20 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/SnapshotActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/SnapshotActionIT.kt @@ -321,4 +321,37 @@ class SnapshotActionIT : IndexStateManagementRestTestCase() { assertEquals(AttemptSnapshotStep.getBlockedMessage(denyList, repository, indexName), getExplainManagedIndexMetaData(indexName).info?.get("message")) } } + // Test entire action executes on a continuous index + fun `test snapshot on continuous index`() { + val indexName = "${testIndexName}_continuous_index" + val policyID = "${testIndexName}_policy" + val repository = "repository" + val snapshot = "snapshot" + val actionConfig = SnapshotAction(repository, snapshot, 0) + val states = listOf( + State("Snapshot", listOf(actionConfig), listOf()) + ) + + waitFor { createRepository(repository) } + + val policy = Policy( + id = policyID, + description = "$testIndexName description", + schemaVersion = 1L, + lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), + errorNotification = randomErrorNotification(), + defaultState = states[0].name, + states = states + ) + createPolicy(policy, policyID) + createIndex(indexName, policyID, continuous = true) + + val managedIndexConfig = getExistingManagedIndexConfig(indexName) + + // Change the start time so the job will trigger in 2 seconds. + updateManagedIndexConfigStartTime(managedIndexConfig) + + waitFor { assertSnapshotExists(repository, "snapshot") } + waitFor { assertSnapshotFinishedWithSuccess(repository, "snapshot") } + } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransitionActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransitionActionIT.kt index 80f153738..e34bcf1f6 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransitionActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransitionActionIT.kt @@ -66,7 +66,6 @@ class TransitionActionIT : IndexStateManagementRestTestCase() { // Should have evaluated to true waitFor { assertEquals(AttemptTransitionStep.getSuccessMessage(indexName, secondStateName), getExplainManagedIndexMetaData(indexName).info?.get("message")) } } - fun `test rollover age transition for index with no rollover fails`() { val indexName = "${testIndexName}_rollover_age_no_rollover" val policyID = "${testIndexName}_rollover_age_no_rollover_policy" @@ -142,4 +141,34 @@ class TransitionActionIT : IndexStateManagementRestTestCase() { // Should have evaluated to true waitFor { assertEquals(AttemptTransitionStep.getSuccessMessage(indexName, secondStateName), getExplainManagedIndexMetaData(indexName).info?.get("message")) } } + // Test that continuous index stops when condition is not met + fun `test continuous index stops when condition not met`() { + val indexName = "${testIndexName}_continuous_condition_check" + val policyID = "continuous_condition_check" + val states = listOf( + State("State1", listOf(), listOf(Transition("State2", null))), + // Should stop here + State("State2", listOf(), listOf(Transition("State3", Conditions(indexAge = TimeValue.timeValueDays(30L))))), + State("State3", listOf(), listOf()) + ) + + val policy = Policy( + id = policyID, + description = "$testIndexName description", + schemaVersion = 1L, + lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), + errorNotification = randomErrorNotification(), + defaultState = states[0].name, + states = states + ) + createPolicy(policy, policyID) + createIndex(indexName, policyID = policy.id, continuous = true) + waitFor { assertIndexExists(indexName) } + + val managedIndexConfig = getExistingManagedIndexConfig(indexName) + // Change the start time so the job will trigger in 2 seconds. + updateManagedIndexConfigStartTime(managedIndexConfig) + // confirm index stopped at State2 + waitFor { assertEquals("State2", getExplainManagedIndexMetaData(indexName).stateMetaData?.name) } + } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/coordinator/ManagedIndexCoordinatorIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/coordinator/ManagedIndexCoordinatorIT.kt index 605e0a1c4..38df3fc5f 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/coordinator/ManagedIndexCoordinatorIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/coordinator/ManagedIndexCoordinatorIT.kt @@ -92,7 +92,8 @@ class ManagedIndexCoordinatorIT : IndexStateManagementRestTestCase() { policyID == null, explainResponseOpenSearchPolicyIdSetting to fun(policyID: Any?): Boolean = policyID == null, - ManagedIndexMetaData.ENABLED to fun(enabled: Any?): Boolean = enabled == null + ManagedIndexMetaData.ENABLED to fun(enabled: Any?): Boolean = enabled == null, + "continuous" to fun(enabled: Any?): Boolean = enabled == null ) ), getExplainMap(index), diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/ISMTemplateRestAPIIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/ISMTemplateRestAPIIT.kt index 563c3de50..3568bf0a1 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/ISMTemplateRestAPIIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/ISMTemplateRestAPIIT.kt @@ -121,7 +121,8 @@ class ISMTemplateRestAPIIT : IndexStateManagementRestTestCase() { indexName1 to listOf( explainResponseOpendistroPolicyIdSetting to fun(policyID: Any?): Boolean = policyID == null, explainResponseOpenSearchPolicyIdSetting to fun(policyID: Any?): Boolean = policyID == null, - ManagedIndexMetaData.ENABLED to fun(enabled: Any?): Boolean = enabled == null + ManagedIndexMetaData.ENABLED to fun(enabled: Any?): Boolean = enabled == null, + "continuous" to fun(enabled: Any?): Boolean = enabled == null ) ), getExplainMap(indexName1), @@ -135,7 +136,8 @@ class ISMTemplateRestAPIIT : IndexStateManagementRestTestCase() { indexName1 to listOf( explainResponseOpendistroPolicyIdSetting to fun(policyID: Any?): Boolean = policyID == null, explainResponseOpenSearchPolicyIdSetting to fun(policyID: Any?): Boolean = policyID == null, - ManagedIndexMetaData.ENABLED to fun(enabled: Any?): Boolean = enabled == null + ManagedIndexMetaData.ENABLED to fun(enabled: Any?): Boolean = enabled == null, + "continuous" to fun(enabled: Any?): Boolean = enabled == null ) ), getExplainMap(indexName1), diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestAddPolicyActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestAddPolicyActionIT.kt index 92a657afc..a08baaffe 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestAddPolicyActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestAddPolicyActionIT.kt @@ -54,7 +54,7 @@ class RestAddPolicyActionIT : IndexStateManagementRestTestCase() { val response = client().makeRequest( POST.toString(), "${RestAddPolicyAction.ADD_POLICY_BASE_URI}/$index", - StringEntity("{ \"policy_id\": \"${policy.id}\" }", APPLICATION_JSON) + StringEntity("{ \"policy_id\": \"${policy.id}\",\"continuous\": false }", APPLICATION_JSON) ) assertEquals("Unexpected RestStatus", RestStatus.OK, response.restStatus()) val actualMessage = response.asMap() @@ -81,7 +81,7 @@ class RestAddPolicyActionIT : IndexStateManagementRestTestCase() { val response = client().makeRequest( POST.toString(), "${RestAddPolicyAction.ADD_POLICY_BASE_URI}/$index", - StringEntity("{ \"policy_id\": \"${policy.id}\" }", APPLICATION_JSON) + StringEntity("{ \"policy_id\": \"${policy.id}\",\"continuous\": false }", APPLICATION_JSON) ) assertEquals("Unexpected RestStatus", RestStatus.OK, response.restStatus()) val actualMessage = response.asMap() @@ -113,7 +113,7 @@ class RestAddPolicyActionIT : IndexStateManagementRestTestCase() { val response = client().makeRequest( POST.toString(), "${RestAddPolicyAction.ADD_POLICY_BASE_URI}/$indexOne,$indexTwo", - StringEntity("{ \"policy_id\": \"${newPolicy.id}\" }", APPLICATION_JSON) + StringEntity("{ \"policy_id\": \"${newPolicy.id}\",\"continuous\": false }", APPLICATION_JSON) ) assertEquals("Unexpected RestStatus", RestStatus.OK, response.restStatus()) val actualMessage = response.asMap() @@ -153,7 +153,7 @@ class RestAddPolicyActionIT : IndexStateManagementRestTestCase() { val response = client().makeRequest( POST.toString(), "${RestAddPolicyAction.ADD_POLICY_BASE_URI}/$indexPattern*", - StringEntity("{ \"policy_id\": \"${newPolicy.id}\" }", APPLICATION_JSON) + StringEntity("{ \"policy_id\": \"${newPolicy.id}\",\"continuous\": false }", APPLICATION_JSON) ) assertEquals("Unexpected RestStatus", RestStatus.OK, response.restStatus()) val actualMessage = response.asMap() @@ -198,7 +198,7 @@ class RestAddPolicyActionIT : IndexStateManagementRestTestCase() { val response = client().makeRequest( POST.toString(), "${RestAddPolicyAction.ADD_POLICY_BASE_URI}/.*", - StringEntity("{ \"policy_id\": \"${policy.id}\" }", APPLICATION_JSON) + StringEntity("{ \"policy_id\": \"${policy.id}\",\"continuous\": false }", APPLICATION_JSON) ) assertEquals("Unexpected RestStatus", RestStatus.OK, response.restStatus()) val actualMessage = response.asMap() @@ -237,7 +237,6 @@ class RestAddPolicyActionIT : IndexStateManagementRestTestCase() { assertEquals(policy.id, getPolicyIDOfManagedIndex(indexFour)) } } - /** * The util UUID method doesn't work for hidden indices because strict warning check, the following method skips the strict check */ diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestChangePolicyActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestChangePolicyActionIT.kt index 22c2f4586..0df207780 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestChangePolicyActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestChangePolicyActionIT.kt @@ -623,4 +623,27 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() { ) } } + + // Check if Continuous field is changed from false to true + fun `test changing value of continuous in index`() { + val indexName = "${testIndexName}_continuous_check" + val policy = createRandomPolicy() + createIndex(indexName, policyID = policy.id, continuous = false) + Thread.sleep(1_000) + var managedIndexConfig = getManagedIndexConfig(indexName) + // Check continuous field is false intially + assertEquals("Incorrect initial continuous field: ${managedIndexConfig?.continuous}", false, managedIndexConfig?.continuous) + + // Change the policy continuous flag to true + val changePolicy = ChangePolicy(policy.id, null, emptyList(), false, continuous = true) + // Make Change Policy Request + client().makeRequest( + RestRequest.Method.POST.toString(), + "${RestChangePolicyAction.CHANGE_POLICY_BASE_URI}/$indexName", emptyMap(), changePolicy.toHttpEntity() + ) + Thread.sleep(1_000) + managedIndexConfig = getManagedIndexConfig(indexName) + // Check continuous flag was changed to true + assertEquals("Continuous field did not change: ${managedIndexConfig?.continuous}", true, managedIndexConfig?.continuous) + } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestExplainActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestExplainActionIT.kt index c7a980932..f9142b0d2 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestExplainActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestExplainActionIT.kt @@ -35,7 +35,8 @@ class RestExplainActionIT : IndexStateManagementRestTestCase() { indexName to mapOf( explainResponseOpendistroPolicyIdSetting to null, explainResponseOpenSearchPolicyIdSetting to null, - ManagedIndexMetaData.ENABLED to null + ManagedIndexMetaData.ENABLED to null, + "continuous" to null ) ) assertResponseMap(expected, getExplainMap(indexName)) @@ -55,7 +56,7 @@ class RestExplainActionIT : IndexStateManagementRestTestCase() { val indexName1 = "${testIndexName}_managed" val indexName2 = "${testIndexName}_not_managed" val policy = createRandomPolicy() - createIndex(indexName1, policy.id) + createIndex(indexName1, policy.id, continuous = false) createIndex(indexName2, null) val expected = mapOf( @@ -66,12 +67,14 @@ class RestExplainActionIT : IndexStateManagementRestTestCase() { "index" to indexName1, "index_uuid" to getUuid(indexName1), "policy_id" to policy.id, - ManagedIndexMetaData.ENABLED to true + ManagedIndexMetaData.ENABLED to true, + "continuous" to false ), indexName2 to mapOf( explainResponseOpendistroPolicyIdSetting to null, explainResponseOpenSearchPolicyIdSetting to null, - ManagedIndexMetaData.ENABLED to null + ManagedIndexMetaData.ENABLED to null, + "continuous" to null ) ) waitFor { @@ -94,7 +97,8 @@ class RestExplainActionIT : IndexStateManagementRestTestCase() { "index" to indexName1, "index_uuid" to getUuid(indexName1), "policy_id" to policy.id, - "enabled" to true + "enabled" to true, + "continuous" to false ), "total_managed_indices" to 1 ) @@ -119,7 +123,8 @@ class RestExplainActionIT : IndexStateManagementRestTestCase() { "index" to indexName1, "index_uuid" to getUuid(indexName1), "policy_id" to policy.id, - ManagedIndexMetaData.ENABLED to true + ManagedIndexMetaData.ENABLED to true, + "continuous" to false ), indexName2 to mapOf( explainResponseOpendistroPolicyIdSetting to policy.id, @@ -127,12 +132,14 @@ class RestExplainActionIT : IndexStateManagementRestTestCase() { "index" to indexName2, "index_uuid" to getUuid(indexName2), "policy_id" to policy.id, - ManagedIndexMetaData.ENABLED to true + ManagedIndexMetaData.ENABLED to true, + "continuous" to false ), indexName3 to mapOf( explainResponseOpendistroPolicyIdSetting to null, explainResponseOpenSearchPolicyIdSetting to null, - ManagedIndexMetaData.ENABLED to null + ManagedIndexMetaData.ENABLED to null, + "continuous" to null ) ) waitFor { @@ -162,7 +169,8 @@ class RestExplainActionIT : IndexStateManagementRestTestCase() { "index" to indexName1, "index_uuid" to getUuid(indexName1), "policy_id" to policy.id, - "enabled" to true + "enabled" to true, + "continuous" to false ) val indexName2Map = indexName2 to mapOf( explainResponseOpendistroPolicyIdSetting to policy.id, @@ -170,7 +178,8 @@ class RestExplainActionIT : IndexStateManagementRestTestCase() { "index" to indexName2, "index_uuid" to getUuid(indexName2), "policy_id" to policy.id, - "enabled" to true + "enabled" to true, + "continuous" to false ) val indexName4Map = indexName4 to mapOf( explainResponseOpendistroPolicyIdSetting to policy.id, @@ -178,7 +187,8 @@ class RestExplainActionIT : IndexStateManagementRestTestCase() { "index" to indexName4, "index_uuid" to getUuid(indexName4), "policy_id" to policy.id, - "enabled" to true + "enabled" to true, + "continuous" to false ) val indexName5Map = indexName5 to mapOf( explainResponseOpendistroPolicyIdSetting to policy.id, @@ -186,7 +196,8 @@ class RestExplainActionIT : IndexStateManagementRestTestCase() { "index" to indexName5, "index_uuid" to getUuid(indexName5), "policy_id" to policy.id, - "enabled" to true + "enabled" to true, + "continuous" to false ) val datastreamMap = ".ds-$dataStreamName-000001" to mapOf( explainResponseOpendistroPolicyIdSetting to policy.id, @@ -194,7 +205,8 @@ class RestExplainActionIT : IndexStateManagementRestTestCase() { "index" to ".ds-$dataStreamName-000001", "index_uuid" to getUuid(".ds-$dataStreamName-000001"), "policy_id" to policy.id, - "enabled" to true + "enabled" to true, + "continuous" to false ) waitFor { @@ -279,7 +291,8 @@ class RestExplainActionIT : IndexStateManagementRestTestCase() { PolicyRetryInfoMetaData.RETRY_INFO to fun(retryInfoMetaDataMap: Any?): Boolean = assertRetryInfoEquals(PolicyRetryInfoMetaData(false, 0), retryInfoMetaDataMap), ManagedIndexMetaData.INFO to fun(info: Any?): Boolean = expectedInfoString == info.toString(), - ManagedIndexMetaData.ENABLED to true::equals + ManagedIndexMetaData.ENABLED to true::equals, + "continuous" to false::equals ) ), getExplainMap(indexName) @@ -292,7 +305,7 @@ class RestExplainActionIT : IndexStateManagementRestTestCase() { val policy = createRandomPolicy() createIndex(indexName, policy.id) val newPolicy = createRandomPolicy() - val changePolicy = ChangePolicy(newPolicy.id, null, emptyList(), false) + val changePolicy = ChangePolicy(newPolicy.id, null, emptyList(), false, continuous = false) client().makeRequest( RestRequest.Method.POST.toString(), "${RestChangePolicyAction.CHANGE_POLICY_BASE_URI}/$indexName", emptyMap(), changePolicy.toHttpEntity() @@ -321,7 +334,8 @@ class RestExplainActionIT : IndexStateManagementRestTestCase() { assertRetryInfoEquals(PolicyRetryInfoMetaData(true, 0), retryInfoMetaDataMap), ManagedIndexMetaData.INFO to fun(info: Any?): Boolean = expectedInfoString == info.toString(), ManagedIndexMetaData.INDEX_CREATION_DATE to fun(indexCreationDate: Any?): Boolean = (indexCreationDate as Long) > 1L, - ManagedIndexMetaData.ENABLED to true::equals + ManagedIndexMetaData.ENABLED to true::equals, + "continuous" to false::equals ) ), getExplainMap(indexName) @@ -343,7 +357,8 @@ class RestExplainActionIT : IndexStateManagementRestTestCase() { "index_uuid" to getUuid(indexName), "policy_id" to policy.id, ManagedIndexMetaData.ENABLED to true, - "policy" to expectedPolicy + "policy" to expectedPolicy, + "continuous" to false ), TOTAL_MANAGED_INDICES to 1, ) @@ -372,4 +387,17 @@ class RestExplainActionIT : IndexStateManagementRestTestCase() { assertEquals("Expected and actual values does not match", entry.value, actual[entry.key]) } } + // Want to see if the continuous field shows up as correct value in the explain response + fun `test continuous field shown`() { + val indexName = "${testIndexName}_continuous_check" + val policy = createRandomPolicy() + createIndex(indexName, policyID = policy.id, continuous = true) + // Need to give time to create index + Thread.sleep(1_000) + val explainMap = getExplainMap(indexName) + val indexMap = explainMap?.get(indexName) as Map + waitFor { + assertEquals("Continuous field in explain map is wrong", true, indexMap?.get("continuous")) + } + } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/AddPolicyRequestTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/AddPolicyRequestTests.kt index 704fef4b5..c2e68c1a0 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/AddPolicyRequestTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/AddPolicyRequestTests.kt @@ -15,7 +15,8 @@ class AddPolicyRequestTests : OpenSearchTestCase() { fun `test add policy request`() { val indices = listOf("index1", "index2") val policyID = "policyID" - val req = AddPolicyRequest(indices, policyID, DEFAULT_INDEX_TYPE) + val continuous = false + val req = AddPolicyRequest(indices, policyID, DEFAULT_INDEX_TYPE, continuous) val out = BytesStreamOutput() req.writeTo(out) @@ -28,7 +29,8 @@ class AddPolicyRequestTests : OpenSearchTestCase() { fun `test add policy request with non default index type and multiple indices fails`() { val indices = listOf("index1", "index2") val policyID = "policyID" - val req = AddPolicyRequest(indices, policyID, "non-existent-index-type") + val continuous = false + val req = AddPolicyRequest(indices, policyID, "non-existent-index-type", continuous) val actualException: String? = req.validate()?.validationErrors()?.firstOrNull() val expectedException: String = AddPolicyRequest.MULTIPLE_INDICES_CUSTOM_INDEX_TYPE_ERROR assertEquals("Add policy request should have failed validation with specific exception", actualException, expectedException) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/ChangePolicyRequestTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/ChangePolicyRequestTests.kt index ac49b2432..f1d79bb53 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/ChangePolicyRequestTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/ChangePolicyRequestTests.kt @@ -18,7 +18,8 @@ class ChangePolicyRequestTests : OpenSearchTestCase() { val indices = listOf("index1", "index2") val stateFilter = StateFilter("state1") val changePolicy = ChangePolicy("policyID", "state1", listOf(stateFilter), true) - val req = ChangePolicyRequest(indices, changePolicy, DEFAULT_INDEX_TYPE) + val continuous = false + val req = ChangePolicyRequest(indices, changePolicy, DEFAULT_INDEX_TYPE, continuous = continuous) val out = BytesStreamOutput() req.writeTo(out) @@ -32,7 +33,8 @@ class ChangePolicyRequestTests : OpenSearchTestCase() { val indices = listOf("index1", "index2") val stateFilter = StateFilter("state1") val changePolicy = ChangePolicy("policyID", "state1", listOf(stateFilter), true) - val req = ChangePolicyRequest(indices, changePolicy, "non-existent-index-type") + val continuous = false + val req = ChangePolicyRequest(indices, changePolicy, "non-existent-index-type", continuous = continuous) val actualException: String? = req.validate()?.validationErrors()?.firstOrNull() val expectedException: String = ChangePolicyRequest.MULTIPLE_INDICES_CUSTOM_INDEX_TYPE_ERROR assertEquals("Add policy request should have failed validation with specific exception", actualException, expectedException) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainResponseTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainResponseTests.kt index 647ffd99f..f0c6df293 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainResponseTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainResponseTests.kt @@ -36,7 +36,8 @@ class ExplainResponseTests : OpenSearchTestCase() { val totalManagedIndices = 1 val enabledState = mapOf("index1" to true) val appliedPolicies = mapOf("policy" to randomPolicy()) - val res = ExplainResponse(indexNames, indexPolicyIDs, indexMetadatas, totalManagedIndices, enabledState, appliedPolicies) + val continuous = mapOf("continuous" to false) + val res = ExplainResponse(indexNames, indexPolicyIDs, indexMetadatas, totalManagedIndices, enabledState, appliedPolicies, continuous) val out = BytesStreamOutput() res.writeTo(out) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtilsTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtilsTests.kt index 207ad3845..67cd1f21d 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtilsTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtilsTests.kt @@ -66,7 +66,7 @@ class ManagedIndexUtilsTests : OpenSearchTestCase() { index = index, uuid = uuid, policyID = policyID, primaryTerm = 1, seqNo = 1, changePolicy = randomChangePolicy(policyID = policyID), policy = null ) - val updateRequest = updateManagedIndexRequest(sweptManagedIndexConfig) + val updateRequest = updateManagedIndexRequest(sweptManagedIndexConfig, continuous = false) assertNotNull("UpdateRequest not created", updateRequest) assertEquals("Incorrect ism index used in request", INDEX_MANAGEMENT_INDEX, updateRequest.index()) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/StepUtilsTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/StepUtilsTests.kt index 12675a501..5419cf2c5 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/StepUtilsTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/StepUtilsTests.kt @@ -19,7 +19,7 @@ import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaD import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ShrinkActionProperties import org.opensearch.jobscheduler.spi.LockModel -import org.opensearch.monitor.os.OsStats +import org.opensearch.monitor.fs.FsInfo import org.opensearch.test.OpenSearchTestCase class StepUtilsTests : OpenSearchTestCase() { @@ -112,16 +112,16 @@ class StepUtilsTests : OpenSearchTestCase() { fun `test free memory after shrink`() { val nodeStats: NodeStats = mock() - val osStats: OsStats = mock() - Mockito.`when`(nodeStats.os).thenReturn(osStats) - val memStats: OsStats.Mem = mock() - Mockito.`when`(osStats.mem).thenReturn(memStats) + val fsInfo: FsInfo = mock() + Mockito.`when`(nodeStats.fs).thenReturn(fsInfo) + val path: FsInfo.Path = mock() + Mockito.`when`(fsInfo.total).thenReturn(path) val totalBytes = randomLongBetween(10, 100000000) val freeBytes = randomLongBetween(0, totalBytes) val indexSize = randomLongBetween(0, totalBytes / 2) val threshold = randomLongBetween(0, totalBytes / 2) - Mockito.`when`(memStats.free).thenReturn(ByteSizeValue(freeBytes)) - Mockito.`when`(memStats.total).thenReturn(ByteSizeValue(totalBytes)) + Mockito.`when`(path.free).thenReturn(ByteSizeValue(freeBytes)) + Mockito.`when`(path.total).thenReturn(ByteSizeValue(totalBytes)) val settings = Settings.builder() .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.key, ByteSizeValue(threshold).stringRep) .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.key, ByteSizeValue(threshold + 1).stringRep) diff --git a/src/test/resources/mappings/cached-opendistro-ism-config.json b/src/test/resources/mappings/cached-opendistro-ism-config.json index 67c5fc47e..cb68653bb 100644 --- a/src/test/resources/mappings/cached-opendistro-ism-config.json +++ b/src/test/resources/mappings/cached-opendistro-ism-config.json @@ -1,6 +1,6 @@ { "_meta" : { - "schema_version": 16 + "schema_version": 17 }, "dynamic": "strict", "properties": { @@ -668,6 +668,9 @@ }, "jitter": { "type": "double" + }, + "continuous": { + "type": "boolean" } } }, From 79ded56ea5c438ac94e74c67ef4c88d89bc51924 Mon Sep 17 00:00:00 2001 From: Ronnak Saxena Date: Tue, 26 Jul 2022 21:33:48 +0000 Subject: [PATCH 04/17] fixed tests Ronnak Saxena Signed-off-by: Ronnak Saxena Signed-off-by: Ronnak Saxena --- .../opensearch/indexmanagement/IndexManagementIndicesIT.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementIndicesIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementIndicesIT.kt index 22e9f9d03..ad77be6af 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementIndicesIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementIndicesIT.kt @@ -166,14 +166,14 @@ class IndexManagementIndicesIT : IndexStateManagementRestTestCase() { val addPolicyResponse = client().makeRequest( RestRequest.Method.POST.toString(), "${RestAddPolicyAction.LEGACY_ADD_POLICY_BASE_URI}/$indexName", - StringEntity("{ \"policy_id\": \"$policyId\" }", ContentType.APPLICATION_JSON) + StringEntity("{ \"policy_id\": \"$policyId\", \"continuous\": false }", ContentType.APPLICATION_JSON) ) assertEquals("Unexpected RestStatus", RestStatus.OK, addPolicyResponse.restStatus()) val changePolicyResponse = client().makeRequest( RestRequest.Method.POST.toString(), "${RestAddPolicyAction.LEGACY_ADD_POLICY_BASE_URI}/$indexName", - StringEntity("{ \"policy_id\": \"$policyId\" }", ContentType.APPLICATION_JSON) + StringEntity("{ \"policy_id\": \"$policyId\", \"continuous\": false }", ContentType.APPLICATION_JSON) ) assertEquals("Unexpected RestStatus", RestStatus.OK, changePolicyResponse.restStatus()) From 644770035f7ae5137fc13da38ce7dfe2a6315074 Mon Sep 17 00:00:00 2001 From: Ronnak Saxena Date: Wed, 27 Jul 2022 01:57:58 +0000 Subject: [PATCH 05/17] fixed change policy test Signed-off-by: Ronnak Saxena Signed-off-by: Ronnak Saxena Signed-off-by: Ronnak Saxena --- .../indexstatemanagement/model/ChangePolicy.kt | 5 +++-- .../transport/action/changepolicy/ChangePolicyRequest.kt | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ChangePolicy.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ChangePolicy.kt index 226dce0c7..b649084a6 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ChangePolicy.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ChangePolicy.kt @@ -36,7 +36,7 @@ data class ChangePolicy( val include: List, val isSafe: Boolean, val user: User? = null, - var continuous: Boolean? = null + val continuous: Boolean? = null ) : Writeable, ToXContentObject { @@ -72,7 +72,8 @@ data class ChangePolicy( out.writeBoolean(isSafe) out.writeBoolean(user != null) user?.writeTo(out) - if (continuous != null) { out.writeBoolean(continuous as Boolean) } + out.writeBoolean(continuous != null) + if (continuous != null) out.writeBoolean(continuous) } companion object { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/ChangePolicyRequest.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/ChangePolicyRequest.kt index 8f9bd76e8..64f7e8f6b 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/ChangePolicyRequest.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/ChangePolicyRequest.kt @@ -18,7 +18,7 @@ class ChangePolicyRequest( val indices: List, val changePolicy: ChangePolicy, val indexType: String, - val continuous: Boolean + val continuous: Boolean = false ) : ActionRequest() { @Throws(IOException::class) @@ -47,7 +47,7 @@ class ChangePolicyRequest( out.writeStringCollection(indices) changePolicy.writeTo(out) out.writeString(indexType) - out.writeBoolean(continuous) + out.writeBoolean(if (continuous != null) continuous else false) } companion object { From 30c7382e41380f5fbe7ea5a4039f102bb209dc3f Mon Sep 17 00:00:00 2001 From: Ronnak Saxena Date: Thu, 28 Jul 2022 22:57:55 +0000 Subject: [PATCH 06/17] thorugh in some print statemtnets Signed-off-by: Ronnak Saxena --- .../indexstatemanagement/ManagedIndexRunner.kt | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt index 113fed82c..214c181a2 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt @@ -229,6 +229,7 @@ object ManagedIndexRunner : var keepExecuting: Boolean = true // Need to execute at least once for policy to initialize do { + println("RON SAX LOOK HERE: name: ${job.jobName} continuous: ${job.continuous}, keepExecuting: $keepExecuting, id: ${job.id}") // Need to renew lock for current step execution val renewedLock = renewLockForScheduledJob(context, lock as LockModel, errorNotificationRetryPolicy) // Failed to renew lock @@ -241,6 +242,7 @@ object ManagedIndexRunner : } } while ((job.continuous && keepExecuting)) // Runs until job is no longer continuous or execution should stop } else { // If job is not continuous run once + println("RON SAX LOOK HERE: name: ${job.jobName} continuous: ${job.continuous}, id: ${job.id}") runManagedIndexConfig(job, context) } // Release lock @@ -320,6 +322,7 @@ object ManagedIndexRunner : if (result.metadataSaved) { disableManagedIndexConfig(managedIndexConfig) } + println("RON SAX return false in line 324") return false } @@ -351,6 +354,7 @@ object ManagedIndexRunner : if (managedIndexConfig.shouldChangePolicy(managedIndexMetaData, action)) { initChangePolicy(managedIndexConfig, managedIndexMetaData, action) + println("RON SAX: returned true on 357") return true } @@ -416,9 +420,12 @@ object ManagedIndexRunner : managedIndexConfig.id, settings, threadPool.threadContext, managedIndexConfig.policy.user ) ) { + println("RON SAX STEP: ${step.name}") step.preExecute(logger, stepContext.getUpdatedContext(startingManagedIndexMetaData)).execute().postExecute(logger) } + println("RON SAX Starting metadata: $startingManagedIndexMetaData") var executedManagedIndexMetaData = startingManagedIndexMetaData.getCompletedManagedIndexMetaData(action, step) + println("RON SAX executed metadata: $executedManagedIndexMetaData") if (executedManagedIndexMetaData.isFailed) { try { @@ -461,6 +468,7 @@ object ManagedIndexRunner : return false } // Made it to end of successful execution block + println("RON SAX: returned true on 471") return true } return false @@ -564,7 +572,7 @@ object ManagedIndexRunner : logger.error("Failed to update ManagedIndexConfig(${managedIndexConfig.index}) job interval") } } catch (e: VersionConflictEngineException) { - logger.error("Failed to update ManagedIndexConfig(${managedIndexConfig.index}) job interval. ${e.message}") + logger.error("RON SAX FAILED HERE: Failed to update ManagedIndexConfig(${managedIndexConfig.index}) job interval. ${e.message}") } catch (e: Exception) { logger.error("Failed to update ManagedIndexConfig(${managedIndexConfig.index}) job interval", e) } @@ -783,7 +791,10 @@ object ManagedIndexRunner : * */ val updated = updateManagedIndexMetaData(updatedManagedIndexMetaData) - if (!updated.metadataSaved || policy == null) return + if (!updated.metadataSaved || policy == null) { + println("RON SAX: coundn't save metadata here") + return + } // Change the policy and user stored on the job from changePolicy, this will also set the changePolicy to null on the job savePolicyToManagedIndexConfig(managedIndexConfig, policy.copy(user = changePolicy.user)) From 5a918e1544129f87d449437b08a279cbbb9094c0 Mon Sep 17 00:00:00 2001 From: Ronnak Saxena Date: Fri, 29 Jul 2022 18:00:36 +0000 Subject: [PATCH 07/17] Added flags in initChangePolicy if managed index config was updated so that runManagedIndex avoids continuous execution on outdated managed index config Signed-off-by: Ronnak Saxena --- .../ManagedIndexRunner.kt | 43 +++++++++++-------- 1 file changed, 25 insertions(+), 18 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt index 214c181a2..5d350bbd7 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt @@ -225,7 +225,7 @@ object ManagedIndexRunner : if (lock == null) { logger.debug("Could not acquire lock [${lock?.lockId}] for ${job.index}") } else { - if (job?.continuous) { + if (job.continuous) { var keepExecuting: Boolean = true // Need to execute at least once for policy to initialize do { @@ -234,10 +234,10 @@ object ManagedIndexRunner : val renewedLock = renewLockForScheduledJob(context, lock as LockModel, errorNotificationRetryPolicy) // Failed to renew lock if (renewedLock == null) { - logger.error("Could not renew lock [${lock?.lockId}] for ${job.index}") + logger.error("Could not renew lock [${lock.lockId}] for ${job.index}") break } else { - lock = renewedLock as LockModel + lock = renewedLock keepExecuting = runManagedIndexConfig(job, context) } } while ((job.continuous && keepExecuting)) // Runs until job is no longer continuous or execution should stop @@ -246,7 +246,7 @@ object ManagedIndexRunner : runManagedIndexConfig(job, context) } // Release lock - if (lock == null || !releaseLockForScheduledJob(context, lock as LockModel)) { + if (lock == null || !releaseLockForScheduledJob(context, lock)) { logger.debug("Could not release lock [${lock?.lockId}] for ${job.index}") } } @@ -353,9 +353,15 @@ object ManagedIndexRunner : } if (managedIndexConfig.shouldChangePolicy(managedIndexMetaData, action)) { - initChangePolicy(managedIndexConfig, managedIndexMetaData, action) - println("RON SAX: returned true on 357") - return true + return if (initChangePolicy(managedIndexConfig, managedIndexMetaData, action)) { + // Don't want to continue execution on old Managed Index Config + println("RON SAX: returned false on 357") + false + } else { + print("RON SAX returned false on 361") + // Managed index config was not updated to safe to continue execution of this job + true + } } val shouldBackOff = action?.shouldBackoff(currentActionMetaData, action.configRetry) @@ -398,7 +404,7 @@ object ManagedIndexRunner : // If this action is not allowed and the step to be executed is the first step in the action then we will fail // as this action has been removed from the AllowList, but if its not the first step we will let it finish as it's already inflight if (action?.isAllowed(allowList) == false && step != null && action.isFirstStep(step.name) && action.type != TransitionsAction.name) { - val info = mapOf("message" to "Attempted to execute action=${action?.type} which is not allowed.") + val info = mapOf("message" to "Attempted to execute action=${action.type} which is not allowed.") val updated = updateManagedIndexMetaData( managedIndexMetaData.copy( policyRetryInfo = PolicyRetryInfoMetaData(true, 0), info = info @@ -468,7 +474,7 @@ object ManagedIndexRunner : return false } // Made it to end of successful execution block - println("RON SAX: returned true on 471") + println("RON SAX: returned true on 478") return true } return false @@ -554,9 +560,9 @@ object ManagedIndexRunner : savedPolicy = indexResponse.status() == RestStatus.OK } } catch (e: VersionConflictEngineException) { - logger.error("Failed to save policy(${policy.id}) to ManagedIndexConfig(${managedIndexConfig.index}). ${e.message}") + logger.error("RON SAX VERSION CONFLICT Failed to save policy(${policy.id}) to ManagedIndexConfig(${managedIndexConfig.index}). ${e.message}") } catch (e: Exception) { - logger.error("Failed to save policy(${policy.id}) to ManagedIndexConfig(${managedIndexConfig.index})", e) + logger.error("RON SAX OTHER EXCEPTION Failed to save policy(${policy.id}) to ManagedIndexConfig(${managedIndexConfig.index})", e) } return savedPolicy } @@ -572,7 +578,7 @@ object ManagedIndexRunner : logger.error("Failed to update ManagedIndexConfig(${managedIndexConfig.index}) job interval") } } catch (e: VersionConflictEngineException) { - logger.error("RON SAX FAILED HERE: Failed to update ManagedIndexConfig(${managedIndexConfig.index}) job interval. ${e.message}") + logger.error("Failed to update ManagedIndexConfig(${managedIndexConfig.index}) job interval. ${e.message}") } catch (e: Exception) { logger.error("Failed to update ManagedIndexConfig(${managedIndexConfig.index}) job interval", e) } @@ -722,18 +728,19 @@ object ManagedIndexRunner : * Initializes the change policy process where we will get the policy using the change policy's policyID, update the [ManagedIndexMetaData] * to reflect the new policy, and save the new policy to the [ManagedIndexConfig] while resetting the change policy to null */ + // Returning true if Managed Index Config was updated to avoid continuous execution on outdated Managed Index Config @Suppress("ReturnCount", "ComplexMethod") private suspend fun initChangePolicy( managedIndexConfig: ManagedIndexConfig, managedIndexMetaData: ManagedIndexMetaData, actionToExecute: Action? - ) { + ): Boolean { // should never happen since we only call this if there is a changePolicy, but we'll do it to make changePolicy non null val changePolicy = managedIndexConfig.changePolicy if (changePolicy == null) { logger.debug("initChangePolicy was called with a null ChangePolicy, ManagedIndexConfig: $managedIndexConfig") - return + return false } // get the policy we'll attempt to change to @@ -778,7 +785,7 @@ object ManagedIndexRunner : // if it is unsafe to change then we set safe back to false so we don't keep doing this check every execution if (managedIndexConfig.policy?.isSafeToChange(managedIndexMetaData.stateMetaData?.name, policy, changePolicy) != true) { updateManagedIndexConfig(managedIndexConfig.copy(changePolicy = managedIndexConfig.changePolicy.copy(isSafe = false))) - return + return true } } } @@ -792,12 +799,12 @@ object ManagedIndexRunner : val updated = updateManagedIndexMetaData(updatedManagedIndexMetaData) if (!updated.metadataSaved || policy == null) { - println("RON SAX: coundn't save metadata here") - return + return false } // Change the policy and user stored on the job from changePolicy, this will also set the changePolicy to null on the job - savePolicyToManagedIndexConfig(managedIndexConfig, policy.copy(user = changePolicy.user)) + // Return a flag to make sure saving correct + return savePolicyToManagedIndexConfig(managedIndexConfig, policy.copy(user = changePolicy.user)) } @Suppress("TooGenericExceptionCaught") From 6674ebc1a80d7cb5413ce27c5cb5842960319768 Mon Sep 17 00:00:00 2001 From: Ronnak Saxena Date: Fri, 29 Jul 2022 18:34:49 +0000 Subject: [PATCH 08/17] removed print statements Signed-off-by: Ronnak Saxena Signed-off-by: Ronnak Saxena --- .../indexstatemanagement/ManagedIndexRunner.kt | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt index 5d350bbd7..5eabb710d 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt @@ -229,7 +229,6 @@ object ManagedIndexRunner : var keepExecuting: Boolean = true // Need to execute at least once for policy to initialize do { - println("RON SAX LOOK HERE: name: ${job.jobName} continuous: ${job.continuous}, keepExecuting: $keepExecuting, id: ${job.id}") // Need to renew lock for current step execution val renewedLock = renewLockForScheduledJob(context, lock as LockModel, errorNotificationRetryPolicy) // Failed to renew lock @@ -242,7 +241,6 @@ object ManagedIndexRunner : } } while ((job.continuous && keepExecuting)) // Runs until job is no longer continuous or execution should stop } else { // If job is not continuous run once - println("RON SAX LOOK HERE: name: ${job.jobName} continuous: ${job.continuous}, id: ${job.id}") runManagedIndexConfig(job, context) } // Release lock @@ -322,7 +320,6 @@ object ManagedIndexRunner : if (result.metadataSaved) { disableManagedIndexConfig(managedIndexConfig) } - println("RON SAX return false in line 324") return false } @@ -355,10 +352,8 @@ object ManagedIndexRunner : if (managedIndexConfig.shouldChangePolicy(managedIndexMetaData, action)) { return if (initChangePolicy(managedIndexConfig, managedIndexMetaData, action)) { // Don't want to continue execution on old Managed Index Config - println("RON SAX: returned false on 357") false } else { - print("RON SAX returned false on 361") // Managed index config was not updated to safe to continue execution of this job true } @@ -426,12 +421,9 @@ object ManagedIndexRunner : managedIndexConfig.id, settings, threadPool.threadContext, managedIndexConfig.policy.user ) ) { - println("RON SAX STEP: ${step.name}") step.preExecute(logger, stepContext.getUpdatedContext(startingManagedIndexMetaData)).execute().postExecute(logger) } - println("RON SAX Starting metadata: $startingManagedIndexMetaData") var executedManagedIndexMetaData = startingManagedIndexMetaData.getCompletedManagedIndexMetaData(action, step) - println("RON SAX executed metadata: $executedManagedIndexMetaData") if (executedManagedIndexMetaData.isFailed) { try { @@ -474,7 +466,6 @@ object ManagedIndexRunner : return false } // Made it to end of successful execution block - println("RON SAX: returned true on 478") return true } return false @@ -560,9 +551,9 @@ object ManagedIndexRunner : savedPolicy = indexResponse.status() == RestStatus.OK } } catch (e: VersionConflictEngineException) { - logger.error("RON SAX VERSION CONFLICT Failed to save policy(${policy.id}) to ManagedIndexConfig(${managedIndexConfig.index}). ${e.message}") + logger.error("Failed to save policy(${policy.id}) to ManagedIndexConfig(${managedIndexConfig.index}). ${e.message}") } catch (e: Exception) { - logger.error("RON SAX OTHER EXCEPTION Failed to save policy(${policy.id}) to ManagedIndexConfig(${managedIndexConfig.index})", e) + logger.error("Failed to save policy(${policy.id}) to ManagedIndexConfig(${managedIndexConfig.index})", e) } return savedPolicy } From ceb07aed993c41258afcf97ccbe036348f4844aa Mon Sep 17 00:00:00 2001 From: Ronnak Saxena Date: Fri, 29 Jul 2022 20:02:31 +0000 Subject: [PATCH 09/17] resolved merge conflicts Signed-off-by: Ronnak Saxena --- .../transport/action/explain/TransportExplainAction.kt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt index 74ef75c78..797a3a4d5 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt @@ -427,7 +427,9 @@ class TransportExplainAction @Inject constructor( totalManagedIndices, filteredAppliedPolicies, continuousField ) } - } + } + } + ) } @Suppress("LongParameterList") From 18fc1d7c7aca89d808b4055c448b95b987162f22 Mon Sep 17 00:00:00 2001 From: Ronnak Saxena Date: Tue, 9 Aug 2022 17:25:56 +0000 Subject: [PATCH 10/17] created request and action class --- .../executepolicy/ExecutePolicyAction.kt | 17 +++++++++ .../executepolicy/ExecutePolicyRequest.kt | 36 +++++++++++++++++++ .../TransportExecutePolicyAction.kt | 9 +++++ 3 files changed, 62 insertions(+) create mode 100644 src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/ExecutePolicyAction.kt create mode 100644 src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/ExecutePolicyRequest.kt create mode 100644 src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/TransportExecutePolicyAction.kt diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/ExecutePolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/ExecutePolicyAction.kt new file mode 100644 index 000000000..165068333 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/ExecutePolicyAction.kt @@ -0,0 +1,17 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.transport.action.executepolicy + + +import org.opensearch.action.ActionType +import org.opensearch.indexmanagement.indexstatemanagement.transport.action.ISMStatusResponse + +class ExecutePolicyAction private constructor() : ActionType(NAME, ::ISMStatusResponse) { + companion object { + val INSTANCE = ExecutePolicyAction() + const val NAME = "cluster:admin/opendistro/ism/managedindex/execute" + } +} \ No newline at end of file diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/ExecutePolicyRequest.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/ExecutePolicyRequest.kt new file mode 100644 index 000000000..0d8fce161 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/ExecutePolicyRequest.kt @@ -0,0 +1,36 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.transport.action.executepolicy + +import org.opensearch.action.ActionRequest +import org.opensearch.action.ActionRequestValidationException +import org.opensearch.action.ValidateActions +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import java.io.IOException + +class ExecutePolicyRequest( + val indices: List +) : ActionRequest() { + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + indices = sin.readStringList() + ) + + override fun validate(): ActionRequestValidationException? { + var validationException: ActionRequestValidationException? = null + if (indices.isEmpty()) { + validationException = ValidateActions.addValidationError("Missing indices", validationException) + } + return validationException + } + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeStringCollection(indices) + } + + +} \ No newline at end of file diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/TransportExecutePolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/TransportExecutePolicyAction.kt new file mode 100644 index 000000000..2da4fd520 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/TransportExecutePolicyAction.kt @@ -0,0 +1,9 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.transport.action.executepolicy + +class TransportExecutePolicyAction { +} \ No newline at end of file From 465d2cff8fdea4e2ddeefeb88def464e76c48c61 Mon Sep 17 00:00:00 2001 From: Ronnak Saxena Date: Tue, 9 Aug 2022 18:17:17 +0000 Subject: [PATCH 11/17] Registered the Handler --- .../indexmanagement/IndexManagementPlugin.kt | 4 +- .../resthandler/RestExecutePolicyAction.kt | 65 +++++++++++++++++++ 2 files changed, 68 insertions(+), 1 deletion(-) create mode 100644 src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestExecutePolicyAction.kt diff --git a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt index cc36d13d4..b276eacdd 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt @@ -40,6 +40,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.MetadataService import org.opensearch.indexmanagement.indexstatemanagement.SkipExecution import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig import org.opensearch.indexmanagement.indexstatemanagement.model.Policy +import org.opensearch.indexmanagement.indexstatemanagement.resthandler.RestExecutePolicyAction import org.opensearch.indexmanagement.indexstatemanagement.resthandler.RestAddPolicyAction import org.opensearch.indexmanagement.indexstatemanagement.resthandler.RestChangePolicyAction import org.opensearch.indexmanagement.indexstatemanagement.resthandler.RestDeletePolicyAction @@ -329,13 +330,14 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin RestExplainTransformAction(), RestStartTransformAction(), RestStopTransformAction(), + RestExecutePolicyAction(), RestGetSMPolicyHandler(), RestStartSMPolicyHandler(), RestStopSMPolicyHandler(), RestExplainSMPolicyHandler(), RestDeleteSMPolicyHandler(), RestCreateSMPolicyHandler(), - RestUpdateSMPolicyHandler() + RestUpdateSMPolicyHandler(), ) } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestExecutePolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestExecutePolicyAction.kt new file mode 100644 index 000000000..94d18ed19 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestExecutePolicyAction.kt @@ -0,0 +1,65 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.resthandler + +import org.opensearch.client.node.NodeClient +import org.opensearch.common.Strings +import org.opensearch.common.xcontent.XContentHelper +import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.ISM_BASE_URI +import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.LEGACY_ISM_BASE_URI +import org.opensearch.indexmanagement.indexstatemanagement.transport.action.executepolicy.ExecutePolicyAction +import org.opensearch.indexmanagement.indexstatemanagement.transport.action.executepolicy.ExecutePolicyRequest +import org.opensearch.rest.BaseRestHandler +import org.opensearch.rest.BaseRestHandler.RestChannelConsumer +import org.opensearch.rest.RestHandler.ReplacedRoute +import org.opensearch.rest.RestHandler.Route +import org.opensearch.rest.RestRequest +import org.opensearch.rest.RestRequest.Method.POST +import org.opensearch.rest.action.RestToXContentListener +import java.io.IOException + +class RestExecutePolicyAction : BaseRestHandler() { + override fun getName(): String = "execute_policy_action" + + override fun routes(): List { + return emptyList() + } + + override fun replacedRoutes(): List { + return listOf( + ReplacedRoute( + POST, "$ISM_BASE_URI/execute", + POST, "$LEGACY_ISM_BASE_URI/execute" + ), + ReplacedRoute( + POST, "$ISM_BASE_URI/execute/{index}", + POST, "$LEGACY_ISM_BASE_URI/{index}" + ) + ) + } + + @Throws(IOException::class) + @Suppress("SpreadOperator") // There is no way around dealing with java vararg without spread operator. + override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { + val indices: Array? = Strings.splitStringByCommaToArray(request.param("index")) + + if (indices.isNullOrEmpty()) { + throw IllegalArgumentException("Missing indices") + } + + val body = if (request.hasContent()) { + XContentHelper.convertToMap(request.requiredContent(), false, request.xContentType).v2() + } else { + mapOf() + } + + val executePolicyRequest = ExecutePolicyRequest(indices.toList()) + + return RestChannelConsumer { channel -> + client.execute(ExecutePolicyAction.INSTANCE, executePolicyRequest, RestToXContentListener(channel)) + } + } +} \ No newline at end of file From 2d90755b72abcef4b4f400b470f48f0b2aa0143c Mon Sep 17 00:00:00 2001 From: Ronnak Saxena Date: Tue, 9 Aug 2022 21:40:04 +0000 Subject: [PATCH 12/17] need to call runJob --- .../indexmanagement/IndexManagementPlugin.kt | 3 + .../TransportExecutePolicyAction.kt | 64 ++++++++++++++++++- 2 files changed, 66 insertions(+), 1 deletion(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt index b276eacdd..2e77d8ed6 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt @@ -57,6 +57,8 @@ import org.opensearch.indexmanagement.indexstatemanagement.transport.action.chan import org.opensearch.indexmanagement.indexstatemanagement.transport.action.changepolicy.TransportChangePolicyAction import org.opensearch.indexmanagement.indexstatemanagement.transport.action.deletepolicy.DeletePolicyAction import org.opensearch.indexmanagement.indexstatemanagement.transport.action.deletepolicy.TransportDeletePolicyAction +import org.opensearch.indexmanagement.indexstatemanagement.transport.action.executepolicy.ExecutePolicyAction +import org.opensearch.indexmanagement.indexstatemanagement.transport.action.executepolicy.TransportExecutePolicyAction import org.opensearch.indexmanagement.indexstatemanagement.transport.action.explain.ExplainAction import org.opensearch.indexmanagement.indexstatemanagement.transport.action.explain.TransportExplainAction import org.opensearch.indexmanagement.indexstatemanagement.transport.action.getpolicy.GetPoliciesAction @@ -522,6 +524,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin ActionPlugin.ActionHandler(RemovePolicyAction.INSTANCE, TransportRemovePolicyAction::class.java), ActionPlugin.ActionHandler(RefreshSearchAnalyzerAction.INSTANCE, TransportRefreshSearchAnalyzerAction::class.java), ActionPlugin.ActionHandler(AddPolicyAction.INSTANCE, TransportAddPolicyAction::class.java), + ActionPlugin.ActionHandler(ExecutePolicyAction.INSTANCE, TransportExecutePolicyAction::class.java), ActionPlugin.ActionHandler(RetryFailedManagedIndexAction.INSTANCE, TransportRetryFailedManagedIndexAction::class.java), ActionPlugin.ActionHandler(ChangePolicyAction.INSTANCE, TransportChangePolicyAction::class.java), ActionPlugin.ActionHandler(IndexPolicyAction.INSTANCE, TransportIndexPolicyAction::class.java), diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/TransportExecutePolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/TransportExecutePolicyAction.kt index 2da4fd520..47b6cf2ad 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/TransportExecutePolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/TransportExecutePolicyAction.kt @@ -5,5 +5,67 @@ package org.opensearch.indexmanagement.indexstatemanagement.transport.action.executepolicy -class TransportExecutePolicyAction { +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext +import org.apache.logging.log4j.LogManager +import org.opensearch.OpenSearchStatusException +import org.opensearch.action.ActionListener +import org.opensearch.action.get.GetRequest +import org.opensearch.action.get.GetResponse +import org.opensearch.action.support.ActionFilters +import org.opensearch.action.support.HandledTransportAction +import org.opensearch.action.support.WriteRequest +import org.opensearch.client.Client +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.inject.Inject +import org.opensearch.common.settings.Settings +import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.NamedXContentRegistry +import org.opensearch.common.xcontent.XContentHelper +import org.opensearch.common.xcontent.XContentType +import org.opensearch.commons.ConfigConstants +import org.opensearch.commons.authuser.User +import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexRunner +import org.opensearch.rest.RestStatus +import org.opensearch.tasks.Task +import org.opensearch.transport.TransportService +import java.time.Instant +import org.opensearch.action.support.master.AcknowledgedResponse +import org.opensearch.client.node.NodeClient +import org.opensearch.jobscheduler.spi.utils.LockService +import org.opensearch.ExceptionsHelper + +private val log = LogManager.getLogger(TransportExecutePolicyAction::class.java) +private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO) + +class TransportExecutePolicyAction @Inject constructor( + transportService: TransportService, + val client: NodeClient, + private val clusterService: ClusterService, + private val runner: ManagedIndexRunner, + actionFilters: ActionFilters, + val xContentRegistry: NamedXContentRegistry, + private val settings: Settings +) : HandledTransportAction ( + ExecutePolicyAction.NAME, transportService, actionFilters, ::ExecutePolicyRequest +) { + override fun doExecute(task: Task, execPolicyRequest: ExecutePolicyRequest, actionListener: ActionListener) { + val userStr = client.threadPool().threadContext.getTransient(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT) + log.debug("User and roles string from thread context: $userStr") + val user: User? = User.parse(userStr) + + runner.launch { + try { + val lock = LockService(client, clusterService) +// runner.runJob(, lock) + } catch (e: Exception) { + log.error("Unexpected error trying to execute policy") + withContext(Dispatchers.IO) { + actionListener.onFailure(ExceptionsHelper.unwrapCause(e) as Exception) + } + } + } + } } \ No newline at end of file From 99d690d7756839031f349f0fe81bb018d77f5298 Mon Sep 17 00:00:00 2001 From: Ronnak Saxena Date: Wed, 10 Aug 2022 00:30:57 +0000 Subject: [PATCH 13/17] changed runManagedIndexConfig so executeAPI can occur --- .../ManagedIndexRunner.kt | 9 +++++---- .../executepolicy/ExecutePolicyAction.kt | 6 +++--- .../TransportExecutePolicyAction.kt | 19 ++++++++----------- 3 files changed, 16 insertions(+), 18 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt index 5eabb710d..da4db7cea 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt @@ -96,6 +96,7 @@ import org.opensearch.jobscheduler.spi.LockModel import org.opensearch.jobscheduler.spi.ScheduledJobParameter import org.opensearch.jobscheduler.spi.ScheduledJobRunner import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule +import org.opensearch.jobscheduler.spi.utils.LockService import org.opensearch.rest.RestStatus import org.opensearch.script.Script import org.opensearch.script.ScriptService @@ -237,11 +238,11 @@ object ManagedIndexRunner : break } else { lock = renewedLock - keepExecuting = runManagedIndexConfig(job, context) + keepExecuting = runManagedIndexConfig(job, context.lockService) } } while ((job.continuous && keepExecuting)) // Runs until job is no longer continuous or execution should stop } else { // If job is not continuous run once - runManagedIndexConfig(job, context) + runManagedIndexConfig(job, context.lockService) } // Release lock if (lock == null || !releaseLockForScheduledJob(context, lock)) { @@ -252,7 +253,7 @@ object ManagedIndexRunner : } @Suppress("ReturnCount", "ComplexMethod", "LongMethod", "ComplexCondition", "NestedBlockDepth") - private suspend fun runManagedIndexConfig(managedIndexConfig: ManagedIndexConfig, jobContext: JobExecutionContext): Boolean { + private suspend fun runManagedIndexConfig(managedIndexConfig: ManagedIndexConfig, lock: LockService): Boolean { logger.debug("Run job for index ${managedIndexConfig.index}") // doing a check of local cluster health as we do not want to overload cluster manager node with potentially a lot of calls if (clusterIsRed()) { @@ -326,7 +327,7 @@ object ManagedIndexRunner : val state = policy.getStateToExecute(managedIndexMetaData) val action: Action? = state?.getActionToExecute(managedIndexMetaData, indexMetadataProvider) val stepContext = StepContext( - managedIndexMetaData, clusterService, client, threadPool.threadContext, policy.user, scriptService, settings, jobContext.lockService + managedIndexMetaData, clusterService, client, threadPool.threadContext, policy.user, scriptService, settings, lock ) val step: Step? = action?.getStepToExecute(stepContext) val currentActionMetaData = action?.getUpdatedActionMetadata(managedIndexMetaData, state.name) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/ExecutePolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/ExecutePolicyAction.kt index 165068333..676092c9c 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/ExecutePolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/ExecutePolicyAction.kt @@ -7,11 +7,11 @@ package org.opensearch.indexmanagement.indexstatemanagement.transport.action.exe import org.opensearch.action.ActionType -import org.opensearch.indexmanagement.indexstatemanagement.transport.action.ISMStatusResponse +import org.opensearch.action.support.master.AcknowledgedResponse -class ExecutePolicyAction private constructor() : ActionType(NAME, ::ISMStatusResponse) { +class ExecutePolicyAction private constructor() : ActionType(NAME, ::AcknowledgedResponse) { companion object { val INSTANCE = ExecutePolicyAction() const val NAME = "cluster:admin/opendistro/ism/managedindex/execute" } -} \ No newline at end of file +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/TransportExecutePolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/TransportExecutePolicyAction.kt index 47b6cf2ad..f439c03e4 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/TransportExecutePolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/TransportExecutePolicyAction.kt @@ -10,32 +10,26 @@ import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.launch import kotlinx.coroutines.withContext import org.apache.logging.log4j.LogManager -import org.opensearch.OpenSearchStatusException import org.opensearch.action.ActionListener -import org.opensearch.action.get.GetRequest -import org.opensearch.action.get.GetResponse import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction -import org.opensearch.action.support.WriteRequest -import org.opensearch.client.Client import org.opensearch.cluster.service.ClusterService import org.opensearch.common.inject.Inject import org.opensearch.common.settings.Settings -import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.NamedXContentRegistry -import org.opensearch.common.xcontent.XContentHelper -import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.ConfigConstants import org.opensearch.commons.authuser.User import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexRunner -import org.opensearch.rest.RestStatus import org.opensearch.tasks.Task import org.opensearch.transport.TransportService -import java.time.Instant import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.client.node.NodeClient import org.opensearch.jobscheduler.spi.utils.LockService import org.opensearch.ExceptionsHelper +import org.opensearch.index.fielddata.IndexFieldDataCache.None +import org.opensearch.jobscheduler.spi.JobDocVersion +import org.opensearch.jobscheduler.spi.JobExecutionContext +import java.time.Instant private val log = LogManager.getLogger(TransportExecutePolicyAction::class.java) private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO) @@ -59,7 +53,10 @@ class TransportExecutePolicyAction @Inject constructor( runner.launch { try { val lock = LockService(client, clusterService) -// runner.runJob(, lock) + // fake context in order to pass into runJob + val newContext = JobExecutionContext(Instant.now(), JobDocVersion(0L,0L,0L), lock, + "", "") + runner.runJob(None, newContext) } catch (e: Exception) { log.error("Unexpected error trying to execute policy") withContext(Dispatchers.IO) { From 1a8161a7d71a8c85dfacbdc883aab66046921577 Mon Sep 17 00:00:00 2001 From: Ronnak Saxena Date: Wed, 10 Aug 2022 00:32:11 +0000 Subject: [PATCH 14/17] changed runManagedIndexConfig so executeAPI can occur Signed-off-by: Ronnak Saxena --- .../action/executepolicy/TransportExecutePolicyAction.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/TransportExecutePolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/TransportExecutePolicyAction.kt index f439c03e4..bcd9d20f0 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/TransportExecutePolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/TransportExecutePolicyAction.kt @@ -53,7 +53,7 @@ class TransportExecutePolicyAction @Inject constructor( runner.launch { try { val lock = LockService(client, clusterService) - // fake context in order to pass into runJob + // temp context in order to pass into runJob val newContext = JobExecutionContext(Instant.now(), JobDocVersion(0L,0L,0L), lock, "", "") runner.runJob(None, newContext) From 5429cd66e5e98cfd40a6260ab66b0b4d2767b84c Mon Sep 17 00:00:00 2001 From: Ronnak Saxena Date: Wed, 10 Aug 2022 01:22:01 +0000 Subject: [PATCH 15/17] trying to get managed index metadata for run job call --- .../TransportExecutePolicyAction.kt | 64 ++++++++++++++++++- 1 file changed, 63 insertions(+), 1 deletion(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/TransportExecutePolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/TransportExecutePolicyAction.kt index bcd9d20f0..51f2b7366 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/TransportExecutePolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/TransportExecutePolicyAction.kt @@ -26,7 +26,15 @@ import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.client.node.NodeClient import org.opensearch.jobscheduler.spi.utils.LockService import org.opensearch.ExceptionsHelper +import org.opensearch.action.admin.cluster.state.ClusterStateRequest +import org.opensearch.action.admin.cluster.state.ClusterStateResponse +import org.opensearch.action.get.MultiGetRequest +import org.opensearch.action.get.MultiGetResponse +import org.opensearch.action.support.IndicesOptions +import org.opensearch.index.IndexNotFoundException import org.opensearch.index.fielddata.IndexFieldDataCache.None +import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings +import org.opensearch.indexmanagement.indexstatemanagement.transport.action.ISMStatusResponse import org.opensearch.jobscheduler.spi.JobDocVersion import org.opensearch.jobscheduler.spi.JobExecutionContext import java.time.Instant @@ -41,14 +49,16 @@ class TransportExecutePolicyAction @Inject constructor( private val runner: ManagedIndexRunner, actionFilters: ActionFilters, val xContentRegistry: NamedXContentRegistry, - private val settings: Settings + val request: ExecutePolicyRequest ) : HandledTransportAction ( ExecutePolicyAction.NAME, transportService, actionFilters, ::ExecutePolicyRequest ) { + override fun doExecute(task: Task, execPolicyRequest: ExecutePolicyRequest, actionListener: ActionListener) { val userStr = client.threadPool().threadContext.getTransient(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT) log.debug("User and roles string from thread context: $userStr") val user: User? = User.parse(userStr) + val indices = mutableSetOf() runner.launch { try { @@ -64,5 +74,57 @@ class TransportExecutePolicyAction @Inject constructor( } } } + fun getExistingManagedIndices() { + + val multiGetReq = MultiGetRequest() + + client.multiGet( + multiGetReq, + object : ActionListener { + override fun onResponse(response: MultiGetResponse) { + + response.forEach { + // get managed index configs + } + + fun onFailure(t: Exception) { + actionListener.onFailure(ExceptionsHelper.unwrapCause(t) as Exception) + } + } + } + ) + } + fun getClusterState() { + val strictExpandOptions = IndicesOptions.strictExpand() + + val clusterStateRequest = ClusterStateRequest() + .clear() + .indices(*request.indices.toTypedArray()) + .metadata(true) + .local(false) + .indicesOptions(strictExpandOptions) + + client.threadPool().threadContext.stashContext().use { + client.admin() + .cluster() + .state( + clusterStateRequest, + object : ActionListener { + override fun onResponse(response: ClusterStateResponse) { + val indexMetadatas = response.state.metadata.indices + indexMetadatas.forEach { + indices.add(it.value.indexUUID) + } + + getExistingManagedIndices() + } + + override fun onFailure(t: Exception) { + actionListener.onFailure(ExceptionsHelper.unwrapCause(t) as Exception) + } + } + ) + } + } } } \ No newline at end of file From 51a7376952a1873833e204bb1839a8fe6f234780 Mon Sep 17 00:00:00 2001 From: Ronnak Saxena Date: Thu, 11 Aug 2022 17:21:17 +0000 Subject: [PATCH 16/17] added comments Signed-off-by: Ronnak Saxena --- .../action/executepolicy/TransportExecutePolicyAction.kt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/TransportExecutePolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/TransportExecutePolicyAction.kt index 51f2b7366..d8fd2502a 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/TransportExecutePolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/TransportExecutePolicyAction.kt @@ -66,7 +66,8 @@ class TransportExecutePolicyAction @Inject constructor( // temp context in order to pass into runJob val newContext = JobExecutionContext(Instant.now(), JobDocVersion(0L,0L,0L), lock, "", "") - runner.runJob(None, newContext) + // Need to get managed index metadata to pass into runJob + // runner.runJob(None, newContext) } catch (e: Exception) { log.error("Unexpected error trying to execute policy") withContext(Dispatchers.IO) { From 58b0e896c4049042f7140976a44b75d966ffefc1 Mon Sep 17 00:00:00 2001 From: Ronnak Saxena Date: Thu, 11 Aug 2022 17:34:00 +0000 Subject: [PATCH 17/17] fixed ktlint errors --- .../resthandler/RestExecutePolicyAction.kt | 18 +++++----- .../executepolicy/ExecutePolicyAction.kt | 1 - .../executepolicy/ExecutePolicyRequest.kt | 4 +-- .../TransportExecutePolicyAction.kt | 34 ++++++++----------- 4 files changed, 24 insertions(+), 33 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestExecutePolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestExecutePolicyAction.kt index 94d18ed19..d2e29bd6d 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestExecutePolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestExecutePolicyAction.kt @@ -30,14 +30,14 @@ class RestExecutePolicyAction : BaseRestHandler() { override fun replacedRoutes(): List { return listOf( - ReplacedRoute( - POST, "$ISM_BASE_URI/execute", - POST, "$LEGACY_ISM_BASE_URI/execute" - ), - ReplacedRoute( - POST, "$ISM_BASE_URI/execute/{index}", - POST, "$LEGACY_ISM_BASE_URI/{index}" - ) + ReplacedRoute( + POST, "$ISM_BASE_URI/execute", + POST, "$LEGACY_ISM_BASE_URI/execute" + ), + ReplacedRoute( + POST, "$ISM_BASE_URI/execute/{index}", + POST, "$LEGACY_ISM_BASE_URI/{index}" + ) ) } @@ -62,4 +62,4 @@ class RestExecutePolicyAction : BaseRestHandler() { client.execute(ExecutePolicyAction.INSTANCE, executePolicyRequest, RestToXContentListener(channel)) } } -} \ No newline at end of file +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/ExecutePolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/ExecutePolicyAction.kt index 676092c9c..1b584f55d 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/ExecutePolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/ExecutePolicyAction.kt @@ -5,7 +5,6 @@ package org.opensearch.indexmanagement.indexstatemanagement.transport.action.executepolicy - import org.opensearch.action.ActionType import org.opensearch.action.support.master.AcknowledgedResponse diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/ExecutePolicyRequest.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/ExecutePolicyRequest.kt index 0d8fce161..d6ac75824 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/ExecutePolicyRequest.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/ExecutePolicyRequest.kt @@ -31,6 +31,4 @@ class ExecutePolicyRequest( override fun writeTo(out: StreamOutput) { out.writeStringCollection(indices) } - - -} \ No newline at end of file +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/TransportExecutePolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/TransportExecutePolicyAction.kt index d8fd2502a..52f004ef4 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/TransportExecutePolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/executepolicy/TransportExecutePolicyAction.kt @@ -15,7 +15,6 @@ import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction import org.opensearch.cluster.service.ClusterService import org.opensearch.common.inject.Inject -import org.opensearch.common.settings.Settings import org.opensearch.common.xcontent.NamedXContentRegistry import org.opensearch.commons.ConfigConstants import org.opensearch.commons.authuser.User @@ -31,10 +30,6 @@ import org.opensearch.action.admin.cluster.state.ClusterStateResponse import org.opensearch.action.get.MultiGetRequest import org.opensearch.action.get.MultiGetResponse import org.opensearch.action.support.IndicesOptions -import org.opensearch.index.IndexNotFoundException -import org.opensearch.index.fielddata.IndexFieldDataCache.None -import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings -import org.opensearch.indexmanagement.indexstatemanagement.transport.action.ISMStatusResponse import org.opensearch.jobscheduler.spi.JobDocVersion import org.opensearch.jobscheduler.spi.JobExecutionContext import java.time.Instant @@ -43,13 +38,13 @@ private val log = LogManager.getLogger(TransportExecutePolicyAction::class.java) private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO) class TransportExecutePolicyAction @Inject constructor( - transportService: TransportService, - val client: NodeClient, - private val clusterService: ClusterService, - private val runner: ManagedIndexRunner, - actionFilters: ActionFilters, - val xContentRegistry: NamedXContentRegistry, - val request: ExecutePolicyRequest + transportService: TransportService, + val client: NodeClient, + private val clusterService: ClusterService, + private val runner: ManagedIndexRunner, + actionFilters: ActionFilters, + val xContentRegistry: NamedXContentRegistry, + val request: ExecutePolicyRequest ) : HandledTransportAction ( ExecutePolicyAction.NAME, transportService, actionFilters, ::ExecutePolicyRequest ) { @@ -63,9 +58,8 @@ class TransportExecutePolicyAction @Inject constructor( runner.launch { try { val lock = LockService(client, clusterService) - // temp context in order to pass into runJob - val newContext = JobExecutionContext(Instant.now(), JobDocVersion(0L,0L,0L), lock, - "", "") + // Temp context in order to pass into runJob + val newContext = JobExecutionContext(Instant.now(), JobDocVersion(0L, 0L, 0L), lock, "", "") // Need to get managed index metadata to pass into runJob // runner.runJob(None, newContext) } catch (e: Exception) { @@ -80,17 +74,17 @@ class TransportExecutePolicyAction @Inject constructor( val multiGetReq = MultiGetRequest() client.multiGet( - multiGetReq, + multiGetReq, object : ActionListener { override fun onResponse(response: MultiGetResponse) { response.forEach { // get managed index configs } + } - fun onFailure(t: Exception) { - actionListener.onFailure(ExceptionsHelper.unwrapCause(t) as Exception) - } + override fun onFailure(e: java.lang.Exception?) { + actionListener.onFailure(ExceptionsHelper.unwrapCause(e) as Exception) } } ) @@ -128,4 +122,4 @@ class TransportExecutePolicyAction @Inject constructor( } } } -} \ No newline at end of file +}