Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alerting Enhancements: Alerting Comments (Experimental) #1561

Merged
merged 24 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
4cebaa3
initial commit, functional but needs refactoring
toepkerd-zz May 27, 2024
6eb761a
refactored QueryLevelTriggerExecutionContext to not need both Alert a…
toepkerd-zz May 27, 2024
745cb62
misc additions and fixes
toepkerd-zz May 27, 2024
85ba751
misc cleanup
toepkerd-zz May 27, 2024
1569cca
misc changes and basic ITs
toepkerd-zz May 29, 2024
23b4b06
cleanup
toepkerd-zz May 29, 2024
2f4752b
renaming notes to comments
toepkerd-zz Jun 5, 2024
715d2c4
Merge branch 'main' into alerting-enhancements
toepkerd-zz Jun 5, 2024
889ecf2
misc changes
toepkerd-zz Jun 6, 2024
c141956
changed API endpoints
toepkerd-zz Jun 6, 2024
97b75a6
more misc changes and fixes
toepkerd-zz Jun 6, 2024
19669dc
misc cleanup
toepkerd-zz Jun 6, 2024
656fc22
updated a comment
toepkerd-zz Jun 6, 2024
3e3a4ed
misc cleanup and setting refresh policy to immediate
toepkerd-zz Jun 7, 2024
a08756b
fixed lint issues and other restructuring
toepkerd-zz Jun 7, 2024
49e5bfc
review-based changes
toepkerd-zz Jun 10, 2024
71dce54
update after adding entityType to Comment model
toepkerd-zz Jun 10, 2024
ef31e28
misc fixes
toepkerd-zz Jun 11, 2024
acc876d
removing comments history enabled setting
toepkerd-zz Jun 11, 2024
6f7f3c1
Merge branch 'opensearch-project:main' into alerting-enhancements
toepkerd Jun 11, 2024
612d0fc
adding release notes
toepkerd-zz Jun 11, 2024
933474d
IT fixes
toepkerd-zz Jun 11, 2024
5a9e9ce
removing dev code vestiges
toepkerd-zz Jun 11, 2024
2556058
changing logger calls
toepkerd-zz Jun 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.script.ChainedAlertTriggerExecutionContext
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
import org.opensearch.alerting.util.CommentsUtils
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.MAX_SEARCH_SIZE
import org.opensearch.alerting.util.getBucketKeysHash
Expand Down Expand Up @@ -157,7 +158,7 @@ class AlertService(
workflorwRunContext: WorkflowRunContext?
): Alert? {
val currentTime = Instant.now()
val currentAlert = ctx.alert
val currentAlert = ctx.alertContext?.alert

val updatedActionExecutionResults = mutableListOf<ActionExecutionResult>()
val currentActionIds = mutableSetOf<String>()
Expand Down Expand Up @@ -684,6 +685,8 @@ class AlertService(
val alertsIndex = dataSources.alertsIndex
val alertsHistoryIndex = dataSources.alertsHistoryIndex

val commentIdsToDelete = mutableListOf<String>()

var requestsToRetry = alerts.flatMap { alert ->
// We don't want to set the version when saving alerts because the MonitorRunner has first priority when writing alerts.
// In the rare event that a user acknowledges an alert between when it's read and when it's written
Expand Down Expand Up @@ -730,13 +733,22 @@ class AlertService(
listOfNotNull<DocWriteRequest<*>>(
DeleteRequest(alertsIndex, alert.id)
.routing(routingId),
// Only add completed alert to history index if history is enabled
if (alertIndices.isAlertHistoryEnabled()) {
// Only add completed alert to history index if history is enabled
IndexRequest(alertsHistoryIndex)
.routing(routingId)
.source(alert.toXContentWithUser(XContentFactory.jsonBuilder()))
.id(alert.id)
} else null
} else {
// Otherwise, prepare the Alert's comments for deletion, and don't include
// a request to index the Alert to an Alert history index.
// The delete request can't be added to the list of DocWriteRequests because
// Comments are stored in aliased history indices, not a concrete Comments
// index like Alerts. A DeleteBy request will be used to delete Comments, instead
// of a regular Delete request
commentIdsToDelete.addAll(CommentsUtils.getCommentIDsByAlertIDs(client, listOf(alert.id)))
null
}
)
}
}
Expand All @@ -756,6 +768,9 @@ class AlertService(
throw ExceptionsHelper.convertToOpenSearchException(retryCause)
}
}

