Skip to content

Commit

Permalink
Alerting Enhancements: Alerting Comments (Experimental) (opensearch-p…
Browse files Browse the repository at this point in the history
…roject#1561)

* initial commit, functional but needs refactoring

Signed-off-by: Dennis Toepker <[email protected]>

* refactored QueryLevelTriggerExecutionContext to not need both Alert and AlertContext field, but only AlertContext field

Signed-off-by: Dennis Toepker <[email protected]>

* misc additions and fixes

Signed-off-by: Dennis Toepker <[email protected]>

* misc cleanup

Signed-off-by: Dennis Toepker <[email protected]>

* misc changes and basic ITs

Signed-off-by: Dennis Toepker <[email protected]>

* cleanup

Signed-off-by: Dennis Toepker <[email protected]>

* renaming notes to comments

Signed-off-by: Dennis Toepker <[email protected]>

* misc changes

Signed-off-by: Dennis Toepker <[email protected]>

* changed API endpoints

Signed-off-by: Dennis Toepker <[email protected]>

* more misc changes and fixes

Signed-off-by: Dennis Toepker <[email protected]>

* misc cleanup

Signed-off-by: Dennis Toepker <[email protected]>

* updated a comment

Signed-off-by: Dennis Toepker <[email protected]>

* misc cleanup and setting refresh policy to immediate

Signed-off-by: Dennis Toepker <[email protected]>

* fixed lint issues and other restructuring

Signed-off-by: Dennis Toepker <[email protected]>

* review-based changes

Signed-off-by: Dennis Toepker <[email protected]>

* update after adding entityType to Comment model

Signed-off-by: Dennis Toepker <[email protected]>

* misc fixes

Signed-off-by: Dennis Toepker <[email protected]>

* removing comments history enabled setting

Signed-off-by: Dennis Toepker <[email protected]>

* adding release notes

Signed-off-by: Dennis Toepker <[email protected]>

* IT fixes

Signed-off-by: Dennis Toepker <[email protected]>

* removing dev code vestiges

Signed-off-by: Dennis Toepker <[email protected]>

* changing logger calls

Signed-off-by: Dennis Toepker <[email protected]>

---------

Signed-off-by: Dennis Toepker <[email protected]>
Co-authored-by: Dennis Toepker <[email protected]>
  • Loading branch information
toepkerd and toepkerd-zz authored Jun 12, 2024
1 parent 1d72fae commit d808474
Show file tree
Hide file tree
Showing 29 changed files with 2,066 additions and 34 deletions.
21 changes: 18 additions & 3 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.script.ChainedAlertTriggerExecutionContext
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
import org.opensearch.alerting.util.CommentsUtils
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.MAX_SEARCH_SIZE
import org.opensearch.alerting.util.getBucketKeysHash
Expand Down Expand Up @@ -157,7 +158,7 @@ class AlertService(
workflorwRunContext: WorkflowRunContext?
): Alert? {
val currentTime = Instant.now()
val currentAlert = ctx.alert
val currentAlert = ctx.alertContext?.alert

val updatedActionExecutionResults = mutableListOf<ActionExecutionResult>()
val currentActionIds = mutableSetOf<String>()
Expand Down Expand Up @@ -684,6 +685,8 @@ class AlertService(
val alertsIndex = dataSources.alertsIndex
val alertsHistoryIndex = dataSources.alertsHistoryIndex

val commentIdsToDelete = mutableListOf<String>()

var requestsToRetry = alerts.flatMap { alert ->
// We don't want to set the version when saving alerts because the MonitorRunner has first priority when writing alerts.
// In the rare event that a user acknowledges an alert between when it's read and when it's written
Expand Down Expand Up @@ -730,13 +733,22 @@ class AlertService(
listOfNotNull<DocWriteRequest<*>>(
DeleteRequest(alertsIndex, alert.id)
.routing(routingId),
// Only add completed alert to history index if history is enabled
if (alertIndices.isAlertHistoryEnabled()) {
// Only add completed alert to history index if history is enabled
IndexRequest(alertsHistoryIndex)
.routing(routingId)
.source(alert.toXContentWithUser(XContentFactory.jsonBuilder()))
.id(alert.id)
} else null
} else {
// Otherwise, prepare the Alert's comments for deletion, and don't include
// a request to index the Alert to an Alert history index.
// The delete request can't be added to the list of DocWriteRequests because
// Comments are stored in aliased history indices, not a concrete Comments
// index like Alerts. A DeleteBy request will be used to delete Comments, instead
// of a regular Delete request
commentIdsToDelete.addAll(CommentsUtils.getCommentIDsByAlertIDs(client, listOf(alert.id)))
null
}
)
}
}
Expand All @@ -756,6 +768,9 @@ class AlertService(
throw ExceptionsHelper.convertToOpenSearchException(retryCause)
}
}

// delete all the comments of any Alerts that were deleted
CommentsUtils.deleteComments(client, commentIdsToDelete)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import org.opensearch.alerting.action.GetRemoteIndexesAction
import org.opensearch.alerting.action.SearchEmailAccountAction
import org.opensearch.alerting.action.SearchEmailGroupAction
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.comments.CommentsIndices
import org.opensearch.alerting.core.JobSweeper
import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.alerting.core.action.node.ScheduledJobsStatsAction
Expand All @@ -27,6 +28,7 @@ import org.opensearch.alerting.core.settings.ScheduledJobSettings
import org.opensearch.alerting.remote.monitors.RemoteMonitorRegistry
import org.opensearch.alerting.resthandler.RestAcknowledgeAlertAction
import org.opensearch.alerting.resthandler.RestAcknowledgeChainedAlertAction
import org.opensearch.alerting.resthandler.RestDeleteAlertingCommentAction
import org.opensearch.alerting.resthandler.RestDeleteMonitorAction
import org.opensearch.alerting.resthandler.RestDeleteWorkflowAction
import org.opensearch.alerting.resthandler.RestExecuteMonitorAction
Expand All @@ -40,8 +42,10 @@ import org.opensearch.alerting.resthandler.RestGetMonitorAction
import org.opensearch.alerting.resthandler.RestGetRemoteIndexesAction
import org.opensearch.alerting.resthandler.RestGetWorkflowAction
import org.opensearch.alerting.resthandler.RestGetWorkflowAlertsAction
import org.opensearch.alerting.resthandler.RestIndexAlertingCommentAction
import org.opensearch.alerting.resthandler.RestIndexMonitorAction
import org.opensearch.alerting.resthandler.RestIndexWorkflowAction
import org.opensearch.alerting.resthandler.RestSearchAlertingCommentAction
import org.opensearch.alerting.resthandler.RestSearchEmailAccountAction
import org.opensearch.alerting.resthandler.RestSearchEmailGroupAction
import org.opensearch.alerting.resthandler.RestSearchMonitorAction
Expand All @@ -55,6 +59,7 @@ import org.opensearch.alerting.settings.LegacyOpenDistroDestinationSettings
import org.opensearch.alerting.spi.RemoteMonitorRunnerExtension
import org.opensearch.alerting.transport.TransportAcknowledgeAlertAction
import org.opensearch.alerting.transport.TransportAcknowledgeChainedAlertAction
import org.opensearch.alerting.transport.TransportDeleteAlertingCommentAction
import org.opensearch.alerting.transport.TransportDeleteMonitorAction
import org.opensearch.alerting.transport.TransportDeleteWorkflowAction
import org.opensearch.alerting.transport.TransportDocLevelMonitorFanOutAction
Expand All @@ -69,8 +74,10 @@ import org.opensearch.alerting.transport.TransportGetMonitorAction
import org.opensearch.alerting.transport.TransportGetRemoteIndexesAction
import org.opensearch.alerting.transport.TransportGetWorkflowAction
import org.opensearch.alerting.transport.TransportGetWorkflowAlertsAction
import org.opensearch.alerting.transport.TransportIndexAlertingCommentAction
import org.opensearch.alerting.transport.TransportIndexMonitorAction
import org.opensearch.alerting.transport.TransportIndexWorkflowAction
import org.opensearch.alerting.transport.TransportSearchAlertingCommentAction
import org.opensearch.alerting.transport.TransportSearchEmailAccountAction
import org.opensearch.alerting.transport.TransportSearchEmailGroupAction
import org.opensearch.alerting.transport.TransportSearchMonitorAction
Expand Down Expand Up @@ -153,6 +160,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
@JvmField val LEGACY_OPENDISTRO_EMAIL_ACCOUNT_BASE_URI = "$LEGACY_OPENDISTRO_DESTINATION_BASE_URI/email_accounts"
@JvmField val LEGACY_OPENDISTRO_EMAIL_GROUP_BASE_URI = "$LEGACY_OPENDISTRO_DESTINATION_BASE_URI/email_groups"
@JvmField val FINDING_BASE_URI = "/_plugins/_alerting/findings"
@JvmField val COMMENTS_BASE_URI = "/_plugins/_alerting/comments"

@JvmField val ALERTING_JOB_TYPES = listOf("monitor", "workflow")
}
Expand All @@ -161,6 +169,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
lateinit var scheduler: JobScheduler
lateinit var sweeper: JobSweeper
lateinit var scheduledJobIndices: ScheduledJobIndices
lateinit var commentsIndices: CommentsIndices
lateinit var docLevelMonitorQueries: DocLevelMonitorQueries
lateinit var threadPool: ThreadPool
lateinit var alertIndices: AlertIndices
Expand Down Expand Up @@ -199,6 +208,9 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
RestGetWorkflowAction(),
RestDeleteWorkflowAction(),
RestGetRemoteIndexesAction(),
RestIndexAlertingCommentAction(),
RestSearchAlertingCommentAction(),
RestDeleteAlertingCommentAction(),
)
}

