Skip to content

Commit

Permalink
Adds continuous field to managed index metadata
Browse files Browse the repository at this point in the history
Signed-off-by: Ronnak Saxena <[email protected]>
  • Loading branch information
downsrob authored and ronnaksaxena committed Jul 26, 2022
1 parent 76de976 commit f9a1f6b
Show file tree
Hide file tree
Showing 45 changed files with 558 additions and 154 deletions.
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import java.util.function.Predicate

buildscript {
ext {
isSnapshot = "true" == System.getProperty("build.snapshot", "true")
opensearch_version = System.getProperty("opensearch.version", "2.1.0-SNAPSHOT")
isSnapshot = "true" == System.getProperty("build.snapshot", "false")
opensearch_version = System.getProperty("opensearch.version", "2.1.0")
buildVersionQualifier = System.getProperty("build.version_qualifier", "")
// 2.0.0-SNAPSHOT -> 2.0.0.0-SNAPSHOT
version_tokens = opensearch_version.tokenize('-')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedInde
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.PolicyRetryInfoMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StateMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import org.opensearch.indexmanagement.util.releaseLockForScheduledJob
import org.opensearch.indexmanagement.util.renewLockForScheduledJob
import org.opensearch.jobscheduler.spi.JobExecutionContext
import org.opensearch.jobscheduler.spi.LockModel
import org.opensearch.jobscheduler.spi.ScheduledJobParameter
Expand Down Expand Up @@ -219,33 +221,49 @@ object ManagedIndexRunner :
}

// Attempt to acquire lock
val lock: LockModel? = context.lockService.suspendUntil { acquireLock(job, context, it) }
var lock: LockModel? = context.lockService.suspendUntil { acquireLock(job, context, it) }
if (lock == null) {
logger.debug("Could not acquire lock [${lock?.lockId}] for ${job.index}")
} else {
runManagedIndexConfig(job, context)
if (job?.continuous) {
var keepExecuting: Boolean = true
// Need to execute at least once for policy to initialize
do {
// Need to renew lock for current step execution
val renewedLock = renewLockForScheduledJob(context, lock as LockModel, errorNotificationRetryPolicy)
// Failed to renew lock
if (renewedLock == null) {
logger.error("Could not renew lock [${lock?.lockId}] for ${job.index}")
break
} else {
lock = renewedLock as LockModel
keepExecuting = runManagedIndexConfig(job, context)
}
} while ((job.continuous && keepExecuting)) // Runs until job is no longer continuous or execution should stop
} else { // If job is not continuous run once
runManagedIndexConfig(job, context)
}
// Release lock
val released: Boolean = context.lockService.suspendUntil { release(lock, it) }
if (!released) {
logger.debug("Could not release lock [${lock.lockId}] for ${job.index}")
if (lock == null || !releaseLockForScheduledJob(context, lock as LockModel)) {
logger.debug("Could not release lock [${lock?.lockId}] for ${job.index}")
}
}
}
}

