Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Continuous Execution + Execute API #462

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.MetadataService
import org.opensearch.indexmanagement.indexstatemanagement.SkipExecution
import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig
import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
import org.opensearch.indexmanagement.indexstatemanagement.resthandler.RestExecutePolicyAction
import org.opensearch.indexmanagement.indexstatemanagement.resthandler.RestAddPolicyAction
import org.opensearch.indexmanagement.indexstatemanagement.resthandler.RestChangePolicyAction
import org.opensearch.indexmanagement.indexstatemanagement.resthandler.RestDeletePolicyAction
Expand All @@ -56,6 +57,8 @@ import org.opensearch.indexmanagement.indexstatemanagement.transport.action.chan
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.changepolicy.TransportChangePolicyAction
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.deletepolicy.DeletePolicyAction
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.deletepolicy.TransportDeletePolicyAction
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.executepolicy.ExecutePolicyAction
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.executepolicy.TransportExecutePolicyAction
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.explain.ExplainAction
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.explain.TransportExplainAction
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.getpolicy.GetPoliciesAction
Expand Down Expand Up @@ -330,13 +333,14 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
RestExplainTransformAction(),
RestStartTransformAction(),
RestStopTransformAction(),
RestExecutePolicyAction(),
RestGetSMPolicyHandler(),
RestStartSMPolicyHandler(),
RestStopSMPolicyHandler(),
RestExplainSMPolicyHandler(),
RestDeleteSMPolicyHandler(),
RestCreateSMPolicyHandler(),
RestUpdateSMPolicyHandler()
RestUpdateSMPolicyHandler(),
)
}

Expand Down Expand Up @@ -522,6 +526,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
ActionPlugin.ActionHandler(RemovePolicyAction.INSTANCE, TransportRemovePolicyAction::class.java),
ActionPlugin.ActionHandler(RefreshSearchAnalyzerAction.INSTANCE, TransportRefreshSearchAnalyzerAction::class.java),
ActionPlugin.ActionHandler(AddPolicyAction.INSTANCE, TransportAddPolicyAction::class.java),
ActionPlugin.ActionHandler(ExecutePolicyAction.INSTANCE, TransportExecutePolicyAction::class.java),
ActionPlugin.ActionHandler(RetryFailedManagedIndexAction.INSTANCE, TransportRetryFailedManagedIndexAction::class.java),
ActionPlugin.ActionHandler(ChangePolicyAction.INSTANCE, TransportChangePolicyAction::class.java),
ActionPlugin.ActionHandler(IndexPolicyAction.INSTANCE, TransportIndexPolicyAction::class.java),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,14 @@ import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedInde
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.PolicyRetryInfoMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StateMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import org.opensearch.indexmanagement.util.releaseLockForScheduledJob
import org.opensearch.indexmanagement.util.renewLockForScheduledJob
import org.opensearch.jobscheduler.spi.JobExecutionContext
import org.opensearch.jobscheduler.spi.LockModel
import org.opensearch.jobscheduler.spi.ScheduledJobParameter
import org.opensearch.jobscheduler.spi.ScheduledJobRunner
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule
import org.opensearch.jobscheduler.spi.utils.LockService
import org.opensearch.rest.RestStatus
import org.opensearch.script.Script
import org.opensearch.script.ScriptService
Expand Down Expand Up @@ -219,33 +222,49 @@ object ManagedIndexRunner :
}

// Attempt to acquire lock
val lock: LockModel? = context.lockService.suspendUntil { acquireLock(job, context, it) }
var lock: LockModel? = context.lockService.suspendUntil { acquireLock(job, context, it) }
if (lock == null) {
logger.debug("Could not acquire lock [${lock?.lockId}] for ${job.index}")
} else {
runManagedIndexConfig(job, context)
if (job.continuous) {
var keepExecuting: Boolean = true
// Need to execute at least once for policy to initialize
do {
// Need to renew lock for current step execution
val renewedLock = renewLockForScheduledJob(context, lock as LockModel, errorNotificationRetryPolicy)
// Failed to renew lock
if (renewedLock == null) {
logger.error("Could not renew lock [${lock.lockId}] for ${job.index}")
break
} else {
lock = renewedLock
keepExecuting = runManagedIndexConfig(job, context.lockService)
}
} while ((job.continuous && keepExecuting)) // Runs until job is no longer continuous or execution should stop
} else { // If job is not continuous run once
runManagedIndexConfig(job, context.lockService)
}
// Release lock
val released: Boolean = context.lockService.suspendUntil { release(lock, it) }
if (!released) {
logger.debug("Could not release lock [${lock.lockId}] for ${job.index}")
if (lock == null || !releaseLockForScheduledJob(context, lock)) {
logger.debug("Could not release lock [${lock?.lockId}] for ${job.index}")
}
}
}
}