// delete all the comments of any Alerts that were deleted
CommentsUtils.deleteComments(client, commentIdsToDelete)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import org.opensearch.alerting.action.GetRemoteIndexesAction
import org.opensearch.alerting.action.SearchEmailAccountAction
import org.opensearch.alerting.action.SearchEmailGroupAction
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.comments.CommentsIndices
import org.opensearch.alerting.core.JobSweeper
import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.alerting.core.action.node.ScheduledJobsStatsAction
Expand All @@ -27,6 +28,7 @@ import org.opensearch.alerting.core.settings.ScheduledJobSettings
import org.opensearch.alerting.remote.monitors.RemoteMonitorRegistry
import org.opensearch.alerting.resthandler.RestAcknowledgeAlertAction
import org.opensearch.alerting.resthandler.RestAcknowledgeChainedAlertAction
import org.opensearch.alerting.resthandler.RestDeleteAlertingCommentAction
import org.opensearch.alerting.resthandler.RestDeleteMonitorAction
import org.opensearch.alerting.resthandler.RestDeleteWorkflowAction
import org.opensearch.alerting.resthandler.RestExecuteMonitorAction
Expand All @@ -40,8 +42,10 @@ import org.opensearch.alerting.resthandler.RestGetMonitorAction
import org.opensearch.alerting.resthandler.RestGetRemoteIndexesAction
import org.opensearch.alerting.resthandler.RestGetWorkflowAction
import org.opensearch.alerting.resthandler.RestGetWorkflowAlertsAction
import org.opensearch.alerting.resthandler.RestIndexAlertingCommentAction
import org.opensearch.alerting.resthandler.RestIndexMonitorAction
import org.opensearch.alerting.resthandler.RestIndexWorkflowAction
import org.opensearch.alerting.resthandler.RestSearchAlertingCommentAction
import org.opensearch.alerting.resthandler.RestSearchEmailAccountAction
import org.opensearch.alerting.resthandler.RestSearchEmailGroupAction
import org.opensearch.alerting.resthandler.RestSearchMonitorAction
Expand All @@ -55,6 +59,7 @@ import org.opensearch.alerting.settings.LegacyOpenDistroDestinationSettings
import org.opensearch.alerting.spi.RemoteMonitorRunnerExtension
import org.opensearch.alerting.transport.TransportAcknowledgeAlertAction
import org.opensearch.alerting.transport.TransportAcknowledgeChainedAlertAction
import org.opensearch.alerting.transport.TransportDeleteAlertingCommentAction
import org.opensearch.alerting.transport.TransportDeleteMonitorAction
import org.opensearch.alerting.transport.TransportDeleteWorkflowAction
import org.opensearch.alerting.transport.TransportDocLevelMonitorFanOutAction
Expand All @@ -69,8 +74,10 @@ import org.opensearch.alerting.transport.TransportGetMonitorAction
import org.opensearch.alerting.transport.TransportGetRemoteIndexesAction
import org.opensearch.alerting.transport.TransportGetWorkflowAction
import org.opensearch.alerting.transport.TransportGetWorkflowAlertsAction
import org.opensearch.alerting.transport.TransportIndexAlertingCommentAction
import org.opensearch.alerting.transport.TransportIndexMonitorAction
import org.opensearch.alerting.transport.TransportIndexWorkflowAction
import org.opensearch.alerting.transport.TransportSearchAlertingCommentAction
import org.opensearch.alerting.transport.TransportSearchEmailAccountAction
import org.opensearch.alerting.transport.TransportSearchEmailGroupAction
import org.opensearch.alerting.transport.TransportSearchMonitorAction
Expand Down Expand Up @@ -153,6 +160,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
@JvmField val LEGACY_OPENDISTRO_EMAIL_ACCOUNT_BASE_URI = "$LEGACY_OPENDISTRO_DESTINATION_BASE_URI/email_accounts"
@JvmField val LEGACY_OPENDISTRO_EMAIL_GROUP_BASE_URI = "$LEGACY_OPENDISTRO_DESTINATION_BASE_URI/email_groups"
@JvmField val FINDING_BASE_URI = "/_plugins/_alerting/findings"
@JvmField val COMMENTS_BASE_URI = "/_plugins/_alerting/comments"

@JvmField val ALERTING_JOB_TYPES = listOf("monitor", "workflow")
}
Expand All @@ -161,6 +169,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
lateinit var scheduler: JobScheduler
lateinit var sweeper: JobSweeper
lateinit var scheduledJobIndices: ScheduledJobIndices
lateinit var commentsIndices: CommentsIndices
lateinit var docLevelMonitorQueries: DocLevelMonitorQueries
lateinit var threadPool: ThreadPool
lateinit var alertIndices: AlertIndices
Expand Down Expand Up @@ -199,6 +208,9 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
RestGetWorkflowAction(),
RestDeleteWorkflowAction(),
RestGetRemoteIndexesAction(),
RestIndexAlertingCommentAction(),
RestSearchAlertingCommentAction(),
RestDeleteAlertingCommentAction(),
)
}

