Skip to content

Commit

Permalink
Revised PR according to comments
Browse files Browse the repository at this point in the history
Signed-off-by: Ronnak Saxena <[email protected]>
  • Loading branch information
ronnaksaxena committed Jul 20, 2023
1 parent 04b5210 commit 3090683
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.indexmanagement.spi.indexstatemanagement

import org.apache.logging.log4j.Logger
import org.opensearch.client.Client
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.io.stream.Writeable
Expand Down Expand Up @@ -38,6 +39,10 @@ abstract class Step(val name: String, val isSafeToDisableOn: Boolean = true) {

abstract fun isIdempotent(): Boolean

open suspend fun isTransientFailure(client: Client, stepContext: StepContext, managedIndexMetaData: ManagedIndexMetaData): Boolean {
return false
}

final fun getStepStartTime(metadata: ManagedIndexMetaData): Instant {
return when {
metadata.stepMetaData == null -> Instant.now()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import kotlinx.coroutines.withContext
import org.apache.logging.log4j.LogManager
import org.opensearch.action.admin.cluster.state.ClusterStateRequest
import org.opensearch.action.admin.cluster.state.ClusterStateResponse
import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest
import org.opensearch.action.bulk.BackoffPolicy
import org.opensearch.action.bulk.BulkRequest
import org.opensearch.action.bulk.BulkResponse
Expand Down Expand Up @@ -48,7 +47,6 @@ import org.opensearch.indexmanagement.indexstatemanagement.model.ErrorNotificati
import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig
import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.getManagedIndexMetadata
import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.getRolloverAlias
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.ALLOW_LIST
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.ALLOW_LIST_NONE
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.DEFAULT_ISM_ENABLED
Expand Down Expand Up @@ -221,67 +219,6 @@ object ManagedIndexRunner :
this.extensionStatusChecker = extensionStatusChecker
return this
}
// Detects if a nonidempotent step was a transient failure or not
@Suppress("NestedBlockDepth")
private suspend fun isTransientFailure(stepContext: StepContext): Boolean {
var isTransientFailure = false
val stepName = stepContext.metadata.stepMetaData?.name
when (stepName) {
"attempt_rollover" -> {
val stepStartTime = stepContext.metadata.stepMetaData?.startTime
// Retrieve the alias name
val indexName = stepContext.metadata.index
val metadata = stepContext.clusterService.state().metadata()
val indexAbstraction = metadata.indicesLookup[indexName]
val isDataStreamIndex = indexAbstraction?.parentDataStream != null

val aliasName = when {
isDataStreamIndex -> indexAbstraction?.parentDataStream?.name
else -> metadata.index(indexName).getRolloverAlias()
}
if (aliasName == null) {
logger.error("Index $indexName has no alias attached to it. Not a Transient Failure in step $stepName")
isTransientFailure = false
} else {
try {
val response = client.suspendUntil { admin().indices().getAliases(GetAliasesRequest(aliasName), it) }
// Parse through all indices under that alias
val aliasIndices = response.aliases
aliasIndices.forEach { (index, _) ->
val indexMetaData = getIndexMetadata(index)
if (indexMetaData == null) {
// Node was dropped or disk full
isTransientFailure = true
} else if (stepStartTime!! < indexMetaData.creationDate) {
// Transient failure detected: Index was created after step started, therefore it finished a rollover
logger.debug("Have to rerun attempt rollover step due to transient failure ${stepContext.metadata.index}")
isTransientFailure = true
}
}
} catch (e: Exception) {
logger.error("Failed to request indices under alias $aliasName", e)
}
// Did not find any indices created after the rollover policy step started, therefore is not a Transient failure
}
}
"attempt_snapshot" -> {
// TODO implement logic for detecting transient failure in attempt snapshot step
}
"attempt_notification" -> {
// TODO implement logic for detecting transient failure in attempt notification step
}
"attempt_shrink_step" -> {
// TODO implement logic for detecting transient failure in attempt shrink step
}
"attempt_call_force_merge" -> {
// TODO implement logic for detecting transient failure in attempt call force merge step
}
else -> {
logger.debug("Detected unfamiliar nonIdempotent step: $stepName")
}
}
return isTransientFailure
}

override fun runJob(job: ScheduledJobParameter, context: JobExecutionContext) {
if (job !is ManagedIndexConfig) {
Expand Down Expand Up @@ -422,7 +359,7 @@ object ManagedIndexRunner :
val isIdempotent = step?.isIdempotent()
logger.info("Previous execution failed to update step status, isIdempotent=$isIdempotent")
// If this step was not a transient failure fail the index
if (isIdempotent != true && !isTransientFailure(stepContext)) {
if (isIdempotent != true && step != null && !step.isTransientFailure(client, stepContext, managedIndexMetaData)) {
val info = mapOf("message" to "Previous action was not able to update IndexMetaData.")
val updated = updateManagedIndexMetaData(
managedIndexMetaData.copy(
Expand Down Expand Up @@ -734,7 +671,7 @@ object ManagedIndexRunner :
* update metadata in config index, and save metadata in history after update
* this can be called 2 times in one job run, so need to save seqNo & primeTerm
*/
private suspend fun updateManagedIndexMetaData(
suspend fun updateManagedIndexMetaData(
managedIndexMetaData: ManagedIndexMetaData,
lastUpdateResult: UpdateMetadataResult? = null,
create: Boolean = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.action.admin.indices.forcemerge.ForceMergeRequest
import org.opensearch.action.admin.indices.forcemerge.ForceMergeResponse
import org.opensearch.client.Client
import org.opensearch.indexmanagement.indexstatemanagement.action.ForceMergeAction
import org.opensearch.indexmanagement.opensearchapi.getUsefulCauseString
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionProperties
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import org.opensearch.rest.RestStatus
import org.opensearch.transport.RemoteTransportException
Expand Down Expand Up @@ -104,6 +106,10 @@ class AttemptCallForceMergeStep(private val action: ForceMergeAction) : Step(nam
}

override fun isIdempotent() = false
override suspend fun isTransientFailure(client: Client, stepContext: StepContext, managedIndexMetaData: ManagedIndexMetaData): Boolean {
// TODO implement logic for detecting transient failure in attempt call force merge step
return false
}

companion object {
const val name = "attempt_call_force_merge"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
package org.opensearch.indexmanagement.indexstatemanagement.step.notification

import org.apache.logging.log4j.LogManager
import org.opensearch.client.Client
import org.opensearch.indexmanagement.indexstatemanagement.action.NotificationAction
import org.opensearch.indexmanagement.indexstatemanagement.util.publishLegacyNotification
import org.opensearch.indexmanagement.indexstatemanagement.util.sendNotification
import org.opensearch.indexmanagement.opensearchapi.convertToMap
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import org.opensearch.script.Script
import org.opensearch.script.ScriptService
Expand Down Expand Up @@ -66,6 +68,10 @@ class AttemptNotificationStep(private val action: NotificationAction) : Step(nam
}

override fun isIdempotent(): Boolean = false
override suspend fun isTransientFailure(client: Client, stepContext: StepContext, managedIndexMetaData: ManagedIndexMetaData): Boolean {
// TODO implement logic for detecting transient failure in attempt notification step
return false
}

companion object {
const val name = "attempt_notification"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,16 @@ package org.opensearch.indexmanagement.indexstatemanagement.step.rollover

import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.action.admin.cluster.state.ClusterStateRequest
import org.opensearch.action.admin.cluster.state.ClusterStateResponse
import org.opensearch.action.admin.indices.rollover.RolloverRequest
import org.opensearch.action.admin.indices.rollover.RolloverResponse
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse
import org.opensearch.client.Client
import org.opensearch.common.unit.ByteSizeValue
import org.opensearch.common.unit.TimeValue
import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexRunner
import org.opensearch.indexmanagement.indexstatemanagement.action.RolloverAction
import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.getRolloverAlias
import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.getRolloverSkip
Expand Down Expand Up @@ -270,7 +274,44 @@ class AttemptRolloverStep(private val action: RolloverAction) : Step(name) {
}

override fun isIdempotent(): Boolean = false
@Suppress("NestedBlockDepth")
override suspend fun isTransientFailure(client: Client, stepContext: StepContext, managedIndexMetaData: ManagedIndexMetaData): Boolean {
var isTransientFailure = false
// Retrieve the alias name
val indexName = stepContext.metadata.index
val metadata = stepContext.clusterService.state().metadata()
val indexAbstraction = metadata.indicesLookup[indexName]
val isDataStreamIndex = indexAbstraction?.parentDataStream != null

val aliasName = when {
isDataStreamIndex -> indexAbstraction?.parentDataStream?.name
else -> metadata.index(indexName).getRolloverAlias()
}
if (aliasName == null) {
logger.error("Index $indexName has no alias attached to it. Not a Transient Failure in step attemptRolloverStep")
isTransientFailure = false
} else {
try {
val response: ClusterStateResponse = client.suspendUntil {
client.admin().cluster().state(ClusterStateRequest(), it)
}
// If the index was rolled over, this is a transient failure
isTransientFailure = response.state.metadata.index(indexName).rolloverInfos.containsKey(aliasName)
val result = ManagedIndexRunner.updateManagedIndexMetaData(
managedIndexMetaData.copy(
stepMetaData = managedIndexMetaData.stepMetaData?.copy(stepStatus = StepStatus.COMPLETED),
info = mapOf("message" to getAlreadyRolledOverMessage(indexName, aliasName))
)
)
if (!result.metadataSaved) {
logger.error("Not able to update managed index meta data for index ${managedIndexMetaData.index}")
}
} catch (e: Exception) {
logger.error("Failed to request index ${stepContext.metadata.index} cluster data when checking for transient failure", e)
}
}
return isTransientFailure
}
@Suppress("TooManyFunctions")
companion object {
const val name = "attempt_rollover"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.opensearch.action.admin.indices.shrink.ResizeResponse
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.client.Client
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.common.settings.Settings
import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction
Expand Down Expand Up @@ -135,6 +136,10 @@ class AttemptShrinkStep(private val action: ShrinkAction) : ShrinkStep(name, tru
}

override fun isIdempotent() = false
override suspend fun isTransientFailure(client: Client, stepContext: StepContext, managedIndexMetaData: ManagedIndexMetaData): Boolean {
// TODO implement logic for detecting transient failure in attempt shrink step
return false
}

companion object {
const val name = "attempt_shrink_step"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse
import org.opensearch.client.Client
import org.opensearch.common.regex.Regex
import org.opensearch.indexmanagement.indexstatemanagement.action.SnapshotAction
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.SNAPSHOT_DENY_LIST
Expand All @@ -17,6 +18,7 @@ import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionProperties
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import org.opensearch.rest.RestStatus
import org.opensearch.script.Script
Expand Down Expand Up @@ -155,6 +157,10 @@ class AttemptSnapshotStep(private val action: SnapshotAction) : Step(name) {
}

override fun isIdempotent(): Boolean = false
override suspend fun isTransientFailure(client: Client, stepContext: StepContext, managedIndexMetaData: ManagedIndexMetaData): Boolean {
// TODO implement logic for detecting transient failure in attempt snapshot step
return false
}

companion object {
val validTopContextFields = setOf("index", "indexUuid")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,7 @@ class RolloverActionIT : IndexStateManagementRestTestCase() {
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor {
val stepStatus = getExplainManagedIndexMetaData(firstIndex).stepMetaData?.stepStatus
assertEquals("Index did not rollover.", Step.StepStatus.COMPLETED, stepStatus)
assertEquals("rollover step did not continue executing after detecting the transient failure.", Step.StepStatus.COMPLETED, stepStatus)
}
}
}

0 comments on commit 3090683

Please sign in to comment.