Skip to content

Commit

Permalink
Backport from main to 2.x (opensearch-project#614)
Browse files Browse the repository at this point in the history
* Fix all the compile warnings and detekt issues (opensearch-project#603)

* Fix all the compile warnings and detekt issues

Signed-off-by: bowenlan-amzn <[email protected]>

* Fix time capture is 0

Signed-off-by: bowenlan-amzn <[email protected]>

Signed-off-by: bowenlan-amzn <[email protected]>

* Unify test clean logic (opensearch-project#609)

* Unify wipe indices logic after tests

Signed-off-by: bowenlan-amzn <[email protected]>

* Enhance wipeAllIndices function

Signed-off-by: bowenlan-amzn <[email protected]>

* Customize cleanup for multi node test

Signed-off-by: bowenlan-amzn <[email protected]>

Signed-off-by: bowenlan-amzn <[email protected]>

Signed-off-by: bowenlan-amzn <[email protected]>
Signed-off-by: Ronnak Saxena <[email protected]>
  • Loading branch information
bowenlan-amzn authored and ronnaksaxena committed Jul 19, 2023
1 parent bc20e63 commit 8989339
Show file tree
Hide file tree
Showing 82 changed files with 419 additions and 319 deletions.
32 changes: 32 additions & 0 deletions .github/workflows/bwc-test-workflow.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
name: Backward compatibility test workflow
on:
pull_request:
branches:
- "*"
push:
branches:
- "*"

jobs:
test:
# This job runs on Linux
runs-on: ubuntu-latest
steps:
# This step uses the setup-java Github action: https://github.com/actions/setup-java
- name: Set Up JDK
uses: actions/setup-java@v1
with:
java-version: 17
# index-management
- name: Checkout Branch
uses: actions/checkout@v2
- name: Run IM Backwards Compatibility Tests
run: |
echo "Running backwards compatibility tests..."
./gradlew bwcTestSuite
- name: Upload failed logs
uses: actions/upload-artifact@v2
if: failure()
with:
name: logs
path: build/testclusters/indexmanagementBwcCluster*/logs/*
1 change: 1 addition & 0 deletions .github/workflows/create-documentation-issue.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ name: Create Documentation Issue
on:
pull_request:
types:
- closed
- labeled
env:
PR_NUMBER: ${{ github.event.number }}
Expand Down
18 changes: 0 additions & 18 deletions .github/workflows/dco.yml

This file was deleted.

2 changes: 1 addition & 1 deletion .github/workflows/links.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ on:
branches: [ main ]

jobs:
linkchecker:
check:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
Expand Down
25 changes: 3 additions & 22 deletions .github/workflows/multi-node-test-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,15 @@ on:
- "*"

jobs:
build:
# Job name
name: Build Index Management
test:
# This job runs on Linux
runs-on: ubuntu-latest
steps:
# This step uses the setup-java Github action: https://github.com/actions/setup-java
- name: Set Up JDK 11
- name: Set Up JDK
uses: actions/setup-java@v1
with:
java-version: 11
java-version: 17
# index-management
- name: Checkout Branch
uses: actions/checkout@v2
Expand All @@ -31,20 +29,3 @@ jobs:
with:
name: logs
path: build/testclusters/integTest-*/logs/*
bwc:
name: Run Index Management Backwards Compatibility Tests
# This job runs on Linux
runs-on: ubuntu-latest
steps:
# This step uses the setup-java Github action: https://github.com/actions/setup-java
- name: Set Up JDK 11
uses: actions/setup-java@v1
with:
java-version: 11
# index-management
- name: Checkout Branch
uses: actions/checkout@v2
- name: Run IM Backwards Compatibility Tests
run: |
echo "Running backwards compatibility tests..."
./gradlew bwcTestSuite
2 changes: 0 additions & 2 deletions .github/workflows/test-and-build-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ on:

jobs:
build:
# Job name
name: Build Index Management
env:
BUILD_ARGS: ${{ matrix.os_build_args }}
WORKING_DIR: ${{ matrix.working_directory }}.
Expand Down
15 changes: 13 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -137,20 +137,24 @@ task ktlint(type: JavaExec, group: "verification") {
classpath = configurations.ktlint
args "src/**/*.kt", "spi/src/main/**/*.kt"
}

check.dependsOn ktlint

task ktlintFormat(type: JavaExec, group: "formatting") {
description = "Fix Kotlin code style deviations."
main = "com.pinterest.ktlint.Main"
classpath = configurations.ktlint
args "-F", "src/**/*.kt", "spi/src/main/**/*.kt"
// https://github.com/pinterest/ktlint/issues/1391
jvmArgs "--add-opens=java.base/java.lang=ALL-UNNAMED"
}

detekt {
config = files("detekt.yml")
buildUponDefaultConfig = true
}
// When writing detekt Gradle first finds the extension with this name,
// but with a string it should look for a task with that name instead
check.dependsOn "detekt"

configurations.testImplementation {
exclude module: "securemock"
Expand Down Expand Up @@ -656,7 +660,14 @@ run {
}
}

compileKotlin { kotlinOptions.freeCompilerArgs = ['-Xjsr305=strict'] }
compileKotlin {
kotlinOptions.freeCompilerArgs = ['-Xjsr305=strict']
kotlinOptions.allWarningsAsErrors = true
}

compileTestKotlin {
kotlinOptions.allWarningsAsErrors = true
}

apply from: 'build-tools/pkgbuild.gradle'

Expand Down
5 changes: 3 additions & 2 deletions detekt.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# TODO: Remove this before initial release, only for developmental purposes
build:
maxIssues: 20
maxIssues: 0

exceptions:
TooGenericExceptionCaught:
Expand All @@ -14,6 +13,8 @@ style:
MaxLineLength:
maxLineLength: 150
excludes: ['**/test/**']
FunctionOnlyReturningConstant:
active: false

complexity:
LargeClass:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import org.opensearch.action.index.IndexRequest
import org.opensearch.action.support.IndicesOptions
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.client.Client
import org.opensearch.cluster.LocalNodeMasterListener
import org.opensearch.cluster.LocalNodeClusterManagerListener
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.ToXContent
Expand Down Expand Up @@ -47,7 +47,7 @@ class IndexStateManagementHistory(
private val threadPool: ThreadPool,
private val clusterService: ClusterService,
private val indexManagementIndices: IndexManagementIndices
) : LocalNodeMasterListener {
) : LocalNodeClusterManagerListener {

private val logger = LogManager.getLogger(javaClass)
private var scheduledRollover: Scheduler.Cancellable? = null
Expand All @@ -61,7 +61,7 @@ class IndexStateManagementHistory(
@Volatile private var historyNumberOfReplicas = ManagedIndexSettings.HISTORY_NUMBER_OF_REPLICAS.get(settings)

init {
clusterService.addLocalNodeMasterListener(this)
clusterService.addLocalNodeClusterManagerListener(this)
clusterService.clusterSettings.addSettingsUpdateConsumer(ManagedIndexSettings.HISTORY_ENABLED) {
historyEnabled = it
}
Expand All @@ -82,7 +82,7 @@ class IndexStateManagementHistory(
}
}

override fun onMaster() {
override fun onClusterManager() {
try {
// try to rollover immediately as we might be restarting the cluster
if (historyEnabled) rolloverHistoryIndex()
Expand All @@ -97,12 +97,12 @@ class IndexStateManagementHistory(
}
}

override fun offMaster() {
override fun offClusterManager() {
scheduledRollover?.cancel()
}

private fun rescheduleRollover() {
if (clusterService.state().nodes.isLocalNodeElectedMaster) {
if (clusterService.state().nodes.isLocalNodeElectedClusterManager) {
scheduledRollover?.cancel()
scheduledRollover = threadPool.scheduleWithFixedDelay(
{ rolloverAndDeleteHistoryIndex() },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,10 @@ class ManagedIndexCoordinator(
}
clusterService.clusterSettings.addSettingsUpdateConsumer(METADATA_SERVICE_STATUS) {
metadataServiceEnabled = it == 0
if (!metadataServiceEnabled) scheduledMoveMetadata?.cancel()
else initMoveMetadata()
if (!metadataServiceEnabled) {
logger.info("Canceling metadata moving job because of cluster setting update.")
scheduledMoveMetadata?.cancel()
} else initMoveMetadata()
}
clusterService.clusterSettings.addSettingsUpdateConsumer(TEMPLATE_MIGRATION_CONTROL) {
templateMigrationEnabled = it >= 0L
Expand Down Expand Up @@ -202,8 +204,8 @@ class ManagedIndexCoordinator(
// Instead of using a LocalNodeMasterListener to track cluster manager changes, this service will
// track them here to avoid conditions where cluster manager listener events run after other
// listeners that depend on what happened in the cluster manager listener
if (this.isClusterManager != event.localNodeMaster()) {
this.isClusterManager = event.localNodeMaster()
if (this.isClusterManager != event.localNodeClusterManager()) {
this.isClusterManager = event.localNodeClusterManager()
if (this.isClusterManager) {
onClusterManager()
} else {
Expand All @@ -215,7 +217,7 @@ class ManagedIndexCoordinator(

if (event.isNewCluster) return

if (!event.localNodeMaster()) return
if (!event.localNodeClusterManager()) return

if (!event.metadataChanged()) return

Expand Down Expand Up @@ -380,7 +382,7 @@ class ManagedIndexCoordinator(
}

/**
* Find a policy that has highest priority ism template with matching index pattern to the index and is created before index creation date. If
* Find a policy that has the highest priority ism template with matching index pattern to the index and is created before index creation date. If
* the policy has user, ensure that the user can manage the index if not find the one that can.
* */
private suspend fun findMatchingPolicy(indexName: String, creationDate: Long, policies: List<Policy>): Policy? {
Expand Down Expand Up @@ -422,7 +424,7 @@ class ManagedIndexCoordinator(
try {
val request = ManagedIndexRequest().indices(indexName)
withClosableContext(IndexManagementSecurityContext("ApplyPolicyOnIndexCreation", settings, threadPool.threadContext, policy.user)) {
val response: AcknowledgedResponse = client.suspendUntil { execute(ManagedIndexAction.INSTANCE, request, it) }
client.suspendUntil<Client, AcknowledgedResponse> { execute(ManagedIndexAction.INSTANCE, request, it) }
}
} catch (e: OpenSearchSecurityException) {
logger.debug("Skipping applying policy ${policy.id} on $indexName as the policy user is missing permissions", e)
Expand Down Expand Up @@ -473,13 +475,13 @@ class ManagedIndexCoordinator(
// If ISM is disabled return early
if (!isIndexStateManagementEnabled()) return

// Do not setup background sweep if we're not the elected cluster manager node
if (!clusterService.state().nodes().isLocalNodeElectedMaster) return
// Do not set up background sweep if we're not the elected cluster manager node
if (!clusterService.state().nodes().isLocalNodeElectedClusterManager) return

// Cancel existing background sweep
scheduledFullSweep?.cancel()

// Setup an anti-entropy/self-healing background sweep, in case we fail to create a ManagedIndexConfig job
// Set up an anti-entropy/self-healing background sweep, in case we fail to create a ManagedIndexConfig job
val scheduledSweep = Runnable {
val elapsedTime = getFullSweepElapsedTime()

Expand All @@ -505,7 +507,7 @@ class ManagedIndexCoordinator(
fun initMoveMetadata() {
if (!metadataServiceEnabled) return
if (!isIndexStateManagementEnabled()) return
if (!clusterService.state().nodes().isLocalNodeElectedMaster) return
if (!clusterService.state().nodes().isLocalNodeElectedClusterManager) return
scheduledMoveMetadata?.cancel()

if (metadataService.finishFlag) {
Expand Down Expand Up @@ -535,7 +537,7 @@ class ManagedIndexCoordinator(
fun initTemplateMigration(enableSetting: Long) {
if (!templateMigrationEnabled) return
if (!isIndexStateManagementEnabled()) return
if (!clusterService.state().nodes().isLocalNodeElectedMaster) return
if (!clusterService.state().nodes().isLocalNodeElectedClusterManager) return
scheduledTemplateMigration?.cancel()

// if service has finished, re-enable it
Expand Down Expand Up @@ -657,8 +659,7 @@ class ManagedIndexCoordinator(
if (scrollIDsToClear.isNotEmpty()) {
val clearScrollRequest = ClearScrollRequest()
clearScrollRequest.scrollIds(scrollIDsToClear.toList())
val clearScrollResponse: ClearScrollResponse =
client.suspendUntil { execute(ClearScrollAction.INSTANCE, clearScrollRequest, it) }
client.suspendUntil<Client, ClearScrollResponse> { execute(ClearScrollAction.INSTANCE, clearScrollRequest, it) }
}
}
return managedIndexUuids
Expand Down Expand Up @@ -693,7 +694,7 @@ class ManagedIndexCoordinator(
val mRes: MultiGetResponse = client.suspendUntil { multiGet(mReq, it) }
val responses = mRes.responses
if (responses.first().isFailed) {
// config index may not initialised yet
// config index may not initialise yet
logger.error("get managed-index failed: ${responses.first().failure.failure}")
return result
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,6 @@ object ManagedIndexRunner :
// the cluster state index uuid differs from the one in the managed index config then the config is referring
// to a different index which does not exist in the cluster. We need to check all of the extensions to confirm an index exists
if (clusterStateIndexMetadata == null || clusterStateIndexUUID != managedIndexConfig.indexUuid) {
clusterStateIndexMetadata = null
// If the cluster state/default index type didn't have an index with a matching name and uuid combination, try all other index types
val nonDefaultIndexTypes = indexMetadataProvider.services.keys.filter { it != DEFAULT_INDEX_TYPE }
val multiTypeIndexNameToMetaData =
Expand Down Expand Up @@ -387,7 +386,7 @@ object ManagedIndexRunner :
// If this action is not allowed and the step to be executed is the first step in the action then we will fail
// as this action has been removed from the AllowList, but if its not the first step we will let it finish as it's already inflight
if (action?.isAllowed(allowList) == false && step != null && action.isFirstStep(step.name) && action.type != TransitionsAction.name) {
val info = mapOf("message" to "Attempted to execute action=${action?.type} which is not allowed.")
val info = mapOf("message" to "Attempted to execute action=${action.type} which is not allowed.")
val updated = updateManagedIndexMetaData(
managedIndexMetaData.copy(
policyRetryInfo = PolicyRetryInfoMetaData(true, 0), info = info
Expand All @@ -404,7 +403,13 @@ object ManagedIndexRunner :
@Suppress("ComplexCondition", "MaxLineLength")
if (updateResult.metadataSaved && state != null && action != null && step != null && currentActionMetaData != null) {
if (validationServiceEnabled) {
val validationResult = actionValidation.validate(action.type, stepContext.metadata.index)
val validationResult = withClosableContext(
IndexManagementSecurityContext(
managedIndexConfig.id, settings, threadPool.threadContext, managedIndexConfig.policy.user
)
) {
actionValidation.validate(action.type, stepContext.metadata.index)
}
if (validationResult.validationStatus == Validate.ValidationStatus.RE_VALIDATING) {
logger.warn("Validation Status is: RE_VALIDATING. The action is {}, state is {}, step is {}.\", action.type, state.name, step.name")
publishErrorNotification(policy, managedIndexMetaData)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,12 @@ class MetadataService(

override fun onResponse(response: ClusterUpdateSettingsResponse) {
if (!response.isAcknowledged) {
logger.error("Update template migration setting to $status is not acknowledged")
logger.error("Update metadata migration setting to $status is not acknowledged")
throw IndexManagementException.wrap(
Exception("Update template migration setting to $status is not acknowledged")
Exception("Update metadata migration setting to $status is not acknowledged")
)
} else {
logger.info("Successfully update template migration setting to $status")
logger.info("Successfully metadata template migration setting to $status")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ data class ManagedIndexConfig(
policySeqNo = policySeqNo,
policyPrimaryTerm = policyPrimaryTerm,
policy = policy?.copy(
id = policyID ?: NO_ID,
id = policyID,
seqNo = policySeqNo ?: SequenceNumbers.UNASSIGNED_SEQ_NO,
primaryTerm = policyPrimaryTerm ?: SequenceNumbers.UNASSIGNED_PRIMARY_TERM
),
Expand Down
Loading

0 comments on commit 8989339

Please sign in to comment.