diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/Step.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/Step.kt index 1a4b2f971..e508510f6 100644 --- a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/Step.kt +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/Step.kt @@ -6,6 +6,7 @@ package org.opensearch.indexmanagement.spi.indexstatemanagement import org.apache.logging.log4j.Logger +import org.opensearch.client.Client import org.opensearch.common.io.stream.StreamInput import org.opensearch.common.io.stream.StreamOutput import org.opensearch.common.io.stream.Writeable @@ -38,6 +39,10 @@ abstract class Step(val name: String, val isSafeToDisableOn: Boolean = true) { abstract fun isIdempotent(): Boolean + open suspend fun isTransientFailure(client: Client, stepContext: StepContext, managedIndexMetaData: ManagedIndexMetaData): Boolean { + return false + } + final fun getStepStartTime(metadata: ManagedIndexMetaData): Instant { return when { metadata.stepMetaData == null -> Instant.now() diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt index dcabaec0d..c424b2edb 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt @@ -15,7 +15,6 @@ import kotlinx.coroutines.withContext import org.apache.logging.log4j.LogManager import org.opensearch.action.admin.cluster.state.ClusterStateRequest import org.opensearch.action.admin.cluster.state.ClusterStateResponse -import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest import org.opensearch.action.bulk.BackoffPolicy import org.opensearch.action.bulk.BulkRequest import org.opensearch.action.bulk.BulkResponse @@ -48,7 +47,6 @@ import org.opensearch.indexmanagement.indexstatemanagement.model.ErrorNotificati import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig import org.opensearch.indexmanagement.indexstatemanagement.model.Policy import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.getManagedIndexMetadata -import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.getRolloverAlias import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.ALLOW_LIST import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.ALLOW_LIST_NONE import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.DEFAULT_ISM_ENABLED @@ -221,67 +219,6 @@ object ManagedIndexRunner : this.extensionStatusChecker = extensionStatusChecker return this } - // Detects if a nonidempotent step was a transient failure or not - @Suppress("NestedBlockDepth") - private suspend fun isTransientFailure(stepContext: StepContext): Boolean { - var isTransientFailure = false - val stepName = stepContext.metadata.stepMetaData?.name - when (stepName) { - "attempt_rollover" -> { - val stepStartTime = stepContext.metadata.stepMetaData?.startTime - // Retrieve the alias name - val indexName = stepContext.metadata.index - val metadata = stepContext.clusterService.state().metadata() - val indexAbstraction = metadata.indicesLookup[indexName] - val isDataStreamIndex = indexAbstraction?.parentDataStream != null - - val aliasName = when { - isDataStreamIndex -> indexAbstraction?.parentDataStream?.name - else -> metadata.index(indexName).getRolloverAlias() - } - if (aliasName == null) { - logger.error("Index $indexName has no alias attached to it. Not a Transient Failure in step $stepName") - isTransientFailure = false - } else { - try { - val response = client.suspendUntil { admin().indices().getAliases(GetAliasesRequest(aliasName), it) } - // Parse through all indices under that alias - val aliasIndices = response.aliases - aliasIndices.forEach { (index, _) -> - val indexMetaData = getIndexMetadata(index) - if (indexMetaData == null) { - // Node was dropped or disk full - isTransientFailure = true - } else if (stepStartTime!! < indexMetaData.creationDate) { - // Transient failure detected: Index was created after step started, therefore it finished a rollover - logger.debug("Have to rerun attempt rollover step due to transient failure ${stepContext.metadata.index}") - isTransientFailure = true - } - } - } catch (e: Exception) { - logger.error("Failed to request indices under alias $aliasName", e) - } - // Did not find any indices created after the rollover policy step started, therefore is not a Transient failure - } - } - "attempt_snapshot" -> { - // TODO implement logic for detecting transient failure in attempt snapshot step - } - "attempt_notification" -> { - // TODO implement logic for detecting transient failure in attempt notification step - } - "attempt_shrink_step" -> { - // TODO implement logic for detecting transient failure in attempt shrink step - } - "attempt_call_force_merge" -> { - // TODO implement logic for detecting transient failure in attempt call force merge step - } - else -> { - logger.debug("Detected unfamiliar nonIdempotent step: $stepName") - } - } - return isTransientFailure - } override fun runJob(job: ScheduledJobParameter, context: JobExecutionContext) { if (job !is ManagedIndexConfig) { @@ -422,7 +359,7 @@ object ManagedIndexRunner : val isIdempotent = step?.isIdempotent() logger.info("Previous execution failed to update step status, isIdempotent=$isIdempotent") // If this step was not a transient failure fail the index - if (isIdempotent != true && !isTransientFailure(stepContext)) { + if (isIdempotent != true && step != null && !step.isTransientFailure(client, stepContext, managedIndexMetaData)) { val info = mapOf("message" to "Previous action was not able to update IndexMetaData.") val updated = updateManagedIndexMetaData( managedIndexMetaData.copy( @@ -734,7 +671,7 @@ object ManagedIndexRunner : * update metadata in config index, and save metadata in history after update * this can be called 2 times in one job run, so need to save seqNo & primeTerm */ - private suspend fun updateManagedIndexMetaData( + suspend fun updateManagedIndexMetaData( managedIndexMetaData: ManagedIndexMetaData, lastUpdateResult: UpdateMetadataResult? = null, create: Boolean = false diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/forcemerge/AttemptCallForceMergeStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/forcemerge/AttemptCallForceMergeStep.kt index 640d2c662..314729c9a 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/forcemerge/AttemptCallForceMergeStep.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/forcemerge/AttemptCallForceMergeStep.kt @@ -14,12 +14,14 @@ import org.apache.logging.log4j.LogManager import org.opensearch.ExceptionsHelper import org.opensearch.action.admin.indices.forcemerge.ForceMergeRequest import org.opensearch.action.admin.indices.forcemerge.ForceMergeResponse +import org.opensearch.client.Client import org.opensearch.indexmanagement.indexstatemanagement.action.ForceMergeAction import org.opensearch.indexmanagement.opensearchapi.getUsefulCauseString import org.opensearch.indexmanagement.opensearchapi.suspendUntil import org.opensearch.indexmanagement.spi.indexstatemanagement.Step import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionProperties import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData import org.opensearch.rest.RestStatus import org.opensearch.transport.RemoteTransportException @@ -104,6 +106,10 @@ class AttemptCallForceMergeStep(private val action: ForceMergeAction) : Step(nam } override fun isIdempotent() = false + override suspend fun isTransientFailure(client: Client, stepContext: StepContext, managedIndexMetaData: ManagedIndexMetaData): Boolean { + // TODO implement logic for detecting transient failure in attempt call force merge step + return false + } companion object { const val name = "attempt_call_force_merge" diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/notification/AttemptNotificationStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/notification/AttemptNotificationStep.kt index 22953bc61..cf2ba37e3 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/notification/AttemptNotificationStep.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/notification/AttemptNotificationStep.kt @@ -6,12 +6,14 @@ package org.opensearch.indexmanagement.indexstatemanagement.step.notification import org.apache.logging.log4j.LogManager +import org.opensearch.client.Client import org.opensearch.indexmanagement.indexstatemanagement.action.NotificationAction import org.opensearch.indexmanagement.indexstatemanagement.util.publishLegacyNotification import org.opensearch.indexmanagement.indexstatemanagement.util.sendNotification import org.opensearch.indexmanagement.opensearchapi.convertToMap import org.opensearch.indexmanagement.spi.indexstatemanagement.Step import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData import org.opensearch.script.Script import org.opensearch.script.ScriptService @@ -66,6 +68,10 @@ class AttemptNotificationStep(private val action: NotificationAction) : Step(nam } override fun isIdempotent(): Boolean = false + override suspend fun isTransientFailure(client: Client, stepContext: StepContext, managedIndexMetaData: ManagedIndexMetaData): Boolean { + // TODO implement logic for detecting transient failure in attempt notification step + return false + } companion object { const val name = "attempt_notification" diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/rollover/AttemptRolloverStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/rollover/AttemptRolloverStep.kt index 6657e672a..95c0d0908 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/rollover/AttemptRolloverStep.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/rollover/AttemptRolloverStep.kt @@ -7,12 +7,16 @@ package org.opensearch.indexmanagement.indexstatemanagement.step.rollover import org.apache.logging.log4j.LogManager import org.opensearch.ExceptionsHelper +import org.opensearch.action.admin.cluster.state.ClusterStateRequest +import org.opensearch.action.admin.cluster.state.ClusterStateResponse import org.opensearch.action.admin.indices.rollover.RolloverRequest import org.opensearch.action.admin.indices.rollover.RolloverResponse import org.opensearch.action.admin.indices.stats.IndicesStatsRequest import org.opensearch.action.admin.indices.stats.IndicesStatsResponse +import org.opensearch.client.Client import org.opensearch.common.unit.ByteSizeValue import org.opensearch.common.unit.TimeValue +import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexRunner import org.opensearch.indexmanagement.indexstatemanagement.action.RolloverAction import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.getRolloverAlias import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.getRolloverSkip @@ -270,7 +274,44 @@ class AttemptRolloverStep(private val action: RolloverAction) : Step(name) { } override fun isIdempotent(): Boolean = false + @Suppress("NestedBlockDepth") + override suspend fun isTransientFailure(client: Client, stepContext: StepContext, managedIndexMetaData: ManagedIndexMetaData): Boolean { + var isTransientFailure = false + // Retrieve the alias name + val indexName = stepContext.metadata.index + val metadata = stepContext.clusterService.state().metadata() + val indexAbstraction = metadata.indicesLookup[indexName] + val isDataStreamIndex = indexAbstraction?.parentDataStream != null + val aliasName = when { + isDataStreamIndex -> indexAbstraction?.parentDataStream?.name + else -> metadata.index(indexName).getRolloverAlias() + } + if (aliasName == null) { + logger.error("Index $indexName has no alias attached to it. Not a Transient Failure in step attemptRolloverStep") + isTransientFailure = false + } else { + try { + val response: ClusterStateResponse = client.suspendUntil { + client.admin().cluster().state(ClusterStateRequest(), it) + } + // If the index was rolled over, this is a transient failure + isTransientFailure = response.state.metadata.index(indexName).rolloverInfos.containsKey(aliasName) + val result = ManagedIndexRunner.updateManagedIndexMetaData( + managedIndexMetaData.copy( + stepMetaData = managedIndexMetaData.stepMetaData?.copy(stepStatus = StepStatus.COMPLETED), + info = mapOf("message" to getAlreadyRolledOverMessage(indexName, aliasName)) + ) + ) + if (!result.metadataSaved) { + logger.error("Not able to update managed index meta data for index ${managedIndexMetaData.index}") + } + } catch (e: Exception) { + logger.error("Failed to request index ${stepContext.metadata.index} cluster data when checking for transient failure", e) + } + } + return isTransientFailure + } @Suppress("TooManyFunctions") companion object { const val name = "attempt_rollover" diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/AttemptShrinkStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/AttemptShrinkStep.kt index 87dba6361..c657f7db0 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/AttemptShrinkStep.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/AttemptShrinkStep.kt @@ -12,6 +12,7 @@ import org.opensearch.action.admin.indices.shrink.ResizeResponse import org.opensearch.action.admin.indices.stats.IndicesStatsRequest import org.opensearch.action.admin.indices.stats.IndicesStatsResponse import org.opensearch.action.support.master.AcknowledgedResponse +import org.opensearch.client.Client import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.common.settings.Settings import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction @@ -135,6 +136,10 @@ class AttemptShrinkStep(private val action: ShrinkAction) : ShrinkStep(name, tru } override fun isIdempotent() = false + override suspend fun isTransientFailure(client: Client, stepContext: StepContext, managedIndexMetaData: ManagedIndexMetaData): Boolean { + // TODO implement logic for detecting transient failure in attempt shrink step + return false + } companion object { const val name = "attempt_shrink_step" diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/snapshot/AttemptSnapshotStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/snapshot/AttemptSnapshotStep.kt index f4ffef794..8447bd58f 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/snapshot/AttemptSnapshotStep.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/snapshot/AttemptSnapshotStep.kt @@ -9,6 +9,7 @@ import org.apache.logging.log4j.LogManager import org.opensearch.ExceptionsHelper import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse +import org.opensearch.client.Client import org.opensearch.common.regex.Regex import org.opensearch.indexmanagement.indexstatemanagement.action.SnapshotAction import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.SNAPSHOT_DENY_LIST @@ -17,6 +18,7 @@ import org.opensearch.indexmanagement.opensearchapi.suspendUntil import org.opensearch.indexmanagement.spi.indexstatemanagement.Step import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionProperties import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData import org.opensearch.rest.RestStatus import org.opensearch.script.Script @@ -155,6 +157,10 @@ class AttemptSnapshotStep(private val action: SnapshotAction) : Step(name) { } override fun isIdempotent(): Boolean = false + override suspend fun isTransientFailure(client: Client, stepContext: StepContext, managedIndexMetaData: ManagedIndexMetaData): Boolean { + // TODO implement logic for detecting transient failure in attempt snapshot step + return false + } companion object { val validTopContextFields = setOf("index", "indexUuid") diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionIT.kt index a5612940e..05ba0806d 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionIT.kt @@ -689,7 +689,7 @@ class RolloverActionIT : IndexStateManagementRestTestCase() { updateManagedIndexConfigStartTime(managedIndexConfig) waitFor { val stepStatus = getExplainManagedIndexMetaData(firstIndex).stepMetaData?.stepStatus - assertEquals("Index did not rollover.", Step.StepStatus.COMPLETED, stepStatus) + assertEquals("rollover step did not continue executing after detecting the transient failure.", Step.StepStatus.COMPLETED, stepStatus) } } }