Skip to content

Commit

Permalink
Merge branch 'main' into feature/ism_transform
Browse files Browse the repository at this point in the history
  • Loading branch information
tanqiuliu authored Oct 6, 2023
2 parents a082860 + f543d93 commit 857abe5
Show file tree
Hide file tree
Showing 11 changed files with 198 additions and 425 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ class AttemptRolloverStep(private val action: RolloverAction) : Step(name) {
info = mutableInfo.toMap()
}

override fun isIdempotent(): Boolean = false
override fun isIdempotent(): Boolean = true

@Suppress("TooManyFunctions")
companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ class AllocationActionIT : IndexStateManagementRestTestCase() {
val states = listOf(
State("Allocate", listOf(actionConfig), listOf())
)

val policy = Policy(
id = policyID,
description = "$testIndexName description",
Expand Down Expand Up @@ -66,12 +65,10 @@ class AllocationActionIT : IndexStateManagementRestTestCase() {
createIndex(indexName, null, null, "0")

availableNodes.remove(getIndexShardNodes(indexName)[0])

val actionConfig = AllocationAction(require = mapOf("_name" to availableNodes.first()), exclude = emptyMap(), include = emptyMap(), index = 0)
val states = listOf(
State("Allocate", listOf(actionConfig), listOf())
)

val policy = Policy(
id = policyID,
description = "$testIndexName description",
Expand Down Expand Up @@ -100,9 +97,6 @@ class AllocationActionIT : IndexStateManagementRestTestCase() {
assertEquals(actionConfig.require["_name"], settings["index.routing.allocation.require._name"])
}

// Third execution: Waits for allocation to complete, which will happen in this execution since index is small
updateManagedIndexConfigStartTime(managedIndexConfig)

waitFor {
assertEquals(availableNodes.first(), getIndexShardNodes(indexName)[0])
}
Expand All @@ -117,12 +111,10 @@ class AllocationActionIT : IndexStateManagementRestTestCase() {
createIndex(indexName, null, null, "0")

val excludedNode = getIndexShardNodes(indexName)[0].toString()

val actionConfig = AllocationAction(require = emptyMap(), exclude = mapOf("_name" to excludedNode), include = emptyMap(), index = 0)
val states = listOf(
State("Allocate", listOf(actionConfig), listOf())
)

val policy = Policy(
id = policyID,
description = "$testIndexName description",
Expand Down Expand Up @@ -151,9 +143,6 @@ class AllocationActionIT : IndexStateManagementRestTestCase() {
assertEquals(actionConfig.exclude["_name"], settings["index.routing.allocation.exclude._name"])
}

// Third execution: Waits for allocation to complete, which will happen in this execution since index is small
updateManagedIndexConfigStartTime(managedIndexConfig)

waitFor {
assertNotEquals(excludedNode, getIndexShardNodes(indexName)[0])
}
Expand All @@ -169,7 +158,6 @@ class AllocationActionIT : IndexStateManagementRestTestCase() {
createIndex(indexName, null, null, "0")

availableNodes.remove(getIndexShardNodes(indexName)[0])

val actionConfig = AllocationAction(require = emptyMap(), exclude = emptyMap(), include = mapOf("_name" to availableNodes.first()), index = 0)
val states = listOf(
State("Allocate", listOf(actionConfig), listOf())
Expand Down Expand Up @@ -203,9 +191,6 @@ class AllocationActionIT : IndexStateManagementRestTestCase() {
assertEquals(actionConfig.include["_name"], settings["index.routing.allocation.include._name"])
}

// Third execution: Waits for allocation to complete, which will happen in this execution since index is small
updateManagedIndexConfigStartTime(managedIndexConfig)

waitFor {
assertEquals(availableNodes.first(), getIndexShardNodes(indexName)[0])
}
Expand All @@ -218,7 +203,6 @@ class AllocationActionIT : IndexStateManagementRestTestCase() {
val states = listOf(
State("Allocate", listOf(actionConfig), listOf())
)

val policy = Policy(
id = policyID,
description = "$testIndexName description",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionRetry
import org.opensearch.indexmanagement.waitFor
import org.opensearch.rest.RestRequest
import org.opensearch.core.rest.RestStatus
import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
import org.opensearch.test.OpenSearchTestCase
import java.time.Instant
import java.time.temporal.ChronoUnit
Expand Down Expand Up @@ -733,4 +734,65 @@ class RolloverActionIT : IndexStateManagementRestTestCase() {
Assert.assertEquals(alias.containsKey("test_alias2"), true)
Assert.assertEquals(alias.containsKey("test_alias3"), true)
}

fun `test rollover detects transient failure and continues executing`() {
val aliasName = "${testIndexName}_alias"
val indexNameBase = "${testIndexName}_index"
val firstIndex = "$indexNameBase-1"
val policyID = "${testIndexName}_testPolicyName_1"
val actionConfig = RolloverAction(null, 1, null, null, false, 0)
val states = listOf(State(name = "RolloverAction", actions = listOf(actionConfig), transitions = listOf()))
val policy = Policy(
id = policyID,
description = "$testIndexName description",
schemaVersion = 1L,
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS),
errorNotification = randomErrorNotification(),
defaultState = states[0].name,
states = states
)

createPolicy(policy, policyID)
createIndex(firstIndex, policyID, aliasName)

val managedIndexConfig = getExistingManagedIndexConfig(firstIndex)

// Change the start time so the job will trigger in 2 seconds, this will trigger the first initialization of the policy
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(firstIndex).policyID) }

// Insert data to trigger rollover
insertSampleData(index = firstIndex, docCount = 5, delay = 0)
// Need to speed up to second execution where it will trigger the attempt rollover step
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor {
val info = getExplainManagedIndexMetaData(firstIndex).info as Map<String, Any?>
assertEquals("Index did not rollover.", AttemptRolloverStep.getSuccessMessage(firstIndex), info["message"])
}
// Manually produce transaction failure
val response = client().makeRequest(
"POST", "$INDEX_MANAGEMENT_INDEX/_update/${managedIndexConfig.id}%23metadata",
StringEntity(
"{\n" +
" \"script\": {\n" +
" \"source\": \"ctx._source.managed_index_metadata.step.step_status = params.step_status\",\n" +
" \"lang\": \"painless\",\n" +
" \"params\": {\n" +
" \"step_status\": \"starting\"\n" +
" }\n" +
" }\n" +
"}",
ContentType.APPLICATION_JSON
)
)
assertEquals("Request failed", RestStatus.OK, response.restStatus())

// Execute again to see the transaction failure
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor {
val metadata = getExplainManagedIndexMetaData(firstIndex)
assertEquals("Executing the wrong step", "attempt_rollover", metadata.stepMetaData?.name)
assertEquals("rollover step did not continue executing after detecting the transient failure.", Step.StepStatus.COMPLETED, metadata.stepMetaData?.stepStatus)
}
}
}
Loading

0 comments on commit 857abe5

Please sign in to comment.