From 1923003b8786bea21e1991a2351040c20c0c0c62 Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Wed, 16 Aug 2023 17:59:03 -0700 Subject: [PATCH] Support copy alias in rollover (#892) * Support copy alias in rollover Signed-off-by: bowenlan-amzn * Add more tests Signed-off-by: bowenlan-amzn * Add bwc test Signed-off-by: bowenlan-amzn * Enhance tests Signed-off-by: bowenlan-amzn * Improve not acknowledge call scenario Signed-off-by: bowenlan-amzn --------- Signed-off-by: bowenlan-amzn --- build.gradle | 76 +++--- .../model/ManagedIndexMetaData.kt | 48 ++-- .../action/RolloverAction.kt | 4 + .../action/RolloverActionParser.kt | 7 +- .../step/rollover/AttemptRolloverStep.kt | 115 +++++++++- .../util/ManagedIndexUtils.kt | 1 + .../mappings/opendistro-ism-config.json | 8 +- .../mappings/opendistro-ism-history.json | 5 +- .../IndexManagementRestTestCase.kt | 22 +- .../bwc/ISMBackwardsCompatibilityIT.kt | 217 ++++++++++++++++++ .../IndexStateManagementRestTestCase.kt | 27 ++- .../action/RolloverActionIT.kt | 124 +++++++++- .../step/AttemptRolloverStepTests.kt | 194 ++++++++++++++++ .../validation/ValidateRolloverIT.kt | 8 +- .../validation/ValidateRolloverTests.kt | 2 +- .../cached-opendistro-ism-config.json | 8 +- .../cached-opendistro-ism-history.json | 5 +- 17 files changed, 770 insertions(+), 101 deletions(-) create mode 100644 src/test/kotlin/org/opensearch/indexmanagement/bwc/ISMBackwardsCompatibilityIT.kt create mode 100644 src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptRolloverStepTests.kt diff --git a/build.gradle b/build.gradle index 986382a4e..d34d45ab9 100644 --- a/build.gradle +++ b/build.gradle @@ -613,32 +613,44 @@ task integTestRemote(type: RestIntegTestTask) { // === Set up BWC tests === -String bwcMinVersion = "1.13.2.0" -String bwcJobSchedulerVersion = "1.13.0.0" +String bwcMinVersion = "2.6.0.0" String bwcBundleVersion = "1.3.2.0" Boolean bwcBundleTest = (project.findProperty('customDistributionDownloadType') != null && project.properties['customDistributionDownloadType'] == "bundle"); String bwcVersion = bwcBundleTest ? bwcBundleVersion : bwcMinVersion String bwcCurrentVersion = opensearch_version.replace("-SNAPSHOT", "") String baseName = "indexmanagementBwcCluster" -String bwcFilePath = "src/test/resources/bwc/" -String bwc_js_resource_location = bwcFilePath + "job-scheduler/" + bwcJobSchedulerVersion -String bwc_im_resource_location = bwcFilePath + "indexmanagement/" + bwcVersion - -// Downloads the bwc job scheduler version -String bwc_js_download_url = "https://github.com/opendistro-for-elasticsearch/job-scheduler/releases/download/v" + - bwcJobSchedulerVersion + "/job-scheduler-artifacts.zip" - -// Downloads the bwc index management version -String bwc_im_download_url = "https://github.com/opendistro-for-elasticsearch/index-management/releases/download/v" + - bwcMinVersion + "/index-management-artifacts.zip" -getPluginResource(bwc_im_resource_location, bwc_im_download_url) +configurations { + bwcZip +} +dependencies { + bwcZip "org.opensearch.plugin:opensearch-job-scheduler:${bwcMinVersion}-SNAPSHOT@zip" + bwcZip "org.opensearch.plugin:opensearch-index-management:${bwcMinVersion}-SNAPSHOT@zip" +} +ext.resolvebwcZipFile = { pluginId -> + return new Callable() { + @Override + RegularFile call() throws Exception { + return new RegularFile() { + @Override + File getAsFile() { + return configurations.bwcZip.resolvedConfiguration.resolvedArtifacts + .find { ResolvedArtifact f -> + f.name.startsWith(pluginId) + } + .file + } + } + } + } +} +Integer bwcNumNodes = 3 2.times {i -> testClusters { "${baseName}$i"{ testDistribution = "ARCHIVE" - numberOfNodes = 3 + numberOfNodes = bwcNumNodes if (bwcBundleTest) { versions = [ "1.3.2", bwcCurrentVersion @@ -686,31 +698,10 @@ getPluginResource(bwc_im_resource_location, bwc_im_download_url) } } else { versions = [ - "7.10.2", opensearch_version + "2.6.0-SNAPSHOT", opensearch_version ] - plugin(provider(new Callable(){ - @Override - RegularFile call() throws Exception { - return new RegularFile() { - @Override - File getAsFile() { - return getPluginResource(bwc_js_resource_location, bwc_js_download_url) - } - } - } - })) - - plugin(provider(new Callable(){ - @Override - RegularFile call() throws Exception { - return new RegularFile() { - @Override - File getAsFile() { - return getPluginResource(bwc_im_resource_location, bwc_im_download_url) - } - } - } - })) + plugin(provider(resolvebwcZipFile("opensearch-job-scheduler"))) + plugin(provider(resolvebwcZipFile("opensearch-index-management"))) } setting 'path.repo', @@ -748,6 +739,7 @@ task prepareBwcTests { nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}$i".allHttpSocketURI.join(",")}") nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}$i".getName()}") systemProperty 'tests.security.manager', 'false' + systemProperty 'cluster.number_of_nodes', "${bwcNumNodes}" } } @@ -772,6 +764,7 @@ task "${baseName}#oneThirdsUpgradeCluster"(type: StandaloneRestIntegTestTask) { nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}0".allHttpSocketURI.join(",")}") nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}0".getName()}") systemProperty 'tests.security.manager', 'false' + systemProperty 'cluster.number_of_nodes', "${bwcNumNodes}" } // Upgrade the second node to new OpenSearch version with upgraded plugin version after the first node is upgraded. @@ -792,6 +785,7 @@ task "${baseName}#twoThirdsUpgradedClusterTask"(type: StandaloneRestIntegTestTas nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}0".allHttpSocketURI.join(",")}") nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}0".getName()}") systemProperty 'tests.security.manager', 'false' + systemProperty 'cluster.number_of_nodes', "${bwcNumNodes}" } // Upgrade the third node to new OpenSearch version with upgraded plugin version after the second node is upgraded. @@ -812,6 +806,7 @@ task "${baseName}#rollingUpgradeClusterTask"(type: StandaloneRestIntegTestTask) nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}0".allHttpSocketURI.join(",")}") nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}0".getName()}") systemProperty 'tests.security.manager', 'false' + systemProperty 'cluster.number_of_nodes', "${bwcNumNodes}" } // Upgrade all the nodes of the old cluster to new OpenSearch version with upgraded plugin version @@ -830,6 +825,7 @@ task "${baseName}#fullRestartClusterTask"(type: StandaloneRestIntegTestTask) { nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}1".allHttpSocketURI.join(",")}") nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}1".getName()}") systemProperty 'tests.security.manager', 'false' + systemProperty 'cluster.number_of_nodes', "${bwcNumNodes}" } // A bwc test suite which runs all the bwc tasks combined @@ -837,7 +833,7 @@ task bwcTestSuite(type: StandaloneRestIntegTestTask) { exclude '**/*Test*' exclude '**/*IT*' // TODO refactor bwc test #677 - // dependsOn tasks.named("${baseName}#rollingUpgradeClusterTask") + dependsOn tasks.named("${baseName}#rollingUpgradeClusterTask") dependsOn tasks.named("${baseName}#fullRestartClusterTask") } diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ManagedIndexMetaData.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ManagedIndexMetaData.kt index a388ac570..ca4124b6b 100644 --- a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ManagedIndexMetaData.kt +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ManagedIndexMetaData.kt @@ -38,7 +38,8 @@ data class ManagedIndexMetaData( val info: Map?, val id: String = NO_ID, val seqNo: Long = SequenceNumbers.UNASSIGNED_SEQ_NO, - val primaryTerm: Long = SequenceNumbers.UNASSIGNED_PRIMARY_TERM + val primaryTerm: Long = SequenceNumbers.UNASSIGNED_PRIMARY_TERM, + val rolledOverIndexName: String? = null, ) : Writeable, ToXContentFragment { @Suppress("ComplexMethod") @@ -51,6 +52,7 @@ data class ManagedIndexMetaData( if (policyPrimaryTerm != null) resultMap[POLICY_PRIMARY_TERM] = policyPrimaryTerm.toString() if (policyCompleted != null) resultMap[POLICY_COMPLETED] = policyCompleted.toString() if (rolledOver != null) resultMap[ROLLED_OVER] = rolledOver.toString() + if (rolledOverIndexName != null) resultMap[ROLLED_OVER_INDEX_NAME] = rolledOverIndexName if (indexCreationDate != null) resultMap[INDEX_CREATION_DATE] = indexCreationDate.toString() if (transitionTo != null) resultMap[TRANSITION_TO] = transitionTo if (stateMetaData != null) resultMap[StateMetaData.STATE] = stateMetaData.getMapValueString() @@ -76,6 +78,7 @@ data class ManagedIndexMetaData( .field(POLICY_PRIMARY_TERM, policyPrimaryTerm) .field(POLICY_COMPLETED, policyCompleted) .field(ROLLED_OVER, rolledOver) + .field(ROLLED_OVER_INDEX_NAME, rolledOverIndexName) .field(INDEX_CREATION_DATE, indexCreationDate) .field(TRANSITION_TO, transitionTo) .addObject(StateMetaData.STATE, stateMetaData, params, true) @@ -110,6 +113,7 @@ data class ManagedIndexMetaData( // Only show rolled_over if we have rolled over or we are in the rollover action if (rolledOver == true || (actionMetaData != null && actionMetaData.name == "rollover")) { builder.field(ROLLED_OVER, rolledOver) + if (rolledOverIndexName != null) builder.field(ROLLED_OVER_INDEX_NAME, rolledOverIndexName) } if (indexCreationDate != null) builder.field(INDEX_CREATION_DATE, indexCreationDate) @@ -142,6 +146,7 @@ data class ManagedIndexMetaData( streamOutput.writeOptionalLong(policyPrimaryTerm) streamOutput.writeOptionalBoolean(policyCompleted) streamOutput.writeOptionalBoolean(rolledOver) + streamOutput.writeOptionalString(rolledOverIndexName) streamOutput.writeOptionalLong(indexCreationDate) streamOutput.writeOptionalString(transitionTo) @@ -172,6 +177,7 @@ data class ManagedIndexMetaData( const val POLICY_PRIMARY_TERM = "policy_primary_term" const val POLICY_COMPLETED = "policy_completed" const val ROLLED_OVER = "rolled_over" + const val ROLLED_OVER_INDEX_NAME = "rolled_over_index_name" const val INDEX_CREATION_DATE = "index_creation_date" const val TRANSITION_TO = "transition_to" const val INFO = "info" @@ -185,6 +191,7 @@ data class ManagedIndexMetaData( val policyPrimaryTerm: Long? = si.readOptionalLong() val policyCompleted: Boolean? = si.readOptionalBoolean() val rolledOver: Boolean? = si.readOptionalBoolean() + val rolledOverIndexName: String? = si.readOptionalString() val indexCreationDate: Long? = si.readOptionalLong() val transitionTo: String? = si.readOptionalString() @@ -207,6 +214,7 @@ data class ManagedIndexMetaData( policyPrimaryTerm = policyPrimaryTerm, policyCompleted = policyCompleted, rolledOver = rolledOver, + rolledOverIndexName = rolledOverIndexName, indexCreationDate = indexCreationDate, transitionTo = transitionTo, stateMetaData = state, @@ -234,6 +242,7 @@ data class ManagedIndexMetaData( var policyPrimaryTerm: Long? = null var policyCompleted: Boolean? = null var rolledOver: Boolean? = null + var rolledOverIndexName: String? = null var indexCreationDate: Long? = null var transitionTo: String? = null @@ -256,6 +265,7 @@ data class ManagedIndexMetaData( POLICY_PRIMARY_TERM -> policyPrimaryTerm = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.longValue() POLICY_COMPLETED -> policyCompleted = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.booleanValue() ROLLED_OVER -> rolledOver = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.booleanValue() + ROLLED_OVER_INDEX_NAME -> rolledOverIndexName = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.text() INDEX_CREATION_DATE -> indexCreationDate = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.longValue() TRANSITION_TO -> transitionTo = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.text() StateMetaData.STATE -> { @@ -277,23 +287,24 @@ data class ManagedIndexMetaData( } return ManagedIndexMetaData( - requireNotNull(index) { "$INDEX is null" }, - requireNotNull(indexUuid) { "$INDEX_UUID is null" }, - requireNotNull(policyID) { "$POLICY_ID is null" }, - policySeqNo, - policyPrimaryTerm, - policyCompleted, - rolledOver, - indexCreationDate, - transitionTo, - state, - action, - step, - retryInfo, - info, - id, - seqNo, - primaryTerm + index = requireNotNull(index) { "$INDEX is null" }, + indexUuid = requireNotNull(indexUuid) { "$INDEX_UUID is null" }, + policyID = requireNotNull(policyID) { "$POLICY_ID is null" }, + policySeqNo = policySeqNo, + policyPrimaryTerm = policyPrimaryTerm, + policyCompleted = policyCompleted, + rolledOver = rolledOver, + rolledOverIndexName = rolledOverIndexName, + indexCreationDate = indexCreationDate, + transitionTo = transitionTo, + stateMetaData = state, + actionMetaData = action, + stepMetaData = step, + policyRetryInfo = retryInfo, + info = info, + id = id, + seqNo = seqNo, + primaryTerm = primaryTerm, ) } @@ -323,6 +334,7 @@ data class ManagedIndexMetaData( policyPrimaryTerm = map[POLICY_PRIMARY_TERM]?.toLong(), policyCompleted = map[POLICY_COMPLETED]?.toBoolean(), rolledOver = map[ROLLED_OVER]?.toBoolean(), + rolledOverIndexName = map[ROLLED_OVER_INDEX_NAME], indexCreationDate = map[INDEX_CREATION_DATE]?.toLong(), transitionTo = map[TRANSITION_TO], stateMetaData = StateMetaData.fromManagedIndexMetaDataMap(map), diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverAction.kt index 48f892032..259afae4d 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverAction.kt @@ -20,6 +20,7 @@ class RolloverAction( val minDocs: Long?, val minAge: TimeValue?, val minPrimaryShardSize: ByteSizeValue?, + val copyAlias: Boolean = false, index: Int ) : Action(name, index) { @@ -47,6 +48,7 @@ class RolloverAction( if (minDocs != null) builder.field(MIN_DOC_COUNT_FIELD, minDocs) if (minAge != null) builder.field(MIN_INDEX_AGE_FIELD, minAge.stringRep) if (minPrimaryShardSize != null) builder.field(MIN_PRIMARY_SHARD_SIZE_FIELD, minPrimaryShardSize.stringRep) + builder.field(COPY_ALIAS_FIELD, copyAlias) builder.endObject() } @@ -55,6 +57,7 @@ class RolloverAction( out.writeOptionalLong(minDocs) out.writeOptionalTimeValue(minAge) out.writeOptionalWriteable(minPrimaryShardSize) + out.writeBoolean(copyAlias) out.writeInt(actionIndex) } @@ -64,5 +67,6 @@ class RolloverAction( const val MIN_DOC_COUNT_FIELD = "min_doc_count" const val MIN_INDEX_AGE_FIELD = "min_index_age" const val MIN_PRIMARY_SHARD_SIZE_FIELD = "min_primary_shard_size" + const val COPY_ALIAS_FIELD = "copy_alias" } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionParser.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionParser.kt index 7671754ad..5f9749f2a 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionParser.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionParser.kt @@ -19,9 +19,10 @@ class RolloverActionParser : ActionParser() { val minDocs = sin.readOptionalLong() val minAge = sin.readOptionalTimeValue() val minPrimaryShardSize = sin.readOptionalWriteable(::ByteSizeValue) + val copyAlias = sin.readBoolean() val index = sin.readInt() - return RolloverAction(minSize, minDocs, minAge, minPrimaryShardSize, index) + return RolloverAction(minSize, minDocs, minAge, minPrimaryShardSize, copyAlias, index) } override fun fromXContent(xcp: XContentParser, index: Int): Action { @@ -29,6 +30,7 @@ class RolloverActionParser : ActionParser() { var minDocs: Long? = null var minAge: TimeValue? = null var minPrimaryShardSize: ByteSizeValue? = null + var copyAlias = false ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { @@ -44,11 +46,12 @@ class RolloverActionParser : ActionParser() { RolloverAction .MIN_PRIMARY_SHARD_SIZE_FIELD ) + RolloverAction.COPY_ALIAS_FIELD -> copyAlias = xcp.booleanValue() else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in RolloverAction.") } } - return RolloverAction(minSize, minDocs, minAge, minPrimaryShardSize, index) + return RolloverAction(minSize, minDocs, minAge, minPrimaryShardSize, copyAlias, index) } override fun getActionType(): String { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/rollover/AttemptRolloverStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/rollover/AttemptRolloverStep.kt index 6657e672a..6769ac4f7 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/rollover/AttemptRolloverStep.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/rollover/AttemptRolloverStep.kt @@ -7,12 +7,18 @@ package org.opensearch.indexmanagement.indexstatemanagement.step.rollover import org.apache.logging.log4j.LogManager import org.opensearch.ExceptionsHelper +import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest +import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest.AliasActions import org.opensearch.action.admin.indices.rollover.RolloverRequest import org.opensearch.action.admin.indices.rollover.RolloverResponse import org.opensearch.action.admin.indices.stats.IndicesStatsRequest import org.opensearch.action.admin.indices.stats.IndicesStatsResponse import org.opensearch.common.unit.ByteSizeValue +import org.opensearch.action.support.master.AcknowledgedResponse +import org.opensearch.client.Client +import org.opensearch.cluster.service.ClusterService import org.opensearch.common.unit.TimeValue +import org.opensearch.index.IndexNotFoundException import org.opensearch.indexmanagement.indexstatemanagement.action.RolloverAction import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.getRolloverAlias import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.getRolloverSkip @@ -33,6 +39,7 @@ class AttemptRolloverStep(private val action: RolloverAction) : Step(name) { private val logger = LogManager.getLogger(javaClass) private var stepStatus = StepStatus.STARTING private var info: Map? = null + private var newIndex: String? = null // this variable holds the new index name if rollover is successful in this run @Suppress("ComplexMethod", "LongMethod") override suspend fun execute(): Step { @@ -53,6 +60,9 @@ class AttemptRolloverStep(private val action: RolloverAction) : Step(name) { if (clusterService.state().metadata.index(indexName).rolloverInfos.containsKey(rolloverTarget)) { stepStatus = StepStatus.COMPLETED info = mapOf("message" to getAlreadyRolledOverMessage(indexName, rolloverTarget)) + + // If already rolled over, alias may not get copied over yet + copyAlias(clusterService, indexName, context.client, rolloverTarget, context.metadata) return this } @@ -114,6 +124,7 @@ class AttemptRolloverStep(private val action: RolloverAction) : Step(name) { " numDocs=$numDocs, indexSize=${indexSize.bytes}, primaryShardSize=${largestPrimaryShardSize.bytes}]" ) executeRollover(context, rolloverTarget, isDataStream, conditions) + copyAlias(clusterService, indexName, context.client, rolloverTarget, context.metadata) } else { stepStatus = StepStatus.CONDITION_NOT_MET info = mapOf("message" to getPendingMessage(indexName), "conditions" to conditions) @@ -158,14 +169,14 @@ class AttemptRolloverStep(private val action: RolloverAction) : Step(name) { val indexName = context.metadata.index val metadata = context.clusterService.state().metadata val indexAlias = metadata.index(indexName)?.aliases?.get(alias) - logger.debug("Index $indexName has aliases $indexAlias") + logger.debug("Index {} has aliases {}", indexName, indexAlias) if (indexAlias == null) { return false } val isWriteIndex = indexAlias.writeIndex() // this could be null if (isWriteIndex != true) { val aliasIndices = metadata.indicesLookup[alias]?.indices?.map { it.index } - logger.debug("Alias $alias contains indices $aliasIndices") + logger.debug("Alias {} contains indices {}", alias, aliasIndices) if (aliasIndices != null && aliasIndices.size > 1) { return false } @@ -225,6 +236,9 @@ class AttemptRolloverStep(private val action: RolloverAction) : Step(name) { else -> getSuccessMessage(indexName) } + // Save newIndex later to metadata to be reused in case of failures + newIndex = response.newIndex + stepStatus = StepStatus.COMPLETED info = listOfNotNull( "message" to message, @@ -251,21 +265,104 @@ class AttemptRolloverStep(private val action: RolloverAction) : Step(name) { } } + /** + * This method should be called ASAP after rollover succeed + * + * Rollover currently only copy the alias being rolled over on to new index + * This method copy any remaining aliases to new index + * + * TODO This method can be deprecated once this issue finished + * https://github.com/opensearch-project/index-management/issues/849 finished + */ + @Suppress("ComplexMethod") + private suspend fun copyAlias( + clusterService: ClusterService, + indexName: String, + client: Client, + rolloverTarget: String, + metadata: ManagedIndexMetaData + ) { + if (!action.copyAlias) return + + // Try to preserve the rollover conditions + val conditions = info?.get("conditions") ?: context?.metadata?.info?.get("conditions") + + val rolledOverIndexName = newIndex ?: metadata.rolledOverIndexName + if (rolledOverIndexName == null) { + // Only in rare case when the program shut down unexpectedly before rolledOverIndexName is set or metadata corrupted + // ISM cannot auto recover from this case, so the status is COMPLETED + logger.error("$indexName rolled over but cannot find the rolledOverIndexName to copy aliases to") + stepStatus = StepStatus.COMPLETED + info = listOfNotNull( + "message" to getCopyAliasRolledOverIndexNotFoundMessage(indexName), + if (conditions != null) "conditions" to conditions else null + ).toMap() + return + } + + val aliasActions = mutableListOf() + val aliases = clusterService.state().metadata().index(indexName).aliases + for (alias in aliases) { + val aliasName = alias.key + // Skip the alias that has been rolled over on, since it's already copied + if (aliasName == rolloverTarget) continue + + val aliasMetadata = alias.value + val aliasAction = AliasActions(AliasActions.Type.ADD).index(rolledOverIndexName) + .alias(aliasMetadata.alias) + .filter(aliasMetadata.filter?.toString()) + .searchRouting(aliasMetadata.searchRouting) + .indexRouting(aliasMetadata.indexRouting) + .isHidden(aliasMetadata.isHidden) + aliasActions.add(aliasAction) + } + val aliasReq = IndicesAliasesRequest() + aliasActions.forEach { aliasReq.addAliasAction(it) } + + try { + val aliasRes: AcknowledgedResponse = client.admin().indices().suspendUntil { aliases(aliasReq, it) } + if (aliasRes.isAcknowledged) { + stepStatus = StepStatus.COMPLETED + info = listOfNotNull( + "message" to getSuccessCopyAliasMessage(indexName, rolledOverIndexName), + if (conditions != null) "conditions" to conditions else null + ).toMap() + } else { + stepStatus = StepStatus.FAILED + info = listOfNotNull( + "message" to getCopyAliasNotAckMessage(indexName, rolledOverIndexName), + if (conditions != null) "conditions" to conditions else null + ).toMap() + } + } catch (e: IndexNotFoundException) { + logger.error("Index not found while copying alias from $indexName to $rolledOverIndexName", e) + stepStatus = StepStatus.FAILED + info = listOfNotNull( + "message" to getCopyAliasIndexNotFoundMessage(rolledOverIndexName), + if (conditions != null) "conditions" to conditions else null + ).toMap() + } catch (e: Exception) { + handleException(indexName, e, getFailedCopyAliasMessage(indexName, rolledOverIndexName), conditions) + } + } + override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData { return currentMetadata.copy( stepMetaData = StepMetaData(name, getStepStartTime(currentMetadata).toEpochMilli(), stepStatus), rolledOver = if (currentMetadata.rolledOver == true) true else stepStatus == StepStatus.COMPLETED, + rolledOverIndexName = if (currentMetadata.rolledOverIndexName != null) currentMetadata.rolledOverIndexName else newIndex, transitionTo = null, info = info ) } - private fun handleException(indexName: String, e: Exception, message: String = getFailedMessage(indexName)) { + private fun handleException(indexName: String, e: Exception, message: String = getFailedMessage(indexName), conditions: Any? = null) { logger.error(message, e) stepStatus = StepStatus.FAILED - val mutableInfo = mutableMapOf("message" to message) + val mutableInfo: MutableMap = mutableMapOf("message" to message) val errorMessage = e.message if (errorMessage != null) mutableInfo["cause"] = errorMessage + if (conditions != null) mutableInfo["conditions"] = conditions info = mutableInfo.toMap() } @@ -288,5 +385,15 @@ class AttemptRolloverStep(private val action: RolloverAction) : Step(name) { fun getSkipRolloverMessage(index: String) = "Skipped rollover action for [index=$index]" fun getAlreadyRolledOverMessage(index: String, alias: String) = "This index has already been rolled over using this alias, treating as a success [index=$index, alias=$alias]" + fun getSuccessCopyAliasMessage(index: String, newIndex: String) = + "Successfully rolled over and copied alias from [index=$index] to [index=$newIndex]" + fun getFailedCopyAliasMessage(index: String, newIndex: String) = + "Successfully rolled over but failed to copied alias from [index=$index] to [index=$newIndex]" + fun getCopyAliasNotAckMessage(index: String, newIndex: String) = + "Successfully rolled over but copy alias from [index=$index] to [index=$newIndex] is not acknowledged" + fun getCopyAliasIndexNotFoundMessage(newIndex: String?) = + "Successfully rolled over but new index [index=$newIndex] not found during copy alias" + fun getCopyAliasRolledOverIndexNotFoundMessage(index: String?) = + "Successfully rolled over [index=$index] but ISM cannot find rolled over index from metadata to copy aliases to, please manually copy" } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt index 1e031193d..250026e49 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt @@ -367,6 +367,7 @@ fun ManagedIndexMetaData.getCompletedManagedIndexMetaData( return this.copy( policyCompleted = updatedStepMetaData.policyCompleted, rolledOver = updatedStepMetaData.rolledOver, + rolledOverIndexName = updatedStepMetaData.rolledOverIndexName, actionMetaData = updatedActionMetaData, stepMetaData = updatedStepMetaData.stepMetaData, transitionTo = updatedStepMetaData.transitionTo, diff --git a/src/main/resources/mappings/opendistro-ism-config.json b/src/main/resources/mappings/opendistro-ism-config.json index e82a1937a..588e886b4 100644 --- a/src/main/resources/mappings/opendistro-ism-config.json +++ b/src/main/resources/mappings/opendistro-ism-config.json @@ -1,6 +1,6 @@ { "_meta" : { - "schema_version": 18 + "schema_version": 19 }, "dynamic": "strict", "properties": { @@ -229,6 +229,9 @@ }, "min_primary_shard_size": { "type": "keyword" + }, + "copy_alias": { + "type": "boolean" } } }, @@ -708,6 +711,9 @@ "rolled_over": { "type": "boolean" }, + "rolled_over_index_name": { + "type": "keyword" + }, "index_creation_date": { "type": "date", "format": "strict_date_time||epoch_millis" diff --git a/src/main/resources/mappings/opendistro-ism-history.json b/src/main/resources/mappings/opendistro-ism-history.json index ca5a8d8de..0e7db6d40 100644 --- a/src/main/resources/mappings/opendistro-ism-history.json +++ b/src/main/resources/mappings/opendistro-ism-history.json @@ -1,6 +1,6 @@ { "_meta" : { - "schema_version": 5 + "schema_version": 6 }, "dynamic": "strict", "properties": { @@ -37,6 +37,9 @@ "rolled_over": { "type": "boolean" }, + "rolled_over_index_name": { + "type": "keyword" + }, "index_creation_date": { "type": "date", "format": "strict_date_time||epoch_millis" diff --git a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt index 9d312a193..950791f71 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt @@ -7,22 +7,23 @@ package org.opensearch.indexmanagement import org.apache.http.entity.ContentType import org.apache.http.entity.StringEntity +import org.apache.logging.log4j.LogManager import org.junit.AfterClass import org.junit.Before import org.junit.rules.DisableOnDebug import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksAction import org.opensearch.client.Request +import org.opensearch.client.RequestOptions import org.opensearch.client.Response +import org.opensearch.client.ResponseException import org.opensearch.client.RestClient -import org.opensearch.client.RequestOptions import org.opensearch.client.WarningsHandler -import org.opensearch.client.ResponseException import org.opensearch.common.Strings import org.opensearch.common.io.PathUtils import org.opensearch.common.settings.Settings +import org.opensearch.common.xcontent.XContentType import org.opensearch.core.xcontent.DeprecationHandler import org.opensearch.core.xcontent.NamedXContentRegistry -import org.opensearch.common.xcontent.XContentType import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_HIDDEN import org.opensearch.rest.RestStatus import java.io.IOException @@ -32,13 +33,11 @@ import javax.management.MBeanServerInvocationHandler import javax.management.ObjectName import javax.management.remote.JMXConnectorFactory import javax.management.remote.JMXServiceURL -import kotlin.collections.ArrayList -import kotlin.collections.HashSet abstract class IndexManagementRestTestCase : ODFERestTestCase() { - val configSchemaVersion = 18 - val historySchemaVersion = 5 + val configSchemaVersion = 19 + val historySchemaVersion = 6 // Having issues with tests leaking into other tests and mappings being incorrect and they are not caught by any pending task wait check as // they do not go through the pending task queue. Ideally this should probably be written in a way to wait for the @@ -169,14 +168,19 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() { override fun preserveIndicesUponCompletion(): Boolean = true companion object { - @JvmStatic val isMultiNode = System.getProperty("cluster.number_of_nodes", "1").toInt() > 1 + val isBWCTest = System.getProperty("tests.plugin_bwc_version", "0") != "0" protected val defaultKeepIndexSet = setOf(".opendistro_security") /** * We override preserveIndicesUponCompletion to true and use this function to clean up indices * Meant to be used in @After or @AfterClass of your feature test suite */ - fun wipeAllIndices(client: RestClient = adminClient(), keepIndex: kotlin.collections.Set = defaultKeepIndexSet) { + fun wipeAllIndices(client: RestClient = adminClient(), keepIndex: Set = defaultKeepIndexSet, skip: Boolean = false) { + val logger = LogManager.getLogger(IndexManagementRestTestCase::class.java) + if (skip) { + logger.info("Skipping wipeAllIndices...") + return + } try { client.performRequest(Request("DELETE", "_data_stream/*")) } catch (e: ResponseException) { diff --git a/src/test/kotlin/org/opensearch/indexmanagement/bwc/ISMBackwardsCompatibilityIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/bwc/ISMBackwardsCompatibilityIT.kt new file mode 100644 index 000000000..d2e5fbedc --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/bwc/ISMBackwardsCompatibilityIT.kt @@ -0,0 +1,217 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.bwc + +import org.junit.Assert +import org.opensearch.common.settings.Settings +import org.opensearch.indexmanagement.IndexManagementIndices.Companion.HISTORY_WRITE_INDEX_ALIAS +import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX +import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase +import org.opensearch.indexmanagement.indexstatemanagement.action.RolloverAction +import org.opensearch.indexmanagement.indexstatemanagement.step.rollover.AttemptRolloverStep +import org.opensearch.indexmanagement.waitFor +import java.util.Locale + +class ISMBackwardsCompatibilityIT : IndexStateManagementRestTestCase() { + + private val testIndexName = javaClass.simpleName.lowercase(Locale.ROOT) + + private enum class ClusterType { + OLD, + MIXED, + UPGRADED; + + companion object { + fun parse(value: String): ClusterType { + return when (value) { + "old_cluster" -> OLD + "mixed_cluster" -> MIXED + "upgraded_cluster" -> UPGRADED + else -> throw AssertionError("Unknown cluster type: $value") + } + } + } + } + + private fun getPluginUri(): String { + return when (CLUSTER_TYPE) { + ClusterType.OLD -> "_nodes/$CLUSTER_NAME-0/plugins" + ClusterType.MIXED -> { + when (System.getProperty("tests.rest.bwcsuite_round")) { + "second" -> "_nodes/$CLUSTER_NAME-1/plugins" + "third" -> "_nodes/$CLUSTER_NAME-2/plugins" + else -> "_nodes/$CLUSTER_NAME-0/plugins" + } + } + ClusterType.UPGRADED -> "_nodes/plugins" + } + } + + companion object { + private val CLUSTER_TYPE = ClusterType.parse(System.getProperty("tests.rest.bwcsuite")) + private val CLUSTER_NAME = System.getProperty("tests.clustername") + } + + override fun preserveIndicesUponCompletion(): Boolean = true + + override fun preserveReposUponCompletion(): Boolean = true + + override fun preserveTemplatesUponCompletion(): Boolean = true + + override fun restClientSettings(): Settings { + return Settings.builder() + .put(super.restClientSettings()) + // increase the timeout here to 90 seconds to handle long waits for a green + // cluster health. the waits for green need to be longer than a minute to + // account for delayed shards + .put(CLIENT_SOCKET_TIMEOUT, "90s") + .build() + } + + @Throws(Exception::class) + @Suppress("UNCHECKED_CAST") + fun `test rollover policy backwards compatibility`() { + val indexNameBase = "${testIndexName}_index" + val index1 = "$indexNameBase-1" + val newIndex1 = "$indexNameBase-000002" + val aliasName1 = "${testIndexName}_alias" + + val index2 = "$indexNameBase-2-1" + val newIndex2 = "$indexNameBase-2-000002" + val aliasName2 = "${testIndexName}_alias2" + + val policyID = "${testIndexName}_testPolicyName_doc_1" + + val uri = getPluginUri() + val responseMap = getAsMap(uri)["nodes"] as Map> + for (response in responseMap.values) { + val plugins = response["plugins"] as List> + val pluginNames = plugins.map { plugin -> plugin ["name"] }.toSet() + when (CLUSTER_TYPE) { + ClusterType.OLD -> { + assertTrue(pluginNames.contains("opendistro-index-management") || pluginNames.contains("opensearch-index-management")) + + createRolloverPolicy(policyID) + + createIndex(index1, policyID, aliasName1) + createIndex(index2, policyID, aliasName2) + + // Change the start time so the job will trigger in 2 seconds, this will trigger the first initialization of the policy + updateManagedIndexConfigStartTime(getExistingManagedIndexConfig(index1)) + waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(index1).policyID) } + + // Change the start time so the job will trigger in 2 seconds, this will trigger the first initialization of the policy + updateManagedIndexConfigStartTime(getExistingManagedIndexConfig(index2)) + waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(index2).policyID) } + + verifyPendingRollover(index1) + verifyPendingRollover(index2) + } + ClusterType.MIXED -> { + assertTrue(pluginNames.contains("opensearch-index-management")) + + verifyPendingRollover(index1) + verifyPendingRollover(index2) + } + ClusterType.UPGRADED -> { + assertTrue(pluginNames.contains("opensearch-index-management")) + + verifyPendingRollover(index1) + insertSampleData(index = index1, docCount = 5, delay = 0) + verifySuccessfulRollover(index1, newIndex1) + + verifyIndexSchemaVersion(INDEX_MANAGEMENT_INDEX, configSchemaVersion) + verifyIndexSchemaVersion(HISTORY_WRITE_INDEX_ALIAS, historySchemaVersion) + + insertSampleData(index = index2, docCount = 5, delay = 0) + verifySuccessfulRollover(index2, newIndex2) + + deleteIndex("$indexNameBase*") + } + } + break + } + } + + private fun createRolloverPolicy(policyID: String) { + val policy = """ + { + "policy": { + "policy_id": "$policyID", + "description": "description", + "default_state": "RolloverAction", + "states": [ + { + "name": "RolloverAction", + "actions": [ + { + "rollover": { + "min_doc_count": 3, + "min_index_age": "2d" + } + } + ], + "transitions": [ + + ] + } + ] + } + } + """.trimIndent() + createPolicyJson(policy, policyID) + } + + @Suppress("UNCHECKED_CAST") + private fun verifyPendingRollover(index: String) { + val managedIndexConfig = getExistingManagedIndexConfig(index) + // Need to speed up to second execution where it will trigger the first execution of the action + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { + val info = getExplainManagedIndexMetaData(index).info as Map + assertEquals( + "Index rollover before it met the condition.", + AttemptRolloverStep.getPendingMessage(index), info["message"] + ) + val conditions = info["conditions"] as Map + assertEquals( + "Did not have exclusively min age and min doc count conditions", + setOf(RolloverAction.MIN_INDEX_AGE_FIELD, RolloverAction.MIN_DOC_COUNT_FIELD), conditions.keys + ) + val minAge = conditions[RolloverAction.MIN_INDEX_AGE_FIELD] as Map + val minDocCount = conditions[RolloverAction.MIN_DOC_COUNT_FIELD] as Map + assertEquals("Did not have min age condition", "2d", minAge["condition"]) + assertTrue("Did not have min age current", minAge["current"] is String) + assertEquals("Did not have min doc count condition", 3, minDocCount["condition"]) + assertEquals("Did not have min doc count current", 0, minDocCount["current"]) + } + } + + @Suppress("UNCHECKED_CAST") + private fun verifySuccessfulRollover(index: String, newIndex: String) { + val managedIndexConfig = getExistingManagedIndexConfig(index) + // Need to speed up to second execution where it will trigger the first execution of the action + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { + val metadata = getExplainManagedIndexMetaData(index) + val info = metadata.info as Map + assertEquals("Index did not rollover", AttemptRolloverStep.getSuccessMessage(index), info["message"]) + val conditions = info["conditions"] as Map + assertEquals( + "Did not have exclusively min age and min doc count conditions", + setOf(RolloverAction.MIN_INDEX_AGE_FIELD, RolloverAction.MIN_DOC_COUNT_FIELD), conditions.keys + ) + val minAge = conditions[RolloverAction.MIN_INDEX_AGE_FIELD] as Map + val minDocCount = conditions[RolloverAction.MIN_DOC_COUNT_FIELD] as Map + assertEquals("Did not have min age condition", "2d", minAge["condition"]) + assertTrue("Did not have min age current", minAge["current"] is String) + assertEquals("Did not have min doc count condition", 3, minDocCount["condition"]) + assertEquals("Did not have min doc count current", 5, minDocCount["current"]) + assertEquals("Did not have rolled over index name", metadata.rolledOverIndexName, newIndex) + } + Assert.assertTrue("New rollover index does not exist.", indexExists(newIndex)) + } +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt index ff6283f2a..a213bd3a5 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt @@ -24,9 +24,9 @@ import org.opensearch.common.settings.Settings import org.opensearch.common.unit.TimeValue import org.opensearch.core.xcontent.DeprecationHandler import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.core.xcontent.XContentParser.Token -import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken import org.opensearch.common.xcontent.XContentType import org.opensearch.common.xcontent.json.JsonXContent.jsonXContent import org.opensearch.index.seqno.SequenceNumbers @@ -67,6 +67,7 @@ import org.opensearch.indexmanagement.waitFor import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule import org.opensearch.rest.RestRequest import org.opensearch.rest.RestStatus +import org.opensearch.indexmanagement.rollup.randomTermQuery import org.opensearch.test.OpenSearchTestCase import java.io.IOException import java.time.Duration @@ -77,7 +78,7 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase() @After fun clearIndicesAfterEachTest() { - wipeAllIndices() + wipeAllIndices(skip = isBWCTest) } val explainResponseOpendistroPolicyIdSetting = "index.opendistro.index_state_management.policy_id" @@ -89,13 +90,11 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase() updateIndexStateManagementJitterSetting(0.0) } - @Before - protected fun disableValidationService() { + protected open fun disableValidationService() { updateValidationServiceSetting(false) } - @Before - protected fun enableValidationService() { + protected open fun enableValidationService() { updateValidationServiceSetting(true) } @@ -238,16 +237,28 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase() index: String, alias: String, action: String = "remove", - isWriteIndex: Boolean = false + isWriteIndex: Boolean = false, + routing: Int? = null, + searchRouting: Int = randomInt(), + indexRouting: Int = randomInt(), + filter: String = randomTermQuery().toString(), + isHidden: Boolean = randomBoolean() ) { val isWriteIndexField = if (isWriteIndex) "\",\"is_write_index\": \"$isWriteIndex" else "" + val params = if (action == "add" && routing != null) """ + ,"routing": $routing, + "search_routing": $searchRouting, + "index_routing": $indexRouting, + "filter": $filter, + "is_hidden": $isHidden + """.trimIndent() else "" val body = """ { "actions": [ { "$action": { "index": "$index", - "alias": "$alias$isWriteIndexField" + "alias": "$alias$isWriteIndexField"$params } } ] diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionIT.kt index b0eb1f86d..dcb2c28b2 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionIT.kt @@ -36,13 +36,12 @@ class RolloverActionIT : IndexStateManagementRestTestCase() { private val testIndexName = javaClass.simpleName.lowercase(Locale.ROOT) - @Suppress("UNCHECKED_CAST") fun `test rollover no condition`() { val aliasName = "${testIndexName}_alias" val indexNameBase = "${testIndexName}_index" val firstIndex = "$indexNameBase-1" val policyID = "${testIndexName}_testPolicyName_1" - val actionConfig = RolloverAction(null, null, null, null, 0) + val actionConfig = RolloverAction(null, null, null, null, false, 0) val states = listOf(State(name = "RolloverAction", actions = listOf(actionConfig), transitions = listOf())) val policy = Policy( id = policyID, @@ -96,7 +95,7 @@ class RolloverActionIT : IndexStateManagementRestTestCase() { ) val policyID = "${testIndexName}_bwc" - val actionConfig = RolloverAction(null, null, null, null, 0) + val actionConfig = RolloverAction(null, null, null, null, false, 0) val states = listOf(State(name = "RolloverAction", actions = listOf(actionConfig), transitions = listOf())) val policy = Policy( id = policyID, @@ -133,7 +132,7 @@ class RolloverActionIT : IndexStateManagementRestTestCase() { val indexNameBase = "${testIndexName}_index_byte" val firstIndex = "$indexNameBase-1" val policyID = "${testIndexName}_testPolicyName_byte_1" - val actionConfig = RolloverAction(ByteSizeValue(10, ByteSizeUnit.BYTES), 1000000, null, null, 0) + val actionConfig = RolloverAction(ByteSizeValue(10, ByteSizeUnit.BYTES), 1000000, null, null, false, 0) val states = listOf(State(name = "RolloverAction", actions = listOf(actionConfig), transitions = listOf())) val policy = Policy( id = policyID, @@ -205,7 +204,7 @@ class RolloverActionIT : IndexStateManagementRestTestCase() { val indexNameBase = "${testIndexName}_index_primary_shard" val firstIndex = "$indexNameBase-1" val policyID = "${testIndexName}_primary_shard_1" - val actionConfig = RolloverAction(null, null, null, ByteSizeValue(100, ByteSizeUnit.KB), 0) + val actionConfig = RolloverAction(null, null, null, ByteSizeValue(100, ByteSizeUnit.KB), false, 0) val states = listOf(State(name = "RolloverAction", actions = listOf(actionConfig), transitions = listOf())) val policy = Policy( id = policyID, @@ -315,7 +314,7 @@ class RolloverActionIT : IndexStateManagementRestTestCase() { val indexNameBase = "${testIndexName}_index_doc" val firstIndex = "$indexNameBase-1" val policyID = "${testIndexName}_testPolicyName_doc_1" - val actionConfig = RolloverAction(null, 3, TimeValue.timeValueDays(2), null, 0) + val actionConfig = RolloverAction(null, 3, TimeValue.timeValueDays(2), null, false, 0) val states = listOf(State(name = "RolloverAction", actions = listOf(actionConfig), transitions = listOf())) val policy = Policy( id = policyID, @@ -389,7 +388,7 @@ class RolloverActionIT : IndexStateManagementRestTestCase() { val index2 = "index-2" val alias1 = "x" val policyID = "${testIndexName}_precheck" - val actionConfig = RolloverAction(null, 3, TimeValue.timeValueDays(2), null, 0) + val actionConfig = RolloverAction(null, 3, TimeValue.timeValueDays(2), null, false, 0) actionConfig.configRetry = ActionRetry(0) val states = listOf(State(name = "RolloverAction", actions = listOf(actionConfig), transitions = listOf())) val policy = Policy( @@ -448,7 +447,7 @@ class RolloverActionIT : IndexStateManagementRestTestCase() { val policyID = "${testIndexName}_rollover_policy" // Create the rollover policy - val rolloverAction = RolloverAction(null, null, null, null, 0) + val rolloverAction = RolloverAction(null, null, null, null, false, 0) val states = listOf(State(name = "default", actions = listOf(rolloverAction), transitions = listOf())) val policy = Policy( id = policyID, @@ -496,6 +495,9 @@ class RolloverActionIT : IndexStateManagementRestTestCase() { managedIndexConfig = getExistingManagedIndexConfig(secondIndexName) updateManagedIndexConfigStartTime(managedIndexConfig) waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(secondIndexName).policyID) } + + val metadata = getExplainManagedIndexMetaData(firstIndexName) + assertEquals(metadata.rolledOverIndexName, secondIndexName) } @Suppress("UNCHECKED_CAST") @@ -504,7 +506,7 @@ class RolloverActionIT : IndexStateManagementRestTestCase() { val policyID = "${testIndexName}_rollover_policy_multi" // Create the rollover policy - val rolloverAction = RolloverAction(null, 3, TimeValue.timeValueDays(2), null, 0) + val rolloverAction = RolloverAction(null, 3, TimeValue.timeValueDays(2), null, false, 0) val states = listOf(State(name = "default", actions = listOf(rolloverAction), transitions = listOf())) val policy = Policy( id = policyID, @@ -589,15 +591,17 @@ class RolloverActionIT : IndexStateManagementRestTestCase() { val secondIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 2L) Assert.assertTrue("New rollover index does not exist.", indexExists(secondIndexName)) + + val metadata = getExplainManagedIndexMetaData(firstIndexName) + assertEquals(metadata.rolledOverIndexName, secondIndexName) } - @Suppress("UNCHECKED_CAST") fun `test rollover from outside ISM doesn't fail ISM job`() { val aliasName = "${testIndexName}_alias" val indexNameBase = "${testIndexName}_index" val firstIndex = "$indexNameBase-1" val policyID = "${testIndexName}_testPolicyName_1" - val actionConfig = RolloverAction(null, null, null, null, 0) + val actionConfig = RolloverAction(null, null, null, null, false, 0) val states = listOf(State(name = "RolloverAction", actions = listOf(actionConfig), transitions = listOf())) val policy = Policy( id = policyID, @@ -632,4 +636,102 @@ class RolloverActionIT : IndexStateManagementRestTestCase() { } assertTrue("New rollover index does not exist.", indexExists("$indexNameBase-000002")) } + + @Suppress("UNCHECKED_CAST") + fun `test rollover with copy alias`() { + val aliasName = "${testIndexName}_doc_alias" + val indexNameBase = "${testIndexName}_index_doc" + val firstIndex = "$indexNameBase-1" + val policyID = "${testIndexName}_testPolicyName_doc_1" + val actionConfig = RolloverAction(null, 3, TimeValue.timeValueDays(2), null, true, 0) + val states = listOf(State(name = "RolloverAction", actions = listOf(actionConfig), transitions = listOf())) + val policy = Policy( + id = policyID, + description = "$testIndexName description", + schemaVersion = 1L, + lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), + errorNotification = randomErrorNotification(), + defaultState = states[0].name, + states = states + ) + + createPolicy(policy, policyID) + // create index defaults + createIndex(firstIndex, policyID, aliasName) + + // Add a bunch of aliases + changeAlias( + firstIndex, "test_alias1", "add", routing = 0, searchRouting = 1, indexRouting = 2, + filter = """ + { "term": { "user.id": "kimchy" } } + """.trimIndent(), + isHidden = false + ) + changeAlias(firstIndex, "test_alias2", "add") + changeAlias(firstIndex, "test_alias3", "add") + + val managedIndexConfig = getExistingManagedIndexConfig(firstIndex) + + // Change the start time so the job will trigger in 2 seconds, this will trigger the first initialization of the policy + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(firstIndex).policyID) } + + // Need to speed up to second execution where it will trigger the first execution of the action + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { + val info = getExplainManagedIndexMetaData(firstIndex).info as Map + assertEquals( + "Index rollover before it met the condition.", + AttemptRolloverStep.getPendingMessage(firstIndex), info["message"] + ) + val conditions = info["conditions"] as Map + assertEquals( + "Did not have exclusively min age and min doc count conditions", + setOf(RolloverAction.MIN_INDEX_AGE_FIELD, RolloverAction.MIN_DOC_COUNT_FIELD), conditions.keys + ) + val minAge = conditions[RolloverAction.MIN_INDEX_AGE_FIELD] as Map + val minDocCount = conditions[RolloverAction.MIN_DOC_COUNT_FIELD] as Map + assertEquals("Did not have min age condition", "2d", minAge["condition"]) + assertTrue("Did not have min age current", minAge["current"] is String) + assertEquals("Did not have min doc count condition", 3, minDocCount["condition"]) + assertEquals("Did not have min doc count current", 0, minDocCount["current"]) + } + + insertSampleData(index = firstIndex, docCount = 5, delay = 0) + + // Need to speed up to second execution where it will trigger the first execution of the action + updateManagedIndexConfigStartTime(managedIndexConfig) + val newIndex = "$indexNameBase-000002" + waitFor { + val metadata = getExplainManagedIndexMetaData(firstIndex) + val info = metadata.info as Map + assertEquals("Index did not rollover", AttemptRolloverStep.getSuccessCopyAliasMessage(firstIndex, newIndex), info["message"]) + val conditions = info["conditions"] as Map + assertEquals( + "Did not have exclusively min age and min doc count conditions", + setOf(RolloverAction.MIN_INDEX_AGE_FIELD, RolloverAction.MIN_DOC_COUNT_FIELD), conditions.keys + ) + val minAge = conditions[RolloverAction.MIN_INDEX_AGE_FIELD] as Map + val minDocCount = conditions[RolloverAction.MIN_DOC_COUNT_FIELD] as Map + assertEquals("Did not have min age condition", "2d", minAge["condition"]) + assertTrue("Did not have min age current", minAge["current"] is String) + assertEquals("Did not have min doc count condition", 3, minDocCount["condition"]) + assertEquals("Did not have min doc count current", 5, minDocCount["current"]) + assertEquals("Did not have rolled over index name", metadata.rolledOverIndexName, newIndex) + } + Assert.assertTrue("New rollover index does not exist.", indexExists(newIndex)) + + // Check if new index has the aliases + val alias = getAlias(newIndex, "") + Assert.assertEquals(alias.containsKey("test_alias1"), true) + alias["test_alias1"]!!.let { + it as Map + assertEquals(it["is_hidden"], false) + assertEquals(it["search_routing"], "1") + assertEquals(it["index_routing"], "2") + assertEquals(it["filter"]!!.toString(), "{term={user.id=kimchy}}") + } + Assert.assertEquals(alias.containsKey("test_alias2"), true) + Assert.assertEquals(alias.containsKey("test_alias3"), true) + } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptRolloverStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptRolloverStepTests.kt new file mode 100644 index 000000000..845d7d429 --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptRolloverStepTests.kt @@ -0,0 +1,194 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.step + +import com.nhaarman.mockitokotlin2.any +import com.nhaarman.mockitokotlin2.doAnswer +import com.nhaarman.mockitokotlin2.doReturn +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.whenever +import kotlinx.coroutines.runBlocking +import org.junit.Before +import org.opensearch.action.ActionListener +import org.opensearch.action.admin.indices.rollover.RolloverResponse +import org.opensearch.action.support.master.AcknowledgedResponse +import org.opensearch.client.AdminClient +import org.opensearch.client.Client +import org.opensearch.client.IndicesAdminClient +import org.opensearch.cluster.ClusterState +import org.opensearch.cluster.metadata.IndexMetadata +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.settings.Settings +import org.opensearch.indexmanagement.indexstatemanagement.action.RolloverAction +import org.opensearch.indexmanagement.indexstatemanagement.step.rollover.AttemptRolloverStep +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.jobscheduler.spi.utils.LockService +import org.opensearch.script.ScriptService +import org.opensearch.test.OpenSearchTestCase +import org.opensearch.cluster.metadata.Metadata +import org.opensearch.index.IndexNotFoundException +import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings + +class AttemptRolloverStepTests : OpenSearchTestCase() { + + private val clusterService: ClusterService = mock() + private val scriptService: ScriptService = mock() + private val settings: Settings = Settings.EMPTY + private val lockService: LockService = LockService(mock(), clusterService) + private val oldIndexName = "old_index" + private val newIndexName = "new_index" + val alias = "alias" + + @Before + fun setup() { + // mock rollover target + val clusterState: ClusterState = mock() + val metadata: Metadata = mock() + val indexMetadata: IndexMetadata = mock() + val settings = Settings.builder() + .put(ManagedIndexSettings.ROLLOVER_ALIAS.key, alias) + .put(ManagedIndexSettings.ROLLOVER_SKIP.key, false) + .build() + whenever(clusterService.state()).thenReturn(clusterState) + whenever(clusterState.metadata()).thenReturn(metadata) + whenever(clusterState.metadata).thenReturn(metadata) + whenever(metadata.index(oldIndexName)).thenReturn(indexMetadata) + whenever(metadata.indicesLookup).thenReturn(sortedMapOf()) + whenever(indexMetadata.settings).thenReturn(settings) + + // mock rolloverInfos + whenever(metadata.index(oldIndexName).rolloverInfos).thenReturn(mapOf(alias to mock())) + } + + fun `test copy alias in rollover step is not acknowledged`() { + val rolloverResponse = RolloverResponse(oldIndexName, newIndexName, mapOf(), false, true, true, true) + val aliasResponse = AcknowledgedResponse(false) + // val exception = Exception("test exception") + val client = getClient(getAdminClient(getIndicesAdminClient(rolloverResponse, aliasResponse, null, null))) + + runBlocking { + val rolloverAction = RolloverAction(null, null, null, null, true, 0) + val managedIndexMetaData = ManagedIndexMetaData( + oldIndexName, "indexUuid", "policy_id", + null, null, null, + null, null, null, + null, null, null, + null, null, rolledOverIndexName = newIndexName + ) + val attemptRolloverStep = AttemptRolloverStep(rolloverAction) + val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) + attemptRolloverStep.preExecute(logger, context).execute() + val updatedManagedIndexMetaData = attemptRolloverStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + assertEquals("message info is not matched", AttemptRolloverStep.getCopyAliasNotAckMessage(oldIndexName, newIndexName), updatedManagedIndexMetaData.info?.get("message")) + } + } + + fun `test copy alias in rollover step failed`() { + val rolloverResponse = RolloverResponse(oldIndexName, newIndexName, mapOf(), false, true, true, true) + // val aliasResponse = AcknowledgedResponse(true) + val exception = Exception("test exception") + val client = getClient(getAdminClient(getIndicesAdminClient(rolloverResponse, null, null, exception))) + + runBlocking { + val rolloverAction = RolloverAction(null, null, null, null, true, 0) + val managedIndexMetaData = ManagedIndexMetaData( + oldIndexName, "indexUuid", "policy_id", + null, null, null, + null, null, null, + null, null, null, + null, null, rolledOverIndexName = newIndexName + ) + val attemptRolloverStep = AttemptRolloverStep(rolloverAction) + val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) + attemptRolloverStep.preExecute(logger, context).execute() + val updatedManagedIndexMetaData = attemptRolloverStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + assertEquals("message info is not matched", AttemptRolloverStep.getFailedCopyAliasMessage(oldIndexName, newIndexName), updatedManagedIndexMetaData.info?.get("message")) + } + } + + fun `test copy alias in rollover step failed with index not found exception`() { + val rolloverResponse = RolloverResponse(oldIndexName, newIndexName, mapOf(), false, true, true, true) + // val aliasResponse = AcknowledgedResponse(true) + val exception = IndexNotFoundException("test exception") + val client = getClient(getAdminClient(getIndicesAdminClient(rolloverResponse, null, null, exception))) + + runBlocking { + val rolloverAction = RolloverAction(null, null, null, null, true, 0) + val managedIndexMetaData = ManagedIndexMetaData( + oldIndexName, "indexUuid", "policy_id", + null, null, null, + null, null, null, + null, null, null, + null, null, rolledOverIndexName = newIndexName + ) + val attemptRolloverStep = AttemptRolloverStep(rolloverAction) + val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) + attemptRolloverStep.preExecute(logger, context).execute() + val updatedManagedIndexMetaData = attemptRolloverStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + assertEquals("message info is not matched", AttemptRolloverStep.getCopyAliasIndexNotFoundMessage(newIndexName), updatedManagedIndexMetaData.info?.get("message")) + } + } + + fun `test copy alias in rollover step but no rollodOverIndexName`() { + val rolloverResponse = RolloverResponse(oldIndexName, newIndexName, mapOf(), false, true, true, true) + val aliasResponse = AcknowledgedResponse(true) + // val exception = IndexNotFoundException("test exception") + val client = getClient(getAdminClient(getIndicesAdminClient(rolloverResponse, aliasResponse, null, null))) + + runBlocking { + val rolloverAction = RolloverAction(null, null, null, null, true, 0) + val managedIndexMetaData = ManagedIndexMetaData( + oldIndexName, "indexUuid", "policy_id", + null, null, null, + null, null, null, + null, null, null, + null, null, rolledOverIndexName = null + ) + val attemptRolloverStep = AttemptRolloverStep(rolloverAction) + val context = StepContext(managedIndexMetaData, clusterService, client, null, null, scriptService, settings, lockService) + attemptRolloverStep.preExecute(logger, context).execute() + val updatedManagedIndexMetaData = attemptRolloverStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) + assertEquals("Step status is not COMPLETED", Step.StepStatus.COMPLETED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + assertEquals("message info is not matched", AttemptRolloverStep.getCopyAliasRolledOverIndexNotFoundMessage(oldIndexName), updatedManagedIndexMetaData.info?.get("message")) + } + } + + private fun getClient(adminClient: AdminClient): Client = mock { on { admin() } doReturn adminClient } + private fun getAdminClient(indicesAdminClient: IndicesAdminClient): AdminClient = mock { on { indices() } doReturn indicesAdminClient } + private fun getIndicesAdminClient( + rolloverResponse: RolloverResponse?, + aliasResponse: AcknowledgedResponse?, + rolloverException: Exception?, + aliasException: Exception?, + ): IndicesAdminClient { + assertTrue( + "Must provide one and only one response or exception", + (rolloverResponse != null).xor(rolloverException != null) + ) + assertTrue( + "Must provide one and only one response or exception", + (aliasResponse != null).xor(aliasException != null) + ) + return mock { + doAnswer { invocationOnMock -> + val listener = invocationOnMock.getArgument>(1) + if (rolloverResponse != null) listener.onResponse(rolloverResponse) + else listener.onFailure(rolloverException) + }.whenever(this.mock).rolloverIndex(any(), any()) + + doAnswer { invocationOnMock -> + val listener = invocationOnMock.getArgument>(1) + if (aliasResponse != null) listener.onResponse(aliasResponse) + else listener.onFailure(aliasException) + }.whenever(this.mock).aliases(any(), any()) + } + } +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateRolloverIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateRolloverIT.kt index f1b20c14f..4127de728 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateRolloverIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateRolloverIT.kt @@ -32,7 +32,7 @@ class ValidateRolloverIT : IndexStateManagementRestTestCase() { val index1 = "index-1" val alias1 = "x" val policyID = "${testIndexName}_precheck" - val actionConfig = RolloverAction(null, 3, TimeValue.timeValueDays(2), null, 0) + val actionConfig = RolloverAction(null, 3, TimeValue.timeValueDays(2), null, false, 0) actionConfig.configRetry = ActionRetry(0) val states = listOf(State(name = "RolloverAction", actions = listOf(actionConfig), transitions = listOf())) val policy = Policy( @@ -84,7 +84,7 @@ class ValidateRolloverIT : IndexStateManagementRestTestCase() { val indexNameBase = "${testIndexName}_index" val index1 = "$indexNameBase-1" val policyID = "${testIndexName}_testPolicyName_1" - val actionConfig = RolloverAction(null, null, null, null, 0) + val actionConfig = RolloverAction(null, null, null, null, false, 0) val states = listOf(State(name = "RolloverAction", actions = listOf(actionConfig), transitions = listOf())) val policy = Policy( id = policyID, @@ -131,7 +131,7 @@ class ValidateRolloverIT : IndexStateManagementRestTestCase() { val index1 = "index-1" val index2 = "index-2" val policyID = "${testIndexName}_precheck" - val actionConfig = RolloverAction(null, 3, TimeValue.timeValueDays(2), null, 0) + val actionConfig = RolloverAction(null, 3, TimeValue.timeValueDays(2), null, false, 0) actionConfig.configRetry = ActionRetry(0) val states = listOf(State(name = "RolloverAction", actions = listOf(actionConfig), transitions = listOf())) val policy = Policy( @@ -175,7 +175,7 @@ class ValidateRolloverIT : IndexStateManagementRestTestCase() { val index2 = "index-2" val alias1 = "x" val policyID = "${testIndexName}_precheck" - val actionConfig = RolloverAction(null, 3, TimeValue.timeValueDays(2), null, 0) + val actionConfig = RolloverAction(null, 3, TimeValue.timeValueDays(2), null, false, 0) actionConfig.configRetry = ActionRetry(0) val states = listOf(State(name = "RolloverAction", actions = listOf(actionConfig), transitions = listOf())) val policy = Policy( diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateRolloverTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateRolloverTests.kt index 805d3b045..367a38e05 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateRolloverTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateRolloverTests.kt @@ -40,7 +40,7 @@ class ValidateRolloverTests : OpenSearchTestCase() { ("rollover", 1, 0, false, 0, null, null), null, null, null ) - val actionConfig = RolloverAction(null, 3, TimeValue.timeValueDays(2), null, 0) + val actionConfig = RolloverAction(null, 3, TimeValue.timeValueDays(2), null, false, 0) private val client: Client = mock() private val lockService: LockService = LockService(mock(), clusterService) private val validate = ValidateRollover(settings, clusterService, jvmService) diff --git a/src/test/resources/mappings/cached-opendistro-ism-config.json b/src/test/resources/mappings/cached-opendistro-ism-config.json index e82a1937a..588e886b4 100644 --- a/src/test/resources/mappings/cached-opendistro-ism-config.json +++ b/src/test/resources/mappings/cached-opendistro-ism-config.json @@ -1,6 +1,6 @@ { "_meta" : { - "schema_version": 18 + "schema_version": 19 }, "dynamic": "strict", "properties": { @@ -229,6 +229,9 @@ }, "min_primary_shard_size": { "type": "keyword" + }, + "copy_alias": { + "type": "boolean" } } }, @@ -708,6 +711,9 @@ "rolled_over": { "type": "boolean" }, + "rolled_over_index_name": { + "type": "keyword" + }, "index_creation_date": { "type": "date", "format": "strict_date_time||epoch_millis" diff --git a/src/test/resources/mappings/cached-opendistro-ism-history.json b/src/test/resources/mappings/cached-opendistro-ism-history.json index ca5a8d8de..0e7db6d40 100644 --- a/src/test/resources/mappings/cached-opendistro-ism-history.json +++ b/src/test/resources/mappings/cached-opendistro-ism-history.json @@ -1,6 +1,6 @@ { "_meta" : { - "schema_version": 5 + "schema_version": 6 }, "dynamic": "strict", "properties": { @@ -37,6 +37,9 @@ "rolled_over": { "type": "boolean" }, + "rolled_over_index_name": { + "type": "keyword" + }, "index_creation_date": { "type": "date", "format": "strict_date_time||epoch_millis"