Skip to content

Commit

Permalink
pre check for rollover action (opensearch-project#88)
Browse files Browse the repository at this point in the history
* pre check for rollover action

Signed-off-by: bowenlan-amzn <[email protected]>
  • Loading branch information
bowenlan-amzn authored Jul 1, 2021
1 parent 69ecb96 commit d415a03
Show file tree
Hide file tree
Showing 9 changed files with 173 additions and 3 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/multi-node-test-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ jobs:
with:
repository: 'opensearch-project/alerting'
path: alerting
ref: 'main'
ref: '1.0'
- name: Build alerting
working-directory: ./alerting
run: ./gradlew :alerting-notification:publishToMavenLocal -Dopensearch.version=1.0.0 -Dbuild.snapshot=false
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test-and-build-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ jobs:
with:
repository: 'opensearch-project/alerting'
path: alerting
ref: 'main'
ref: '1.0'
- name: Build alerting
working-directory: ./alerting
run: ./gradlew :alerting-notification:publishToMavenLocal -Dopensearch.version=1.0.0 -Dbuild.snapshot=false
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ dependencies {
compile "org.jetbrains.kotlin:kotlin-stdlib-common:${kotlin_version}"
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.7'
compile "org.jetbrains:annotations:13.0"
compile "org.opensearch:notification:1.0.0.0-rc1"
compile "org.opensearch:notification:1.0.0.0"
compile "org.opensearch:common-utils:1.0.0.0"
compile "com.github.seancfoley:ipaddress:5.3.3"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
ManagedIndexSettings.HISTORY_NUMBER_OF_REPLICAS,
ManagedIndexSettings.POLICY_ID,
ManagedIndexSettings.ROLLOVER_ALIAS,
ManagedIndexSettings.ROLLOVER_SKIP,
ManagedIndexSettings.INDEX_STATE_MANAGEMENT_ENABLED,
ManagedIndexSettings.METADATA_SERVICE_ENABLED,
ManagedIndexSettings.JOB_INTERVAL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ fun IndexMetadata.getRolloverAlias(): String? {
return this.settings.get(ManagedIndexSettings.ROLLOVER_ALIAS.key)
}

fun IndexMetadata.getRolloverSkip(): Boolean {
return this.settings.getAsBoolean(ManagedIndexSettings.ROLLOVER_SKIP.key, false)
}

fun IndexMetadata.getManagedIndexMetadata(): ManagedIndexMetaData? {
val existingMetaDataMap = this.getCustomData(ManagedIndexMetaData.MANAGED_INDEX_METADATA_TYPE)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ class ManagedIndexSettings {
Setting.Property.Dynamic
)

val ROLLOVER_SKIP: Setting<Boolean> = Setting.boolSetting(
"index.plugins.index_state_management.rollover_skip",
false,
Setting.Property.IndexScope,
Setting.Property.Dynamic
)

val JOB_INTERVAL: Setting<Int> = Setting.intSetting(
"plugins.index_state_management.job_interval",
LegacyOpenDistroManagedIndexSettings.JOB_INTERVAL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexMet
import org.opensearch.indexmanagement.indexstatemanagement.model.action.RolloverActionConfig
import org.opensearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.StepMetaData
import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.getRolloverAlias
import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.getRolloverSkip
import org.opensearch.indexmanagement.indexstatemanagement.step.Step
import org.opensearch.indexmanagement.indexstatemanagement.util.evaluateConditions
import org.opensearch.indexmanagement.opensearchapi.getUsefulCauseString
Expand All @@ -64,6 +65,13 @@ class AttemptRolloverStep(

@Suppress("TooGenericExceptionCaught")
override suspend fun execute(): AttemptRolloverStep {
val skipRollover = clusterService.state().metadata.index(indexName).getRolloverSkip()
if (skipRollover) {
stepStatus = StepStatus.COMPLETED
info = mapOf("message" to getSkipRolloverMessage(indexName))
return this
}

// If we have already rolled over this index then fail as we only allow an index to be rolled over once
if (managedIndexMetaData.rolledOver == true) {
logger.warn("$indexName was already rolled over, cannot execute rollover step")
Expand All @@ -76,6 +84,12 @@ class AttemptRolloverStep(
// If the rolloverTarget is null, we would've already updated the failed info from getRolloverTargetOrUpdateInfo and can return early
rolloverTarget ?: return this

if (!isDataStream && !preCheckIndexAlias(rolloverTarget)) {
stepStatus = StepStatus.FAILED
info = mapOf("message" to getFailedPreCheckMessage(indexName))
return this
}

val statsResponse = getIndexStatsOrUpdateInfo()
// If statsResponse is null we already updated failed info from getIndexStatsOrUpdateInfo and can return early
statsResponse ?: return this
Expand Down Expand Up @@ -170,6 +184,36 @@ class AttemptRolloverStep(
}
}

/**
* pre-condition check on managed-index's alias before rollover
*
* This will block
* when managed index dont have alias
* when managed index has alias but not the write index,
* and this alias contains more than one index
* User can use skip rollover setting to bypass this
*
* @param alias user defined ISM rollover alias
*/
private fun preCheckIndexAlias(alias: String): Boolean {
val metadata = clusterService.state().metadata
val indexAlias = metadata.index(indexName)?.aliases?.get(alias)
logger.debug("Index $indexName has aliases $indexAlias")
if (indexAlias == null) {
return false
}
val isWriteIndex = indexAlias.writeIndex() // this could be null
if (isWriteIndex != true) {
val aliasIndices = metadata.indicesLookup[alias]?.indices?.map { it.index }
logger.debug("Alias $alias contains indices $aliasIndices")
if (aliasIndices != null && aliasIndices.size > 1) {
return false
}
}

return true
}

private fun getRolloverTargetOrUpdateInfo(): Pair<String?, Boolean> {
val metadata = clusterService.state().metadata()
val indexAbstraction = metadata.indicesLookup[indexName]
Expand Down Expand Up @@ -246,5 +290,7 @@ class AttemptRolloverStep(
fun getSuccessMessage(index: String) = "Successfully rolled over index [index=$index]"
fun getSuccessDataStreamRolloverMessage(dataStream: String, index: String) =
"Successfully rolled over data stream [data_stream=$dataStream index=$index]"
fun getFailedPreCheckMessage(index: String) = "Missing alias or not the write index when rollover [index=$index]"
fun getSkipRolloverMessage(index: String) = "Skipped rollover action for [index=$index]"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,29 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase()
return index to policyID
}

protected fun changeAlias(
index: String,
alias: String,
action: String = "remove",
isWriteIndex: Boolean = false
) {
val isWriteIndexField = if (isWriteIndex) "\",\"is_write_index\": \"$isWriteIndex" else ""
val body = """
{
"actions": [
{
"$action": {
"index": "$index",
"alias": "$alias$isWriteIndexField"
}
}
]
}
""".trimIndent()
val response = client().makeRequest("POST", "_aliases", StringEntity(body, APPLICATION_JSON))
assertEquals("Unexpected RestStatus", RestStatus.OK, response.restStatus())
}

/** Refresh all indices in the cluster */
protected fun refresh() {
val request = Request("POST", "/_refresh")
Expand Down Expand Up @@ -248,6 +271,30 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase()
assertEquals("Request failed", RestStatus.OK, res.restStatus())
}

protected fun updateIndexSetting(
index: String,
key: String,
value: String
) {
val body = """
{
"$key" : "$value"
}
""".trimIndent()
val res = client().makeRequest(
"PUT", "$index/_settings", emptyMap(),
StringEntity(body, APPLICATION_JSON)
)
assertEquals("Update index setting failed", RestStatus.OK, res.restStatus())
}

protected fun getIndexSetting(index: String) {
val res = client().makeRequest(
"GET", "$index/_settings", emptyMap()
)
assertEquals("Update index setting failed", RestStatus.OK, res.restStatus())
}

protected fun getManagedIndexConfig(index: String): ManagedIndexConfig? {
val request = """
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,13 @@ import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
import org.opensearch.indexmanagement.indexstatemanagement.model.State
import org.opensearch.indexmanagement.indexstatemanagement.model.action.RolloverActionConfig
import org.opensearch.indexmanagement.indexstatemanagement.randomErrorNotification
import org.opensearch.indexmanagement.indexstatemanagement.resthandler.RestRetryFailedManagedIndexAction
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings
import org.opensearch.indexmanagement.indexstatemanagement.step.rollover.AttemptRolloverStep
import org.opensearch.indexmanagement.makeRequest
import org.opensearch.indexmanagement.waitFor
import org.opensearch.rest.RestRequest
import org.opensearch.rest.RestStatus
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.Locale
Expand Down Expand Up @@ -284,6 +288,67 @@ class RolloverActionIT : IndexStateManagementRestTestCase() {
Assert.assertTrue("New rollover index does not exist.", indexExists("$indexNameBase-000002"))
}

fun `test rollover pre check`() {
// index-1 alias x
// index-2 alias x is_write_index
// manage index-1, expect it fail to rollover
val index1 = "index-1"
val index2 = "index-2"
val alias1 = "x"
val policyID = "${testIndexName}_precheck"
val actionConfig = RolloverActionConfig(null, 3, TimeValue.timeValueDays(2), 0)
val states = listOf(State(name = "RolloverAction", actions = listOf(actionConfig), transitions = listOf()))
val policy = Policy(
id = policyID,
description = "$testIndexName description",
schemaVersion = 1L,
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS),
errorNotification = randomErrorNotification(),
defaultState = states[0].name,
states = states
)
createPolicy(policy, policyID)
createIndex(index1, policyID)
changeAlias(index1, alias1, "add")
updateIndexSetting(index1, ManagedIndexSettings.ROLLOVER_ALIAS.key, alias1)
createIndex(index2, policyID)
changeAlias(index2, alias1, "add", true)
updateIndexSetting(index2, ManagedIndexSettings.ROLLOVER_ALIAS.key, alias1)

val managedIndexConfig = getExistingManagedIndexConfig(index1)

// Change the start time so the job will trigger in 2 seconds, this will trigger the first initialization of the policy
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(index1).policyID) }

// Need to speed up to second execution where it will trigger the first execution of the action
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor {
val info = getExplainManagedIndexMetaData(index1).info as Map<String, Any?>
assertEquals(
"Index rollover not stopped by pre-check.",
AttemptRolloverStep.getFailedPreCheckMessage(index1), info["message"]
)
}

updateIndexSetting(index1, ManagedIndexSettings.ROLLOVER_SKIP.key, "true")

val response = client().makeRequest(
RestRequest.Method.POST.toString(),
"${RestRetryFailedManagedIndexAction.RETRY_BASE_URI}/$index1"
)
assertEquals("Unexpected RestStatus", RestStatus.OK, response.restStatus())

updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor {
val info = getExplainManagedIndexMetaData(index1).info as Map<String, Any?>
assertEquals(
"Index rollover not skip.",
AttemptRolloverStep.getSkipRolloverMessage(index1), info["message"]
)
}
}

fun `test data stream rollover no condition`() {
val dataStreamName = "${testIndexName}_data_stream"
val policyID = "${testIndexName}_rollover_policy"
Expand Down

0 comments on commit d415a03

Please sign in to comment.