@Suppress("ReturnCount", "ComplexMethod", "LongMethod", "ComplexCondition", "NestedBlockDepth")
private suspend fun runManagedIndexConfig(managedIndexConfig: ManagedIndexConfig, jobContext: JobExecutionContext) {
private suspend fun runManagedIndexConfig(managedIndexConfig: ManagedIndexConfig, jobContext: JobExecutionContext): Boolean {
logger.debug("Run job for index ${managedIndexConfig.index}")
// doing a check of local cluster health as we do not want to overload cluster manager node with potentially a lot of calls
if (clusterIsRed()) {
logger.debug("Skipping current execution of ${managedIndexConfig.index} because of red cluster health")
return
return false
}

val (managedIndexMetaData, getMetadataSuccess) = client.getManagedIndexMetadata(managedIndexConfig.indexUuid)
if (!getMetadataSuccess) {
logger.info("Failed to retrieve managed index metadata of index [${managedIndexConfig.index}] from config index, abort this run.")
return
return false
}

// Check the cluster state for the index metadata
Expand All @@ -267,28 +285,28 @@ object ManagedIndexRunner :
// If no index types had an index with a matching name and uuid combination, return
if (!someTypeMatchedUuid) {
logger.warn("Failed to find IndexMetadata for ${managedIndexConfig.index}.")
return
return false
}
} else {
val clusterStateMetadata = clusterStateIndexMetadata.getManagedIndexMetadata()
val metadataCheck = checkMetadata(clusterStateMetadata, managedIndexMetaData, managedIndexConfig.indexUuid, logger)
if (metadataCheck != MetadataCheck.SUCCESS) {
logger.info("Skipping execution while metadata status is $metadataCheck")
return
return false
}
}

// If policy or managedIndexMetaData is null then initialize
val policy = managedIndexConfig.policy
if (policy == null || managedIndexMetaData == null) {
initManagedIndex(managedIndexConfig, managedIndexMetaData)
return
return true
}

// If the policy was completed or failed then return early and disable job so it stops scheduling work
if (managedIndexMetaData.policyCompleted == true || managedIndexMetaData.isFailed) {
disableManagedIndexConfig(managedIndexConfig)
return
return false
}

if (managedIndexMetaData.hasVersionConflict(managedIndexConfig)) {
Expand All @@ -302,7 +320,7 @@ object ManagedIndexRunner :
if (result.metadataSaved) {
disableManagedIndexConfig(managedIndexConfig)
}
return
return false
}

val state = policy.getStateToExecute(managedIndexMetaData)
Expand All @@ -317,7 +335,7 @@ object ManagedIndexRunner :
// then disable the job and return early
if (!indexStateManagementEnabled && step != null && step.isSafeToDisableOn) {
disableManagedIndexConfig(managedIndexConfig)
return
return false
}

if (action?.hasTimedOut(currentActionMetaData) == true) {
Expand All @@ -328,19 +346,19 @@ object ManagedIndexRunner :
.copy(actionMetaData = currentActionMetaData?.copy(failed = true), info = info)
)
if (updated.metadataSaved) disableManagedIndexConfig(managedIndexConfig)
return
return false
}

if (managedIndexConfig.shouldChangePolicy(managedIndexMetaData, action)) {
initChangePolicy(managedIndexConfig, managedIndexMetaData, action)
return
return true
}

val shouldBackOff = action?.shouldBackoff(currentActionMetaData, action.configRetry)
if (shouldBackOff?.first == true) {
// If we should back off then exit early.
logger.info("Backoff for retrying. Remaining time ${shouldBackOff.second}")
return
return false
}

if (managedIndexMetaData.stepMetaData?.stepStatus == Step.StepStatus.STARTING) {
Expand All @@ -353,8 +371,10 @@ object ManagedIndexRunner :
policyRetryInfo = PolicyRetryInfoMetaData(true, 0), info = info
)
)
if (updated.metadataSaved) disableManagedIndexConfig(managedIndexConfig)
return
if (updated.metadataSaved) {
disableManagedIndexConfig(managedIndexConfig)
return false
}
}
}

Expand All @@ -368,7 +388,7 @@ object ManagedIndexRunner :
)
)
if (updated.metadataSaved) disableManagedIndexConfig(managedIndexConfig)
return
return false
}

// If this action is not allowed and the step to be executed is the first step in the action then we will fail
Expand All @@ -381,7 +401,7 @@ object ManagedIndexRunner :
)
)
if (updated.metadataSaved) disableManagedIndexConfig(managedIndexConfig)
return
return false
}

// If any of State, Action, Step components come back as null then we are moving to error in ManagedIndexMetaData
Expand Down Expand Up @@ -412,12 +432,11 @@ object ManagedIndexRunner :
executedManagedIndexMetaData = executedManagedIndexMetaData.copy(info = mutableInfo.toMap())
}
}

if (executedManagedIndexMetaData.isSuccessfulDelete) {
GlobalScope.launch(Dispatchers.IO + CoroutineName("ManagedIndexMetaData-AddHistory")) {
ismHistory.addManagedIndexMetaDataHistory(listOf(executedManagedIndexMetaData))
}
return
return false
}

