Skip to content

Commit

Permalink
Resolve default for ActionExecutionPolicy at runtime (opensearch-proj…
Browse files Browse the repository at this point in the history
…ect#165)

Signed-off-by: Mohammad Qureshi <[email protected]>
  • Loading branch information
qreshi authored Sep 1, 2021
1 parent 8024b8b commit d5dbdd6
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 94 deletions.
36 changes: 11 additions & 25 deletions alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import org.opensearch.alerting.model.action.Action.Companion.SUBJECT
import org.opensearch.alerting.model.action.ActionExecutionScope
import org.opensearch.alerting.model.action.AlertCategory
import org.opensearch.alerting.model.action.PerAlertActionScope
import org.opensearch.alerting.model.action.PerExecutionActionScope
import org.opensearch.alerting.model.destination.DestinationContextFactory
import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
Expand All @@ -74,7 +75,7 @@ import org.opensearch.alerting.settings.DestinationSettings.Companion.ALLOW_LIST
import org.opensearch.alerting.settings.DestinationSettings.Companion.HOST_DENY_LIST
import org.opensearch.alerting.settings.DestinationSettings.Companion.loadDestinationSettings
import org.opensearch.alerting.settings.LegacyOpenDistroDestinationSettings.Companion.HOST_DENY_LIST_NONE
import org.opensearch.alerting.util.getActionScope
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.alerting.util.getBucketKeysHash
import org.opensearch.alerting.util.getCombinedTriggerRunResult
import org.opensearch.alerting.util.isADMonitor
Expand Down Expand Up @@ -486,9 +487,10 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
monitorOrTriggerError = monitorOrTriggerError
)
for (action in trigger.actions) {
if (action.getActionScope() == ActionExecutionScope.Type.PER_ALERT && !shouldDefaultToPerExecution) {
val perAlertActionScope = action.actionExecutionPolicy.actionExecutionScope as PerAlertActionScope
for (alertCategory in perAlertActionScope.actionableAlerts) {
// ActionExecutionPolicy should not be null for Bucket-Level Monitors since it has a default config when not set explicitly
val actionExecutionScope = action.getActionExecutionPolicy(monitor)!!.actionExecutionScope
if (actionExecutionScope is PerAlertActionScope && !shouldDefaultToPerExecution) {
for (alertCategory in actionExecutionScope.actionableAlerts) {
val alertsToExecuteActionsFor = nextAlerts[trigger.id]?.get(alertCategory) ?: mutableListOf()
for (alert in alertsToExecuteActionsFor) {
val actionCtx = getActionContextForAlertCategory(
Expand All @@ -502,19 +504,20 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {

// Keeping the throttled response separate from runAction for now since
// throttling is not supported for PER_EXECUTION
val actionResult = if (isBucketLevelTriggerActionThrottled(action, alert)) {
ActionRunResult(action.id, action.name, mapOf(), true, null, null)
} else {
val actionResult = if (isActionActionable(action, alert)) {
runAction(action, actionCtx, dryrun)
} else {
ActionRunResult(action.id, action.name, mapOf(), true, null, null)
}

triggerResult.actionResultsMap[alertBucketKeysHash]?.set(action.id, actionResult)
alertsToUpdate.add(alert)
// Remove the alert from completedAlertsToUpdate in case it is present there since
// its update will be handled in the alertsToUpdate batch
completedAlertsToUpdate.remove(alert)
}
}
} else if (action.getActionScope() == ActionExecutionScope.Type.PER_EXECUTION || shouldDefaultToPerExecution) {
} else if (actionExecutionScope is PerExecutionActionScope || shouldDefaultToPerExecution) {
// If all categories of Alerts are empty, there is nothing to message on and we can skip the Action.
// If the error is not null, this is disregarded and the Action is executed anyway so the user can be notified.
if (monitorOrTriggerError == null && dedupedAlerts.isEmpty() && newAlerts.isEmpty() && completedAlerts.isEmpty())
Expand Down Expand Up @@ -639,23 +642,6 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
return true
}

// TODO: Add unit test for this method (or at least cover it in MonitorRunnerIT)
// Bucket-Level Monitors use the throttle configurations defined in ActionExecutionPolicy, this method evaluates that configuration.
private fun isBucketLevelTriggerActionThrottled(action: Action, alert: Alert): Boolean {
if (action.actionExecutionPolicy.throttle == null) return false
// TODO: This will need to be updated if throttleEnabled is moved to ActionExecutionPolicy
if (action.throttleEnabled) {
val result = alert.actionExecutionResults.firstOrNull { r -> r.actionId == action.id }
val lastExecutionTime: Instant? = result?.lastExecutionTime
val throttledTimeBound = currentTime().minus(
action.actionExecutionPolicy.throttle.value.toLong(),
action.actionExecutionPolicy.throttle.unit
)
return !(lastExecutionTime == null || lastExecutionTime.isBefore(throttledTimeBound))
}
return false
}

private fun getActionContextForAlertCategory(
alertCategory: AlertCategory,
alert: Alert,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,18 @@ data class Action(
val throttleEnabled: Boolean,
val throttle: Throttle?,
val id: String = UUIDs.base64UUID(),
val actionExecutionPolicy: ActionExecutionPolicy = ActionExecutionPolicy.getDefaultConfiguration()
val actionExecutionPolicy: ActionExecutionPolicy? = null
) : Writeable, ToXContentObject {

init {
if (subjectTemplate != null) {
require(subjectTemplate.lang == MUSTACHE) { "subject_template must be a mustache script" }
}
require(messageTemplate.lang == MUSTACHE) { "message_template must be a mustache script" }

if (actionExecutionPolicy?.actionExecutionScope is PerExecutionActionScope) {
require(throttle == null) { "Throttle is currently not supported for per execution action scope" }
}
}

@Throws(IOException::class)
Expand All @@ -68,7 +72,7 @@ data class Action(
sin.readBoolean(), // throttleEnabled
sin.readOptionalWriteable(::Throttle), // throttle
sin.readString(), // id
ActionExecutionPolicy(sin) // actionExecutionPolicy
sin.readOptionalWriteable(::ActionExecutionPolicy) // actionExecutionPolicy
)

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
Expand All @@ -78,13 +82,15 @@ data class Action(
.field(DESTINATION_ID_FIELD, destinationId)
.field(MESSAGE_TEMPLATE_FIELD, messageTemplate)
.field(THROTTLE_ENABLED_FIELD, throttleEnabled)
.field(ACTION_EXECUTION_POLICY_FIELD, actionExecutionPolicy)
if (subjectTemplate != null) {
xContentBuilder.field(SUBJECT_TEMPLATE_FIELD, subjectTemplate)
}
if (throttle != null) {
xContentBuilder.field(THROTTLE_FIELD, throttle)
}
if (actionExecutionPolicy != null) {
xContentBuilder.field(ACTION_EXECUTION_POLICY_FIELD, actionExecutionPolicy)
}
return xContentBuilder.endObject()
}

Expand All @@ -111,7 +117,12 @@ data class Action(
out.writeBoolean(false)
}
out.writeString(id)
actionExecutionPolicy.writeTo(out)
if (actionExecutionPolicy != null) {
out.writeBoolean(true)
actionExecutionPolicy.writeTo(out)
} else {
out.writeBoolean(false)
}
}

companion object {
Expand All @@ -138,7 +149,7 @@ data class Action(
lateinit var messageTemplate: Script
var throttleEnabled = false
var throttle: Throttle? = null
var actionExecutionPolicy: ActionExecutionPolicy = ActionExecutionPolicy.getDefaultConfiguration()
var actionExecutionPolicy: ActionExecutionPolicy? = null

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
Expand All @@ -160,7 +171,11 @@ data class Action(
throttleEnabled = xcp.booleanValue()
}
ACTION_EXECUTION_POLICY_FIELD -> {
actionExecutionPolicy = ActionExecutionPolicy.parse(xcp)
actionExecutionPolicy = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) {
null
} else {
ActionExecutionPolicy.parse(xcp)
}
}
else -> {
throw IllegalStateException("Unexpected field: $fieldName, while parsing action")
Expand All @@ -180,7 +195,7 @@ data class Action(
throttleEnabled,
throttle,
id = requireNotNull(id),
actionExecutionPolicy = requireNotNull(actionExecutionPolicy) { "Action execution policy is null" }
actionExecutionPolicy = actionExecutionPolicy
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,41 +25,23 @@ import java.io.IOException
/**
* This class represents the container for various configurations which control Action behavior.
*/
// TODO: Should throttleEnabled be included in here as well?
data class ActionExecutionPolicy(
val throttle: Throttle? = null,
val actionExecutionScope: ActionExecutionScope
) : Writeable, ToXContentObject {

init {
if (actionExecutionScope is PerExecutionActionScope) {
require(throttle == null) { "Throttle is currently not supported for per execution action scope" }
}
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this (
sin.readOptionalWriteable(::Throttle), // throttle
ActionExecutionScope.readFrom(sin) // actionExecutionScope
)

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
val xContentBuilder = builder.startObject()
if (throttle != null) {
xContentBuilder.field(THROTTLE_FIELD, throttle)
}
xContentBuilder.field(ACTION_EXECUTION_SCOPE, actionExecutionScope)
return xContentBuilder.endObject()
builder.startObject()
.field(ACTION_EXECUTION_SCOPE, actionExecutionScope)
return builder.endObject()
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
if (throttle != null) {
out.writeBoolean(true)
throttle.writeTo(out)
} else {
out.writeBoolean(false)
}
if (actionExecutionScope is PerAlertActionScope) {
out.writeEnum(ActionExecutionScope.Type.PER_ALERT)
} else {
Expand All @@ -69,13 +51,11 @@ data class ActionExecutionPolicy(
}

companion object {
const val THROTTLE_FIELD = "throttle"
const val ACTION_EXECUTION_SCOPE = "action_execution_scope"

@JvmStatic
@Throws(IOException::class)
fun parse(xcp: XContentParser): ActionExecutionPolicy {
var throttle: Throttle? = null
lateinit var actionExecutionScope: ActionExecutionScope

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
Expand All @@ -84,15 +64,11 @@ data class ActionExecutionPolicy(
xcp.nextToken()

when (fieldName) {
THROTTLE_FIELD -> {
throttle = if (xcp.currentToken() == Token.VALUE_NULL) null else Throttle.parse(xcp)
}
ACTION_EXECUTION_SCOPE -> actionExecutionScope = ActionExecutionScope.parse(xcp)
}
}

return ActionExecutionPolicy(
throttle,
requireNotNull(actionExecutionScope) { "Action execution scope is null" }
)
}
Expand All @@ -104,17 +80,16 @@ data class ActionExecutionPolicy(
}

/**
* The default [ActionExecutionPolicy] configuration.
* The default [ActionExecutionPolicy] configuration for Bucket-Level Monitors.
*
* This is currently only used by Bucket-Level Monitors and was configured with that in mind.
* If Query-Level Monitors integrate the use of [ActionExecutionPolicy] then a separate default configuration
* might need to be made depending on the desired behavior.
* will need to be made depending on the desired behavior.
*/
fun getDefaultConfiguration(): ActionExecutionPolicy {
fun getDefaultConfigurationForBucketLevelMonitor(): ActionExecutionPolicy {
val defaultActionExecutionScope = PerAlertActionScope(
actionableAlerts = setOf(AlertCategory.DEDUPED, AlertCategory.NEW)
)
return ActionExecutionPolicy(throttle = null, actionExecutionScope = defaultActionExecutionScope)
return ActionExecutionPolicy(actionExecutionScope = defaultActionExecutionScope)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.opensearch.alerting.model.AggregationResultBucket
import org.opensearch.alerting.model.BucketLevelTriggerRunResult
import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.model.action.Action
import org.opensearch.alerting.model.action.ActionExecutionScope
import org.opensearch.alerting.model.action.ActionExecutionPolicy
import org.opensearch.alerting.model.destination.Destination
import org.opensearch.alerting.settings.DestinationSettings
import org.opensearch.commons.authuser.User
Expand Down Expand Up @@ -153,8 +153,17 @@ fun Monitor.isBucketLevelMonitor(): Boolean = this.monitorType == Monitor.Monito
*/
fun AggregationResultBucket.getBucketKeysHash(): String = this.bucketKeys.joinToString(separator = "#")

fun Action.getActionScope(): ActionExecutionScope.Type =
this.actionExecutionPolicy.actionExecutionScope.getExecutionScope()
fun Action.getActionExecutionPolicy(monitor: Monitor): ActionExecutionPolicy? {
// When the ActionExecutionPolicy is null for an Action, the default is resolved at runtime
// so it can be chosen based on the Monitor type at that time.
// The Action config is not aware of the Monitor type which is why the default was not stored during
// the parse.
return this.actionExecutionPolicy ?: if (monitor.isBucketLevelMonitor()) {
ActionExecutionPolicy.getDefaultConfigurationForBucketLevelMonitor()
} else {
null
}
}

fun BucketLevelTriggerRunResult.getCombinedTriggerRunResult(
prevTriggerRunResult: BucketLevelTriggerRunResult?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1285,10 +1285,10 @@ class MonitorRunnerIT : AlertingRestTestCase() {
params.docCount > 1
""".trimIndent()

val action = randomAction(
val action = randomActionWithPolicy(
template = randomTemplateScript("Hello {{ctx.monitor.name}}"),
destinationId = createDestination().id,
actionExecutionPolicy = ActionExecutionPolicy(null, PerExecutionActionScope())
actionExecutionPolicy = ActionExecutionPolicy(PerExecutionActionScope())
)
var trigger = randomBucketLevelTrigger(actions = listOf(action))
trigger = trigger.copy(
Expand Down Expand Up @@ -1352,10 +1352,10 @@ class MonitorRunnerIT : AlertingRestTestCase() {
params.docCount > 1
""".trimIndent()

val action = randomAction(
val action = randomActionWithPolicy(
template = randomTemplateScript("Hello {{ctx.monitor.name}}"),
destinationId = createDestination().id,
actionExecutionPolicy = ActionExecutionPolicy(null, PerAlertActionScope(setOf(AlertCategory.DEDUPED, AlertCategory.NEW)))
actionExecutionPolicy = ActionExecutionPolicy(PerAlertActionScope(setOf(AlertCategory.DEDUPED, AlertCategory.NEW)))
)
var trigger = randomBucketLevelTrigger(actions = listOf(action))
trigger = trigger.copy(
Expand Down Expand Up @@ -1419,21 +1419,21 @@ class MonitorRunnerIT : AlertingRestTestCase() {
params.docCount > 0
""".trimIndent()

val actionThrottleEnabled = randomAction(
val actionThrottleEnabled = randomActionWithPolicy(
template = randomTemplateScript("Hello {{ctx.monitor.name}}"),
destinationId = createDestination().id,
throttleEnabled = true,
throttle = Throttle(value = 5, unit = MINUTES),
actionExecutionPolicy = ActionExecutionPolicy(
throttle = Throttle(value = 5, unit = MINUTES),
actionExecutionScope = PerAlertActionScope(setOf(AlertCategory.DEDUPED, AlertCategory.NEW))
)
)
val actionThrottleNotEnabled = randomAction(
val actionThrottleNotEnabled = randomActionWithPolicy(
template = randomTemplateScript("Hello {{ctx.monitor.name}}"),
destinationId = createDestination().id,
throttleEnabled = false,
throttle = Throttle(value = 5, unit = MINUTES),
actionExecutionPolicy = ActionExecutionPolicy(
throttle = Throttle(value = 5, unit = MINUTES),
actionExecutionScope = PerAlertActionScope(setOf(AlertCategory.DEDUPED, AlertCategory.NEW))
)
)
Expand Down
Loading

0 comments on commit d5dbdd6

Please sign in to comment.