diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/transform/WaitForTransformCompletionStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/transform/WaitForTransformCompletionStep.kt index a3b53a895..78aa41f3d 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/transform/WaitForTransformCompletionStep.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/transform/WaitForTransformCompletionStep.kt @@ -6,6 +6,7 @@ package org.opensearch.indexmanagement.indexstatemanagement.step.transform import org.apache.logging.log4j.LogManager +import org.opensearch.ExceptionsHelper import org.opensearch.indexmanagement.opensearchapi.suspendUntil import org.opensearch.indexmanagement.spi.indexstatemanagement.Step import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData @@ -69,7 +70,7 @@ class WaitForTransformCompletionStep : Step(name) { logger.info("Received the status for jobs [${response.getIdsToExplain().keys}]") return response } catch (e: RemoteTransportException) { - processFailure(transformJobId, indexName, e) + processFailure(transformJobId, indexName, ExceptionsHelper.unwrapCause(e) as Exception) } catch (e: Exception) { processFailure(transformJobId, indexName, e) } @@ -108,17 +109,8 @@ class WaitForTransformCompletionStep : Step(name) { } override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData { - val currentActionMetadata = currentMetadata.actionMetaData - val currentActionProperties = currentActionMetadata?.actionProperties - val currentTransformActionProperties = currentActionProperties?.transformActionProperties return currentMetadata.copy( - actionMetaData = currentActionMetadata?.copy( - actionProperties = currentActionProperties?.copy( - transformActionProperties = currentTransformActionProperties?.copy( - transformId = currentTransformActionProperties.transformId - ) - ) - ), + actionMetaData = currentMetadata.actionMetaData, stepMetaData = StepMetaData(name, getStepStartTime(currentMetadata).toEpochMilli(), stepStatus), transitionTo = null, info = info diff --git a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt index 97eb3fc13..80813c19a 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt @@ -28,6 +28,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_HIDDEN import org.opensearch.core.rest.RestStatus import org.opensearch.core.xcontent.MediaType import org.opensearch.indexmanagement.rollup.model.Rollup +import org.opensearch.indexmanagement.transform.model.Transform import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule import java.io.IOException import java.nio.file.Files @@ -229,6 +230,35 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() { assertEquals("Request failed", RestStatus.OK, response.restStatus()) } + protected fun updateTransformStartTime(update: Transform, desiredStartTimeMillis: Long? = null) { + // Before updating start time of a job always make sure there are no unassigned shards that could cause the config + // index to move to a new node and negate this forced start + if (isMultiNode) { + waitFor { + try { + client().makeRequest("GET", "_cluster/allocation/explain") + fail("Expected 400 Bad Request when there are no unassigned shards to explain") + } catch (e: ResponseException) { + assertEquals(RestStatus.BAD_REQUEST, e.response.restStatus()) + } + } + } + val intervalSchedule = (update.jobSchedule as IntervalSchedule) + val millis = Duration.of(intervalSchedule.interval.toLong(), intervalSchedule.unit).minusSeconds(2).toMillis() + val startTimeMillis = desiredStartTimeMillis ?: (Instant.now().toEpochMilli() - millis) + val waitForActiveShards = if (isMultiNode) "all" else "1" + val response = client().makeRequest( + "POST", "${IndexManagementPlugin.INDEX_MANAGEMENT_INDEX}/_update/${update.id}?wait_for_active_shards=$waitForActiveShards", + StringEntity( + "{\"doc\":{\"transform\":{\"schedule\":{\"interval\":{\"start_time\":" + + "\"$startTimeMillis\"}}}}}", + ContentType.APPLICATION_JSON + ) + ) + + assertEquals("Request failed", RestStatus.OK, response.restStatus()) + } + override fun preserveIndicesUponCompletion(): Boolean = true companion object { val isMultiNode = System.getProperty("cluster.number_of_nodes", "1").toInt() > 1 diff --git a/src/test/kotlin/org/opensearch/indexmanagement/SecurityRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/SecurityRestTestCase.kt index 87cbb21c0..f77a9be21 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/SecurityRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/SecurityRestTestCase.kt @@ -109,9 +109,6 @@ abstract class SecurityRestTestCase : IndexManagementRestTestCase() { private object TransformRestTestCaseExt : TransformRestTestCase() { - fun updateTransformStartTimeExt(update: Transform, desiredStartTimeMillis: Long? = null) = - super.updateTransformStartTime(update, desiredStartTimeMillis) - fun createTransformExt( transform: Transform, transformId: String = randomAlphaOfLength(10), @@ -310,9 +307,6 @@ abstract class SecurityRestTestCase : IndexManagementRestTestCase() { executeRequest(request, expectedStatus, userClient) } - protected fun updateTransformStartTime(update: Transform, desiredStartTimeMillis: Long? = null) = - TransformRestTestCaseExt.updateTransformStartTimeExt(update, desiredStartTimeMillis) - protected fun createTransform( transform: Transform, transformId: String = randomAlphaOfLength(10), diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransformActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransformActionIT.kt index 1b0de903d..a054d94bc 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransformActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransformActionIT.kt @@ -133,6 +133,7 @@ class TransformActionIT : IndexStateManagementRestTestCase() { .addAggregator(avgAggregation()) .addAggregator(valueCountAggregation()) ) + val transform = ismTransform.toTransform(indexName) val policy = preparePolicyContainingTransform(indexName, ismTransform, policyId, retry = 1) createPolicy(policy, policyId) createIndex(indexName, policyId, mapping = SOURCE_INDEX_MAPPING) @@ -140,11 +141,10 @@ class TransformActionIT : IndexStateManagementRestTestCase() { assertIndexTransformFailedInAttemptCreateTransformStep(indexName, policyId, ismTransform) // verify the wait for transform completion step will be retried and failed again. - Thread.sleep(60000) updateManagedIndexConfigStartTime(getExistingManagedIndexConfig(indexName)) waitFor { assertEquals( - AttemptCreateTransformJobStep.getFailedMessage(ismTransform.toTransform(indexName).id, indexName), + AttemptCreateTransformJobStep.getFailedMessage(transform.id, indexName), getExplainManagedIndexMetaData(indexName).info?.get("message") ) } @@ -255,7 +255,8 @@ class TransformActionIT : IndexStateManagementRestTestCase() { } private fun assertIndexTransformSucceeded(indexName: String, policyId: String, ismTransform: ISMTransform) { - val transformId = ismTransform.toTransform(indexName).id + val transform = ismTransform.toTransform(indexName) + val transformId = transform.id val managedIndexConfig = getExistingManagedIndexConfig(indexName) // Change the start time so that the policy will be initialized. @@ -271,7 +272,7 @@ class TransformActionIT : IndexStateManagementRestTestCase() { ) } - Thread.sleep(60000) + updateTransformStartTime(transform) // Change the start time so that the transform action will be attempted. updateManagedIndexConfigStartTime(managedIndexConfig) @@ -291,7 +292,8 @@ class TransformActionIT : IndexStateManagementRestTestCase() { } private fun assertIndexTransformSucceededTwice(indexName: String, policyId: String, ismTransform: ISMTransform) { - val transformId = ismTransform.toTransform(indexName).id + val transform = ismTransform.toTransform(indexName) + val transformId = transform.id val managedIndexConfig = getExistingManagedIndexConfig(indexName) // Change the start time so that the policy will be initialized. @@ -306,7 +308,7 @@ class TransformActionIT : IndexStateManagementRestTestCase() { getExplainManagedIndexMetaData(indexName).info?.get("message") ) } - Thread.sleep(60000) + updateTransformStartTime(transform) // Change the start time so that the transform action will be attempted. updateManagedIndexConfigStartTime(managedIndexConfig) @@ -341,7 +343,7 @@ class TransformActionIT : IndexStateManagementRestTestCase() { getExplainManagedIndexMetaData(indexName).info?.get("message") ) } - Thread.sleep(60000) + updateTransformStartTime(transform) // Change the start time so that the second transform action will be attempted. updateManagedIndexConfigStartTime(managedIndexConfig) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRestTestCase.kt index 4d01e49b2..aaba661c3 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRestTestCase.kt @@ -12,7 +12,6 @@ import org.apache.hc.core5.http.io.entity.StringEntity import org.apache.hc.core5.http.message.BasicHeader import org.junit.AfterClass import org.opensearch.client.Response -import org.opensearch.client.ResponseException import org.opensearch.client.RestClient import org.opensearch.common.settings.Settings import org.opensearch.core.xcontent.NamedXContentRegistry @@ -30,12 +29,8 @@ import org.opensearch.indexmanagement.transform.model.TransformMetadata import org.opensearch.indexmanagement.util._ID import org.opensearch.indexmanagement.util._PRIMARY_TERM import org.opensearch.indexmanagement.util._SEQ_NO -import org.opensearch.indexmanagement.waitFor -import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule import org.opensearch.core.rest.RestStatus import org.opensearch.search.SearchModule -import java.time.Duration -import java.time.Instant abstract class TransformRestTestCase : IndexManagementRestTestCase() { @@ -221,35 +216,6 @@ abstract class TransformRestTestCase : IndexManagementRestTestCase() { return continuousStats["documents_behind"] as Map } - protected fun updateTransformStartTime(update: Transform, desiredStartTimeMillis: Long? = null) { - // Before updating start time of a job always make sure there are no unassigned shards that could cause the config - // index to move to a new node and negate this forced start - if (isMultiNode) { - waitFor { - try { - client().makeRequest("GET", "_cluster/allocation/explain") - fail("Expected 400 Bad Request when there are no unassigned shards to explain") - } catch (e: ResponseException) { - assertEquals(RestStatus.BAD_REQUEST, e.response.restStatus()) - } - } - } - val intervalSchedule = (update.jobSchedule as IntervalSchedule) - val millis = Duration.of(intervalSchedule.interval.toLong(), intervalSchedule.unit).minusSeconds(2).toMillis() - val startTimeMillis = desiredStartTimeMillis ?: Instant.now().toEpochMilli() - millis - val waitForActiveShards = if (isMultiNode) "all" else "1" - val response = client().makeRequest( - "POST", "$INDEX_MANAGEMENT_INDEX/_update/${update.id}?wait_for_active_shards=$waitForActiveShards", - StringEntity( - "{\"doc\":{\"transform\":{\"schedule\":{\"interval\":{\"start_time\":" + - "\"$startTimeMillis\"}}}}}", - ContentType.APPLICATION_JSON - ) - ) - - assertEquals("Request failed", RestStatus.OK, response.restStatus()) - } - protected fun Transform.toHttpEntity(): HttpEntity = StringEntity(toJsonString(), ContentType.APPLICATION_JSON) override fun xContentRegistry(): NamedXContentRegistry {