// If a custom action deletes some off-cluster index and has deleteIndexMetadataAfterFinish set to true,
Expand All @@ -426,18 +445,25 @@ object ManagedIndexRunner :
if (action.isFinishedSuccessfully(executedManagedIndexMetaData)) {
if (action.deleteIndexMetadataAfterFinish()) {
deleteFromManagedIndex(managedIndexConfig, action.type)
return
}
}

if (!updateManagedIndexMetaData(executedManagedIndexMetaData, updateResult).metadataSaved) {
logger.error("Failed to update ManagedIndexMetaData after executing the Step : ${step.name}")
return false
}

if (managedIndexConfig.hasDifferentJobInterval(jobInterval)) {
updateJobInterval(managedIndexConfig, jobInterval)
}
// Check that transition condition evaluated to false
if (executedManagedIndexMetaData.stepMetaData?.stepStatus == Step.StepStatus.CONDITION_NOT_MET) {
return false
}
// Made it to end of successful execution block
return true
}
return false
}

private suspend fun initManagedIndex(managedIndexConfig: ManagedIndexConfig, managedIndexMetaData: ManagedIndexMetaData?) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ data class ChangePolicy(
val state: String?,
val include: List<StateFilter>,
val isSafe: Boolean,
val user: User? = null
val user: User? = null,
var continuous: Boolean? = null

) : Writeable, ToXContentObject {

@Throws(IOException::class)
Expand All @@ -46,16 +48,19 @@ data class ChangePolicy(
isSafe = sin.readBoolean(),
user = if (sin.readBoolean()) {
User(sin)
} else null
} else null,
continuous = if (sin.readBoolean()) { sin.readBoolean() } else null
)

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder
.startObject()
builder.startObject()
.field(ManagedIndexConfig.POLICY_ID_FIELD, policyID)
.field(StateMetaData.STATE, state)
.field(IS_SAFE_FIELD, isSafe)
if (params.paramAsBoolean(WITH_USER, true)) builder.optionalUserField(USER_FIELD, user)
if (continuous != null) {
builder.field(ManagedIndexConfig.CONTINUOUS, continuous)
}
return builder.endObject()
}

Expand All @@ -67,6 +72,7 @@ data class ChangePolicy(
out.writeBoolean(isSafe)
out.writeBoolean(user != null)
user?.writeTo(out)
if (continuous != null) { out.writeBoolean(continuous as Boolean) }
}