@Suppress("ReturnCount", "ComplexMethod", "LongMethod", "ComplexCondition", "NestedBlockDepth")
private suspend fun runManagedIndexConfig(managedIndexConfig: ManagedIndexConfig, jobContext: JobExecutionContext) {
private suspend fun runManagedIndexConfig(managedIndexConfig: ManagedIndexConfig, lock: LockService): Boolean {
logger.debug("Run job for index ${managedIndexConfig.index}")
// doing a check of local cluster health as we do not want to overload cluster manager node with potentially a lot of calls
if (clusterIsRed()) {
logger.debug("Skipping current execution of ${managedIndexConfig.index} because of red cluster health")
return
return false
}

val (managedIndexMetaData, getMetadataSuccess) = client.getManagedIndexMetadata(managedIndexConfig.indexUuid)
if (!getMetadataSuccess) {
logger.info("Failed to retrieve managed index metadata of index [${managedIndexConfig.index}] from config index, abort this run.")
return
return false
}

// Check the cluster state for the index metadata
Expand All @@ -267,28 +286,28 @@ object ManagedIndexRunner :
// If no index types had an index with a matching name and uuid combination, return
if (!someTypeMatchedUuid) {
logger.warn("Failed to find IndexMetadata for ${managedIndexConfig.index}.")
return
return false
}
} else {
val clusterStateMetadata = clusterStateIndexMetadata.getManagedIndexMetadata()
val metadataCheck = checkMetadata(clusterStateMetadata, managedIndexMetaData, managedIndexConfig.indexUuid, logger)
if (metadataCheck != MetadataCheck.SUCCESS) {
logger.info("Skipping execution while metadata status is $metadataCheck")
return
return false
}
}

// If policy or managedIndexMetaData is null then initialize
val policy = managedIndexConfig.policy
if (policy == null || managedIndexMetaData == null) {
initManagedIndex(managedIndexConfig, managedIndexMetaData)
return
return true
}

// If the policy was completed or failed then return early and disable job so it stops scheduling work
if (managedIndexMetaData.policyCompleted == true || managedIndexMetaData.isFailed) {
disableManagedIndexConfig(managedIndexConfig)
return
return false
}

if (managedIndexMetaData.hasVersionConflict(managedIndexConfig)) {
Expand All @@ -302,13 +321,13 @@ object ManagedIndexRunner :
if (result.metadataSaved) {
disableManagedIndexConfig(managedIndexConfig)
}
return
return false
}

val state = policy.getStateToExecute(managedIndexMetaData)
val action: Action? = state?.getActionToExecute(managedIndexMetaData, indexMetadataProvider)
val stepContext = StepContext(
managedIndexMetaData, clusterService, client, threadPool.threadContext, policy.user, scriptService, settings, jobContext.lockService
managedIndexMetaData, clusterService, client, threadPool.threadContext, policy.user, scriptService, settings, lock
)
val step: Step? = action?.getStepToExecute(stepContext)
val currentActionMetaData = action?.getUpdatedActionMetadata(managedIndexMetaData, state.name)
Expand All @@ -317,7 +336,7 @@ object ManagedIndexRunner :
// then disable the job and return early
if (!indexStateManagementEnabled && step != null && step.isSafeToDisableOn) {
disableManagedIndexConfig(managedIndexConfig)
return
return false
}

if (action?.hasTimedOut(currentActionMetaData) == true) {
Expand All @@ -328,19 +347,24 @@ object ManagedIndexRunner :
.copy(actionMetaData = currentActionMetaData?.copy(failed = true), info = info)
)
if (updated.metadataSaved) disableManagedIndexConfig(managedIndexConfig)
return
return false
}

if (managedIndexConfig.shouldChangePolicy(managedIndexMetaData, action)) {
initChangePolicy(managedIndexConfig, managedIndexMetaData, action)
return
return if (initChangePolicy(managedIndexConfig, managedIndexMetaData, action)) {
// Don't want to continue execution on old Managed Index Config
false
} else {
// Managed index config was not updated to safe to continue execution of this job
true
}
}

val shouldBackOff = action?.shouldBackoff(currentActionMetaData, action.configRetry)
if (shouldBackOff?.first == true) {
// If we should back off then exit early.
logger.info("Backoff for retrying. Remaining time ${shouldBackOff.second}")
return
return false
}

if (managedIndexMetaData.stepMetaData?.stepStatus == Step.StepStatus.STARTING) {
Expand All @@ -353,8 +377,10 @@ object ManagedIndexRunner :
policyRetryInfo = PolicyRetryInfoMetaData(true, 0), info = info
)
)
if (updated.metadataSaved) disableManagedIndexConfig(managedIndexConfig)
return
if (updated.metadataSaved) {
disableManagedIndexConfig(managedIndexConfig)
return false
}
}
}

