From ddf4a7ba67466316541935be52bbf71d06064a1b Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Mon, 21 Nov 2022 10:58:00 -0800 Subject: [PATCH] Backport from main to 2.x (#614) * Fix all the compile warnings and detekt issues (#603) * Fix all the compile warnings and detekt issues Signed-off-by: bowenlan-amzn * Fix time capture is 0 Signed-off-by: bowenlan-amzn Signed-off-by: bowenlan-amzn * Unify test clean logic (#609) * Unify wipe indices logic after tests Signed-off-by: bowenlan-amzn * Enhance wipeAllIndices function Signed-off-by: bowenlan-amzn * Customize cleanup for multi node test Signed-off-by: bowenlan-amzn Signed-off-by: bowenlan-amzn Signed-off-by: bowenlan-amzn --- .github/workflows/bwc-test-workflow.yml | 32 +++++ .../workflows/create-documentation-issue.yml | 1 + .github/workflows/dco.yml | 18 --- .github/workflows/links.yml | 2 +- .../workflows/multi-node-test-workflow.yml | 25 +--- .github/workflows/test-and-build-workflow.yml | 2 - build.gradle | 15 ++- detekt.yml | 5 +- .../IndexStateManagementHistory.kt | 12 +- .../ManagedIndexCoordinator.kt | 31 ++--- .../ManagedIndexRunner.kt | 11 +- .../indexstatemanagement/MetadataService.kt | 6 +- .../model/ManagedIndexConfig.kt | 2 +- .../coordinator/SweptManagedIndexConfig.kt | 8 +- .../step/close/AttemptCloseStep.kt | 2 +- .../step/rollup/AttemptCreateRollupJobStep.kt | 3 +- .../TransportChangePolicyAction.kt | 2 +- .../action/explain/ExplainResponse.kt | 8 +- .../action/explain/TransportExplainAction.kt | 2 +- .../util/ManagedIndexUtils.kt | 1 + .../util/RestHandlerUtils.kt | 3 +- .../validation/ValidateDelete.kt | 2 +- .../rollup/RollupMapperService.kt | 4 +- .../index/TransportIndexRollupAction.kt | 1 - .../indexmanagement/rollup/model/Rollup.kt | 2 +- .../rollup/model/RollupMetadata.kt | 3 +- .../rollup/model/metric/Average.kt | 2 +- .../rollup/model/metric/Max.kt | 2 +- .../rollup/model/metric/Min.kt | 2 +- .../rollup/model/metric/Sum.kt | 2 +- .../rollup/model/metric/ValueCount.kt | 2 +- .../snapshotmanagement/SMUtils.kt | 5 +- .../engine/SMStateMachine.kt | 7 +- .../engine/states/SMState.kt | 2 +- .../snapshotmanagement/engine/states/State.kt | 2 +- .../engine/states/creation/CreatingState.kt | 3 +- .../engine/states/deletion/DeletingState.kt | 6 +- .../transform/TransformProcessedBucketLog.kt | 18 ++- .../transform/TransformSearchService.kt | 2 +- .../delete/TransportDeleteTransformsAction.kt | 2 +- .../get/TransportGetTransformsAction.kt | 1 + .../index/TransportIndexTransformAction.kt | 1 - .../transform/model/Transform.kt | 8 +- .../opensearchapi/OpenSearchExtensions.kt | 1 + .../indexmanagement/util/JobSchedulerUtils.kt | 2 +- .../indexmanagement/util/ScheduledJobUtils.kt | 1 + .../IndexManagementIndicesIT.kt | 3 +- .../IndexManagementRestTestCase.kt | 123 +++++++++++++++++- .../IndexStateManagementSecurityBehaviorIT.kt | 4 - .../indexmanagement/ODFERestTestCase.kt | 102 --------------- .../RollupSecurityBehaviorIT.kt | 4 - .../indexmanagement/SecurityBehaviorIT.kt | 4 - .../TransformSecurityBehaviorIT.kt | 4 - ...IndexManagementBackwardsCompatibilityIT.kt | 2 - .../IndexStateManagementIntegTestCase.kt | 11 +- .../IndexStateManagementRestTestCase.kt | 10 +- .../MetadataRegressionIT.kt | 4 +- .../action/ActionTimeoutIT.kt | 17 ++- .../action/RolloverActionIT.kt | 19 +-- .../migration/MigrationServicesIT.kt | 1 + .../util/ManagedIndexUtilsTests.kt | 3 +- .../validation/ValidateDeleteIT.kt | 4 +- .../validation/ValidateForceMergeIT.kt | 4 +- .../validation/ValidateOpenIT.kt | 2 +- .../validation/ValidateReadOnlyIT.kt | 2 +- .../validation/ValidateReadWriteIT.kt | 2 +- .../validation/ValidateReplicaCountIT.kt | 2 +- .../validation/ValidateRolloverIT.kt | 16 +-- .../RefreshSearchAnalyzerActionIT.kt | 7 +- .../RestRefreshSearchAnalyzerActionIT.kt | 8 ++ .../rollup/RollupMetadataServiceTests.kt | 6 +- .../rollup/RollupRestTestCase.kt | 12 +- .../indexmanagement/rollup/TestHelpers.kt | 2 +- .../rollup/model/ISMRollupTests.kt | 1 + .../rollup/model/WriteableTests.kt | 12 ++ .../rollup/runner/RollupRunnerIT.kt | 30 +++-- .../SnapshotManagementRestTestCase.kt | 6 + .../RestIndexSnapshotManagementIT.kt | 1 + .../indexmanagement/transform/TestHelpers.kt | 2 +- .../transform/TransformRestTestCase.kt | 5 +- .../transform/TransformRunnerIT.kt | 21 +-- .../transform/model/WriteableTests.kt | 8 ++ 82 files changed, 419 insertions(+), 319 deletions(-) create mode 100644 .github/workflows/bwc-test-workflow.yml delete mode 100644 .github/workflows/dco.yml diff --git a/.github/workflows/bwc-test-workflow.yml b/.github/workflows/bwc-test-workflow.yml new file mode 100644 index 000000000..692880cdd --- /dev/null +++ b/.github/workflows/bwc-test-workflow.yml @@ -0,0 +1,32 @@ +name: Backward compatibility test workflow +on: + pull_request: + branches: + - "*" + push: + branches: + - "*" + +jobs: + test: + # This job runs on Linux + runs-on: ubuntu-latest + steps: + # This step uses the setup-java Github action: https://github.com/actions/setup-java + - name: Set Up JDK + uses: actions/setup-java@v1 + with: + java-version: 17 + # index-management + - name: Checkout Branch + uses: actions/checkout@v2 + - name: Run IM Backwards Compatibility Tests + run: | + echo "Running backwards compatibility tests..." + ./gradlew bwcTestSuite + - name: Upload failed logs + uses: actions/upload-artifact@v2 + if: failure() + with: + name: logs + path: build/testclusters/indexmanagementBwcCluster*/logs/* diff --git a/.github/workflows/create-documentation-issue.yml b/.github/workflows/create-documentation-issue.yml index 23bc47e6f..b20b78117 100644 --- a/.github/workflows/create-documentation-issue.yml +++ b/.github/workflows/create-documentation-issue.yml @@ -2,6 +2,7 @@ name: Create Documentation Issue on: pull_request: types: + - closed - labeled env: PR_NUMBER: ${{ github.event.number }} diff --git a/.github/workflows/dco.yml b/.github/workflows/dco.yml deleted file mode 100644 index 53ed5304c..000000000 --- a/.github/workflows/dco.yml +++ /dev/null @@ -1,18 +0,0 @@ -name: Developer Certificate of Origin Check - -on: [pull_request] - -jobs: - check: - runs-on: ubuntu-latest - - steps: - - name: Get PR Commits - id: 'get-pr-commits' - uses: tim-actions/get-pr-commits@v1.1.0 - with: - token: ${{ secrets.GITHUB_TOKEN }} - - name: DCO Check - uses: tim-actions/dco@v1.1.0 - with: - commits: ${{ steps.get-pr-commits.outputs.commits }} \ No newline at end of file diff --git a/.github/workflows/links.yml b/.github/workflows/links.yml index cbcf25750..af6c25f3a 100644 --- a/.github/workflows/links.yml +++ b/.github/workflows/links.yml @@ -6,7 +6,7 @@ on: branches: [ main ] jobs: - linkchecker: + check: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 diff --git a/.github/workflows/multi-node-test-workflow.yml b/.github/workflows/multi-node-test-workflow.yml index 8f634003a..aaa37dc98 100644 --- a/.github/workflows/multi-node-test-workflow.yml +++ b/.github/workflows/multi-node-test-workflow.yml @@ -9,17 +9,15 @@ on: - "*" jobs: - build: - # Job name - name: Build Index Management + test: # This job runs on Linux runs-on: ubuntu-latest steps: # This step uses the setup-java Github action: https://github.com/actions/setup-java - - name: Set Up JDK 11 + - name: Set Up JDK uses: actions/setup-java@v1 with: - java-version: 11 + java-version: 17 # index-management - name: Checkout Branch uses: actions/checkout@v2 @@ -31,20 +29,3 @@ jobs: with: name: logs path: build/testclusters/integTest-*/logs/* - bwc: - name: Run Index Management Backwards Compatibility Tests - # This job runs on Linux - runs-on: ubuntu-latest - steps: - # This step uses the setup-java Github action: https://github.com/actions/setup-java - - name: Set Up JDK 11 - uses: actions/setup-java@v1 - with: - java-version: 11 - # index-management - - name: Checkout Branch - uses: actions/checkout@v2 - - name: Run IM Backwards Compatibility Tests - run: | - echo "Running backwards compatibility tests..." - ./gradlew bwcTestSuite diff --git a/.github/workflows/test-and-build-workflow.yml b/.github/workflows/test-and-build-workflow.yml index b3dd0ded9..d324148e8 100644 --- a/.github/workflows/test-and-build-workflow.yml +++ b/.github/workflows/test-and-build-workflow.yml @@ -9,8 +9,6 @@ on: jobs: build: - # Job name - name: Build Index Management env: BUILD_ARGS: ${{ matrix.os_build_args }} WORKING_DIR: ${{ matrix.working_directory }}. diff --git a/build.gradle b/build.gradle index 07db1a7fa..9872edc51 100644 --- a/build.gradle +++ b/build.gradle @@ -137,7 +137,6 @@ task ktlint(type: JavaExec, group: "verification") { classpath = configurations.ktlint args "src/**/*.kt", "spi/src/main/**/*.kt" } - check.dependsOn ktlint task ktlintFormat(type: JavaExec, group: "formatting") { @@ -145,12 +144,17 @@ task ktlintFormat(type: JavaExec, group: "formatting") { main = "com.pinterest.ktlint.Main" classpath = configurations.ktlint args "-F", "src/**/*.kt", "spi/src/main/**/*.kt" + // https://github.com/pinterest/ktlint/issues/1391 + jvmArgs "--add-opens=java.base/java.lang=ALL-UNNAMED" } detekt { config = files("detekt.yml") buildUponDefaultConfig = true } +// When writing detekt Gradle first finds the extension with this name, +// but with a string it should look for a task with that name instead +check.dependsOn "detekt" configurations.testImplementation { exclude module: "securemock" @@ -656,7 +660,14 @@ run { } } -compileKotlin { kotlinOptions.freeCompilerArgs = ['-Xjsr305=strict'] } +compileKotlin { + kotlinOptions.freeCompilerArgs = ['-Xjsr305=strict'] + kotlinOptions.allWarningsAsErrors = true +} + +compileTestKotlin { + kotlinOptions.allWarningsAsErrors = true +} apply from: 'build-tools/pkgbuild.gradle' diff --git a/detekt.yml b/detekt.yml index 47b9d163c..77c5afe01 100644 --- a/detekt.yml +++ b/detekt.yml @@ -1,6 +1,5 @@ -# TODO: Remove this before initial release, only for developmental purposes build: - maxIssues: 20 + maxIssues: 0 exceptions: TooGenericExceptionCaught: @@ -14,6 +13,8 @@ style: MaxLineLength: maxLineLength: 150 excludes: ['**/test/**'] + FunctionOnlyReturningConstant: + active: false complexity: LargeClass: diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementHistory.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementHistory.kt index 763603ecc..2b846f9e3 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementHistory.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementHistory.kt @@ -19,7 +19,7 @@ import org.opensearch.action.index.IndexRequest import org.opensearch.action.support.IndicesOptions import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.client.Client -import org.opensearch.cluster.LocalNodeMasterListener +import org.opensearch.cluster.LocalNodeClusterManagerListener import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings import org.opensearch.common.xcontent.ToXContent @@ -47,7 +47,7 @@ class IndexStateManagementHistory( private val threadPool: ThreadPool, private val clusterService: ClusterService, private val indexManagementIndices: IndexManagementIndices -) : LocalNodeMasterListener { +) : LocalNodeClusterManagerListener { private val logger = LogManager.getLogger(javaClass) private var scheduledRollover: Scheduler.Cancellable? = null @@ -61,7 +61,7 @@ class IndexStateManagementHistory( @Volatile private var historyNumberOfReplicas = ManagedIndexSettings.HISTORY_NUMBER_OF_REPLICAS.get(settings) init { - clusterService.addLocalNodeMasterListener(this) + clusterService.addLocalNodeClusterManagerListener(this) clusterService.clusterSettings.addSettingsUpdateConsumer(ManagedIndexSettings.HISTORY_ENABLED) { historyEnabled = it } @@ -82,7 +82,7 @@ class IndexStateManagementHistory( } } - override fun onMaster() { + override fun onClusterManager() { try { // try to rollover immediately as we might be restarting the cluster if (historyEnabled) rolloverHistoryIndex() @@ -97,12 +97,12 @@ class IndexStateManagementHistory( } } - override fun offMaster() { + override fun offClusterManager() { scheduledRollover?.cancel() } private fun rescheduleRollover() { - if (clusterService.state().nodes.isLocalNodeElectedMaster) { + if (clusterService.state().nodes.isLocalNodeElectedClusterManager) { scheduledRollover?.cancel() scheduledRollover = threadPool.scheduleWithFixedDelay( { rolloverAndDeleteHistoryIndex() }, diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt index 26d1ac590..20b621f99 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt @@ -160,8 +160,10 @@ class ManagedIndexCoordinator( } clusterService.clusterSettings.addSettingsUpdateConsumer(METADATA_SERVICE_STATUS) { metadataServiceEnabled = it == 0 - if (!metadataServiceEnabled) scheduledMoveMetadata?.cancel() - else initMoveMetadata() + if (!metadataServiceEnabled) { + logger.info("Canceling metadata moving job because of cluster setting update.") + scheduledMoveMetadata?.cancel() + } else initMoveMetadata() } clusterService.clusterSettings.addSettingsUpdateConsumer(TEMPLATE_MIGRATION_CONTROL) { templateMigrationEnabled = it >= 0L @@ -202,8 +204,8 @@ class ManagedIndexCoordinator( // Instead of using a LocalNodeMasterListener to track cluster manager changes, this service will // track them here to avoid conditions where cluster manager listener events run after other // listeners that depend on what happened in the cluster manager listener - if (this.isClusterManager != event.localNodeMaster()) { - this.isClusterManager = event.localNodeMaster() + if (this.isClusterManager != event.localNodeClusterManager()) { + this.isClusterManager = event.localNodeClusterManager() if (this.isClusterManager) { onClusterManager() } else { @@ -215,7 +217,7 @@ class ManagedIndexCoordinator( if (event.isNewCluster) return - if (!event.localNodeMaster()) return + if (!event.localNodeClusterManager()) return if (!event.metadataChanged()) return @@ -380,7 +382,7 @@ class ManagedIndexCoordinator( } /** - * Find a policy that has highest priority ism template with matching index pattern to the index and is created before index creation date. If + * Find a policy that has the highest priority ism template with matching index pattern to the index and is created before index creation date. If * the policy has user, ensure that the user can manage the index if not find the one that can. * */ private suspend fun findMatchingPolicy(indexName: String, creationDate: Long, policies: List): Policy? { @@ -422,7 +424,7 @@ class ManagedIndexCoordinator( try { val request = ManagedIndexRequest().indices(indexName) withClosableContext(IndexManagementSecurityContext("ApplyPolicyOnIndexCreation", settings, threadPool.threadContext, policy.user)) { - val response: AcknowledgedResponse = client.suspendUntil { execute(ManagedIndexAction.INSTANCE, request, it) } + client.suspendUntil { execute(ManagedIndexAction.INSTANCE, request, it) } } } catch (e: OpenSearchSecurityException) { logger.debug("Skipping applying policy ${policy.id} on $indexName as the policy user is missing permissions", e) @@ -473,13 +475,13 @@ class ManagedIndexCoordinator( // If ISM is disabled return early if (!isIndexStateManagementEnabled()) return - // Do not setup background sweep if we're not the elected cluster manager node - if (!clusterService.state().nodes().isLocalNodeElectedMaster) return + // Do not set up background sweep if we're not the elected cluster manager node + if (!clusterService.state().nodes().isLocalNodeElectedClusterManager) return // Cancel existing background sweep scheduledFullSweep?.cancel() - // Setup an anti-entropy/self-healing background sweep, in case we fail to create a ManagedIndexConfig job + // Set up an anti-entropy/self-healing background sweep, in case we fail to create a ManagedIndexConfig job val scheduledSweep = Runnable { val elapsedTime = getFullSweepElapsedTime() @@ -505,7 +507,7 @@ class ManagedIndexCoordinator( fun initMoveMetadata() { if (!metadataServiceEnabled) return if (!isIndexStateManagementEnabled()) return - if (!clusterService.state().nodes().isLocalNodeElectedMaster) return + if (!clusterService.state().nodes().isLocalNodeElectedClusterManager) return scheduledMoveMetadata?.cancel() if (metadataService.finishFlag) { @@ -535,7 +537,7 @@ class ManagedIndexCoordinator( fun initTemplateMigration(enableSetting: Long) { if (!templateMigrationEnabled) return if (!isIndexStateManagementEnabled()) return - if (!clusterService.state().nodes().isLocalNodeElectedMaster) return + if (!clusterService.state().nodes().isLocalNodeElectedClusterManager) return scheduledTemplateMigration?.cancel() // if service has finished, re-enable it @@ -657,8 +659,7 @@ class ManagedIndexCoordinator( if (scrollIDsToClear.isNotEmpty()) { val clearScrollRequest = ClearScrollRequest() clearScrollRequest.scrollIds(scrollIDsToClear.toList()) - val clearScrollResponse: ClearScrollResponse = - client.suspendUntil { execute(ClearScrollAction.INSTANCE, clearScrollRequest, it) } + client.suspendUntil { execute(ClearScrollAction.INSTANCE, clearScrollRequest, it) } } } return managedIndexUuids @@ -693,7 +694,7 @@ class ManagedIndexCoordinator( val mRes: MultiGetResponse = client.suspendUntil { multiGet(mReq, it) } val responses = mRes.responses if (responses.first().isFailed) { - // config index may not initialised yet + // config index may not initialise yet logger.error("get managed-index failed: ${responses.first().failure.failure}") return result } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt index 0b0f58d64..668012a25 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt @@ -269,7 +269,6 @@ object ManagedIndexRunner : // the cluster state index uuid differs from the one in the managed index config then the config is referring // to a different index which does not exist in the cluster. We need to check all of the extensions to confirm an index exists if (clusterStateIndexMetadata == null || clusterStateIndexUUID != managedIndexConfig.indexUuid) { - clusterStateIndexMetadata = null // If the cluster state/default index type didn't have an index with a matching name and uuid combination, try all other index types val nonDefaultIndexTypes = indexMetadataProvider.services.keys.filter { it != DEFAULT_INDEX_TYPE } val multiTypeIndexNameToMetaData = @@ -387,7 +386,7 @@ object ManagedIndexRunner : // If this action is not allowed and the step to be executed is the first step in the action then we will fail // as this action has been removed from the AllowList, but if its not the first step we will let it finish as it's already inflight if (action?.isAllowed(allowList) == false && step != null && action.isFirstStep(step.name) && action.type != TransitionsAction.name) { - val info = mapOf("message" to "Attempted to execute action=${action?.type} which is not allowed.") + val info = mapOf("message" to "Attempted to execute action=${action.type} which is not allowed.") val updated = updateManagedIndexMetaData( managedIndexMetaData.copy( policyRetryInfo = PolicyRetryInfoMetaData(true, 0), info = info @@ -404,7 +403,13 @@ object ManagedIndexRunner : @Suppress("ComplexCondition", "MaxLineLength") if (updateResult.metadataSaved && state != null && action != null && step != null && currentActionMetaData != null) { if (validationServiceEnabled) { - val validationResult = actionValidation.validate(action.type, stepContext.metadata.index) + val validationResult = withClosableContext( + IndexManagementSecurityContext( + managedIndexConfig.id, settings, threadPool.threadContext, managedIndexConfig.policy.user + ) + ) { + actionValidation.validate(action.type, stepContext.metadata.index) + } if (validationResult.validationStatus == Validate.ValidationStatus.RE_VALIDATING) { logger.warn("Validation Status is: RE_VALIDATING. The action is {}, state is {}, step is {}.\", action.type, state.name, step.name") publishErrorNotification(policy, managedIndexMetaData) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/MetadataService.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/MetadataService.kt index b69fcac5c..c42682f71 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/MetadataService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/MetadataService.kt @@ -191,12 +191,12 @@ class MetadataService( override fun onResponse(response: ClusterUpdateSettingsResponse) { if (!response.isAcknowledged) { - logger.error("Update template migration setting to $status is not acknowledged") + logger.error("Update metadata migration setting to $status is not acknowledged") throw IndexManagementException.wrap( - Exception("Update template migration setting to $status is not acknowledged") + Exception("Update metadata migration setting to $status is not acknowledged") ) } else { - logger.info("Successfully update template migration setting to $status") + logger.info("Successfully metadata template migration setting to $status") } } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexConfig.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexConfig.kt index acaabc864..9a87bc1d0 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexConfig.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexConfig.kt @@ -178,7 +178,7 @@ data class ManagedIndexConfig( policySeqNo = policySeqNo, policyPrimaryTerm = policyPrimaryTerm, policy = policy?.copy( - id = policyID ?: NO_ID, + id = policyID, seqNo = policySeqNo ?: SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm = policyPrimaryTerm ?: SequenceNumbers.UNASSIGNED_PRIMARY_TERM ), diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/coordinator/SweptManagedIndexConfig.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/coordinator/SweptManagedIndexConfig.kt index a148d5fde..80fed45d1 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/coordinator/SweptManagedIndexConfig.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/coordinator/SweptManagedIndexConfig.kt @@ -31,7 +31,7 @@ data class SweptManagedIndexConfig( ) { companion object { - @Suppress("ComplexMethod", "UnusedPrivateMember") + @Suppress("ComplexMethod", "UNUSED_PARAMETER") @JvmStatic @Throws(IOException::class) fun parse(xcp: XContentParser, id: String = NO_ID, seqNo: Long, primaryTerm: Long): SweptManagedIndexConfig { @@ -60,11 +60,11 @@ data class SweptManagedIndexConfig( } return SweptManagedIndexConfig( - requireNotNull(index) { "SweptManagedIndexConfig index is null" }, + index, seqNo, primaryTerm, - requireNotNull(uuid) { "SweptManagedIndexConfig uuid is null" }, - requireNotNull(policyID) { "SweptManagedIndexConfig policy id is null" }, + uuid, + policyID, policy, changePolicy ) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/close/AttemptCloseStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/close/AttemptCloseStep.kt index bab8423e8..3e02ecb91 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/close/AttemptCloseStep.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/close/AttemptCloseStep.kt @@ -44,7 +44,7 @@ class AttemptCloseStep : Step(name) { } catch (e: RemoteTransportException) { val cause = ExceptionsHelper.unwrapCause(e) if (cause is SnapshotInProgressException) { - handleSnapshotException(indexName, cause as SnapshotInProgressException) + handleSnapshotException(indexName, cause) } else { handleException(indexName, cause as Exception) } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/rollup/AttemptCreateRollupJobStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/rollup/AttemptCreateRollupJobStep.kt index f86d0a3b5..a5e767d64 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/rollup/AttemptCreateRollupJobStep.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/rollup/AttemptCreateRollupJobStep.kt @@ -10,6 +10,7 @@ import org.opensearch.ExceptionsHelper import org.opensearch.OpenSearchException import org.opensearch.action.support.WriteRequest import org.opensearch.action.support.master.AcknowledgedResponse +import org.opensearch.client.Client import org.opensearch.index.engine.VersionConflictEngineException import org.opensearch.indexmanagement.indexstatemanagement.action.RollupAction import org.opensearch.indexmanagement.opensearchapi.suspendUntil @@ -85,7 +86,7 @@ class AttemptCreateRollupJobStep(private val action: RollupAction) : Step(name) logger.info("Attempting to re-start the job $rollupId") try { val startRollupRequest = StartRollupRequest(rollupId) - val response: AcknowledgedResponse = client.suspendUntil { execute(StartRollupAction.INSTANCE, startRollupRequest, it) } + client.suspendUntil { execute(StartRollupAction.INSTANCE, startRollupRequest, it) } stepStatus = StepStatus.COMPLETED info = mapOf("message" to getSuccessRestartMessage(rollupId, indexName)) } catch (e: Exception) { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/TransportChangePolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/TransportChangePolicyAction.kt index fed9cf828..4fcdf487c 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/TransportChangePolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/TransportChangePolicyAction.kt @@ -288,7 +288,7 @@ class TransportChangePolicyAction @Inject constructor( ) ) // if there exists a transitionTo on the ManagedIndexMetaData then we will - // fail as they might not of meant to add a ChangePolicy when its on the next state + // fail as they might not of meant to add a ChangePolicy when it's on the next state managedIndexMetadata?.transitionTo != null -> failedIndices.add( FailedIndex( diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainResponse.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainResponse.kt index 11a1585b3..a0871db8d 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainResponse.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainResponse.kt @@ -57,7 +57,7 @@ open class ExplainResponse : ActionResponse, ToXContentObject { indexPolicyIDs = sin.readStringList(), indexMetadatas = sin.readList { ManagedIndexMetaData.fromStreamInput(it) }, totalManagedIndices = sin.readInt(), - enabledState = sin.readMap() as Map, + enabledState = sin.readMap(StreamInput::readString, StreamInput::readBoolean), policies = sin.readMap(StreamInput::readString, ::Policy), validationResults = sin.readList { ValidationResult.fromStreamInput(it) } ) @@ -68,7 +68,11 @@ open class ExplainResponse : ActionResponse, ToXContentObject { out.writeStringCollection(indexPolicyIDs) out.writeCollection(indexMetadatas) out.writeInt(totalManagedIndices) - out.writeMap(enabledState) + out.writeMap( + enabledState, + { _out, key -> _out.writeString(key) }, + { _out, enable -> _out.writeBoolean(enable) } + ) out.writeMap( policies, { _out, key -> _out.writeString(key) }, diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt index ac7caab1f..f3d079a53 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt @@ -383,7 +383,7 @@ class TransportExplainAction @Inject constructor( filteredIndices.add(indexNames[i]) filteredMetadata.add(indexMetadatas[i]) filteredPolicies.add(indexPolicyIDs[i]) - validationResults[i]?.let { filteredValidationResult.add(it) } + validationResults[i].let { filteredValidationResult.add(it) } enabledState[indexNames[i]]?.let { enabledStatus[indexNames[i]] = it } appliedPolicies[indexNames[i]]?.let { filteredAppliedPolicies[indexNames[i]] = it } } catch (e: OpenSearchSecurityException) { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt index bea3ed57c..a400bf65b 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt @@ -517,6 +517,7 @@ fun checkMetadata( val t2 = when (configIndexMetadata) { is ManagedIndexMetaData -> configIndexMetadata.stepMetaData?.startTime is Map<*, *> -> { + @Suppress("UNCHECKED_CAST") val stepMetadata = configIndexMetadata["step"] as Map? stepMetadata?.get("start_time") } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/RestHandlerUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/RestHandlerUtils.kt index fe48a7a70..151f52fa4 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/RestHandlerUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/RestHandlerUtils.kt @@ -136,7 +136,8 @@ suspend fun removeClusterStateMetadatas(client: Client, logger: Logger, indices: } const val MASTER_TIMEOUT_DEPRECATED_MESSAGE = - "Parameter [master_timeout] is deprecated and will be removed in 3.0. To support inclusive language, please use [cluster_manager_timeout] instead." + "Parameter [master_timeout] is deprecated and will be removed in 3.0. " + + "To support inclusive language, please use [cluster_manager_timeout] instead." const val DUPLICATE_PARAMETER_ERROR_MESSAGE = "Please only use one of the request parameters [master_timeout, cluster_manager_timeout]." fun parseClusterManagerTimeout(request: RestRequest, deprecationLogger: DeprecationLogger, restActionName: String): TimeValue { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateDelete.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateDelete.kt index 34d8d7c3f..70ce78d58 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateDelete.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateDelete.kt @@ -30,7 +30,7 @@ class ValidateDelete( if (!deleteIndexExists(indexName) || !validIndex(indexName)) { return this } - val (rolloverTarget, isDataStream) = getRolloverTargetOrUpdateInfo(indexName) + val (rolloverTarget, _) = getRolloverTargetOrUpdateInfo(indexName) if (rolloverTarget != null && !notWriteIndexForDataStream(rolloverTarget, indexName)) { return this // can't be deleted if it's write index } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMapperService.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMapperService.kt index 0da24de23..2f44f4389 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMapperService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMapperService.kt @@ -106,7 +106,8 @@ class RollupMapperService( if (rollupJobs != null && (rollupJobs.size > 1 || rollupJobs[0].id != rollup.id) ) { - errorMessage = "More than one rollup jobs present on the backing index of the target alias, cannot perform rollup to this target alias [${rollup.targetIndex}]." + errorMessage = "More than one rollup jobs present on the backing index of the target alias, " + + "cannot perform rollup to this target alias [${rollup.targetIndex}]." logger.error(errorMessage) return RollupJobValidationResult.Failure(errorMessage) } @@ -152,6 +153,7 @@ class RollupMapperService( when (validationResult) { is RollupJobValidationResult.Failure -> logger.error(validationResult.message) is RollupJobValidationResult.Invalid -> logger.error(validationResult.reason) + else -> {} } return validationResult } else { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/index/TransportIndexRollupAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/index/TransportIndexRollupAction.kt index 251ab7767..baad1f24b 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/index/TransportIndexRollupAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/index/TransportIndexRollupAction.kt @@ -155,7 +155,6 @@ class TransportIndexRollupAction @Inject constructor( if (rollup.metrics != newRollup.metrics) modified.add(Rollup.METRICS_FIELD) if (rollup.sourceIndex != newRollup.sourceIndex) modified.add(Rollup.SOURCE_INDEX_FIELD) if (rollup.targetIndex != newRollup.targetIndex) modified.add(Rollup.TARGET_INDEX_FIELD) - if (rollup.roles != newRollup.roles) modified.add(Rollup.ROLES_FIELD) return modified.toList() } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt index 021f80f1a..fa8c51d93 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt @@ -195,7 +195,7 @@ data class Rollup( out.writeString(sourceIndex) out.writeString(targetIndex) out.writeOptionalString(metadataID) - out.writeStringArray(roles.toTypedArray()) + out.writeStringArray(emptyList().toTypedArray()) out.writeInt(pageSize) out.writeOptionalLong(delay) out.writeBoolean(continuous) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/RollupMetadata.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/RollupMetadata.kt index 5fb708719..25442c7d0 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/RollupMetadata.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/RollupMetadata.kt @@ -295,7 +295,8 @@ fun RollupMetadata.incrementStats(response: SearchResponse, internalComposite: I return this.copy( stats = this.stats.copy( pagesProcessed = stats.pagesProcessed + 1L, - documentsProcessed = stats.documentsProcessed + internalComposite.buckets.fold(0L) { acc, it -> acc + it.docCount }, + documentsProcessed = stats.documentsProcessed + + internalComposite.buckets.fold(0L) { acc, internalBucket -> acc + internalBucket.docCount }, searchTimeInMillis = stats.searchTimeInMillis + response.took.millis ) ) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/metric/Average.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/metric/Average.kt index b739016f9..a3a491206 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/metric/Average.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/metric/Average.kt @@ -15,7 +15,7 @@ import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken class Average() : Metric(Type.AVERAGE) { - @Suppress("UnusedPrivateMember") + @Suppress("UNUSED_PARAMETER") constructor(sin: StreamInput) : this() override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/metric/Max.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/metric/Max.kt index 34fe2b2a9..ea9b54105 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/metric/Max.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/metric/Max.kt @@ -14,7 +14,7 @@ import org.opensearch.common.xcontent.XContentParser.Token import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken class Max() : Metric(Type.MAX) { - @Suppress("UnusedPrivateMember") + @Suppress("UNUSED_PARAMETER") constructor(sin: StreamInput) : this() override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/metric/Min.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/metric/Min.kt index 8e14e87de..5d03ab18b 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/metric/Min.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/metric/Min.kt @@ -14,7 +14,7 @@ import org.opensearch.common.xcontent.XContentParser.Token import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken class Min() : Metric(Type.MIN) { - @Suppress("UnusedPrivateMember") + @Suppress("UNUSED_PARAMETER") constructor(sin: StreamInput) : this() override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/metric/Sum.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/metric/Sum.kt index b4bc5ca7c..baf2e3f2d 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/metric/Sum.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/metric/Sum.kt @@ -14,7 +14,7 @@ import org.opensearch.common.xcontent.XContentParser.Token import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken class Sum() : Metric(Type.SUM) { - @Suppress("UnusedPrivateMember") + @Suppress("UNUSED_PARAMETER") constructor(sin: StreamInput) : this() override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/metric/ValueCount.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/metric/ValueCount.kt index daa9465e1..278b95d35 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/metric/ValueCount.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/metric/ValueCount.kt @@ -14,7 +14,7 @@ import org.opensearch.common.xcontent.XContentParser.Token import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken class ValueCount() : Metric(Type.VALUE_COUNT) { - @Suppress("UnusedPrivateMember") + @Suppress("UNUSED_PARAMETER") constructor(sin: StreamInput) : this() override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMUtils.kt index 95ae45b7b..c7384eefc 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMUtils.kt @@ -190,6 +190,7 @@ fun preFixTimeStamp(msg: String?): String { } fun addSMPolicyInSnapshotMetadata(snapshotConfig: Map, policyName: String): Map { + @Suppress("UNCHECKED_CAST") var snapshotMetadata = snapshotConfig["metadata"] as MutableMap? if (snapshotMetadata != null) { snapshotMetadata[SM_TYPE] = policyName @@ -351,7 +352,7 @@ fun timeLimitExceeded( } fun getTimeLimitExceededMessage(timeLimit: TimeValue, workflow: WorkflowType): String { - val workflow = when (workflow) { + val workflowStr = when (workflow) { WorkflowType.CREATION -> { "creation" } @@ -359,5 +360,5 @@ fun getTimeLimitExceededMessage(timeLimit: TimeValue, workflow: WorkflowType): S "deletion" } } - return "Time limit $timeLimit exceeded during snapshot $workflow step" + return "Time limit $timeLimit exceeded during snapshot $workflowStr step" } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/SMStateMachine.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/SMStateMachine.kt index 4556030e5..51f4d35a2 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/SMStateMachine.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/SMStateMachine.kt @@ -123,10 +123,11 @@ class SMStateMachine( } catch (ex: Exception) { val message = "There was an exception at ${now()} while executing Snapshot Management policy ${job.policyName}, please check logs." job.notificationConfig?.sendFailureNotification(client, job.policyName, message, job.user, log) + @Suppress("InstanceOfCheckForException") if (ex is SnapshotManagementException && ex.exKey == ExceptionKey.METADATA_INDEXING_FAILURE ) { - // update metadata exception is special, we don't want to retry update metadata here + // update metadata exception is special, we have logged out the error in updateMetadata return this } log.error("Uncaught snapshot management runtime exception.", ex) @@ -219,7 +220,9 @@ class SMStateMachine( // TODO SM save a copy to history } - private val updateMetaDataRetryPolicy = BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(EXPONENTIAL_BACKOFF_MILLIS), MAX_NUMBER_OF_RETRIES) + private val updateMetaDataRetryPolicy = BackoffPolicy.exponentialBackoff( + TimeValue.timeValueMillis(EXPONENTIAL_BACKOFF_MILLIS), MAX_NUMBER_OF_RETRIES + ) /** * Handle the policy change before job running diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/SMState.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/SMState.kt index 991a9a174..00630bd5e 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/SMState.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/SMState.kt @@ -45,7 +45,7 @@ enum class WorkflowType { * resetRetry is performed in Next and Stay. * except the last one. Refer to [DeletingState] */ -sealed class SMResult : State.Result() { +sealed class SMResult : State.Result { data class Next(val metadataToSave: SMMetadata.Builder) : SMResult() data class Stay(val metadataToSave: SMMetadata.Builder) : SMResult() data class Fail( diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/State.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/State.kt index e6c272ab0..98cf1b08e 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/State.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/State.kt @@ -22,5 +22,5 @@ interface State { suspend fun execute(context: SMStateMachine): Result - abstract class Result + interface Result } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/creation/CreatingState.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/creation/CreatingState.kt index bc4b296bd..bc591a486 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/creation/CreatingState.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/creation/CreatingState.kt @@ -9,6 +9,7 @@ import org.apache.logging.log4j.Logger import org.opensearch.ExceptionsHelper import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse +import org.opensearch.client.ClusterAdminClient import org.opensearch.indexmanagement.opensearchapi.suspendUntil import org.opensearch.indexmanagement.snapshotmanagement.engine.SMStateMachine import org.opensearch.indexmanagement.snapshotmanagement.generateSnapshotName @@ -68,7 +69,7 @@ object CreatingState : State { val req = CreateSnapshotRequest(job.snapshotConfig["repository"] as String, snapshotName) .source(addSMPolicyInSnapshotMetadata(job.snapshotConfig, job.policyName)) .waitForCompletion(false) - val res: CreateSnapshotResponse = client.admin().cluster().suspendUntil { createSnapshot(req, it) } + client.admin().cluster().suspendUntil { createSnapshot(req, it) } metadataBuilder.setLatestExecution( status = SMMetadata.LatestExecution.Status.IN_PROGRESS, diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/deletion/DeletingState.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/deletion/DeletingState.kt index 0aa9cfdd8..69ed735b6 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/deletion/DeletingState.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/deletion/DeletingState.kt @@ -9,6 +9,7 @@ import org.apache.logging.log4j.Logger import org.opensearch.ExceptionsHelper import org.opensearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest import org.opensearch.action.support.master.AcknowledgedResponse +import org.opensearch.client.ClusterAdminClient import org.opensearch.indexmanagement.opensearchapi.suspendUntil import org.opensearch.indexmanagement.snapshotmanagement.engine.SMStateMachine import org.opensearch.indexmanagement.snapshotmanagement.engine.states.SMResult @@ -68,7 +69,7 @@ object DeletingState : State { job.snapshotConfig["repository"] as String, *snapshotsToDelete.toTypedArray() ) - val res: AcknowledgedResponse = client.admin().cluster().suspendUntil { deleteSnapshot(req, it) } + client.admin().cluster().suspendUntil { deleteSnapshot(req, it) } metadataBuilder.setLatestExecution( status = SMMetadata.LatestExecution.Status.IN_PROGRESS, @@ -102,7 +103,8 @@ object DeletingState : State { return SMResult.Fail(metadataBuilder, WorkflowType.CREATION) } - private fun getSnapshotDeletionStartedMessage(snapshotNames: List) = "Snapshots $snapshotNames deletion has been started and waiting for completion." + private fun getSnapshotDeletionStartedMessage(snapshotNames: List) = + "Snapshots $snapshotNames deletion has been started and waiting for completion." private fun getSnapshotsMissingMessage() = "No snapshots found under policy while getting snapshots to decide which snapshots to delete." private fun getSnapshotsErrorMessage() = "Caught exception while getting snapshots to decide which snapshots to delete." private fun getDeleteSnapshotErrorMessage(snapshotNames: List) = "Caught exception while deleting snapshot $snapshotNames." diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformProcessedBucketLog.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformProcessedBucketLog.kt index 6ba6232b2..adc2489ef 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformProcessedBucketLog.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformProcessedBucketLog.kt @@ -23,26 +23,24 @@ class TransformProcessedBucketLog { } } - fun addBucket(bucket: Map) { + private fun addBucket(bucket: Map) { if (processedBuckets.size >= MAX_SIZE) return processedBuckets.add(computeBucketHash(bucket)) } - fun isProcessed(bucket: Map): Boolean { + private fun isProcessed(bucket: Map): Boolean { return processedBuckets.contains(computeBucketHash(bucket)) } fun isNotProcessed(bucket: Map) = !isProcessed(bucket) - fun computeBucketHash(bucket: Map): String { + private fun computeBucketHash(bucket: Map): String { val md5Crypt = MessageDigest.getInstance("MD5") - bucket.entries.sortedBy { it.key }.also { - it.forEach { entry -> - md5Crypt.update( - if (entry.value == null) "null".toByteArray() - else entry.value.toString().toByteArray() - ) - } + bucket.entries.sortedBy { it.key }.onEach { entry -> + md5Crypt.update( + if (entry.value == null) "null".toByteArray() + else entry.value.toString().toByteArray() + ) } return BigInteger(1, md5Crypt.digest()).toString(HEX_RADIX) } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt index bd8915089..0330aa952 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt @@ -337,7 +337,7 @@ class TransformSearchService( ): TransformSearchResult { val aggs = searchResponse.aggregations.get(transform.id) as CompositeAggregation val buckets = if (modifiedBuckets != null) aggs.buckets.filter { modifiedBuckets.contains(it.key) } else aggs.buckets - val documentsProcessed = buckets.fold(0L) { sum, it -> sum + it.docCount } + val documentsProcessed = buckets.fold(0L) { sum, bucket -> sum + bucket.docCount } val pagesProcessed = 1L val searchTime = searchResponse.took.millis val stats = TransformStats(pagesProcessed, documentsProcessed, 0, 0, searchTime) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/action/delete/TransportDeleteTransformsAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/action/delete/TransportDeleteTransformsAction.kt index 913229246..5da2fc554 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/action/delete/TransportDeleteTransformsAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/action/delete/TransportDeleteTransformsAction.kt @@ -108,7 +108,7 @@ class TransportDeleteTransformsAction @Inject constructor( } } - @Suppress("LongMethod") + @Suppress("LongMethod", "NestedBlockDepth") private fun bulkDelete(response: MultiGetResponse, ids: List, forceDelete: Boolean, actionListener: ActionListener) { val enabledIDs = mutableListOf() val notTransform = mutableListOf() diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/action/get/TransportGetTransformsAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/action/get/TransportGetTransformsAction.kt index 96ad989d0..a4801eafe 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/action/get/TransportGetTransformsAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/action/get/TransportGetTransformsAction.kt @@ -76,6 +76,7 @@ class TransportGetTransformsAction @Inject constructor( .sort(sortField, SortOrder.fromString(sortDirection)) client.threadPool().threadContext.stashContext().use { + @Suppress("UNCHECKED_CAST") getJobs( client, searchSourceBuilder, diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/action/index/TransportIndexTransformAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/action/index/TransportIndexTransformAction.kt index 373530386..fc8f632f8 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/action/index/TransportIndexTransformAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/action/index/TransportIndexTransformAction.kt @@ -146,7 +146,6 @@ class TransportIndexTransformAction @Inject constructor( if (transform.dataSelectionQuery != newTransform.dataSelectionQuery) modified.add(Transform.DATA_SELECTION_QUERY_FIELD) if (transform.groups != newTransform.groups) modified.add(Transform.GROUPS_FIELD) if (transform.aggregations != newTransform.aggregations) modified.add(Transform.AGGREGATIONS_FIELD) - if (transform.roles != newTransform.roles) modified.add(Transform.ROLES_FIELD) if (transform.continuous != newTransform.continuous) modified.add(Transform.CONTINUOUS_FIELD) return modified.toList() } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/model/Transform.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/model/Transform.kt index fc382c630..f87085757 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/model/Transform.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/model/Transform.kt @@ -149,7 +149,7 @@ data class Transform( out.writeString(sourceIndex) out.writeOptionalNamedWriteable(dataSelectionQuery) out.writeString(targetIndex) - out.writeStringArray(roles.toTypedArray()) + out.writeStringArray(emptyList().toTypedArray()) out.writeInt(pageSize) out.writeVInt(groups.size) for (group in groups) { @@ -170,12 +170,12 @@ data class Transform( return if (includeId) { mutableMapOf( TRANSFORM_DOC_ID_FIELD to this.id, - _DOC_COUNT to docCount, + DOC_COUNT to docCount, TRANSFORM_DOC_COUNT_FIELD to docCount ) } else { mutableMapOf( - _DOC_COUNT to docCount, + DOC_COUNT to docCount, TRANSFORM_DOC_COUNT_FIELD to docCount ) } @@ -288,7 +288,7 @@ data class Transform( const val MAXIMUM_PAGE_SIZE_CONTINUOUS = 1_000 const val MINIMUM_JOB_INTERVAL = 1 const val TRANSFORM_DOC_ID_FIELD = "$TRANSFORM_TYPE._id" - const val _DOC_COUNT = "_doc_count" + const val DOC_COUNT = "_doc_count" // Keeping the field in order to be backward compatible const val TRANSFORM_DOC_COUNT_FIELD = "$TRANSFORM_TYPE._doc_count" const val CONTINUOUS_FIELD = "continuous" diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/opensearchapi/OpenSearchExtensions.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/opensearchapi/OpenSearchExtensions.kt index c9b062a7a..3df66f16f 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/opensearchapi/OpenSearchExtensions.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/opensearchapi/OpenSearchExtensions.kt @@ -70,6 +70,7 @@ fun isRetryable( * Retries on 408 or on TaskCancelledException once the message matches the given pattern. * In that case, retry request with reduced size param and timeout param is set based on the lock expiration */ +@Suppress("ReturnCount") fun isTransformOperationTimedOut(ex: OpenSearchException): Boolean { if (RestStatus.REQUEST_TIMEOUT == ex.status()) { return true diff --git a/src/main/kotlin/org/opensearch/indexmanagement/util/JobSchedulerUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/util/JobSchedulerUtils.kt index ad6023f2d..3157f7631 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/util/JobSchedulerUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/util/JobSchedulerUtils.kt @@ -13,7 +13,7 @@ import org.opensearch.jobscheduler.spi.JobExecutionContext import org.opensearch.jobscheduler.spi.LockModel import org.opensearch.jobscheduler.spi.ScheduledJobParameter -private val logger = LogManager.getLogger("JobSchedulerUtils") +private val logger = LogManager.getLogger("o.o.i.u.JobSchedulerUtils") /** * Util method to attempt to get the lock on the requested scheduled job using the backoff policy. diff --git a/src/main/kotlin/org/opensearch/indexmanagement/util/ScheduledJobUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/util/ScheduledJobUtils.kt index 7eb414e76..70e849bfb 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/util/ScheduledJobUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/util/ScheduledJobUtils.kt @@ -66,6 +66,7 @@ fun getJobs( ) } +@Suppress("UNCHECKED_CAST") private fun populateResponse( jobType: String, jobs: List, diff --git a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementIndicesIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementIndicesIT.kt index 34e68cfa9..123c89007 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementIndicesIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementIndicesIT.kt @@ -71,8 +71,7 @@ class IndexManagementIndicesIT : IndexStateManagementRestTestCase() { } fun `test update management index mapping with new schema version`() { - wipeAllODFEIndices() - waitForPendingTasks(adminClient()) + wipeAllIndices() assertIndexDoesNotExist(INDEX_MANAGEMENT_INDEX) val mapping = indexManagementMappings.trim().trimStart('{').trimEnd('}') diff --git a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt index db7ce6a7b..eba7db734 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt @@ -10,20 +10,31 @@ import org.apache.http.entity.StringEntity import org.junit.AfterClass import org.junit.Before import org.junit.rules.DisableOnDebug +import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksAction import org.opensearch.client.Request -import org.opensearch.client.RequestOptions import org.opensearch.client.Response import org.opensearch.client.RestClient +import org.opensearch.client.RequestOptions +import org.opensearch.client.WarningsHandler +import org.opensearch.client.ResponseException import org.opensearch.common.Strings +import org.opensearch.common.collect.Set import org.opensearch.common.io.PathUtils import org.opensearch.common.settings.Settings +import org.opensearch.common.xcontent.DeprecationHandler +import org.opensearch.common.xcontent.NamedXContentRegistry +import org.opensearch.common.xcontent.XContentType import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_HIDDEN import org.opensearch.rest.RestStatus +import java.io.IOException import java.nio.file.Files +import java.util.* import javax.management.MBeanServerInvocationHandler import javax.management.ObjectName import javax.management.remote.JMXConnectorFactory import javax.management.remote.JMXServiceURL +import kotlin.collections.ArrayList +import kotlin.collections.HashSet abstract class IndexManagementRestTestCase : ODFERestTestCase() { @@ -57,7 +68,6 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() { protected val isDebuggingTest = DisableOnDebug(null).isDebugging protected val isDebuggingRemoteCluster = System.getProperty("cluster.debug", "false")!!.toBoolean() - protected val isMultiNode = System.getProperty("cluster.number_of_nodes", "1").toInt() > 1 protected val isLocalTest = clusterName() == "integTest" private fun clusterName(): String { @@ -138,6 +148,7 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() { insertSampleBulkData(index, javaClass.classLoader.getResource("data/nyc_5000.ndjson").readText()) } + @Suppress("UNCHECKED_CAST") protected fun extractFailuresFromSearchResponse(searchResponse: Response): List?>? { val shards = searchResponse.asMap()["_shards"] as Map>> assertNotNull(shards) @@ -152,7 +163,115 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() { } } + override fun preserveIndicesUponCompletion(): Boolean = true companion object { + @JvmStatic + protected val isMultiNode = System.getProperty("cluster.number_of_nodes", "1").toInt() > 1 + protected val defaultKeepIndexSet = setOf(".opendistro_security") + /** + * We override preserveIndicesUponCompletion to true and use this function to clean up indices + * Meant to be used in @After or @AfterClass of your feature test suite + */ + fun wipeAllIndices(client: RestClient = adminClient(), keepIndex: kotlin.collections.Set = defaultKeepIndexSet) { + try { + client.performRequest(Request("DELETE", "_data_stream/*")) + } catch (e: ResponseException) { + // We hit a version of ES that doesn't serialize DeleteDataStreamAction.Request#wildcardExpressionsOriginallySpecified field or + // that doesn't support data streams so it's safe to ignore + val statusCode = e.response.statusLine.statusCode + if (!Set.of(404, 405, 500).contains(statusCode)) { + throw e + } + } + + val response = client.performRequest(Request("GET", "/_cat/indices?format=json&expand_wildcards=all")) + val xContentType = XContentType.fromMediaType(response.entity.contentType.value) + xContentType.xContent().createParser( + NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, + response.entity.content + ).use { parser -> + for (index in parser.list()) { + val jsonObject: Map<*, *> = index as java.util.HashMap<*, *> + val indexName: String = jsonObject["index"] as String + // .opendistro_security isn't allowed to delete from cluster + if (!keepIndex.contains(indexName)) { + val request = Request("DELETE", "/$indexName") + // TODO: remove PERMISSIVE option after moving system index access to REST API call + val options = RequestOptions.DEFAULT.toBuilder() + options.setWarningsHandler(WarningsHandler.PERMISSIVE) + request.options = options.build() + client.performRequest(request) + } + } + } + + waitFor { + if (!isMultiNode) { + waitForRunningTasks(client) + waitForPendingTasks(client) + waitForThreadPools(client) + } else { + // Multi node test is not suitable to waitFor + // We have seen long-running write task that fails the waitFor + // probably because of cluster manager - data node task not in sync + // So instead we just sleep 1s after wiping indices + Thread.sleep(1_000) + } + } + } + + @JvmStatic + @Throws(IOException::class) + protected fun waitForRunningTasks(client: RestClient) { + val runningTasks: MutableSet = runningTasks(client.performRequest(Request("GET", "/_tasks?detailed"))) + if (runningTasks.isEmpty()) { + return + } + val stillRunning = ArrayList(runningTasks) + fail("${Date()}: There are still tasks running after this test that might break subsequent tests: \n${stillRunning.joinToString("\n")}.") + } + + @Suppress("UNCHECKED_CAST") + @Throws(IOException::class) + private fun runningTasks(response: Response): MutableSet { + val runningTasks: MutableSet = HashSet() + val nodes = entityAsMap(response)["nodes"] as Map? + for ((_, value) in nodes!!) { + val nodeInfo = value as Map + val nodeTasks = nodeInfo["tasks"] as Map? + for ((_, value1) in nodeTasks!!) { + val task = value1 as Map + // Ignore the task list API - it doesn't count against us + if (task["action"] == ListTasksAction.NAME || task["action"] == ListTasksAction.NAME + "[n]") continue + // runningTasks.add(task["action"].toString() + " | " + task["description"].toString()) + runningTasks.add(task.toString()) + } + } + return runningTasks + } + + @JvmStatic + protected fun waitForThreadPools(client: RestClient) { + val response = client.performRequest(Request("GET", "/_cat/thread_pool?format=json")) + + val xContentType = XContentType.fromMediaType(response.entity.contentType.value) + xContentType.xContent().createParser( + NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, + response.entity.content + ).use { parser -> + for (index in parser.list()) { + val jsonObject: Map<*, *> = index as java.util.HashMap<*, *> + val active = (jsonObject["active"] as String).toInt() + val queue = (jsonObject["queue"] as String).toInt() + val name = jsonObject["name"] + val trueActive = if (name == "management") active - 1 else active + if (trueActive > 0 || queue > 0) { + fail("Still active threadpools in cluster: $jsonObject") + } + } + } + } + internal interface IProxy { val version: String? var sessionId: String? diff --git a/src/test/kotlin/org/opensearch/indexmanagement/IndexStateManagementSecurityBehaviorIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/IndexStateManagementSecurityBehaviorIT.kt index 9fc774551..523dfe0e4 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/IndexStateManagementSecurityBehaviorIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/IndexStateManagementSecurityBehaviorIT.kt @@ -52,10 +52,6 @@ class IndexStateManagementSecurityBehaviorIT : SecurityRestTestCase() { private val testRole = "test_role" var testClient: RestClient? = null - override fun preserveIndicesUponCompletion(): Boolean { - return true - } - @Before fun setupUsersAndRoles() { updateClusterSetting(ManagedIndexSettings.JITTER.key, "0.0", false) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/ODFERestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/ODFERestTestCase.kt index 64b1a232e..fa1a77a92 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/ODFERestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/ODFERestTestCase.kt @@ -6,18 +6,9 @@ package org.opensearch.indexmanagement import org.apache.http.HttpHost -import org.junit.After -import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksAction -import org.opensearch.client.Request -import org.opensearch.client.RequestOptions -import org.opensearch.client.Response import org.opensearch.client.RestClient -import org.opensearch.client.WarningsHandler import org.opensearch.common.io.PathUtils import org.opensearch.common.settings.Settings -import org.opensearch.common.xcontent.DeprecationHandler -import org.opensearch.common.xcontent.NamedXContentRegistry -import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.ConfigConstants.OPENSEARCH_SECURITY_SSL_HTTP_ENABLED import org.opensearch.commons.ConfigConstants.OPENSEARCH_SECURITY_SSL_HTTP_KEYSTORE_FILEPATH import org.opensearch.commons.ConfigConstants.OPENSEARCH_SECURITY_SSL_HTTP_KEYSTORE_KEYPASSWORD @@ -35,99 +26,6 @@ abstract class ODFERestTestCase : OpenSearchRestTestCase() { override fun getProtocol(): String = if (isHttps()) "https" else "http" - @Suppress("UNCHECKED_CAST") - @Throws(IOException::class) - private fun runningTasks(response: Response): MutableSet { - val runningTasks: MutableSet = HashSet() - val nodes = entityAsMap(response)["nodes"] as Map? - for ((_, value) in nodes!!) { - val nodeInfo = value as Map - val nodeTasks = nodeInfo["tasks"] as Map? - for ((_, value1) in nodeTasks!!) { - val task = value1 as Map - runningTasks.add(task["action"].toString()) - } - } - return runningTasks - } - - @After - fun waitForCleanup() { - waitFor { - waitForRunningTasks() - waitForThreadPools() - waitForPendingTasks(adminClient()) - } - } - - @Throws(IOException::class) - private fun waitForRunningTasks() { - val runningTasks: MutableSet = runningTasks(adminClient().performRequest(Request("GET", "/_tasks"))) - // Ignore the task list API - it doesn't count against us - runningTasks.remove(ListTasksAction.NAME) - runningTasks.remove(ListTasksAction.NAME + "[n]") - if (runningTasks.isEmpty()) { - return - } - val stillRunning = ArrayList(runningTasks) - fail("There are still tasks running after this test that might break subsequent tests $stillRunning.") - } - - private fun waitForThreadPools() { - waitFor { - val response = client().performRequest(Request("GET", "/_cat/thread_pool?format=json")) - - val xContentType = XContentType.fromMediaType(response.entity.contentType.value) - xContentType.xContent().createParser( - NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, - response.entity.content - ).use { parser -> - for (index in parser.list()) { - val jsonObject: Map<*, *> = index as java.util.HashMap<*, *> - val active = (jsonObject["active"] as String).toInt() - val queue = (jsonObject["queue"] as String).toInt() - val name = jsonObject["name"] - val trueActive = if (name == "management") active - 1 else active - if (trueActive > 0 || queue > 0) { - fail("Still active threadpools in cluster: $jsonObject") - } - } - } - } - } - - open fun preserveODFEIndicesAfterTest(): Boolean = false - - @Throws(IOException::class) - open fun wipeAllODFEIndices() { - if (preserveODFEIndicesAfterTest()) return - - // Delete all data stream indices - client().performRequest(Request("DELETE", "/_data_stream/*")) - - // Delete all indices - val response = client().performRequest(Request("GET", "/_cat/indices?format=json&expand_wildcards=all")) - - val xContentType = XContentType.fromMediaType(response.entity.contentType.value) - xContentType.xContent().createParser( - NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, - response.entity.content - ).use { parser -> - for (index in parser.list()) { - val jsonObject: Map<*, *> = index as java.util.HashMap<*, *> - val indexName: String = jsonObject["index"] as String - // .opendistro_security isn't allowed to delete from cluster - if (".opendistro_security" != indexName) { - val request = Request("DELETE", "/$indexName") - // TODO: remove PERMISSIVE option after moving system index access to REST API call - val options = RequestOptions.DEFAULT.toBuilder() - options.setWarningsHandler(WarningsHandler.PERMISSIVE) - request.options = options.build() - adminClient().performRequest(request) - } - } - } - } /** * Returns the REST client settings used for super-admin actions like cleaning up after the test has completed. */ diff --git a/src/test/kotlin/org/opensearch/indexmanagement/RollupSecurityBehaviorIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/RollupSecurityBehaviorIT.kt index 79d66cd3c..edcff6b18 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/RollupSecurityBehaviorIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/RollupSecurityBehaviorIT.kt @@ -43,10 +43,6 @@ class RollupSecurityBehaviorIT : SecurityRestTestCase() { private val testRole = "test_role" var testUserClient: RestClient? = null - override fun preserveIndicesUponCompletion(): Boolean { - return true - } - @Before fun setupUsersAndRoles() { updateClusterSetting(ManagedIndexSettings.JITTER.key, "0.0", false) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/SecurityBehaviorIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/SecurityBehaviorIT.kt index 8beff5aad..fc203442a 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/SecurityBehaviorIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/SecurityBehaviorIT.kt @@ -26,10 +26,6 @@ class SecurityBehaviorIT : SecurityRestTestCase() { private val john = "john" private var johnClient: RestClient? = null - override fun preserveIndicesUponCompletion(): Boolean { - return true - } - @Before fun setupUsersAndRoles() { updateClusterSetting(ManagedIndexSettings.JITTER.key, "0.0", false) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/TransformSecurityBehaviorIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/TransformSecurityBehaviorIT.kt index e46add04d..77d84323e 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/TransformSecurityBehaviorIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/TransformSecurityBehaviorIT.kt @@ -37,10 +37,6 @@ class TransformSecurityBehaviorIT : SecurityRestTestCase() { private val testRole = "test_role" var testUserClient: RestClient? = null - override fun preserveIndicesUponCompletion(): Boolean { - return true - } - @Before fun setupUsersAndRoles() { updateClusterSetting(ManagedIndexSettings.JITTER.key, "0.0", false) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/bwc/IndexManagementBackwardsCompatibilityIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/bwc/IndexManagementBackwardsCompatibilityIT.kt index 630fcca42..84344d181 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/bwc/IndexManagementBackwardsCompatibilityIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/bwc/IndexManagementBackwardsCompatibilityIT.kt @@ -36,8 +36,6 @@ class IndexManagementBackwardsCompatibilityIT : IndexManagementRestTestCase() { override fun preserveTemplatesUponCompletion(): Boolean = true - override fun preserveODFEIndicesAfterTest(): Boolean = true - override fun restClientSettings(): Settings { return Settings.builder() .put(super.restClientSettings()) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementIntegTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementIntegTestCase.kt index 540049fe1..798c8eb1a 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementIntegTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementIntegTestCase.kt @@ -7,6 +7,7 @@ package org.opensearch.indexmanagement.indexstatemanagement import org.apache.http.entity.ContentType import org.apache.http.entity.StringEntity +import org.junit.After import org.junit.Before import org.opensearch.OpenSearchParseException import org.opensearch.action.ActionRequest @@ -28,6 +29,7 @@ import org.opensearch.common.xcontent.XContentParserUtils import org.opensearch.common.xcontent.XContentType import org.opensearch.common.xcontent.json.JsonXContent import org.opensearch.indexmanagement.IndexManagementPlugin +import org.opensearch.indexmanagement.IndexManagementRestTestCase.Companion.wipeAllIndices import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig import org.opensearch.indexmanagement.indexstatemanagement.model.Policy import org.opensearch.indexmanagement.indexstatemanagement.resthandler.RestExplainAction @@ -55,6 +57,12 @@ import java.time.Duration import java.time.Instant abstract class IndexStateManagementIntegTestCase : OpenSearchIntegTestCase() { + + @After + fun clearIndicesAfterEachTest() { + wipeAllIndices(getRestClient()) + } + @Before fun disableIndexStateManagementJitter() { // jitter would add a test-breaking delay to the integration tests @@ -275,11 +283,10 @@ abstract class IndexStateManagementIntegTestCase : OpenSearchIntegTestCase() { xcp.nextToken(), xcp ) - var totalManagedIndices = 0 while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { xcp.currentName() xcp.nextToken() - if (xcp.currentName() == TOTAL_MANAGED_INDICES) totalManagedIndices = xcp.intValue() + if (xcp.currentName() == TOTAL_MANAGED_INDICES) xcp.intValue() else metadata = ManagedIndexMetaData.parse(xcp) } return metadata diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt index 4a582f130..17d0c9cce 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt @@ -10,6 +10,7 @@ import org.apache.http.HttpHeaders import org.apache.http.entity.ContentType.APPLICATION_JSON import org.apache.http.entity.StringEntity import org.apache.http.message.BasicHeader +import org.junit.After import org.junit.Before import org.opensearch.OpenSearchParseException import org.opensearch.action.get.GetResponse @@ -74,6 +75,11 @@ import java.util.Locale abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase() { + @After + fun clearIndicesAfterEachTest() { + wipeAllIndices() + } + val explainResponseOpendistroPolicyIdSetting = "index.opendistro.index_state_management.policy_id" val explainResponseOpenSearchPolicyIdSetting = "index.plugins.index_state_management.policy_id" @@ -476,7 +482,7 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase() // Validate segment count per shard by specifying the min and max it should be @Suppress("UNCHECKED_CAST", "ReturnCount") protected fun validateSegmentCount(index: String, min: Int? = null, max: Int? = null): Boolean { - if (min == null && max == null) throw IllegalArgumentException("Must provide at least a min or max") + require(min != null || max != null) { "Must provide at least a min or max" } val statsResponse: Map = getShardSegmentStats(index) val indicesStats = statsResponse["indices"] as Map>>>>> @@ -942,6 +948,7 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase() return true } + @Suppress("UNCHECKED_CAST") protected fun assertPredicatesOnISMTemplatesMap( templatePredicates: List Boolean>>>>, // response map name: predicate response: Map @@ -957,6 +964,7 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase() } } + @Suppress("UNCHECKED_CAST") protected fun assertISMTemplateEquals(expected: ISMTemplate, actualISMTemplateMap: Any?): Boolean { actualISMTemplateMap as Map assertEquals(expected.indexPatterns, actualISMTemplateMap[ISMTemplate.INDEX_PATTERN]) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/MetadataRegressionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/MetadataRegressionIT.kt index c7e83c874..e18aa6c33 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/MetadataRegressionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/MetadataRegressionIT.kt @@ -88,7 +88,9 @@ class MetadataRegressionIT : IndexStateManagementIntegTestCase() { indexMetadata = getIndexMetadata(indexName) logger.info("check if metadata is saved in cluster state: ${indexMetadata.getCustomData("managed_index_metadata")}") - waitFor { + // TODO increase wait time since flaky seeing here. After looking through the log + // it's more likely a test framework execution lag. + waitFor(Instant.ofEpochSecond(60)) { assertEquals( METADATA_MOVING_WARNING, getExplainManagedIndexMetaData(indexName).info?.get("message") diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ActionTimeoutIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ActionTimeoutIT.kt index a924dc637..551132009 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ActionTimeoutIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ActionTimeoutIT.kt @@ -5,7 +5,6 @@ package org.opensearch.indexmanagement.indexstatemanagement.action -import org.hamcrest.collection.IsMapContaining import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase import org.opensearch.indexmanagement.indexstatemanagement.step.open.AttemptOpenStep import org.opensearch.indexmanagement.indexstatemanagement.step.rollover.AttemptRolloverStep @@ -41,10 +40,10 @@ class ActionTimeoutIT : IndexStateManagementRestTestCase() { // the second execution we move into rollover action, we won't hit the timeout as this is the execution that sets the startTime updateManagedIndexConfigStartTime(managedIndexConfig) waitFor { - assertThat( + assertEquals( "Should be attempting to rollover", - getExplainManagedIndexMetaData(indexName).info, - IsMapContaining.hasEntry("message", AttemptRolloverStep.getPendingMessage(indexName) as Any?) + getExplainManagedIndexMetaData(indexName).info?.get("message"), + AttemptRolloverStep.getPendingMessage(indexName) ) } @@ -72,8 +71,8 @@ class ActionTimeoutIT : IndexStateManagementRestTestCase() { // https://github.com/opendistro-for-elasticsearch/index-management/issues/130 fun `test action timeout doesn't bleed over into next action`() { - val indexName = "${testIndexName}_index_1" - val policyID = "${testIndexName}_testPolicyName_1" + val indexName = "${testIndexName}_index_2" + val policyID = "${testIndexName}_testPolicyName_2" val testPolicy = """ {"policy":{"description":"Default policy","default_state":"rolloverstate","states":[ {"name":"rolloverstate","actions":[{"timeout": "5s","open":{}},{"timeout":"1s","rollover":{"min_doc_count":100}}], @@ -110,10 +109,10 @@ class ActionTimeoutIT : IndexStateManagementRestTestCase() { // but there was a bug before where it would use the startTime from the previous actions metadata and immediately fail updateManagedIndexConfigStartTime(managedIndexConfig) waitFor { - assertThat( + assertEquals( "Should be attempting to rollover", - getExplainManagedIndexMetaData(indexName).info, - IsMapContaining.hasEntry("message", AttemptRolloverStep.getPendingMessage(indexName) as Any?) + getExplainManagedIndexMetaData(indexName).info?.get("message"), + AttemptRolloverStep.getPendingMessage(indexName) ) } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionIT.kt index cc3cc057d..59d408f74 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionIT.kt @@ -7,7 +7,6 @@ package org.opensearch.indexmanagement.indexstatemanagement.action import org.apache.http.entity.ContentType import org.apache.http.entity.StringEntity -import org.hamcrest.core.Is.isA import org.junit.Assert import org.opensearch.cluster.metadata.DataStream import org.opensearch.common.settings.Settings @@ -172,7 +171,7 @@ class RolloverActionIT : IndexStateManagementRestTestCase() { val minSize = conditions[RolloverAction.MIN_SIZE_FIELD] as Map val minDocCount = conditions[RolloverAction.MIN_DOC_COUNT_FIELD] as Map assertEquals("Did not have min size condition", "10b", minSize["condition"]) - assertThat("Did not have min size current", minSize["current"], isA(String::class.java)) + assertTrue("Did not have min size current", minSize["current"] is String) assertEquals("Did not have min doc count condition", 1000000, minDocCount["condition"]) assertEquals("Did not have min doc count current", 0, minDocCount["current"]) } @@ -192,7 +191,7 @@ class RolloverActionIT : IndexStateManagementRestTestCase() { val minSize = conditions[RolloverAction.MIN_SIZE_FIELD] as Map val minDocCount = conditions[RolloverAction.MIN_DOC_COUNT_FIELD] as Map assertEquals("Did not have min size condition", "10b", minSize["condition"]) - assertThat("Did not have min size current", minSize["current"], isA(String::class.java)) + assertTrue("Did not have min size current", minSize["current"] is String) assertEquals("Did not have min doc count condition", 1000000, minDocCount["condition"]) assertEquals("Did not have min doc count current", 5, minDocCount["current"]) } @@ -268,7 +267,7 @@ class RolloverActionIT : IndexStateManagementRestTestCase() { ) val minPrimarySize = conditions[RolloverAction.MIN_PRIMARY_SHARD_SIZE_FIELD] as Map assertEquals("Did not have min size condition", "100kb", minPrimarySize["condition"]) - assertThat("Did not have min size current", minPrimarySize["current"], isA(String::class.java)) + assertTrue("Did not have min size current", minPrimarySize["current"] is String) } val kb150 = 150_000 @@ -283,6 +282,8 @@ class RolloverActionIT : IndexStateManagementRestTestCase() { flush(firstIndex, true) forceMerge(firstIndex, "1") val primaryShards = (cat("shards/$firstIndex?format=json&bytes=b") as List>).filter { it["prirep"] == "p" } + // TODO seeing flakyness of multiple shards over 100kb, log out shards to further debug + logger.info("cat shards result: $primaryShards") val primaryShardsOver100KB = primaryShards.filter { (it["store"] as String).toInt() > 100000 } assertTrue("Found multiple shards over 100kb", primaryShardsOver100KB.size == 1) primaryShardSizeBytes = primaryShards.maxOf { (it["store"] as String).toInt() } @@ -300,7 +301,7 @@ class RolloverActionIT : IndexStateManagementRestTestCase() { ) val minPrimaryShardSize = conditions[RolloverAction.MIN_PRIMARY_SHARD_SIZE_FIELD] as Map assertEquals("Did not have min primary shard size condition", "100kb", minPrimaryShardSize["condition"]) - assertThat("Did not have min primary shard size current", minPrimaryShardSize["current"], isA(String::class.java)) + assertTrue("Did not have min primary shard size current", minPrimaryShardSize["current"] is String) } assertTrue("New rollover index does not exist.", indexExists("$indexNameBase-000002")) } @@ -349,7 +350,7 @@ class RolloverActionIT : IndexStateManagementRestTestCase() { val minAge = conditions[RolloverAction.MIN_INDEX_AGE_FIELD] as Map val minDocCount = conditions[RolloverAction.MIN_DOC_COUNT_FIELD] as Map assertEquals("Did not have min age condition", "2d", minAge["condition"]) - assertThat("Did not have min age current", minAge["current"], isA(String::class.java)) + assertTrue("Did not have min age current", minAge["current"] is String) assertEquals("Did not have min doc count condition", 3, minDocCount["condition"]) assertEquals("Did not have min doc count current", 0, minDocCount["current"]) } @@ -369,7 +370,7 @@ class RolloverActionIT : IndexStateManagementRestTestCase() { val minAge = conditions[RolloverAction.MIN_INDEX_AGE_FIELD] as Map val minDocCount = conditions[RolloverAction.MIN_DOC_COUNT_FIELD] as Map assertEquals("Did not have min age condition", "2d", minAge["condition"]) - assertThat("Did not have min age current", minAge["current"], isA(String::class.java)) + assertTrue("Did not have min age current", minAge["current"] is String) assertEquals("Did not have min doc count condition", 3, minDocCount["condition"]) assertEquals("Did not have min doc count current", 5, minDocCount["current"]) } @@ -551,7 +552,7 @@ class RolloverActionIT : IndexStateManagementRestTestCase() { val minDocCount = conditions[RolloverAction.MIN_DOC_COUNT_FIELD] as Map assertEquals("Incorrect min age condition", "2d", minAge["condition"]) assertEquals("Incorrect min docs condition", 3, minDocCount["condition"]) - assertThat("Missing min age current", minAge["current"], isA(String::class.java)) + assertTrue("Missing min age current", minAge["current"] is String) assertEquals("Incorrect min docs current", 0, minDocCount["current"]) } @@ -579,7 +580,7 @@ class RolloverActionIT : IndexStateManagementRestTestCase() { val minDocCount = conditions[RolloverAction.MIN_DOC_COUNT_FIELD] as Map assertEquals("Incorrect min age condition", "2d", minAge["condition"]) assertEquals("Incorrect min docs condition", 3, minDocCount["condition"]) - assertThat("Missing min age current", minAge["current"], isA(String::class.java)) + assertTrue("Missing min age current", minAge["current"] is String) assertEquals("Incorrect min docs current", 5, minDocCount["current"]) } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/migration/MigrationServicesIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/migration/MigrationServicesIT.kt index bf727630c..b082352fe 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/migration/MigrationServicesIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/migration/MigrationServicesIT.kt @@ -131,6 +131,7 @@ class MigrationServicesIT : IndexStateManagementRestTestCase() { } } + @Suppress("UNCHECKED_CAST") private fun getTemplatesOrder(): List { val order = catIndexTemplates().map { val row = it as Map diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtilsTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtilsTests.kt index 207ad3845..e21e3d355 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtilsTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtilsTests.kt @@ -5,7 +5,6 @@ package org.opensearch.indexmanagement.indexstatemanagement.util -import org.opensearch.action.delete.DeleteRequest import org.opensearch.common.bytes.BytesReference import org.opensearch.common.unit.ByteSizeValue import org.opensearch.common.unit.TimeValue @@ -28,6 +27,7 @@ import org.opensearch.indexmanagement.opensearchapi.parseWithType import org.opensearch.test.OpenSearchTestCase import java.time.Instant +@Suppress("UnusedPrivateMember") class ManagedIndexUtilsTests : OpenSearchTestCase() { fun `test create managed index request`() { @@ -112,7 +112,6 @@ class ManagedIndexUtilsTests : OpenSearchTestCase() { assertEquals("Too many requests", 1, requests.size) val request = requests.first() assertEquals("Incorrect uuid used as document id on request", sweptConfigToDelete.uuid, request.id()) - assertTrue("Incorrect request type", request is DeleteRequest) } fun `test get swept managed index search request`() { diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateDeleteIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateDeleteIT.kt index 696989d18..16e063270 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateDeleteIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateDeleteIT.kt @@ -59,7 +59,7 @@ class ValidateDeleteIT : IndexStateManagementRestTestCase() { assertEquals( "Index delete action validation status is RE_VALIDATING.", Validate.ValidationStatus.RE_VALIDATING, - data?.validationStatus + data.validationStatus ) } waitFor { @@ -67,7 +67,7 @@ class ValidateDeleteIT : IndexStateManagementRestTestCase() { assertEquals( "Index delete action validation message is index is write index.", ValidateDelete.getFailedIsWriteIndexMessage(index1), - data?.validationMessage + data.validationMessage ) } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateForceMergeIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateForceMergeIT.kt index a9cfd2940..b44ad9ac3 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateForceMergeIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateForceMergeIT.kt @@ -68,12 +68,12 @@ class ValidateForceMergeIT : IndexStateManagementRestTestCase() { assertEquals( "Index force_merge action validation status is RE_VALIDATING.", Validate.ValidationStatus.PASSED, - data?.validationStatus + data.validationStatus ) assertEquals( "Index force_merge action validation status is RE_VALIDATING.", ValidateForceMerge.getValidationPassedMessage(indexName), - data?.validationMessage + data.validationMessage ) } waitFor { diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateOpenIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateOpenIT.kt index f52cbb2ac..0bb00610a 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateOpenIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateOpenIT.kt @@ -66,7 +66,7 @@ class ValidateOpenIT : IndexStateManagementRestTestCase() { assertEquals( "Index open action validation status is PASSED.", Validate.ValidationStatus.PASSED, - data?.validationStatus + data.validationStatus ) } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateReadOnlyIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateReadOnlyIT.kt index 4d9033609..0bbcf9d81 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateReadOnlyIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateReadOnlyIT.kt @@ -64,7 +64,7 @@ class ValidateReadOnlyIT : IndexStateManagementRestTestCase() { assertEquals( "Index read cation validation status is PASSED.", Validate.ValidationStatus.PASSED, - data?.validationStatus + data.validationStatus ) } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateReadWriteIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateReadWriteIT.kt index 25b6aee1c..1b530fb6b 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateReadWriteIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateReadWriteIT.kt @@ -73,7 +73,7 @@ class ValidateReadWriteIT : IndexStateManagementRestTestCase() { assertEquals( "Index read_write action validation status is PASSED.", Validate.ValidationStatus.PASSED, - data?.validationStatus + data.validationStatus ) } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateReplicaCountIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateReplicaCountIT.kt index 90c838f80..60f941588 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateReplicaCountIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateReplicaCountIT.kt @@ -63,7 +63,7 @@ class ValidateReplicaCountIT : IndexStateManagementRestTestCase() { assertEquals( "Index replica_count action validation status is PASSED.", Validate.ValidationStatus.PASSED, - data?.validationStatus + data.validationStatus ) } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateRolloverIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateRolloverIT.kt index 199a9fc15..f1b20c14f 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateRolloverIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateRolloverIT.kt @@ -68,11 +68,11 @@ class ValidateRolloverIT : IndexStateManagementRestTestCase() { val data = getExplainValidationResult(index1) assertEquals( "Index rollover validation status is pass.", - Validate.ValidationStatus.PASSED, data?.validationStatus + Validate.ValidationStatus.PASSED, data.validationStatus ) assertEquals( "Index rollover validation message is skipped rollover", - ValidateRollover.getSkipRolloverMessage(index1), data?.validationMessage + ValidateRollover.getSkipRolloverMessage(index1), data.validationMessage ) } } @@ -115,11 +115,11 @@ class ValidateRolloverIT : IndexStateManagementRestTestCase() { val data = getExplainValidationResult(index1) assertEquals( "Index rollover validation status is PASSED.", - Validate.ValidationStatus.PASSED, data?.validationStatus + Validate.ValidationStatus.PASSED, data.validationStatus ) assertEquals( "Index rollover validation message is already rolled over", - ValidateRollover.getAlreadyRolledOverMessage(index1, aliasName), data?.validationMessage + ValidateRollover.getAlreadyRolledOverMessage(index1, aliasName), data.validationMessage ) } assertTrue("New rollover index does not exist.", indexExists("$indexNameBase-000002")) @@ -159,11 +159,11 @@ class ValidateRolloverIT : IndexStateManagementRestTestCase() { val data = getExplainValidationResult(index1) assertEquals( "Index rollover validation status is RE_VALIDATING", - Validate.ValidationStatus.RE_VALIDATING, data?.validationStatus + Validate.ValidationStatus.RE_VALIDATING, data.validationStatus ) assertEquals( "Index rollover validation message is no alias index setting", - ValidateRollover.getFailedNoValidAliasMessage(index1), data?.validationMessage + ValidateRollover.getFailedNoValidAliasMessage(index1), data.validationMessage ) } } @@ -207,11 +207,11 @@ class ValidateRolloverIT : IndexStateManagementRestTestCase() { val data = getExplainValidationResult(index1) assertEquals( "Index rollover validation status is RE_VALIDATING.", - Validate.ValidationStatus.RE_VALIDATING, data?.validationStatus + Validate.ValidationStatus.RE_VALIDATING, data.validationStatus ) assertEquals( "Index rollover validation message is not write index", - ValidateRollover.getFailedWriteIndexMessage(index1), data?.validationMessage + ValidateRollover.getFailedWriteIndexMessage(index1), data.validationMessage ) } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/refreshanalyzer/RefreshSearchAnalyzerActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/refreshanalyzer/RefreshSearchAnalyzerActionIT.kt index f3d8c243c..a16dbc48f 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/refreshanalyzer/RefreshSearchAnalyzerActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/refreshanalyzer/RefreshSearchAnalyzerActionIT.kt @@ -5,6 +5,7 @@ package org.opensearch.indexmanagement.refreshanalyzer +import org.junit.After import org.junit.Assume import org.junit.Before import org.opensearch.client.Request @@ -20,6 +21,11 @@ import java.nio.file.Files class RefreshSearchAnalyzerActionIT : IndexManagementRestTestCase() { + @After + fun clearIndicesAfterEachTest() { + wipeAllIndices() + } + @Before fun checkIfLocalCluster() { Assume.assumeTrue(isLocalTest) @@ -165,7 +171,6 @@ class RefreshSearchAnalyzerActionIT : IndexManagementRestTestCase() { } companion object { - fun writeToFile(filePath: String, contents: String) { val path = org.opensearch.common.io.PathUtils.get(filePath) Files.newBufferedWriter(path, Charset.forName("UTF-8")).use { writer -> writer.write(contents) } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/refreshanalyzer/RestRefreshSearchAnalyzerActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/refreshanalyzer/RestRefreshSearchAnalyzerActionIT.kt index c12a3df5a..f58ff4d43 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/refreshanalyzer/RestRefreshSearchAnalyzerActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/refreshanalyzer/RestRefreshSearchAnalyzerActionIT.kt @@ -5,6 +5,7 @@ package org.opensearch.indexmanagement.refreshanalyzer +import org.junit.AfterClass import org.opensearch.client.ResponseException import org.opensearch.common.settings.Settings import org.opensearch.indexmanagement.IndexManagementRestTestCase @@ -15,6 +16,13 @@ import org.opensearch.rest.RestStatus class RestRefreshSearchAnalyzerActionIT : IndexManagementRestTestCase() { + companion object { + @AfterClass + @JvmStatic fun clearIndicesAfterClass() { + wipeAllIndices() + } + } + fun `test missing indices`() { try { client().makeRequest(POST.toString(), REFRESH_SEARCH_ANALYZER_BASE_URI) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupMetadataServiceTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupMetadataServiceTests.kt index 265b69b01..66a940187 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupMetadataServiceTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupMetadataServiceTests.kt @@ -150,7 +150,7 @@ class RollupMetadataServiceTests : OpenSearchTestCase() { val metadataService = RollupMetadataService(client, xContentRegistry) val expectedWindowStartTime = localDateAtTimezone("2020-03-08T01:00:00", ZoneId.of("America/Los_Angeles")) - // Should jump to March 3, 2020 at 3AM PST for end time due to DST + // Should jump to March 3, 2020, at 3AM PST for end time due to DST val expectedWindowEndTime = localDateAtTimezone("2020-03-08T03:00:00", ZoneId.of("America/Los_Angeles")) runBlocking { @@ -255,7 +255,7 @@ class RollupMetadataServiceTests : OpenSearchTestCase() { dimensions = dimensions ) - val firstDocTimestamp = "2020-03-22T08:40:15Z" // March 22, 2020 Sunday + val firstDocTimestamp = "2020-03-22T08:40:15Z" // March 22, 2020, Sunday val client = getClient( searchResponse = getSearchResponseForTimestamp(rollup, firstDocTimestamp), searchException = null, @@ -664,7 +664,7 @@ class RollupMetadataServiceTests : OpenSearchTestCase() { listener.onResponse(getResponse) }.whenever(this.mock).get(any(), any()) } - val metadataService = RollupMetadataService(client, xContentRegistry) + RollupMetadataService(client, xContentRegistry) // runBlocking { // val getExistingMetadataResult = metadataService.getExistingMetadata(metadata.id) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt index 3bd41b433..150d935fb 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt @@ -11,6 +11,7 @@ import org.apache.http.entity.ContentType.APPLICATION_JSON import org.apache.http.entity.StringEntity import org.apache.http.message.BasicHeader import org.junit.AfterClass +import org.junit.Before import org.opensearch.client.Response import org.opensearch.client.ResponseException import org.opensearch.client.RestClient @@ -43,12 +44,19 @@ import java.time.Instant abstract class RollupRestTestCase : IndexManagementRestTestCase() { companion object { - @AfterClass @JvmStatic fun clearIndicesAfterClassCompletion() { + @AfterClass + @JvmStatic fun clearIndicesAfterClass() { wipeAllIndices() } } - override fun preserveIndicesUponCompletion(): Boolean = true + @Before + fun setDebugLogLevel() { + client().makeRequest( + "PUT", "_cluster/settings", + StringEntity("""{"transient":{"logger.org.opensearch.indexmanagement.rollup":"DEBUG"}}""", APPLICATION_JSON) + ) + } protected fun createRollup( rollup: Rollup, diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/TestHelpers.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/TestHelpers.kt index 303336922..42bf72953 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/TestHelpers.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/TestHelpers.kt @@ -108,7 +108,7 @@ fun randomRollup(): Rollup { sourceIndex = OpenSearchRestTestCase.randomAlphaOfLength(10).lowercase(Locale.ROOT), targetIndex = OpenSearchRestTestCase.randomAlphaOfLength(10).lowercase(Locale.ROOT), metadataID = if (OpenSearchRestTestCase.randomBoolean()) null else OpenSearchRestTestCase.randomAlphaOfLength(10), - roles = OpenSearchRestTestCase.randomList(10) { OpenSearchRestTestCase.randomAlphaOfLength(10) }, + roles = emptyList(), pageSize = OpenSearchRestTestCase.randomIntBetween(1, 10000), delay = 0, continuous = OpenSearchRestTestCase.randomBoolean(), diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/model/ISMRollupTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/model/ISMRollupTests.kt index d0b35ecff..0a2c15ee5 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/model/ISMRollupTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/model/ISMRollupTests.kt @@ -86,6 +86,7 @@ class ISMRollupTests : OpenSearchTestCase() { assertNotNull(rollup.jobEnabledTime) assertFalse(rollup.continuous) assertTrue(rollup.enabled) + @Suppress("DEPRECATION") assertTrue(rollup.roles.isEmpty()) assertTrue(rollup.isEnabled) } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/model/WriteableTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/model/WriteableTests.kt index 324b6a5ed..feb76c98f 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/model/WriteableTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/model/WriteableTests.kt @@ -114,6 +114,18 @@ class WriteableTests : OpenSearchTestCase() { assertEquals("Round tripping Rollup stream doesn't work", rollup, streamedRollup) } + fun `test rollup roles field deprecation`() { + val rollup = randomRollup().copy( + delay = randomLongBetween(0, 60000000), + roles = listOf("I am deprecated") + ) + val out = BytesStreamOutput().also { rollup.writeTo(it) } + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val streamedRollup = Rollup(sin) + @Suppress("DEPRECATION") + assertTrue("roles field in rollup model is deprecated and should be parsed to empty list.", streamedRollup.roles.isEmpty()) + } + fun `test explain rollup as stream`() { val explainRollup = randomExplainRollup() val out = BytesStreamOutput().also { explainRollup.writeTo(it) } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt index 3dd8eee1b..72727fffe 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt @@ -72,6 +72,7 @@ class RollupRunnerIT : RollupRestTestCase() { } } + @Suppress("UNCHECKED_CAST") fun `test rollup with avg metric`() { val sourceIdxTestName = "source_idx_test" val targetIdxTestName = "target_idx_test" @@ -112,7 +113,7 @@ class RollupRunnerIT : RollupRestTestCase() { assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status) // Term query - var req = """ + val req = """ { "size": 0, "query": { @@ -801,7 +802,8 @@ class RollupRunnerIT : RollupRestTestCase() { assertEquals("Did not have 2 rollups indexed", 2, rollupMetadata.stats.rollupsIndexed) // These are hard to test.. just assert they are more than 0 assertTrue("Did not spend time indexing", rollupMetadata.stats.indexTimeInMillis > 0L) - assertTrue("Did not spend time searching", rollupMetadata.stats.searchTimeInMillis > 0L) + // In some cases it seems that these times are less than 1ms - which causes fails on ubuntu instances (at least that was detected) + assertTrue("Did not spend time searching", rollupMetadata.stats.searchTimeInMillis >= 0L) } fun `test rollup action with alias as target_index successfully`() { @@ -865,6 +867,10 @@ class RollupRunnerIT : RollupRestTestCase() { var rollupMetadata = getRollupMetadata(rollupMetadataID) assertTrue("Did not process any doc during rollup", rollupMetadata.stats.documentsProcessed > 0) + // TODO Flaky: version conflict could happen here + // From log diving, it seems to be a race condition coming from RollupRunner + // (need more dive to understand rollup business logic) + // There are indexRollup happened between get and enable // restart job client().makeRequest( "PUT", @@ -877,8 +883,8 @@ class RollupRunnerIT : RollupRestTestCase() { startedRollup = waitFor { val rollupJob = getRollup(rollupId = rollup.id) assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) - val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) - assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status) + val rollupMetadata1 = getRollupMetadata(rollupJob.metadataID!!) + assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata1.status) rollupJob } @@ -968,8 +974,8 @@ class RollupRunnerIT : RollupRestTestCase() { startedRollup = waitFor { val rollupJob = getRollup(rollupId = rollup.id) assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) - val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) - assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status) + val rollupMetadata1 = getRollupMetadata(rollupJob.metadataID!!) + assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata1.status) rollupJob } @@ -1056,8 +1062,8 @@ class RollupRunnerIT : RollupRestTestCase() { var startedRollup2 = waitFor { val rollupJob = getRollup(rollupId = job2.id) assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) - val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) - assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status) + val rollupMetadata1 = getRollupMetadata(rollupJob.metadataID!!) + assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata1.status) assertTrue("Rollup is not disabled", !rollupJob.enabled) rollupJob } @@ -1084,8 +1090,8 @@ class RollupRunnerIT : RollupRestTestCase() { startedRollup1 = waitFor { val rollupJob = getRollup(rollupId = job1.id) assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) - val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) - assertEquals("Rollup is not finished", RollupMetadata.Status.FAILED, rollupMetadata.status) + val rollupMetadata1 = getRollupMetadata(rollupJob.metadataID!!) + assertEquals("Rollup is not finished", RollupMetadata.Status.FAILED, rollupMetadata1.status) rollupJob } @@ -1167,8 +1173,8 @@ class RollupRunnerIT : RollupRestTestCase() { val startedRollup2 = waitFor { val rollupJob = getRollup(rollupId = job2.id) assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) - val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) - assertEquals("Rollup is not finished", RollupMetadata.Status.FAILED, rollupMetadata.status) + val rollupMetadata1 = getRollupMetadata(rollupJob.metadataID!!) + assertEquals("Rollup is not finished", RollupMetadata.Status.FAILED, rollupMetadata1.status) assertTrue("Rollup is not disabled", !rollupJob.enabled) rollupJob } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SnapshotManagementRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SnapshotManagementRestTestCase.kt index c460c2439..8b694b1d0 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SnapshotManagementRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SnapshotManagementRestTestCase.kt @@ -10,6 +10,7 @@ import org.apache.http.HttpHeaders import org.apache.http.entity.ContentType.APPLICATION_JSON import org.apache.http.entity.StringEntity import org.apache.http.message.BasicHeader +import org.junit.After import org.junit.Before import org.opensearch.client.Response import org.opensearch.client.ResponseException @@ -36,6 +37,11 @@ import java.time.Instant.now abstract class SnapshotManagementRestTestCase : IndexManagementRestTestCase() { + @After + fun clearIndicesAfterEachTest() { + wipeAllIndices() + } + var timeout: Instant = Instant.ofEpochSecond(20) /** diff --git a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/resthandler/RestIndexSnapshotManagementIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/resthandler/RestIndexSnapshotManagementIT.kt index c622584be..5c1b213d2 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/resthandler/RestIndexSnapshotManagementIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/resthandler/RestIndexSnapshotManagementIT.kt @@ -44,6 +44,7 @@ class RestIndexSnapshotManagementIT : SnapshotManagementRestTestCase() { assertEquals("Created and returned snapshot management policies differ", smPolicy.toMap(XCONTENT_WITHOUT_TYPE_AND_USER), responseSMPolicy) } + @Suppress("UNCHECKED_CAST") fun `test updating a snapshot management policy with correct seq_no and primary_term`() { val smPolicy = createSMPolicy(randomSMPolicy()) val updateResponse = client().makeRequest( diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/TestHelpers.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/TestHelpers.kt index baa8b982e..bc07e0f0d 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/TestHelpers.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/TestHelpers.kt @@ -86,7 +86,7 @@ fun randomTransform(): Transform { description = OpenSearchRestTestCase.randomAlphaOfLength(10), sourceIndex = OpenSearchRestTestCase.randomAlphaOfLength(10).lowercase(Locale.ROOT), targetIndex = OpenSearchRestTestCase.randomAlphaOfLength(10).lowercase(Locale.ROOT), - roles = OpenSearchRestTestCase.randomList(10) { OpenSearchRestTestCase.randomAlphaOfLength(10) }, + roles = emptyList(), pageSize = if (isContinuous) OpenSearchRestTestCase.randomIntBetween(1, 1000) else OpenSearchRestTestCase.randomIntBetween(1, 10000), groups = randomGroups(), aggregations = randomAggregationFactories(), diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRestTestCase.kt index 286a3cd19..7e26fa263 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRestTestCase.kt @@ -40,13 +40,12 @@ import java.time.Instant abstract class TransformRestTestCase : IndexManagementRestTestCase() { companion object { - @AfterClass @JvmStatic fun clearIndicesAfterClassCompletion() { + @AfterClass + @JvmStatic fun clearIndicesAfterClass() { wipeAllIndices() } } - override fun preserveIndicesUponCompletion(): Boolean = true - protected fun createTransform( transform: Transform, transformId: String = randomAlphaOfLength(10), diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt index 6b7dbfa00..73cca0ef1 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt @@ -68,8 +68,9 @@ class TransformRunnerIT : TransformRestTestCase() { assertEquals("More than expected pages processed", 3L, metadata.stats.pagesProcessed) assertEquals("More than expected documents indexed", 2L, metadata.stats.documentsIndexed) assertEquals("More than expected documents processed", 5000L, metadata.stats.documentsProcessed) - assertTrue("Doesn't capture indexed time", metadata.stats.indexTimeInMillis > 0) - assertTrue("Didn't capture search time", metadata.stats.searchTimeInMillis > 0) + // In some cases it seems that these times are less than 1ms - which causes fails on ubuntu instances (at least that was detected) + assertTrue("Doesn't capture indexed time", metadata.stats.indexTimeInMillis >= 0) + assertTrue("Didn't capture search time", metadata.stats.searchTimeInMillis >= 0) } fun `test transform with data filter`() { @@ -107,9 +108,9 @@ class TransformRunnerIT : TransformRestTestCase() { assertEquals("More than expected pages processed", 2L, metadata.stats.pagesProcessed) assertEquals("More than expected documents indexed", 1L, metadata.stats.documentsIndexed) assertEquals("More than expected documents processed", 4977L, metadata.stats.documentsProcessed) - assertTrue("Doesn't capture indexed time", metadata.stats.indexTimeInMillis > 0) - // In some cases it seems that the search time is less than 1ms - which causes fails on ubuntu instances (at least that was detected) - // assertTrue("Didn't capture search time", metadata.stats.searchTimeInMillis > 0) + // In some cases it seems that these times are less than 1ms - which causes fails on ubuntu instances (at least that was detected) + assertTrue("Doesn't capture indexed time", metadata.stats.indexTimeInMillis >= 0) + assertTrue("Didn't capture search time", metadata.stats.searchTimeInMillis >= 0) } fun `test invalid transform`() { @@ -206,6 +207,7 @@ class TransformRunnerIT : TransformRestTestCase() { assertTrue("Didn't capture search time", metadata.stats.searchTimeInMillis > 0) } + @Suppress("UNCHECKED_CAST") fun `test transform target index _doc_count against the source index _doc_count`() { val sourceIdxTestName = "source_idx_test" val targetIdxTestName = "target_idx_test" @@ -247,7 +249,7 @@ class TransformRunnerIT : TransformRestTestCase() { val transformMetadata = getTransformMetadata(transformJob.metadataId!!) assertEquals("Transform is not finished", TransformMetadata.Status.FINISHED, transformMetadata.status) - var req = """ + val req = """ { "size": 0, "aggs": { @@ -451,8 +453,9 @@ class TransformRunnerIT : TransformRestTestCase() { assertEquals("More than expected pages processed", 2L, metadata.stats.pagesProcessed) assertEquals("More than expected documents indexed", 2L, metadata.stats.documentsIndexed) assertEquals("More than expected documents processed", 10000L, metadata.stats.documentsProcessed) - assertTrue("Doesn't capture indexed time", metadata.stats.indexTimeInMillis > 0) - assertTrue("Didn't capture search time", metadata.stats.searchTimeInMillis > 0) + // In some cases it seems that these times are less than 1ms - which causes fails on ubuntu instances (at least that was detected) + assertTrue("Doesn't capture indexed time", metadata.stats.indexTimeInMillis >= 0) + assertTrue("Didn't capture search time", metadata.stats.searchTimeInMillis >= 0) } fun `test no-op execution when no buckets have been modified`() { @@ -912,7 +915,7 @@ class TransformRunnerIT : TransformRestTestCase() { assertTrue("Doesn't capture indexed time", firstIterationMetadata.stats.indexTimeInMillis > 0) assertTrue("Didn't capture search time", firstIterationMetadata.stats.searchTimeInMillis > 0) - // Get all of the buckets + // Get all the buckets var hits = waitFor { val response = client().makeRequest( "GET", "${transform.targetIndex}/_search", diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/model/WriteableTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/model/WriteableTests.kt index 1cd5a59ef..5072bd126 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/model/WriteableTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/model/WriteableTests.kt @@ -28,4 +28,12 @@ class WriteableTests : OpenSearchTestCase() { val streamedTransform = Transform(buildStreamInputForTransforms(out)) assertEquals("Round tripping Transform stream doesn't work", transform, streamedTransform) } + + fun `test transform roles field deprecation`() { + val transform = randomTransform().copy(roles = listOf("I am deprecated")) + val out = BytesStreamOutput().also { transform.writeTo(it) } + val streamedTransform = Transform(buildStreamInputForTransforms(out)) + @Suppress("DEPRECATION") + assertTrue("roles field in transform model is deprecated and should be parsed to empty list.", streamedTransform.roles.isEmpty()) + } }