companion object {
Expand All @@ -75,6 +81,7 @@ data class ChangePolicy(
const val INCLUDE_FIELD = "include"
const val IS_SAFE_FIELD = "is_safe"
const val USER_FIELD = "user"
const val CONTINUOUS_FIELD = "continuous"

@JvmStatic
@Throws(IOException::class)
Expand All @@ -83,6 +90,7 @@ data class ChangePolicy(
var state: String? = null
var isSafe: Boolean = false
var user: User? = null
var continuous: Boolean? = null
val include = mutableListOf<StateFilter>()

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
Expand All @@ -103,6 +111,7 @@ data class ChangePolicy(
USER_FIELD -> {
user = if (xcp.currentToken() == Token.VALUE_NULL) null else User.parse(xcp)
}
CONTINUOUS_FIELD -> continuous = xcp.booleanValue()
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in ChangePolicy.")
}
}
Expand All @@ -112,7 +121,8 @@ data class ChangePolicy(
state,
include.toList(),
isSafe,
user
user,
continuous
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ data class ManagedIndexConfig(
val policyPrimaryTerm: Long?,
val policy: Policy?,
val changePolicy: ChangePolicy?,
val jobJitter: Double?
val jobJitter: Double?,
val continuous: Boolean
) : ScheduledJobParameter {

init {
Expand Down Expand Up @@ -81,6 +82,7 @@ data class ManagedIndexConfig(
.field(POLICY_FIELD, policy, XCONTENT_WITHOUT_TYPE)
.field(CHANGE_POLICY_FIELD, changePolicy)
.field(JITTER, jobJitter)
.field(CONTINUOUS, continuous)
builder.endObject()
return builder.endObject()
}
Expand All @@ -100,6 +102,7 @@ data class ManagedIndexConfig(
const val POLICY_PRIMARY_TERM_FIELD = "policy_primary_term"
const val CHANGE_POLICY_FIELD = "change_policy"
const val JITTER = "jitter"
const val CONTINUOUS = "continuous"

@Suppress("ComplexMethod", "LongMethod")
@JvmStatic
Expand All @@ -124,6 +127,7 @@ data class ManagedIndexConfig(
var policyPrimaryTerm: Long? = SequenceNumbers.UNASSIGNED_PRIMARY_TERM
var policySeqNo: Long? = SequenceNumbers.UNASSIGNED_SEQ_NO
var jitter: Double? = null
var continuous: Boolean = false

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
Expand Down Expand Up @@ -154,6 +158,7 @@ data class ManagedIndexConfig(
JITTER -> {
jitter = if (xcp.currentToken() == Token.VALUE_NULL) null else xcp.doubleValue()
}
CONTINUOUS -> continuous = xcp.booleanValue()
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in ManagedIndexConfig.")
}
}
Expand Down Expand Up @@ -183,7 +188,8 @@ data class ManagedIndexConfig(
primaryTerm = policyPrimaryTerm ?: SequenceNumbers.UNASSIGNED_PRIMARY_TERM
),
changePolicy = changePolicy,
jobJitter = jitter
jobJitter = jitter,
continuous = continuous
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ class RestAddPolicyAction : BaseRestHandler() {

val policyID = requireNotNull(body.getOrDefault("policy_id", null)) { "Missing policy_id" }

val addPolicyRequest = AddPolicyRequest(indices.toList(), policyID as String, indexType)
val continuous = body?.get("continuous") as Boolean

val addPolicyRequest = AddPolicyRequest(indices.toList(), policyID as String, indexType, continuous)

return RestChannelConsumer { channel ->
client.execute(AddPolicyAction.INSTANCE, addPolicyRequest, RestToXContentListener(channel))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ class RestChangePolicyAction : BaseRestHandler() {
val xcp = request.contentParser()
ensureExpectedToken(Token.START_OBJECT, xcp.nextToken(), xcp)
val changePolicy = ChangePolicy.parse(xcp)

val changePolicyRequest = ChangePolicyRequest(indices.toList(), changePolicy, indexType)
val continuous = if (changePolicy.continuous != null) changePolicy.continuous as Boolean else false
val changePolicyRequest = ChangePolicyRequest(indices.toList(), changePolicy.copy(continuous = null), indexType, continuous = continuous)

return RestChannelConsumer { channel ->
client.execute(ChangePolicyAction.INSTANCE, changePolicyRequest, RestToXContentListener(channel))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : ShrinkStep(name,
shardStats: Array<ShardStats>,
indexSizeInBytes: Long
): List<String> {
val nodesStatsReq = NodesStatsRequest().addMetric(OS_METRIC)
val nodesStatsReq = NodesStatsRequest().addMetric(FS_METRIC)
val nodeStatsResponse: NodesStatsResponse = stepContext.client.admin().cluster().suspendUntil { nodesStats(nodesStatsReq, it) }
val nodesList = nodeStatsResponse.nodes.filter { it.node.isDataNode }
// Sort in increasing order of keys, in our case this is memory remaining
Expand Down Expand Up @@ -393,7 +393,7 @@ class AttemptMoveShardsStep(private val action: ShrinkAction) : ShrinkStep(name,
override fun isIdempotent() = true

companion object {
const val OS_METRIC = "os"
const val FS_METRIC = "fs"
const val ROUTING_SETTING = "index.routing.allocation.require._name"
const val DEFAULT_TARGET_SUFFIX = "_shrunken"
const val name = "attempt_move_shards_step"
Expand Down
Loading

0 comments on commit f9a1f6b

Please sign in to comment.