Expand All @@ -225,6 +237,9 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
ActionPlugin.ActionHandler(AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, TransportIndexWorkflowAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.GET_WORKFLOW_ACTION_TYPE, TransportGetWorkflowAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.DELETE_WORKFLOW_ACTION_TYPE, TransportDeleteWorkflowAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.INDEX_COMMENT_ACTION_TYPE, TransportIndexAlertingCommentAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.SEARCH_COMMENTS_ACTION_TYPE, TransportSearchAlertingCommentAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.DELETE_COMMENT_ACTION_TYPE, TransportDeleteAlertingCommentAction::class.java),
ActionPlugin.ActionHandler(ExecuteWorkflowAction.INSTANCE, TransportExecuteWorkflowAction::class.java),
ActionPlugin.ActionHandler(GetRemoteIndexesAction.INSTANCE, TransportGetRemoteIndexesAction::class.java),
ActionPlugin.ActionHandler(DocLevelMonitorFanOutAction.INSTANCE, TransportDocLevelMonitorFanOutAction::class.java)
Expand Down Expand Up @@ -285,6 +300,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
.registerDestinationSettings()
.registerRemoteMonitors(monitorTypeToMonitorRunners)
scheduledJobIndices = ScheduledJobIndices(client.admin(), clusterService)
commentsIndices = CommentsIndices(environment.settings(), client, threadPool, clusterService)
docLevelMonitorQueries = DocLevelMonitorQueries(client, clusterService)
scheduler = JobScheduler(threadPool, runner)
sweeper = JobSweeper(environment.settings(), client, clusterService, threadPool, xContentRegistry, scheduler, ALERTING_JOB_TYPES)
Expand Down Expand Up @@ -313,6 +329,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
scheduler,
runner,
scheduledJobIndices,
commentsIndices,
docLevelMonitorQueries,
destinationMigrationCoordinator,
lockService,
Expand Down Expand Up @@ -387,7 +404,15 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
AlertingSettings.FINDING_HISTORY_ROLLOVER_PERIOD,
AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD,
AlertingSettings.FINDINGS_INDEXING_BATCH_SIZE,
AlertingSettings.CROSS_CLUSTER_MONITORING_ENABLED
AlertingSettings.CROSS_CLUSTER_MONITORING_ENABLED,
AlertingSettings.ALERTING_COMMENTS_ENABLED,
AlertingSettings.COMMENTS_HISTORY_MAX_DOCS,
AlertingSettings.COMMENTS_HISTORY_INDEX_MAX_AGE,
AlertingSettings.COMMENTS_HISTORY_ROLLOVER_PERIOD,
AlertingSettings.COMMENTS_HISTORY_RETENTION_PERIOD,
AlertingSettings.COMMENTS_MAX_CONTENT_SIZE,
AlertingSettings.MAX_COMMENTS_PER_ALERT,
AlertingSettings.MAX_COMMENTS_PER_NOTIFICATION
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import org.opensearch.alerting.opensearchapi.retry
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.opensearchapi.withClosableContext
import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.util.CommentsUtils
import org.opensearch.alerting.util.defaultToPerExecutionAction
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.alerting.util.getBucketKeysHash
Expand All @@ -33,6 +35,7 @@ import org.opensearch.commons.alerting.model.ActionRunResult
import org.opensearch.commons.alerting.model.Alert
import org.opensearch.commons.alerting.model.BucketLevelTrigger
import org.opensearch.commons.alerting.model.BucketLevelTriggerRunResult
import org.opensearch.commons.alerting.model.Comment
import org.opensearch.commons.alerting.model.Finding
import org.opensearch.commons.alerting.model.InputRunResults
import org.opensearch.commons.alerting.model.Monitor
Expand Down Expand Up @@ -273,6 +276,9 @@ object BucketLevelMonitorRunner : MonitorRunner() {
// to alertsToUpdate to ensure the Alert doc is updated at the end in either case
completedAlertsToUpdate.addAll(completedAlerts)

// retrieve max Comments per Alert notification setting
val maxComments = monitorCtx.clusterService!!.clusterSettings.get(AlertingSettings.MAX_COMMENTS_PER_NOTIFICATION)

// All trigger contexts and results should be available at this point since all triggers were evaluated in the main do-while loop
val triggerCtx = triggerContexts[trigger.id]!!
val triggerResult = triggerResults[trigger.id]!!
Expand All @@ -290,9 +296,18 @@ object BucketLevelMonitorRunner : MonitorRunner() {
if (actionExecutionScope is PerAlertActionScope && !shouldDefaultToPerExecution) {
for (alertCategory in actionExecutionScope.actionableAlerts) {
val alertsToExecuteActionsFor = nextAlerts[trigger.id]?.get(alertCategory) ?: mutableListOf()
val alertsToExecuteActionsForIds = alertsToExecuteActionsFor.map { it.id }
val allAlertsComments = CommentsUtils.getCommentsForAlertNotification(
monitorCtx.client!!,
alertsToExecuteActionsForIds,
maxComments
)
for (alert in alertsToExecuteActionsFor) {
val alertContext = if (alertCategory != AlertCategory.NEW) AlertContext(alert = alert)
else getAlertContext(alert = alert, alertSampleDocs = alertSampleDocs)
val alertContext = if (alertCategory != AlertCategory.NEW) {
AlertContext(alert = alert, comments = allAlertsComments[alert.id])
} else {
getAlertContext(alert = alert, alertSampleDocs = alertSampleDocs, allAlertsComments[alert.id])
}

val actionCtx = getActionContextForAlertCategory(
alertCategory, alertContext, triggerCtx, monitorOrTriggerError
Expand Down Expand Up @@ -324,12 +339,28 @@ object BucketLevelMonitorRunner : MonitorRunner() {
if (monitorOrTriggerError == null && dedupedAlerts.isEmpty() && newAlerts.isEmpty() && completedAlerts.isEmpty())
continue

val alertsToExecuteActionsForIds = dedupedAlerts.map { it.id }
.plus(newAlerts.map { it.id })
.plus(completedAlerts.map { it.id })
val allAlertsComments = CommentsUtils.getCommentsForAlertNotification(
monitorCtx.client!!,
alertsToExecuteActionsForIds,
maxComments
)
val actionCtx = triggerCtx.copy(
dedupedAlerts = dedupedAlerts,
dedupedAlerts = dedupedAlerts.map {
AlertContext(alert = it, comments = allAlertsComments[it.id])
},
newAlerts = newAlerts.map {
getAlertContext(alert = it, alertSampleDocs = alertSampleDocs)
getAlertContext(
alert = it,
alertSampleDocs = alertSampleDocs,
alertComments = allAlertsComments[it.id]
)
},
completedAlerts = completedAlerts.map {
AlertContext(alert = it, comments = allAlertsComments[it.id])
},
completedAlerts = completedAlerts,
error = monitorResult.error ?: triggerResult.error
)
val actionResult = this.runAction(action, actionCtx, monitorCtx, monitor, dryrun)
Expand Down Expand Up @@ -530,17 +561,18 @@ object BucketLevelMonitorRunner : MonitorRunner() {
): BucketLevelTriggerExecutionContext {
return when (alertCategory) {
AlertCategory.DEDUPED ->
ctx.copy(dedupedAlerts = listOf(alertContext.alert), newAlerts = emptyList(), completedAlerts = emptyList(), error = error)
ctx.copy(dedupedAlerts = listOf(alertContext), newAlerts = emptyList(), completedAlerts = emptyList(), error = error)
AlertCategory.NEW ->
ctx.copy(dedupedAlerts = emptyList(), newAlerts = listOf(alertContext), completedAlerts = emptyList(), error = error)
AlertCategory.COMPLETED ->
ctx.copy(dedupedAlerts = emptyList(), newAlerts = emptyList(), completedAlerts = listOf(alertContext.alert), error = error)
ctx.copy(dedupedAlerts = emptyList(), newAlerts = emptyList(), completedAlerts = listOf(alertContext), error = error)
}
}

private fun getAlertContext(
alert: Alert,
alertSampleDocs: Map<String, Map<String, List<Map<String, Any>>>>
alertSampleDocs: Map<String, Map<String, List<Map<String, Any>>>>,
alertComments: List<Comment>?
): AlertContext {
val bucketKey = alert.aggregationResultBucket?.getBucketKeysHash()
val sampleDocs = alertSampleDocs[alert.triggerId]?.get(bucketKey)
Expand All @@ -554,7 +586,7 @@ object BucketLevelMonitorRunner : MonitorRunner() {
alert.monitorId,
alert.executionId
)
AlertContext(alert = alert, sampleDocs = listOf())
AlertContext(alert = alert, sampleDocs = listOf(), comments = alertComments)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ abstract class MonitorRunner {
dryrun: Boolean
): ActionRunResult {
return try {
if (ctx is QueryLevelTriggerExecutionContext && !MonitorRunnerService.isActionActionable(action, ctx.alert)) {
if (ctx is QueryLevelTriggerExecutionContext && !MonitorRunnerService.isActionActionable(action, ctx.alertContext?.alert)) {
return ActionRunResult(action.id, action.name, mapOf(), true, null, null)
}
val actionOutput = mutableMapOf<String, String>()
Expand Down
Loading

0 comments on commit d808474

Please sign in to comment.