diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/rollover/AttemptRolloverStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/rollover/AttemptRolloverStep.kt index 663e1bee2..4c7356e07 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/rollover/AttemptRolloverStep.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/rollover/AttemptRolloverStep.kt @@ -366,7 +366,7 @@ class AttemptRolloverStep(private val action: RolloverAction) : Step(name) { info = mutableInfo.toMap() } - override fun isIdempotent(): Boolean = false + override fun isIdempotent(): Boolean = true @Suppress("TooManyFunctions") companion object { diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/AllocationActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/AllocationActionIT.kt index 82b8daddf..63b00200a 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/AllocationActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/AllocationActionIT.kt @@ -26,7 +26,6 @@ class AllocationActionIT : IndexStateManagementRestTestCase() { val states = listOf( State("Allocate", listOf(actionConfig), listOf()) ) - val policy = Policy( id = policyID, description = "$testIndexName description", @@ -66,12 +65,10 @@ class AllocationActionIT : IndexStateManagementRestTestCase() { createIndex(indexName, null, null, "0") availableNodes.remove(getIndexShardNodes(indexName)[0]) - val actionConfig = AllocationAction(require = mapOf("_name" to availableNodes.first()), exclude = emptyMap(), include = emptyMap(), index = 0) val states = listOf( State("Allocate", listOf(actionConfig), listOf()) ) - val policy = Policy( id = policyID, description = "$testIndexName description", @@ -100,9 +97,6 @@ class AllocationActionIT : IndexStateManagementRestTestCase() { assertEquals(actionConfig.require["_name"], settings["index.routing.allocation.require._name"]) } - // Third execution: Waits for allocation to complete, which will happen in this execution since index is small - updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { assertEquals(availableNodes.first(), getIndexShardNodes(indexName)[0]) } @@ -117,12 +111,10 @@ class AllocationActionIT : IndexStateManagementRestTestCase() { createIndex(indexName, null, null, "0") val excludedNode = getIndexShardNodes(indexName)[0].toString() - val actionConfig = AllocationAction(require = emptyMap(), exclude = mapOf("_name" to excludedNode), include = emptyMap(), index = 0) val states = listOf( State("Allocate", listOf(actionConfig), listOf()) ) - val policy = Policy( id = policyID, description = "$testIndexName description", @@ -151,9 +143,6 @@ class AllocationActionIT : IndexStateManagementRestTestCase() { assertEquals(actionConfig.exclude["_name"], settings["index.routing.allocation.exclude._name"]) } - // Third execution: Waits for allocation to complete, which will happen in this execution since index is small - updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { assertNotEquals(excludedNode, getIndexShardNodes(indexName)[0]) } @@ -169,7 +158,6 @@ class AllocationActionIT : IndexStateManagementRestTestCase() { createIndex(indexName, null, null, "0") availableNodes.remove(getIndexShardNodes(indexName)[0]) - val actionConfig = AllocationAction(require = emptyMap(), exclude = emptyMap(), include = mapOf("_name" to availableNodes.first()), index = 0) val states = listOf( State("Allocate", listOf(actionConfig), listOf()) @@ -203,9 +191,6 @@ class AllocationActionIT : IndexStateManagementRestTestCase() { assertEquals(actionConfig.include["_name"], settings["index.routing.allocation.include._name"]) } - // Third execution: Waits for allocation to complete, which will happen in this execution since index is small - updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { assertEquals(availableNodes.first(), getIndexShardNodes(indexName)[0]) } @@ -218,7 +203,6 @@ class AllocationActionIT : IndexStateManagementRestTestCase() { val states = listOf( State("Allocate", listOf(actionConfig), listOf()) ) - val policy = Policy( id = policyID, description = "$testIndexName description", diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionIT.kt index ca47942e8..b627d2898 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionIT.kt @@ -27,6 +27,7 @@ import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionRetry import org.opensearch.indexmanagement.waitFor import org.opensearch.rest.RestRequest import org.opensearch.core.rest.RestStatus +import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX import org.opensearch.test.OpenSearchTestCase import java.time.Instant import java.time.temporal.ChronoUnit @@ -733,4 +734,65 @@ class RolloverActionIT : IndexStateManagementRestTestCase() { Assert.assertEquals(alias.containsKey("test_alias2"), true) Assert.assertEquals(alias.containsKey("test_alias3"), true) } + + fun `test rollover detects transient failure and continues executing`() { + val aliasName = "${testIndexName}_alias" + val indexNameBase = "${testIndexName}_index" + val firstIndex = "$indexNameBase-1" + val policyID = "${testIndexName}_testPolicyName_1" + val actionConfig = RolloverAction(null, 1, null, null, false, 0) + val states = listOf(State(name = "RolloverAction", actions = listOf(actionConfig), transitions = listOf())) + val policy = Policy( + id = policyID, + description = "$testIndexName description", + schemaVersion = 1L, + lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), + errorNotification = randomErrorNotification(), + defaultState = states[0].name, + states = states + ) + + createPolicy(policy, policyID) + createIndex(firstIndex, policyID, aliasName) + + val managedIndexConfig = getExistingManagedIndexConfig(firstIndex) + + // Change the start time so the job will trigger in 2 seconds, this will trigger the first initialization of the policy + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(firstIndex).policyID) } + + // Insert data to trigger rollover + insertSampleData(index = firstIndex, docCount = 5, delay = 0) + // Need to speed up to second execution where it will trigger the attempt rollover step + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { + val info = getExplainManagedIndexMetaData(firstIndex).info as Map + assertEquals("Index did not rollover.", AttemptRolloverStep.getSuccessMessage(firstIndex), info["message"]) + } + // Manually produce transaction failure + val response = client().makeRequest( + "POST", "$INDEX_MANAGEMENT_INDEX/_update/${managedIndexConfig.id}%23metadata", + StringEntity( + "{\n" + + " \"script\": {\n" + + " \"source\": \"ctx._source.managed_index_metadata.step.step_status = params.step_status\",\n" + + " \"lang\": \"painless\",\n" + + " \"params\": {\n" + + " \"step_status\": \"starting\"\n" + + " }\n" + + " }\n" + + "}", + ContentType.APPLICATION_JSON + ) + ) + assertEquals("Request failed", RestStatus.OK, response.restStatus()) + + // Execute again to see the transaction failure + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { + val metadata = getExplainManagedIndexMetaData(firstIndex) + assertEquals("Executing the wrong step", "attempt_rollover", metadata.stepMetaData?.name) + assertEquals("rollover step did not continue executing after detecting the transient failure.", Step.StepStatus.COMPLETED, metadata.stepMetaData?.stepStatus) + } + } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ShrinkActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ShrinkActionIT.kt index 6f4737761..5f2e54cbc 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ShrinkActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ShrinkActionIT.kt @@ -8,6 +8,7 @@ package org.opensearch.indexmanagement.indexstatemanagement.action import org.apache.hc.core5.http.ContentType import org.apache.hc.core5.http.io.entity.StringEntity import org.apache.logging.log4j.LogManager +import org.junit.Assume import org.junit.Before import org.opensearch.action.admin.indices.alias.Alias import org.opensearch.cluster.metadata.IndexMetadata @@ -36,6 +37,7 @@ import java.time.Instant import java.time.temporal.ChronoUnit class ShrinkActionIT : IndexStateManagementRestTestCase() { + @Suppress("UnusedPrivateMember") @Before private fun disableJobIndexShardRelocation() { @@ -63,22 +65,12 @@ class ShrinkActionIT : IndexStateManagementRestTestCase() { private val testIndexName = javaClass.simpleName.lowercase() private val testIndexSuffix = "_shrink_test" - fun `test basic workflow number of shards`() { - val logger = LogManager.getLogger(::ShrinkActionIT) + + fun `test basic workflow`() { val indexName = "${testIndexName}_index_1" val policyID = "${testIndexName}_testPolicyName_1" - - val shrinkAction = ShrinkAction( - numNewShards = 1, - maxShardSize = null, - percentageOfSourceShards = null, - targetIndexTemplate = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, "{{ctx.index}}$testIndexSuffix", mapOf()), - aliases = listOf(Alias("test-alias1"), Alias("test-alias2").filter(QueryBuilders.termQuery("foo", "bar")).writeIndex(true)), - forceUnsafe = true, - index = 0 - ) + val shrinkAction = randomShrinkAction() val states = listOf(State("ShrinkState", listOf(shrinkAction), listOf())) - val policy = Policy( id = policyID, description = "$testIndexName description", @@ -92,28 +84,83 @@ class ShrinkActionIT : IndexStateManagementRestTestCase() { createPolicy(policy, policyID) createIndex(indexName, policyID, null, "0", "3", "") + assertShrinkActionRun(indexName, policyID) + } + + private fun randomShrinkAction(): ShrinkAction { + val newShards = 1 + val newMaxShardSize = ByteSizeValue.parseBytesSizeValue("1GB", "test") + val newPercentageOfSourceShards = 0.5 + val choice = randomInt(2) + val aliases = listOf(Alias("test-alias1"), Alias("test-alias2").filter(QueryBuilders.termQuery("foo", "bar")).writeIndex(true)) + val targetIndexTemplate = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, "{{ctx.index}}$testIndexSuffix", mapOf()) + return when (choice) { + 0 -> ShrinkAction( + numNewShards = newShards, + maxShardSize = null, + percentageOfSourceShards = null, + targetIndexTemplate = targetIndexTemplate, + aliases = aliases, + forceUnsafe = true, + index = 0 + ) + + 1 -> ShrinkAction( + numNewShards = null, + maxShardSize = newMaxShardSize, + percentageOfSourceShards = null, + targetIndexTemplate = targetIndexTemplate, + aliases = aliases, + forceUnsafe = true, + index = 0 + ) + + 2 -> ShrinkAction( + numNewShards = null, + maxShardSize = null, + percentageOfSourceShards = newPercentageOfSourceShards, + targetIndexTemplate = targetIndexTemplate, + aliases = aliases, + forceUnsafe = true, + index = 0 + ) + + else -> { + error("Invalid choice") + } + } + } + + private fun assertShrinkActionRun(indexName: String, policyID: String, excludeNode: String? = null) { insertSampleData(indexName, 3) // Set the index as readonly to check that the setting is preserved after the shrink finishes updateIndexSetting(indexName, IndexMetadata.SETTING_BLOCKS_WRITE, "true") + logger.info("index settings: \n ${getFlatSettings(indexName)}") - // Will change the startTime each execution so that it triggers in 2 seconds // First execution: Policy is initialized val managedIndexConfig = getExistingManagedIndexConfig(indexName) - updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor(Instant.ofEpochSecond(60)) { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } + waitFor(Instant.ofEpochSecond(60)) { + assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) + } + logger.info("before attempt move shards") // Starts AttemptMoveShardsStep updateManagedIndexConfigStartTime(managedIndexConfig) - val targetIndexName = indexName + testIndexSuffix waitFor(Instant.ofEpochSecond(60)) { - assertEquals(targetIndexName, getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.targetIndexName) + assertEquals( + targetIndexName, + getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.targetIndexName + ) assertEquals("true", getIndexBlocksWriteSetting(indexName)) - assertNotNull("Couldn't find node to shrink onto.", getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.nodeName) + val nodeName = getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.nodeName + assertNotNull("Couldn't find node to shrink onto.", nodeName) + excludeNode ?: assertNotEquals(nodeName, excludeNode) val settings = getFlatSettings(indexName) - val nodeToShrink = getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.nodeName + val nodeToShrink = + getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.nodeName assertTrue(settings.containsKey("index.routing.allocation.require._name")) assertEquals(nodeToShrink, settings["index.routing.allocation.require._name"]) assertEquals( @@ -121,7 +168,9 @@ class ShrinkActionIT : IndexStateManagementRestTestCase() { getExplainManagedIndexMetaData(indexName).info?.get("message") ) } - val nodeToShrink = getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.nodeName + val nodeToShrink = + getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.nodeName + // starts WaitForMoveShardsStep updateManagedIndexConfigStartTime(managedIndexConfig) waitFor(Instant.ofEpochSecond(60)) { @@ -130,6 +179,7 @@ class ShrinkActionIT : IndexStateManagementRestTestCase() { getExplainManagedIndexMetaData(indexName).info?.get("message") ) } + // Wait for move should finish before this. Starts AttemptShrinkStep updateManagedIndexConfigStartTime(managedIndexConfig) val instant: Instant = Instant.ofEpochSecond(50) @@ -157,23 +207,13 @@ class ShrinkActionIT : IndexStateManagementRestTestCase() { } } - @Suppress("UNCHECKED_CAST") - fun `test basic workflow max shard size`() { - val logger = LogManager.getLogger(::ShrinkActionIT) - val indexName = "${testIndexName}_index_2" - val policyID = "${testIndexName}_testPolicyName_2" - val testMaxShardSize: ByteSizeValue = ByteSizeValue.parseBytesSizeValue("1GB", "test") - val shrinkAction = ShrinkAction( - numNewShards = null, - maxShardSize = testMaxShardSize, - percentageOfSourceShards = null, - targetIndexTemplate = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, "{{ctx.index}}$testIndexSuffix", mapOf()), - aliases = listOf(Alias("max-shard-alias")), - forceUnsafe = true, - index = 0 - ) + fun `test allocation block picks correct node with exclude node`() { + Assume.assumeTrue(isMultiNode) + val nodes = getNodes() + val indexName = "${testIndexName}_index_4" + val policyID = "${testIndexName}_testPolicyName_4" + val shrinkAction = randomShrinkAction() val states = listOf(State("ShrinkState", listOf(shrinkAction), listOf())) - val policy = Policy( id = policyID, description = "$testIndexName description", @@ -187,259 +227,14 @@ class ShrinkActionIT : IndexStateManagementRestTestCase() { createPolicy(policy, policyID) createIndex(indexName, policyID, null, "0", "3", "") - insertSampleData(indexName, 3) - - // Will change the startTime each execution so that it triggers in 2 seconds - // First execution: Policy is initialized - val managedIndexConfig = getExistingManagedIndexConfig(indexName) - - updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor(Instant.ofEpochSecond(60)) { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } - logger.info("before attempt move shards") - // Starts AttemptMoveShardsStep - updateManagedIndexConfigStartTime(managedIndexConfig) - - val targetIndexName = indexName + testIndexSuffix - waitFor(Instant.ofEpochSecond(60)) { - assertEquals(targetIndexName, getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.targetIndexName) - assertEquals("true", getIndexBlocksWriteSetting(indexName)) - assertNotNull("Couldn't find node to shrink onto.", getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.nodeName) - val settings = getFlatSettings(indexName) - val nodeToShrink = getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.nodeName - assertTrue(settings.containsKey("index.routing.allocation.require._name")) - assertEquals(nodeToShrink, settings["index.routing.allocation.require._name"]) - assertEquals( - AttemptMoveShardsStep.getSuccessMessage(nodeToShrink), - getExplainManagedIndexMetaData(indexName).info?.get("message") - ) - } - val nodeToShrink = getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.nodeName - // starts WaitForMoveShardsStep - updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor(Instant.ofEpochSecond(60)) { - assertEquals( - WaitForMoveShardsStep.getSuccessMessage(nodeToShrink), - getExplainManagedIndexMetaData(indexName).info?.get("message") - ) - } - // Wait for move should finish before this. Starts AttemptShrinkStep - updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor(Instant.ofEpochSecond(50)) { - assertTrue("Target index is not created", indexExists(targetIndexName)) - assertEquals( - AttemptShrinkStep.getSuccessMessage(targetIndexName), - getExplainManagedIndexMetaData(indexName).info?.get("message") - ) - } - - // starts WaitForShrinkStep - updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor(Instant.ofEpochSecond(60)) { - // one primary and one replica - assertTrue(getIndexShards(targetIndexName).size == 2) - assertEquals( - WaitForShrinkStep.SUCCESS_MESSAGE, - getExplainManagedIndexMetaData(indexName).info?.get("message") - ) - val indexSettings = getIndexSettings(indexName) as Map>> - val writeBlock = indexSettings[indexName]!!["settings"]!![IndexMetadata.SETTING_BLOCKS_WRITE] as String? - assertNull("Write block setting was not reset after successful shrink", writeBlock) - val aliases = getAlias(targetIndexName, "") - assertTrue("Alias was not added to shrunken index", aliases.containsKey("max-shard-alias")) - } - } - - @Suppress("UNCHECKED_CAST") - fun `test basic workflow percentage to decrease to`() { - val indexName = "${testIndexName}_index_3" - val policyID = "${testIndexName}_testPolicyName_3" - val shrinkAction = ShrinkAction( - numNewShards = null, - maxShardSize = null, - percentageOfSourceShards = 0.5, - targetIndexTemplate = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, "{{ctx.index}}$testIndexSuffix", mapOf()), - aliases = null, - forceUnsafe = true, - index = 0 + val excludedNode = nodes.iterator().next() + logger.info("Excluded node: $excludedNode") + updateIndexSettings( + indexName, + Settings.builder().put("index.routing.allocation.exclude._name", excludedNode) ) - val states = listOf(State("ShrinkState", listOf(shrinkAction), listOf())) - - val policy = Policy( - id = policyID, - description = "$testIndexName description", - schemaVersion = 11L, - lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), - errorNotification = randomErrorNotification(), - defaultState = states[0].name, - states = states - ) - - createPolicy(policy, policyID) - createIndex(indexName, policyID, null, "0", "3", "") - - insertSampleData(indexName, 3) - - // Will change the startTime each execution so that it triggers in 2 seconds - // First execution: Policy is initialized - val managedIndexConfig = getExistingManagedIndexConfig(indexName) - - updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor(Instant.ofEpochSecond(60)) { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } - // Starts AttemptMoveShardsStep - updateManagedIndexConfigStartTime(managedIndexConfig) - - val targetIndexName = indexName + testIndexSuffix - waitFor(Instant.ofEpochSecond(60)) { - assertEquals(targetIndexName, getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.targetIndexName) - assertEquals("true", getIndexBlocksWriteSetting(indexName)) - assertNotNull("Couldn't find node to shrink onto.", getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.nodeName) - val settings = getFlatSettings(indexName) - val nodeToShrink = getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.nodeName - assertTrue(settings.containsKey("index.routing.allocation.require._name")) - assertEquals(nodeToShrink, settings["index.routing.allocation.require._name"]) - assertEquals( - AttemptMoveShardsStep.getSuccessMessage(nodeToShrink), - getExplainManagedIndexMetaData(indexName).info?.get("message") - ) - } - - val nodeToShrink = getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.nodeName - - // starts WaitForMoveShardsStep - updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor(Instant.ofEpochSecond(60)) { - assertEquals( - WaitForMoveShardsStep.getSuccessMessage(nodeToShrink), - getExplainManagedIndexMetaData(indexName).info?.get("message") - ) - } - // Wait for move should finish before this. Starts AttemptShrinkStep - updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor(Instant.ofEpochSecond(50)) { - assertTrue("Target index is not created", indexExists(targetIndexName)) - assertEquals( - AttemptShrinkStep.getSuccessMessage(targetIndexName), - getExplainManagedIndexMetaData(indexName).info?.get("message") - ) - } - // starts WaitForShrinkStep - updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor(Instant.ofEpochSecond(60)) { - // one primary and one replica - assertTrue(getIndexShards(targetIndexName).size == 2) - assertEquals( - WaitForShrinkStep.SUCCESS_MESSAGE, - getExplainManagedIndexMetaData(indexName).info?.get("message") - ) - val indexSettings = getIndexSettings(indexName) as Map>> - val writeBlock = indexSettings[indexName]!!["settings"]!![IndexMetadata.SETTING_BLOCKS_WRITE] as String? - assertNull("Write block setting was not reset after successful shrink", writeBlock) - } - } - - @Suppress("UNCHECKED_CAST") - fun `test allocation block picks correct node`() { - val logger = LogManager.getLogger(::ShrinkActionIT) - val nodes = getNodes() - if (nodes.size > 1) { - val indexName = "${testIndexName}_index_4" - val policyID = "${testIndexName}_testPolicyName_4" - val shrinkAction = ShrinkAction( - numNewShards = null, - maxShardSize = null, - percentageOfSourceShards = 0.5, - targetIndexTemplate = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, "{{ctx.index}}$testIndexSuffix", mapOf()), - aliases = null, - forceUnsafe = true, - index = 0 - ) - val states = listOf(State("ShrinkState", listOf(shrinkAction), listOf())) - - val policy = Policy( - id = policyID, - description = "$testIndexName description", - schemaVersion = 11L, - lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), - errorNotification = randomErrorNotification(), - defaultState = states[0].name, - states = states - ) - createPolicy(policy, policyID) - createIndex(indexName, policyID, null, "0", "3", "") - val excludedNode = nodes.iterator().next() - logger.info("Excluded node: $excludedNode") - updateIndexSettings( - indexName, - Settings.builder().put("index.routing.allocation.exclude._name", excludedNode) - ) - insertSampleData(indexName, 3) - // Will change the startTime each execution so that it triggers in 2 seconds - // First execution: Policy is initialized - val managedIndexConfig = getExistingManagedIndexConfig(indexName) - logger.info("index settings: \n ${getFlatSettings(indexName)}") - - updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor(Instant.ofEpochSecond(60)) { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } - // Starts AttemptMoveShardsStep - updateManagedIndexConfigStartTime(managedIndexConfig) - val targetIndexName = indexName + testIndexSuffix - waitFor(Instant.ofEpochSecond(60)) { - assertEquals( - targetIndexName, - getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.targetIndexName - ) - assertEquals("true", getIndexBlocksWriteSetting(indexName)) - val nodeName = - getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.nodeName - assertNotNull("Couldn't find node to shrink onto.", nodeName) - assertNotEquals(nodeName, excludedNode) - val settings = getFlatSettings(indexName) - val nodeToShrink = - getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.nodeName - assertTrue(settings.containsKey("index.routing.allocation.require._name")) - assertEquals(nodeToShrink, settings["index.routing.allocation.require._name"]) - assertEquals( - AttemptMoveShardsStep.getSuccessMessage(nodeToShrink), - getExplainManagedIndexMetaData(indexName).info?.get("message") - ) - } - - val nodeToShrink = - getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.nodeName - - // starts WaitForMoveShardsStep - updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor(Instant.ofEpochSecond(60)) { - assertEquals( - WaitForMoveShardsStep.getSuccessMessage(nodeToShrink), - getExplainManagedIndexMetaData(indexName).info?.get("message") - ) - } - // Wait for move should finish before this. Starts AttemptShrinkStep - updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor(Instant.ofEpochSecond(50)) { - assertTrue("Target index is not created", indexExists(targetIndexName)) - assertEquals( - AttemptShrinkStep.getSuccessMessage(targetIndexName), - getExplainManagedIndexMetaData(indexName).info?.get("message") - ) - } - - // starts WaitForShrinkStep - updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor(Instant.ofEpochSecond(60)) { - // one primary and one replica - assertTrue(getIndexShards(targetIndexName).size == 2) - assertEquals( - WaitForShrinkStep.SUCCESS_MESSAGE, - getExplainManagedIndexMetaData(indexName).info?.get("message") - ) - val indexSettings = getIndexSettings(indexName) as Map>> - val writeBlock = indexSettings[indexName]!!["settings"]!![IndexMetadata.SETTING_BLOCKS_WRITE] as String? - assertNull("Write block setting was not reset after successful shrink", writeBlock) - } - } + assertShrinkActionRun(indexName, policyID, excludedNode) } fun `test no-op with single source index primary shard`() { @@ -500,99 +295,26 @@ class ShrinkActionIT : IndexStateManagementRestTestCase() { } } - @Suppress("UNCHECKED_CAST") fun `test shrink with replicas`() { - val logger = LogManager.getLogger(::ShrinkActionIT) - val nodes = getNodes() - if (nodes.size > 1) { - val indexName = "${testIndexName}_with_replicas" - val policyID = "${testIndexName}_with_replicas" - val shrinkAction = ShrinkAction( - numNewShards = null, - maxShardSize = null, - percentageOfSourceShards = 0.5, - targetIndexTemplate = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, "{{ctx.index}}$testIndexSuffix", mapOf()), - aliases = null, - forceUnsafe = false, - index = 0 - ) - val states = listOf(State("ShrinkState", listOf(shrinkAction), listOf())) - - val policy = Policy( - id = policyID, - description = "$testIndexName description", - schemaVersion = 11L, - lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), - errorNotification = randomErrorNotification(), - defaultState = states[0].name, - states = states - ) - createPolicy(policy, policyID) - createIndex(indexName, policyID, null, "1", "3", "") - insertSampleData(indexName, 3) - // Will change the startTime each execution so that it triggers in 2 seconds - // First execution: Policy is initialized - val managedIndexConfig = getExistingManagedIndexConfig(indexName) - logger.info("index settings: \n ${getFlatSettings(indexName)}") - - updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor(Instant.ofEpochSecond(60)) { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } - // Starts AttemptMoveShardsStep - updateManagedIndexConfigStartTime(managedIndexConfig) - val targetIndexName = indexName + testIndexSuffix - waitFor(Instant.ofEpochSecond(60)) { - assertEquals( - targetIndexName, - getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.targetIndexName - ) - assertEquals("true", getIndexBlocksWriteSetting(indexName)) - val nodeName = - getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.nodeName - assertNotNull("Couldn't find node to shrink onto.", nodeName) - val settings = getFlatSettings(indexName) - assertTrue(settings.containsKey("index.routing.allocation.require._name")) - assertEquals(nodeName, settings["index.routing.allocation.require._name"]) - assertEquals( - AttemptMoveShardsStep.getSuccessMessage(nodeName), - getExplainManagedIndexMetaData(indexName).info?.get("message") - ) - } + Assume.assumeTrue(isMultiNode) - val nodeToShrink = - getExplainManagedIndexMetaData(indexName).actionMetaData!!.actionProperties!!.shrinkActionProperties!!.nodeName - - // starts WaitForMoveShardsStep - updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor(Instant.ofEpochSecond(60)) { - assertEquals( - WaitForMoveShardsStep.getSuccessMessage(nodeToShrink), - getExplainManagedIndexMetaData(indexName).info?.get("message") - ) - } - // Wait for move should finish before this. Starts AttemptShrinkStep - updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor(Instant.ofEpochSecond(50)) { - assertTrue("Target index is not created", indexExists(targetIndexName)) - assertEquals( - AttemptShrinkStep.getSuccessMessage(targetIndexName), - getExplainManagedIndexMetaData(indexName).info?.get("message") - ) - } + val indexName = "${testIndexName}_with_replicas" + val policyID = "${testIndexName}_with_replicas" + val shrinkAction = randomShrinkAction() + val states = listOf(State("ShrinkState", listOf(shrinkAction), listOf())) + val policy = Policy( + id = policyID, + description = "$testIndexName description", + schemaVersion = 11L, + lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), + errorNotification = randomErrorNotification(), + defaultState = states[0].name, + states = states + ) + createPolicy(policy, policyID) + createIndex(indexName, policyID, null, "1", "3", "") - // starts WaitForShrinkStep - updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor(Instant.ofEpochSecond(60)) { - // one primary and one replica - assertTrue(getIndexShards(targetIndexName).size == 2) - assertEquals( - WaitForShrinkStep.SUCCESS_MESSAGE, - getExplainManagedIndexMetaData(indexName).info?.get("message") - ) - val indexSettings = getIndexSettings(indexName) as Map>> - val writeBlock = indexSettings[indexName]!!["settings"]!![IndexMetadata.SETTING_BLOCKS_WRITE] as String? - assertNull("Write block setting was not reset after successful shrink", writeBlock) - } - } + assertShrinkActionRun(indexName, policyID) } fun `test retries from first step`() { diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestDeleteRollupActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestDeleteRollupActionIT.kt index 7c871781c..67bab487c 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestDeleteRollupActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestDeleteRollupActionIT.kt @@ -6,16 +6,14 @@ package org.opensearch.indexmanagement.rollup.resthandler import org.opensearch.client.ResponseException -import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.ROLLUP_JOBS_BASE_URI import org.opensearch.indexmanagement.makeRequest -import org.opensearch.indexmanagement.rollup.RollupRestTestCase import org.opensearch.core.rest.RestStatus +import org.opensearch.indexmanagement.indexstatemanagement.wait import org.opensearch.test.junit.annotations.TestLogging @TestLogging(value = "level:DEBUG", reason = "Debugging tests") -@Suppress("UNCHECKED_CAST") -class RestDeleteRollupActionIT : RollupRestTestCase() { +class RestDeleteRollupActionIT : RollupRestAPITestCase() { @Throws(Exception::class) fun `test deleting a rollup`() { @@ -30,19 +28,21 @@ class RestDeleteRollupActionIT : RollupRestTestCase() { @Throws(Exception::class) fun `test deleting a rollup that doesn't exist in existing config index`() { - try { - createRandomRollup() - client().makeRequest("DELETE", "$ROLLUP_JOBS_BASE_URI/foobarbaz") - fail("expected 404 ResponseException") - } catch (e: ResponseException) { - assertEquals(RestStatus.NOT_FOUND, e.response.restStatus()) + createRandomRollup() + wait { + try { + client().makeRequest("DELETE", "$ROLLUP_JOBS_BASE_URI/foobarbaz") + fail("expected 404 ResponseException") + } catch (e: ResponseException) { + assertEquals(RestStatus.NOT_FOUND, e.response.restStatus()) + } } } @Throws(Exception::class) fun `test deleting a rollup that doesn't exist and config index doesnt exist`() { try { - deleteIndex(INDEX_MANAGEMENT_INDEX) + wipeAllIndices() client().makeRequest("DELETE", "$ROLLUP_JOBS_BASE_URI/foobarbaz") fail("expected 404 ResponseException") } catch (e: ResponseException) { diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestExplainRollupActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestExplainRollupActionIT.kt index 0f7fd50e9..8674b0445 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestExplainRollupActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestExplainRollupActionIT.kt @@ -8,7 +8,6 @@ package org.opensearch.indexmanagement.rollup.resthandler import org.opensearch.client.ResponseException import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.ROLLUP_JOBS_BASE_URI import org.opensearch.indexmanagement.makeRequest -import org.opensearch.indexmanagement.rollup.RollupRestTestCase import org.opensearch.indexmanagement.rollup.model.RollupMetadata import org.opensearch.indexmanagement.rollup.randomRollup import org.opensearch.indexmanagement.waitFor @@ -20,7 +19,7 @@ import java.time.temporal.ChronoUnit @TestLogging(value = "level:DEBUG", reason = "Debugging tests") @Suppress("UNCHECKED_CAST") -class RestExplainRollupActionIT : RollupRestTestCase() { +class RestExplainRollupActionIT : RollupRestAPITestCase() { @Throws(Exception::class) fun `test explain rollup`() { diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestGetRollupActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestGetRollupActionIT.kt index 0f2897972..e02b7eafb 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestGetRollupActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestGetRollupActionIT.kt @@ -8,7 +8,6 @@ package org.opensearch.indexmanagement.rollup.resthandler import org.opensearch.client.ResponseException import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.ROLLUP_JOBS_BASE_URI import org.opensearch.indexmanagement.makeRequest -import org.opensearch.indexmanagement.rollup.RollupRestTestCase import org.opensearch.indexmanagement.rollup.action.get.GetRollupsRequest.Companion.DEFAULT_SIZE import org.opensearch.indexmanagement.rollup.randomRollup import org.opensearch.core.rest.RestStatus @@ -17,7 +16,7 @@ import java.util.Locale @TestLogging(value = "level:DEBUG", reason = "Debugging tests") @Suppress("UNCHECKED_CAST") -class RestGetRollupActionIT : RollupRestTestCase() { +class RestGetRollupActionIT : RollupRestAPITestCase() { private val testName = javaClass.simpleName.lowercase(Locale.ROOT) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestIndexRollupActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestIndexRollupActionIT.kt index 49a44b49b..4ae34eab6 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestIndexRollupActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestIndexRollupActionIT.kt @@ -14,7 +14,6 @@ import org.opensearch.indexmanagement.common.model.dimension.Dimension import org.opensearch.indexmanagement.common.model.dimension.Histogram import org.opensearch.indexmanagement.common.model.dimension.Terms import org.opensearch.indexmanagement.makeRequest -import org.opensearch.indexmanagement.rollup.RollupRestTestCase import org.opensearch.indexmanagement.rollup.model.RollupMetrics import org.opensearch.indexmanagement.rollup.model.metric.Average import org.opensearch.indexmanagement.rollup.model.metric.Max @@ -35,7 +34,7 @@ import java.util.Locale @TestLogging(value = "level:DEBUG", reason = "Debugging tests") @Suppress("UNCHECKED_CAST") -class RestIndexRollupActionIT : RollupRestTestCase() { +class RestIndexRollupActionIT : RollupRestAPITestCase() { private val testName = javaClass.simpleName.lowercase(Locale.ROOT) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStartRollupActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStartRollupActionIT.kt index e2654b105..d2442995d 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStartRollupActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStartRollupActionIT.kt @@ -14,7 +14,6 @@ import org.opensearch.indexmanagement.common.model.dimension.DateHistogram import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_HIDDEN import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_NUMBER_OF_SHARDS import org.opensearch.indexmanagement.makeRequest -import org.opensearch.indexmanagement.rollup.RollupRestTestCase import org.opensearch.indexmanagement.rollup.model.Rollup import org.opensearch.indexmanagement.rollup.model.RollupMetadata import org.opensearch.indexmanagement.rollup.randomRollup @@ -25,7 +24,7 @@ import java.time.Instant import java.time.temporal.ChronoUnit import java.util.Locale -class RestStartRollupActionIT : RollupRestTestCase() { +class RestStartRollupActionIT : RollupRestAPITestCase() { private val testName = javaClass.simpleName.lowercase(Locale.ROOT) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStopRollupActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStopRollupActionIT.kt index 592889e49..4c6fa79a2 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStopRollupActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStopRollupActionIT.kt @@ -5,7 +5,6 @@ package org.opensearch.indexmanagement.rollup.resthandler -import org.junit.After import org.opensearch.client.ResponseException import org.opensearch.common.settings.Settings import org.opensearch.indexmanagement.IndexManagementIndices @@ -17,7 +16,6 @@ import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_HIDDEN import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_NUMBER_OF_SHARDS import org.opensearch.indexmanagement.makeRequest import org.opensearch.indexmanagement.randomInstant -import org.opensearch.indexmanagement.rollup.RollupRestTestCase import org.opensearch.indexmanagement.rollup.model.Rollup import org.opensearch.indexmanagement.rollup.model.RollupMetadata import org.opensearch.indexmanagement.rollup.randomRollup @@ -28,18 +26,10 @@ import java.time.Instant import java.time.temporal.ChronoUnit import java.util.Locale -class RestStopRollupActionIT : RollupRestTestCase() { +class RestStopRollupActionIT : RollupRestAPITestCase() { private val testName = javaClass.simpleName.lowercase(Locale.ROOT) - @After - fun clearIndicesAfterEachTest() { - // Flaky could happen if config index not deleted - // metadata creation could cause the mapping to be auto set to - // a wrong type, namely, [rollup_metadata.continuous.next_window_end_time] to long - wipeAllIndices() - } - @Throws(Exception::class) fun `test stopping a started rollup`() { val rollup = createRollup(randomRollup().copy(enabled = true, jobEnabledTime = randomInstant(), metadataID = null), rollupId = "$testName-1") diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RollupRestAPITestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RollupRestAPITestCase.kt new file mode 100644 index 000000000..26d6f6e78 --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RollupRestAPITestCase.kt @@ -0,0 +1,19 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.rollup.resthandler + +import org.junit.After +import org.opensearch.indexmanagement.rollup.RollupRestTestCase + +abstract class RollupRestAPITestCase : RollupRestTestCase() { + @After + fun clearIndicesAfterEachTest() { + // For API tests, flaky could happen if config index not deleted + // metadata creation could cause the mapping to be auto set to + // a wrong type, namely, [rollup_metadata.continuous.next_window_end_time] to long + wipeAllIndices() + } +}