Skip to content

Commit

Permalink
Revised code based on comments
Browse files Browse the repository at this point in the history
Signed-off-by: Ronnak Saxena <[email protected]>
  • Loading branch information
ronnaksaxena committed Jul 19, 2023
1 parent 1db64da commit 04b5210
Showing 1 changed file with 25 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,9 @@ object ManagedIndexRunner :
return this
}
// Detects if a nonidempotent step was a transient failure or not
@Suppress("ReturnCount")
suspend fun isTransientFailure(stepContext: StepContext): Boolean {
@Suppress("NestedBlockDepth")
private suspend fun isTransientFailure(stepContext: StepContext): Boolean {
var isTransientFailure = false
val stepName = stepContext.metadata.stepMetaData?.name
when (stepName) {
"attempt_rollover" -> {
Expand All @@ -239,23 +240,29 @@ object ManagedIndexRunner :
else -> metadata.index(indexName).getRolloverAlias()
}
if (aliasName == null) {
logger.debug("Index has no alias attached to it. Not a Transient Failure")
return false
}
val response = client.suspendUntil { admin().indices().getAliases(GetAliasesRequest(aliasName), it) }
// Parse through all indices under that alias
val aliasIndices = response.aliases
aliasIndices.forEach { (index, _) ->
val indexMetaData = getIndexMetadata(index)
val indexCreationDate = indexMetaData?.creationDate
// Transient failure detected: Index was created after step started, therefore it finished a rollover
if (stepStartTime!! < indexCreationDate!!) {
logger.debug("Have to rerun attempt rollover step due to transient failure ${stepContext.metadata.index}")
return true
logger.error("Index $indexName has no alias attached to it. Not a Transient Failure in step $stepName")
isTransientFailure = false
} else {
try {
val response = client.suspendUntil { admin().indices().getAliases(GetAliasesRequest(aliasName), it) }
// Parse through all indices under that alias
val aliasIndices = response.aliases
aliasIndices.forEach { (index, _) ->
val indexMetaData = getIndexMetadata(index)
if (indexMetaData == null) {
// Node was dropped or disk full
isTransientFailure = true
} else if (stepStartTime!! < indexMetaData.creationDate) {
// Transient failure detected: Index was created after step started, therefore it finished a rollover
logger.debug("Have to rerun attempt rollover step due to transient failure ${stepContext.metadata.index}")
isTransientFailure = true
}
}
} catch (e: Exception) {
logger.error("Failed to request indices under alias $aliasName", e)
}
// Did not find any indices created after the rollover policy step started, therefore is not a Transient failure
}
// Did not find any indices created after the rollover policy step started, therefore is not a Transient failure
return false
}
"attempt_snapshot" -> {
// TODO implement logic for detecting transient failure in attempt snapshot step
Expand All @@ -273,7 +280,7 @@ object ManagedIndexRunner :
logger.debug("Detected unfamiliar nonIdempotent step: $stepName")
}
}
return true
return isTransientFailure
}

override fun runJob(job: ScheduledJobParameter, context: JobExecutionContext) {
Expand Down

0 comments on commit 04b5210

Please sign in to comment.