Expand All @@ -225,6 +237,9 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
ActionPlugin.ActionHandler(AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, TransportIndexWorkflowAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.GET_WORKFLOW_ACTION_TYPE, TransportGetWorkflowAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.DELETE_WORKFLOW_ACTION_TYPE, TransportDeleteWorkflowAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.INDEX_COMMENT_ACTION_TYPE, TransportIndexAlertingCommentAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.SEARCH_COMMENTS_ACTION_TYPE, TransportSearchAlertingCommentAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.DELETE_COMMENT_ACTION_TYPE, TransportDeleteAlertingCommentAction::class.java),
ActionPlugin.ActionHandler(ExecuteWorkflowAction.INSTANCE, TransportExecuteWorkflowAction::class.java),
ActionPlugin.ActionHandler(GetRemoteIndexesAction.INSTANCE, TransportGetRemoteIndexesAction::class.java),
ActionPlugin.ActionHandler(DocLevelMonitorFanOutAction.INSTANCE, TransportDocLevelMonitorFanOutAction::class.java)
Expand Down Expand Up @@ -285,6 +300,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
.registerDestinationSettings()
.registerRemoteMonitors(monitorTypeToMonitorRunners)
scheduledJobIndices = ScheduledJobIndices(client.admin(), clusterService)
commentsIndices = CommentsIndices(environment.settings(), client, threadPool, clusterService)
docLevelMonitorQueries = DocLevelMonitorQueries(client, clusterService)
scheduler = JobScheduler(threadPool, runner)
sweeper = JobSweeper(environment.settings(), client, clusterService, threadPool, xContentRegistry, scheduler, ALERTING_JOB_TYPES)
Expand Down Expand Up @@ -313,6 +329,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
scheduler,
runner,
scheduledJobIndices,
commentsIndices,
docLevelMonitorQueries,
destinationMigrationCoordinator,
lockService,
Expand Down Expand Up @@ -387,7 +404,15 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
AlertingSettings.FINDING_HISTORY_ROLLOVER_PERIOD,
AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD,
AlertingSettings.FINDINGS_INDEXING_BATCH_SIZE,
AlertingSettings.CROSS_CLUSTER_MONITORING_ENABLED
AlertingSettings.CROSS_CLUSTER_MONITORING_ENABLED,
AlertingSettings.ALERTING_COMMENTS_ENABLED,
AlertingSettings.COMMENTS_HISTORY_MAX_DOCS,
AlertingSettings.COMMENTS_HISTORY_INDEX_MAX_AGE,
AlertingSettings.COMMENTS_HISTORY_ROLLOVER_PERIOD,
AlertingSettings.COMMENTS_HISTORY_RETENTION_PERIOD,
AlertingSettings.COMMENTS_MAX_CONTENT_SIZE,
AlertingSettings.MAX_COMMENTS_PER_ALERT,
AlertingSettings.MAX_COMMENTS_PER_NOTIFICATION
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import org.opensearch.alerting.opensearchapi.retry
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.opensearchapi.withClosableContext
import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.util.CommentsUtils
import org.opensearch.alerting.util.defaultToPerExecutionAction
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.alerting.util.getBucketKeysHash
Expand All @@ -33,6 +35,7 @@ import org.opensearch.commons.alerting.model.ActionRunResult
import org.opensearch.commons.alerting.model.Alert
import org.opensearch.commons.alerting.model.BucketLevelTrigger
import org.opensearch.commons.alerting.model.BucketLevelTriggerRunResult
import org.opensearch.commons.alerting.model.Comment
import org.opensearch.commons.alerting.model.Finding
import org.opensearch.commons.alerting.model.InputRunResults
import org.opensearch.commons.alerting.model.Monitor
Expand Down Expand Up @@ -273,6 +276,9 @@ object BucketLevelMonitorRunner : MonitorRunner() {
// to alertsToUpdate to ensure the Alert doc is updated at the end in either case
completedAlertsToUpdate.addAll(completedAlerts)

// retrieve max Comments per Alert notification setting
val maxComments = monitorCtx.clusterService!!.clusterSettings.get(AlertingSettings.MAX_COMMENTS_PER_NOTIFICATION)

// All trigger contexts and results should be available at this point since all triggers were evaluated in the main do-while loop
val triggerCtx = triggerContexts[trigger.id]!!
val triggerResult = triggerResults[trigger.id]!!
Expand All @@ -290,9 +296,18 @@ object BucketLevelMonitorRunner : MonitorRunner() {
if (actionExecutionScope is PerAlertActionScope && !shouldDefaultToPerExecution) {
for (alertCategory in actionExecutionScope.actionableAlerts) {
val alertsToExecuteActionsFor = nextAlerts[trigger.id]?.get(alertCategory) ?: mutableListOf()
val alertsToExecuteActionsForIds = alertsToExecuteActionsFor.map { it.id }
val allAlertsComments = CommentsUtils.getCommentsForAlertNotification(
monitorCtx.client!!,
alertsToExecuteActionsForIds,
maxComments
)
for (alert in alertsToExecuteActionsFor) {
val alertContext = if (alertCategory != AlertCategory.NEW) AlertContext(alert = alert)
else getAlertContext(alert = alert, alertSampleDocs = alertSampleDocs)
val alertContext = if (alertCategory != AlertCategory.NEW) {
AlertContext(alert = alert, comments = allAlertsComments[alert.id])
} else {
getAlertContext(alert = alert, alertSampleDocs = alertSampleDocs, allAlertsComments[alert.id])
}

val actionCtx = getActionContextForAlertCategory(
alertCategory, alertContext, triggerCtx, monitorOrTriggerError
Expand Down Expand Up @@ -324,12 +339,28 @@ object BucketLevelMonitorRunner : MonitorRunner() {
if (monitorOrTriggerError == null && dedupedAlerts.isEmpty() && newAlerts.isEmpty() && completedAlerts.isEmpty())
continue

val alertsToExecuteActionsForIds = dedupedAlerts.map { it.id }
.plus(newAlerts.map { it.id })
.plus(completedAlerts.map { it.id })
val allAlertsComments = CommentsUtils.getCommentsForAlertNotification(
monitorCtx.client!!,
alertsToExecuteActionsForIds,
maxComments
)
val actionCtx = triggerCtx.copy(
dedupedAlerts = dedupedAlerts,
dedupedAlerts = dedupedAlerts.map {
AlertContext(alert = it, comments = allAlertsComments[it.id])
},
newAlerts = newAlerts.map {
getAlertContext(alert = it, alertSampleDocs = alertSampleDocs)
getAlertContext(
alert = it,
alertSampleDocs = alertSampleDocs,
alertComments = allAlertsComments[it.id]
)
},
completedAlerts = completedAlerts.map {
AlertContext(alert = it, comments = allAlertsComments[it.id])
},
completedAlerts = completedAlerts,
error = monitorResult.error ?: triggerResult.error
)
val actionResult = this.runAction(action, actionCtx, monitorCtx, monitor, dryrun)
Expand Down Expand Up @@ -530,17 +561,18 @@ object BucketLevelMonitorRunner : MonitorRunner() {
): BucketLevelTriggerExecutionContext {
return when (alertCategory) {
AlertCategory.DEDUPED ->
ctx.copy(dedupedAlerts = listOf(alertContext.alert), newAlerts = emptyList(), completedAlerts = emptyList(), error = error)
ctx.copy(dedupedAlerts = listOf(alertContext), newAlerts = emptyList(), completedAlerts = emptyList(), error = error)
AlertCategory.NEW ->
ctx.copy(dedupedAlerts = emptyList(), newAlerts = listOf(alertContext), completedAlerts = emptyList(), error = error)
AlertCategory.COMPLETED ->
ctx.copy(dedupedAlerts = emptyList(), newAlerts = emptyList(), completedAlerts = listOf(alertContext.alert), error = error)
ctx.copy(dedupedAlerts = emptyList(), newAlerts = emptyList(), completedAlerts = listOf(alertContext), error = error)
}
}

private fun getAlertContext(
alert: Alert,
alertSampleDocs: Map<String, Map<String, List<Map<String, Any>>>>
alertSampleDocs: Map<String, Map<String, List<Map<String, Any>>>>,
alertComments: List<Comment>?
): AlertContext {
val bucketKey = alert.aggregationResultBucket?.getBucketKeysHash()
val sampleDocs = alertSampleDocs[alert.triggerId]?.get(bucketKey)
Expand All @@ -554,7 +586,7 @@ object BucketLevelMonitorRunner : MonitorRunner() {
alert.monitorId,
alert.executionId
)
AlertContext(alert = alert, sampleDocs = listOf())
AlertContext(alert = alert, sampleDocs = listOf(), comments = alertComments)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ abstract class MonitorRunner {
dryrun: Boolean
): ActionRunResult {
return try {
if (ctx is QueryLevelTriggerExecutionContext && !MonitorRunnerService.isActionActionable(action, ctx.alert)) {
if (ctx is QueryLevelTriggerExecutionContext && !MonitorRunnerService.isActionActionable(action, ctx.alertContext?.alert)) {
return ActionRunResult(action.id, action.name, mapOf(), true, null, null)
}
val actionOutput = mutableMapOf<String, String>()
Expand Down
Loading
Loading