Expand All @@ -368,20 +394,20 @@ object ManagedIndexRunner :
)
)
if (updated.metadataSaved) disableManagedIndexConfig(managedIndexConfig)
return
return false
}

// If this action is not allowed and the step to be executed is the first step in the action then we will fail
// as this action has been removed from the AllowList, but if its not the first step we will let it finish as it's already inflight
if (action?.isAllowed(allowList) == false && step != null && action.isFirstStep(step.name) && action.type != TransitionsAction.name) {
val info = mapOf("message" to "Attempted to execute action=${action?.type} which is not allowed.")
val info = mapOf("message" to "Attempted to execute action=${action.type} which is not allowed.")
val updated = updateManagedIndexMetaData(
managedIndexMetaData.copy(
policyRetryInfo = PolicyRetryInfoMetaData(true, 0), info = info
)
)
if (updated.metadataSaved) disableManagedIndexConfig(managedIndexConfig)
return
return false
}

// If any of State, Action, Step components come back as null then we are moving to error in ManagedIndexMetaData
Expand Down Expand Up @@ -412,12 +438,11 @@ object ManagedIndexRunner :
executedManagedIndexMetaData = executedManagedIndexMetaData.copy(info = mutableInfo.toMap())
}
}

if (executedManagedIndexMetaData.isSuccessfulDelete) {
GlobalScope.launch(Dispatchers.IO + CoroutineName("ManagedIndexMetaData-AddHistory")) {
ismHistory.addManagedIndexMetaDataHistory(listOf(executedManagedIndexMetaData))
}
return
return false
}

// If a custom action deletes some off-cluster index and has deleteIndexMetadataAfterFinish set to true,
Expand All @@ -426,18 +451,25 @@ object ManagedIndexRunner :
if (action.isFinishedSuccessfully(executedManagedIndexMetaData)) {
if (action.deleteIndexMetadataAfterFinish()) {
deleteFromManagedIndex(managedIndexConfig, action.type)
return
}
}

if (!updateManagedIndexMetaData(executedManagedIndexMetaData, updateResult).metadataSaved) {
logger.error("Failed to update ManagedIndexMetaData after executing the Step : ${step.name}")
return false
}

if (managedIndexConfig.hasDifferentJobInterval(jobInterval)) {
updateJobInterval(managedIndexConfig, jobInterval)
}
// Check that transition condition evaluated to false
if (executedManagedIndexMetaData.stepMetaData?.stepStatus == Step.StepStatus.CONDITION_NOT_MET) {
return false
}
// Made it to end of successful execution block
return true
}
return false
}

private suspend fun initManagedIndex(managedIndexConfig: ManagedIndexConfig, managedIndexMetaData: ManagedIndexMetaData?) {
Expand Down Expand Up @@ -688,18 +720,19 @@ object ManagedIndexRunner :
* Initializes the change policy process where we will get the policy using the change policy's policyID, update the [ManagedIndexMetaData]
* to reflect the new policy, and save the new policy to the [ManagedIndexConfig] while resetting the change policy to null
*/
// Returning true if Managed Index Config was updated to avoid continuous execution on outdated Managed Index Config
@Suppress("ReturnCount", "ComplexMethod")
private suspend fun initChangePolicy(
managedIndexConfig: ManagedIndexConfig,
managedIndexMetaData: ManagedIndexMetaData,
actionToExecute: Action?
) {
): Boolean {

// should never happen since we only call this if there is a changePolicy, but we'll do it to make changePolicy non null
val changePolicy = managedIndexConfig.changePolicy
if (changePolicy == null) {
logger.debug("initChangePolicy was called with a null ChangePolicy, ManagedIndexConfig: $managedIndexConfig")
return
return false
}

// get the policy we'll attempt to change to
Expand Down Expand Up @@ -744,7 +777,7 @@ object ManagedIndexRunner :
// if it is unsafe to change then we set safe back to false so we don't keep doing this check every execution
if (managedIndexConfig.policy?.isSafeToChange(managedIndexMetaData.stateMetaData?.name, policy, changePolicy) != true) {
updateManagedIndexConfig(managedIndexConfig.copy(changePolicy = managedIndexConfig.changePolicy.copy(isSafe = false)))
return
return true
}
}
}
Expand All @@ -757,10 +790,13 @@ object ManagedIndexRunner :
* */
val updated = updateManagedIndexMetaData(updatedManagedIndexMetaData)

if (!updated.metadataSaved || policy == null) return
if (!updated.metadataSaved || policy == null) {
return false
}

// Change the policy and user stored on the job from changePolicy, this will also set the changePolicy to null on the job
savePolicyToManagedIndexConfig(managedIndexConfig, policy.copy(user = changePolicy.user))
// Return a flag to make sure saving correct
return savePolicyToManagedIndexConfig(managedIndexConfig, policy.copy(user = changePolicy.user))
}

@Suppress("TooGenericExceptionCaught")
Expand Down
Loading
Loading