From 920afc8670d4ceb657f1dadf37c9db20a88da7cb Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Fri, 9 Feb 2024 04:42:39 -0800 Subject: [PATCH] test changes on document level monitor runner wip Signed-off-by: Surya Sashank Nistala --- .../alerting/DocumentLevelMonitorRunner.kt | 210 +++----- .../alerting/MonitorRunnerExecutionContext.kt | 2 + .../action/DocLevelMonitorFanOutRequest.kt | 16 +- .../TransportDocLevelMonitorFanOutAction.kt | 465 +++++++++++++++++- .../alerting/workflow/WorkflowRunContext.kt | 44 +- 5 files changed, 594 insertions(+), 143 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index fbcb9a2d9..bb74b7972 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -16,6 +16,7 @@ import org.opensearch.action.search.SearchAction import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.WriteRequest +import org.opensearch.alerting.action.DocLevelMonitorFanOutRequest import org.opensearch.alerting.model.DocumentLevelTriggerRunResult import org.opensearch.alerting.model.IndexExecutionContext import org.opensearch.alerting.model.InputRunResults @@ -53,6 +54,7 @@ import org.opensearch.commons.alerting.model.action.PerAlertActionScope import org.opensearch.commons.alerting.util.string import org.opensearch.core.action.ActionListener import org.opensearch.core.common.bytes.BytesReference +import org.opensearch.core.index.shard.ShardId import org.opensearch.core.rest.RestStatus import org.opensearch.core.xcontent.ToXContent import org.opensearch.core.xcontent.XContentBuilder @@ -84,7 +86,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { periodEnd: Instant, dryrun: Boolean, workflowRunContext: WorkflowRunContext?, - executionId: String + executionId: String, ): MonitorRunResult { logger.debug("Document-level-monitor is running ...") val isTempMonitor = dryrun || monitor.id == Monitor.NO_ID @@ -132,9 +134,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() { try { // Resolve all passed indices to concrete indices + val clusterService = monitorCtx.clusterService!! val allConcreteIndices = IndexUtils.resolveAllIndices( docLevelMonitorInput.indices, - monitorCtx.clusterService!!, + clusterService, monitorCtx.indexNameExpressionResolver!! ) if (allConcreteIndices.isEmpty()) { @@ -142,8 +145,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() { throw IndexNotFoundException(docLevelMonitorInput.indices.joinToString(",")) } - monitorCtx.docLevelMonitorQueries!!.initDocLevelQueryIndex(monitor.dataSources) - monitorCtx.docLevelMonitorQueries!!.indexDocLevelQueries( + val docLevelMonitorQueries = monitorCtx.docLevelMonitorQueries!! + docLevelMonitorQueries.initDocLevelQueryIndex(monitor.dataSources) + docLevelMonitorQueries.indexDocLevelQueries( monitor = monitor, monitorId = monitor.id, monitorMetadata, @@ -172,20 +176,20 @@ object DocumentLevelMonitorRunner : MonitorRunner() { var concreteIndices = IndexUtils.resolveAllIndices( listOf(indexName), - monitorCtx.clusterService!!, + clusterService, monitorCtx.indexNameExpressionResolver!! ) var lastWriteIndex: String? = null - if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) || - IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state()) + if (IndexUtils.isAlias(indexName, clusterService.state()) || + IndexUtils.isDataStream(indexName, clusterService.state()) ) { lastWriteIndex = concreteIndices.find { lastRunContext.containsKey(it) } if (lastWriteIndex != null) { val lastWriteIndexCreationDate = - IndexUtils.getCreationDateForIndex(lastWriteIndex, monitorCtx.clusterService!!.state()) + IndexUtils.getCreationDateForIndex(lastWriteIndex, clusterService.state()) concreteIndices = IndexUtils.getNewestIndicesByCreationDate( concreteIndices, - monitorCtx.clusterService!!.state(), + clusterService.state(), lastWriteIndexCreationDate ) } @@ -193,8 +197,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { concreteIndicesSeenSoFar.addAll(concreteIndices) val updatedIndexName = indexName.replace("*", "_") updatedIndexNames.add(updatedIndexName) - val conflictingFields = monitorCtx.docLevelMonitorQueries!!.getAllConflictingFields( - monitorCtx.clusterService!!.state(), + val conflictingFields = docLevelMonitorQueries.getAllConflictingFields( + clusterService.state(), concreteIndices ) @@ -205,7 +209,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitor, periodStart, periodEnd, - monitorCtx.clusterService!!.state().metadata.index(concreteIndexName) + clusterService.state().metadata.index(concreteIndexName) ) MonitorMetadataService.createRunContextForIndex(concreteIndexName, isIndexCreatedRecently) } @@ -216,10 +220,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitorCtx, concreteIndexName ) as MutableMap - if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) || - IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state()) + if (IndexUtils.isAlias(indexName, clusterService.state()) || + IndexUtils.isDataStream(indexName, clusterService.state()) ) { - if (concreteIndexName == IndexUtils.getWriteIndex(indexName, monitorCtx.clusterService!!.state())) { + if (concreteIndexName == IndexUtils.getWriteIndex(indexName, clusterService.state())) { updatedLastRunContext.remove(lastWriteIndex) updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext } @@ -264,117 +268,29 @@ object DocumentLevelMonitorRunner : MonitorRunner() { conflictingFields.toList(), matchingDocIdsPerIndex?.get(concreteIndexName), ) - // map - // build DocLevelMonitorFanOutRequest - // groupedlistener - // monitorCtx.client.send request parallel calls - - fetchShardDataAndMaybeExecutePercolateQueries( - monitor, - monitorCtx, - indexExecutionContext, - monitorMetadata, - inputRunResults, - docsToQueries, - transformedDocs, - docsSizeInBytes, - updatedIndexNames, - concreteIndicesSeenSoFar, - ArrayList(fieldsToBeQueried), - nonPercolateSearchesTimeTaken, - percolateQueriesTimeTaken, - totalDocsQueried, - docTransformTimeTaken - ) { shard, maxSeqNo -> // function passed to update last run context with new max sequence number - indexExecutionContext.updatedLastRunContext[shard] = maxSeqNo - } - } - } - /* if all indices are covered still in-memory docs size limit is not breached we would need to submit - the percolate query at the end */ - if (transformedDocs.isNotEmpty()) { - performPercolateQueryAndResetCounters( - monitorCtx, - transformedDocs, - docsSizeInBytes, - monitor, - monitorMetadata, - updatedIndexNames, - concreteIndicesSeenSoFar, - inputRunResults, - docsToQueries, - percolateQueriesTimeTaken, - totalDocsQueried - ) - } - val took = System.currentTimeMillis() - queryingStartTimeMillis - logger.error("PERF_DEBUG_STAT: Entire query+percolate completed in $took millis in $executionId") - monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults))) - - /* - populate the map queryToDocIds with pairs of - this fixes the issue of passing id, name, tags fields of DocLevelQuery object correctly to TriggerExpressionParser - */ - queries.forEach { - if (inputRunResults.containsKey(it.id)) { - queryToDocIds[it] = inputRunResults[it.id]!! - } - } - - val idQueryMap: Map = queries.associateBy { it.id } - - val triggerResults = mutableMapOf() - // If there are no triggers defined, we still want to generate findings - if (monitor.triggers.isEmpty()) { - if (dryrun == false && monitor.id != Monitor.NO_ID) { - logger.error("PERF_DEBUG: Creating ${docsToQueries.size} findings for monitor ${monitor.id}") - createFindings(monitor, monitorCtx, docsToQueries, idQueryMap, true) - } - } else { - monitor.triggers.forEach { - triggerResults[it.id] = runForEachDocTrigger( - monitorCtx, - monitorResult, - it as DocumentLevelTrigger, - monitor, - idQueryMap, - docsToQueries, - queryToDocIds, - dryrun, - executionId = executionId, - workflowRunContext = workflowRunContext - ) - } - } - // Don't update monitor if this is a test monitor - if (!isTempMonitor) { - // If any error happened during trigger execution, upsert monitor error alert - val errorMessage = constructErrorMessageFromTriggerResults(triggerResults = triggerResults) - if (errorMessage.isNotEmpty()) { - monitorCtx.alertService!!.upsertMonitorErrorAlert( + val docLevelMonitorFanOutRequest1 = DocLevelMonitorFanOutRequest( + nodeId = clusterService.localNode().id, monitor = monitor, - errorMessage = errorMessage, + monitorMetadata = monitorMetadata, executionId = executionId, + indexExecutionContexts = listOf(indexExecutionContext), + listOf( + ShardId( + concreteIndexName, + clusterService.state().metadata.index(concreteIndexName).indexUUID, + 0 + ) + ), workflowRunContext ) - } else { - onSuccessfulMonitorRun(monitorCtx, monitor) } - logger.error( - "Calling upsertMetadata function from ${monitorCtx.clusterService!!.localNode().id} in " + - "execution $executionId" - ) - // construct metadata from all nodes' fanout - // response - MonitorMetadataService.upsertMetadata( - monitorMetadata.copy(lastRunContext = updatedLastRunContext), - true - ) } - + MonitorMetadataService.upsertMetadata( + monitorMetadata.copy(lastRunContext = updatedLastRunContext), + true + ) // TODO: Update the Document as part of the Trigger and return back the trigger action result - return monitorResult.copy(triggerResults = triggerResults) + return monitorResult.copy(triggerResults = emptyMap()) } catch (e: Exception) { val errorMessage = ExceptionsHelper.detailedMessage(e) monitorCtx.alertService!!.upsertMonitorErrorAlert(monitor, errorMessage, executionId, workflowRunContext) @@ -384,7 +300,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() { RestStatus.INTERNAL_SERVER_ERROR, e ) - return monitorResult.copy(error = alertingException, inputResults = InputRunResults(emptyList(), alertingException)) + return monitorResult.copy( + error = alertingException, + inputResults = InputRunResults(emptyList(), alertingException) + ) } finally { logger.error( "PERF_DEBUG_STATS: Monitor ${monitor.id} " + @@ -408,7 +327,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } private fun constructErrorMessageFromTriggerResults( - triggerResults: MutableMap? = null + triggerResults: MutableMap? = null, ): String { var errorMessage = "" if (triggerResults != null) { @@ -435,7 +354,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { queryToDocIds: Map>, dryrun: Boolean, workflowRunContext: WorkflowRunContext?, - executionId: String + executionId: String, ): DocumentLevelTriggerRunResult { val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger) val triggerResult = monitorCtx.triggerService!!.runDocLevelTrigger(monitor, trigger, queryToDocIds) @@ -490,7 +409,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { val actionExecutionScope = action.getActionExecutionPolicy(monitor)!!.actionExecutionScope if (actionExecutionScope is PerAlertActionScope && !shouldDefaultToPerExecution) { for (alert in alerts) { - val actionResults = this.runAction(action, actionCtx.copy(alerts = listOf(alert)), monitorCtx, monitor, dryrun) + val actionResults = + this.runAction(action, actionCtx.copy(alerts = listOf(alert)), monitorCtx, monitor, dryrun) triggerResult.actionResultsMap.getOrPut(alert.id) { mutableMapOf() } triggerResult.actionResultsMap[alert.id]?.set(action.id, actionResults) } @@ -508,7 +428,11 @@ object DocumentLevelMonitorRunner : MonitorRunner() { val updatedAlerts = alerts.map { alert -> val actionResults = triggerResult.actionResultsMap.getOrDefault(alert.id, emptyMap()) val actionExecutionResults = actionResults.values.map { actionRunResult -> - ActionExecutionResult(actionRunResult.actionId, actionRunResult.executionTime, if (actionRunResult.throttled) 1 else 0) + ActionExecutionResult( + actionRunResult.actionId, + actionRunResult.executionTime, + if (actionRunResult.throttled) 1 else 0 + ) } alert.copy(actionExecutionResults = actionExecutionResults) } @@ -592,7 +516,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { private fun publishFinding( monitor: Monitor, monitorCtx: MonitorRunnerExecutionContext, - finding: Finding + finding: Finding, ) { val publishFindingsRequest = PublishFindingsRequest(monitor.id, finding) AlertingPluginInterface.publishFinding( @@ -640,7 +564,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitor: Monitor, periodStart: Instant, periodEnd: Instant, - indexMetadata: IndexMetadata + indexMetadata: IndexMetadata, ): Boolean { val lastExecutionTime = if (periodStart == periodEnd) monitor.lastUpdateTime else periodStart val indexCreationDate = indexMetadata.settings.get("index.creation_date")?.toLong() ?: 0L @@ -651,7 +575,12 @@ object DocumentLevelMonitorRunner : MonitorRunner() { * Get the current max seq number of the shard. We find it by searching the last document * in the primary shard. */ - private suspend fun getMaxSeqNo(client: Client, index: String, shard: String, nonPercolateSearchesTimeTaken: AtomicLong): Long { + private suspend fun getMaxSeqNo( + client: Client, + index: String, + shard: String, + nonPercolateSearchesTimeTaken: AtomicLong, + ): Long { val request: SearchRequest = SearchRequest() .indices(index) .preference("_shards:$shard") @@ -699,7 +628,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { percolateQueriesTimeTaken: AtomicLong, totalDocsQueried: AtomicLong, docTransformTimeTake: AtomicLong, - updateLastRunContext: (String, String) -> Unit + updateLastRunContext: (String, String) -> Unit, ) { val count: Int = indexExecutionCtx.updatedLastRunContext["shards_count"] as Int for (i: Int in 0 until count) { @@ -743,7 +672,11 @@ object DocumentLevelMonitorRunner : MonitorRunner() { ) if ( transformedDocs.isNotEmpty() && - shouldPerformPercolateQueryAndFlushInMemoryDocs(docsSizeInBytes, transformedDocs.size, monitorCtx) + shouldPerformPercolateQueryAndFlushInMemoryDocs( + docsSizeInBytes, + transformedDocs.size, + monitorCtx + ) ) { performPercolateQueryAndResetCounters( monitorCtx, @@ -795,7 +728,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { inputRunResults: MutableMap>, docsToQueries: MutableMap>, percolateQueriesTimeTaken: AtomicLong, - totalDocsQueried: AtomicLong + totalDocsQueried: AtomicLong, ) { try { val percolateQueryResponseHits = runPercolateQueryOnTransformedDocs( @@ -883,7 +816,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitorMetadata: MonitorMetadata, concreteIndices: List, monitorInputIndices: List, - percolateQueriesTimeTaken: AtomicLong + percolateQueriesTimeTaken: AtomicLong, ): SearchHits { val indices = docs.stream().map { it.second.indexName }.distinct().collect(Collectors.toList()) val boolQueryBuilder = BoolQueryBuilder().must(buildShouldClausesOverPerIndexMatchQueries(indices)) @@ -940,6 +873,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { percolateQueriesTimeTaken.getAndAdd(response.took.millis) return response.hits } + /** we cannot use terms query because `index` field's mapping is of type TEXT and not keyword. Refer doc-level-queries.json*/ private fun buildShouldClausesOverPerIndexMatchQueries(indices: List): BoolQueryBuilder { val boolQueryBuilder = QueryBuilders.boolQuery() @@ -1015,7 +949,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { conflictingFields: List, fieldNameSuffixPattern: String, fieldNameSuffixIndex: String, - fieldNamePrefix: String + fieldNamePrefix: String, ) { val tempMap = mutableMapOf() val it: MutableIterator> = jsonAsMap.entries.iterator() @@ -1052,7 +986,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() { * amount of percentage (default:10. setting is dynamic and configurable) of the total heap size or not. * */ - private fun isInMemoryDocsSizeExceedingMemoryLimit(docsBytesSize: Long, monitorCtx: MonitorRunnerExecutionContext): Boolean { + private fun isInMemoryDocsSizeExceedingMemoryLimit( + docsBytesSize: Long, + monitorCtx: MonitorRunnerExecutionContext, + ): Boolean { var thresholdPercentage = PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT.get(monitorCtx.settings) val heapMaxBytes = monitorCtx.jvmStats!!.mem.heapMax.bytes val thresholdBytes = (thresholdPercentage.toDouble() / 100.0) * heapMaxBytes @@ -1060,7 +997,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() { return docsBytesSize > thresholdBytes } - private fun isInMemoryNumDocsExceedingMaxDocsPerPercolateQueryLimit(numDocs: Int, monitorCtx: MonitorRunnerExecutionContext): Boolean { + private fun isInMemoryNumDocsExceedingMaxDocsPerPercolateQueryLimit( + numDocs: Int, + monitorCtx: MonitorRunnerExecutionContext, + ): Boolean { var maxNumDocsThreshold = PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY.get(monitorCtx.settings) return numDocs >= maxNumDocsThreshold } @@ -1073,6 +1013,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() { var indexName: String, var concreteIndexName: String, var docId: String, - var docSource: BytesReference + var docSource: BytesReference, ) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt index 3b17ceebe..68e9e59f3 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt @@ -21,6 +21,7 @@ import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.monitor.jvm.JvmStats import org.opensearch.script.ScriptService import org.opensearch.threadpool.ThreadPool +import org.opensearch.transport.TransportService data class MonitorRunnerExecutionContext( @@ -38,6 +39,7 @@ data class MonitorRunnerExecutionContext( var docLevelMonitorQueries: DocLevelMonitorQueries? = null, var workflowService: WorkflowService? = null, var jvmStats: JvmStats? = null, + val transportService: TransportService, @Volatile var retryPolicy: BackoffPolicy? = null, @Volatile var moveAlertsRetryPolicy: BackoffPolicy? = null, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/action/DocLevelMonitorFanOutRequest.kt b/alerting/src/main/kotlin/org/opensearch/alerting/action/DocLevelMonitorFanOutRequest.kt index 33ef42564..60da3b385 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/action/DocLevelMonitorFanOutRequest.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/action/DocLevelMonitorFanOutRequest.kt @@ -4,6 +4,7 @@ import org.opensearch.action.ActionRequest import org.opensearch.action.ActionRequestValidationException import org.opensearch.alerting.model.IndexExecutionContext import org.opensearch.alerting.model.MonitorMetadata +import org.opensearch.alerting.workflow.WorkflowRunContext import org.opensearch.commons.alerting.model.Monitor import org.opensearch.core.common.io.stream.StreamInput import org.opensearch.core.common.io.stream.StreamOutput @@ -21,6 +22,7 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { val executionId: String val indexExecutionContexts: List val shardIds: List + val workflowRunContext: WorkflowRunContext? constructor( nodeId: String, @@ -29,6 +31,7 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { executionId: String, indexExecutionContexts: List, shardIds: List, + workflowRunContext: WorkflowRunContext?, ) : super() { this.nodeId = nodeId this.monitor = monitor @@ -36,6 +39,7 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { this.executionId = executionId this.indexExecutionContexts = indexExecutionContexts this.shardIds = shardIds + this.workflowRunContext = workflowRunContext require(shardIds.isEmpty()) { } require(indexExecutionContexts.isEmpty()) { } } @@ -47,7 +51,10 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { monitorMetadata = MonitorMetadata.readFrom(sin), executionId = sin.readString(), indexExecutionContexts = sin.readList { IndexExecutionContext(sin) }, - shardIds = sin.readList(::ShardId) + shardIds = sin.readList(::ShardId), + workflowRunContext = if (sin.readBoolean()) { + WorkflowRunContext(sin) + } else null, ) @Throws(IOException::class) @@ -58,6 +65,8 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { out.writeString(executionId) out.writeCollection(indexExecutionContexts) out.writeCollection(shardIds) + out.writeBoolean(workflowRunContext != null) + workflowRunContext?.writeTo(out) } override fun validate(): ActionRequestValidationException { @@ -77,10 +86,11 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { builder.startObject() .field("node_id", nodeId) - .field("monitor_id", nodeId) - .field("execution_id", nodeId) + .field("monitor", monitor) + .field("execution_id", executionId) .field("index_execution_contexts", indexExecutionContexts) .field("shard_ids", shardIds) + .field("workflow_run_context", workflowRunContext) return builder.endObject() } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt index d0b92d22c..1f5e70276 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt @@ -4,37 +4,79 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager +import org.opensearch.OpenSearchSecurityException import org.opensearch.OpenSearchStatusException +import org.opensearch.action.DocWriteRequest +import org.opensearch.action.bulk.BulkRequest +import org.opensearch.action.bulk.BulkResponse +import org.opensearch.action.index.IndexRequest import org.opensearch.action.search.SearchAction import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction +import org.opensearch.action.support.WriteRequest import org.opensearch.alerting.MonitorRunnerExecutionContext +import org.opensearch.alerting.MonitorRunnerService import org.opensearch.alerting.action.DocLevelMonitorFanOutRequest import org.opensearch.alerting.action.DocLevelMonitorFanOutResponse +import org.opensearch.alerting.action.GetDestinationsAction +import org.opensearch.alerting.action.GetDestinationsRequest +import org.opensearch.alerting.action.GetDestinationsResponse +import org.opensearch.alerting.model.ActionRunResult import org.opensearch.alerting.model.DocumentLevelTriggerRunResult import org.opensearch.alerting.model.IndexExecutionContext import org.opensearch.alerting.model.InputRunResults import org.opensearch.alerting.model.MonitorMetadata import org.opensearch.alerting.model.MonitorRunResult +import org.opensearch.alerting.model.destination.Destination +import org.opensearch.alerting.model.userErrorMessage +import org.opensearch.alerting.opensearchapi.InjectorContextElement import org.opensearch.alerting.opensearchapi.suspendUntil +import org.opensearch.alerting.opensearchapi.withClosableContext +import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext +import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext +import org.opensearch.alerting.script.TriggerExecutionContext import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.util.AlertingException +import org.opensearch.alerting.util.defaultToPerExecutionAction +import org.opensearch.alerting.util.destinationmigration.NotificationActionConfigs +import org.opensearch.alerting.util.destinationmigration.NotificationApiUtils +import org.opensearch.alerting.util.destinationmigration.getTitle +import org.opensearch.alerting.util.destinationmigration.publishLegacyNotification +import org.opensearch.alerting.util.destinationmigration.sendNotification +import org.opensearch.alerting.util.getActionExecutionPolicy +import org.opensearch.alerting.util.isAllowed +import org.opensearch.alerting.util.isTestAction +import org.opensearch.alerting.util.use +import org.opensearch.alerting.workflow.WorkflowRunContext import org.opensearch.client.Client +import org.opensearch.client.node.NodeClient import org.opensearch.cluster.service.ClusterService import org.opensearch.common.inject.Inject import org.opensearch.common.settings.Settings import org.opensearch.common.xcontent.XContentFactory import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.alerting.action.AlertingActions +import org.opensearch.commons.alerting.model.ActionExecutionResult +import org.opensearch.commons.alerting.model.Alert import org.opensearch.commons.alerting.model.DocLevelMonitorInput import org.opensearch.commons.alerting.model.DocLevelQuery +import org.opensearch.commons.alerting.model.DocumentLevelTrigger +import org.opensearch.commons.alerting.model.Finding import org.opensearch.commons.alerting.model.Monitor +import org.opensearch.commons.alerting.model.Table +import org.opensearch.commons.alerting.model.action.Action +import org.opensearch.commons.alerting.model.action.PerAlertActionScope +import org.opensearch.commons.alerting.util.string +import org.opensearch.commons.notifications.model.NotificationConfigInfo import org.opensearch.core.action.ActionListener +import org.opensearch.core.common.Strings import org.opensearch.core.common.bytes.BytesReference import org.opensearch.core.rest.RestStatus import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder import org.opensearch.index.query.BoolQueryBuilder import org.opensearch.index.query.Operator import org.opensearch.index.query.QueryBuilders @@ -49,6 +91,7 @@ import org.opensearch.tasks.Task import org.opensearch.transport.TransportService import java.io.IOException import java.time.Instant +import java.util.UUID import java.util.concurrent.atomic.AtomicLong import java.util.stream.Collectors @@ -111,6 +154,7 @@ class TransportDocLevelMonitorFanOutAction indexShardsMap[shardId.indexName] = mutableListOf(shardId.id) } } + InputRunResults val docLevelMonitorInput = request.monitor.inputs[0] as DocLevelMonitorInput val queries: List = docLevelMonitorInput.queries val fieldsToBeQueried = mutableSetOf() @@ -152,9 +196,6 @@ class TransportDocLevelMonitorFanOutAction indexExecutionContext.updatedLastRunContext[shard] = maxSeqNo } } - val took = System.currentTimeMillis() - queryingStartTimeMillis - logger.error("PERF_DEBUG_STAT: Entire query+percolate completed in $took millis in ${request.executionId}") - monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults))) /* if all indices are covered still in-memory docs size limit is not breached we would need to submit the percolate query at the end */ if (transformedDocs.isNotEmpty()) { @@ -172,6 +213,165 @@ class TransportDocLevelMonitorFanOutAction totalDocsQueried ) } + val took = System.currentTimeMillis() - queryingStartTimeMillis + logger.error("PERF_DEBUG_STAT: Entire query+percolate completed in $took millis in ${request.executionId}") + monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults))) + + /* + populate the map queryToDocIds with pairs of + this fixes the issue of passing id, name, tags fields of DocLevelQuery object correctly to TriggerExpressionParser + */ + queries.forEach { + if (inputRunResults.containsKey(it.id)) { + queryToDocIds[it] = inputRunResults[it.id]!! + } + } + + val idQueryMap: Map = queries.associateBy { it.id } + + val triggerResults = mutableMapOf() + // If there are no triggers defined, we still want to generate findings + if (monitor.triggers.isEmpty()) { + if (monitor.id != Monitor.NO_ID) { + logger.error("PERF_DEBUG: Creating ${docsToQueries.size} findings for monitor ${monitor.id}") + createFindings(monitor, monitorCtx, docsToQueries, idQueryMap, true) + } + } else { + monitor.triggers.forEach { + triggerResults[it.id] = runForEachDocTrigger( + monitorCtx, + monitorResult, + it as DocumentLevelTrigger, + monitor, + idQueryMap, + docsToQueries, + queryToDocIds, + false, + executionId = request.executionId, + workflowRunContext = request.workflowRunContext + ) + } + } + + // If any error happened during trigger execution, upsert monitor error alert + val errorMessage = + constructErrorMessageFromTriggerResults(triggerResults = triggerResults) + if (errorMessage.isNotEmpty()) { + monitorCtx.alertService!!.upsertMonitorErrorAlert( + monitor = monitor, + errorMessage = errorMessage, + executionId = request.executionId, + request.workflowRunContext + ) + } else { + onSuccessfulMonitorRun(monitorCtx, monitor) + } + listener.onResponse( + DocLevelMonitorFanOutResponse( + nodeId = clusterService.localNode().id, + executionId = request.executionId, + monitorId = monitor.id, + shardIdFailureMap = emptyMap(), + findingIds = emptyList(), + request.indexExecutionContexts[0].updatedLastRunContext, + InputRunResults(listOf(inputRunResults)), + triggerResults + ) + ) + } + + private suspend fun onSuccessfulMonitorRun(monitorCtx: MonitorRunnerExecutionContext, monitor: Monitor) { + monitorCtx.alertService!!.clearMonitorErrorAlert(monitor) + if (monitor.dataSources.alertsHistoryIndex != null) { + monitorCtx.alertService!!.moveClearedErrorAlertsToHistory( + monitor.id, + monitor.dataSources.alertsIndex, + monitor.dataSources.alertsHistoryIndex!! + ) + } + } + + private fun constructErrorMessageFromTriggerResults( + triggerResults: MutableMap? = null, + ): String { + var errorMessage = "" + if (triggerResults != null) { + val triggersErrorBuilder = StringBuilder() + triggerResults.forEach { + if (it.value.error != null) { + triggersErrorBuilder.append("[${it.key}]: [${it.value.error!!.userErrorMessage()}]").append(" | ") + } + } + if (triggersErrorBuilder.isNotEmpty()) { + errorMessage = "Trigger errors: $triggersErrorBuilder" + } + } + return errorMessage + } + + private suspend fun createFindings( + monitor: Monitor, + monitorCtx: MonitorRunnerExecutionContext, + docsToQueries: MutableMap>, + idQueryMap: Map, + shouldCreateFinding: Boolean, + workflowExecutionId: String? = null, + ): List> { + + val findingDocPairs = mutableListOf>() + val findings = mutableListOf() + val indexRequests = mutableListOf() + + docsToQueries.forEach { + val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! } + + // Before the "|" is the doc id and after the "|" is the index + val docIndex = it.key.split("|") + + val finding = Finding( + id = UUID.randomUUID().toString(), + relatedDocIds = listOf(docIndex[0]), + correlatedDocIds = listOf(docIndex[0]), + monitorId = monitor.id, + monitorName = monitor.name, + index = docIndex[1], + docLevelQueries = triggeredQueries, + timestamp = Instant.now(), + executionId = workflowExecutionId + ) + findingDocPairs.add(Pair(finding.id, it.key)) + findings.add(finding) + + val findingStr = + finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS) + .string() + logger.debug("Findings: $findingStr") + + if (shouldCreateFinding) { + indexRequests += IndexRequest(monitor.dataSources.findingsIndex) + .source(findingStr, XContentType.JSON) + .id(finding.id) + .routing(finding.id) + .opType(DocWriteRequest.OpType.CREATE) + } + } + + if (indexRequests.isNotEmpty()) { + val bulkResponse: BulkResponse = monitorCtx.client!!.suspendUntil { + bulk(BulkRequest().add(indexRequests).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), it) + } + if (bulkResponse.hasFailures()) { + bulkResponse.items.forEach { item -> + if (item.isFailed) { + logger.debug("Failed indexing the finding ${item.id} of monitor [${monitor.id}]") + } + } + } else { + logger.debug("[${bulkResponse.items.size}] All findings successfully indexed.") + } + } + return findingDocPairs } /** 1. Fetch data per shard for given index. (only 10000 docs are fetched. @@ -589,4 +789,263 @@ class TransportDocLevelMonitorFanOutAction return isInMemoryDocsSizeExceedingMemoryLimit(docsSizeInBytes.get(), monitorCtx) || isInMemoryNumDocsExceedingMaxDocsPerPercolateQueryLimit(numDocs, monitorCtx) } + + private suspend fun runForEachDocTrigger( + monitorCtx: MonitorRunnerExecutionContext, + monitorResult: MonitorRunResult, + trigger: DocumentLevelTrigger, + monitor: Monitor, + idQueryMap: Map, + docsToQueries: MutableMap>, + queryToDocIds: Map>, + dryrun: Boolean, + workflowRunContext: WorkflowRunContext?, + executionId: String, + ): DocumentLevelTriggerRunResult { + val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger) + val triggerResult = monitorCtx.triggerService!!.runDocLevelTrigger(monitor, trigger, queryToDocIds) + + val triggerFindingDocPairs = mutableListOf>() + + // TODO: Implement throttling for findings + val findingToDocPairs = createFindings( + monitor, + monitorCtx, + docsToQueries, + idQueryMap, + !dryrun && monitor.id != Monitor.NO_ID, + executionId + ) + + findingToDocPairs.forEach { + // Only pick those entries whose docs have triggers associated with them + if (triggerResult.triggeredDocs.contains(it.second)) { + triggerFindingDocPairs.add(Pair(it.first, it.second)) + } + } + + val actionCtx = triggerCtx.copy( + triggeredDocs = triggerResult.triggeredDocs, + relatedFindings = findingToDocPairs.map { it.first }, + error = monitorResult.error ?: triggerResult.error + ) + + val alerts = mutableListOf() + triggerFindingDocPairs.forEach { + val alert = monitorCtx.alertService!!.composeDocLevelAlert( + listOf(it.first), + listOf(it.second), + triggerCtx, + monitorResult.alertError() ?: triggerResult.alertError(), + executionId = executionId, + workflorwRunContext = workflowRunContext + ) + alerts.add(alert) + } + + val shouldDefaultToPerExecution = defaultToPerExecutionAction( + monitorCtx.maxActionableAlertCount, + monitorId = monitor.id, + triggerId = trigger.id, + totalActionableAlertCount = alerts.size, + monitorOrTriggerError = actionCtx.error + ) + + for (action in trigger.actions) { + val actionExecutionScope = action.getActionExecutionPolicy(monitor)!!.actionExecutionScope + if (actionExecutionScope is PerAlertActionScope && !shouldDefaultToPerExecution) { + for (alert in alerts) { + val actionResults = + this.runAction(action, actionCtx.copy(alerts = listOf(alert)), monitorCtx, monitor, dryrun) + triggerResult.actionResultsMap.getOrPut(alert.id) { mutableMapOf() } + triggerResult.actionResultsMap[alert.id]?.set(action.id, actionResults) + } + } else if (alerts.isNotEmpty()) { + val actionResults = this.runAction(action, actionCtx.copy(alerts = alerts), monitorCtx, monitor, dryrun) + for (alert in alerts) { + triggerResult.actionResultsMap.getOrPut(alert.id) { mutableMapOf() } + triggerResult.actionResultsMap[alert.id]?.set(action.id, actionResults) + } + } + } + + // Alerts are saved after the actions since if there are failures in the actions, they can be stated in the alert + if (!dryrun && monitor.id != Monitor.NO_ID) { + val updatedAlerts = alerts.map { alert -> + val actionResults = triggerResult.actionResultsMap.getOrDefault(alert.id, emptyMap()) + val actionExecutionResults = actionResults.values.map { actionRunResult -> + ActionExecutionResult( + actionRunResult.actionId, + actionRunResult.executionTime, + if (actionRunResult.throttled) 1 else 0 + ) + } + alert.copy(actionExecutionResults = actionExecutionResults) + } + + monitorCtx.retryPolicy?.let { + monitorCtx.alertService!!.saveAlerts( + monitor.dataSources, + updatedAlerts, + it, + routingId = monitor.id + ) + } + } + return triggerResult + } + + suspend fun runAction( + action: Action, + ctx: TriggerExecutionContext, + monitorCtx: MonitorRunnerExecutionContext, + monitor: Monitor, + dryrun: Boolean, + ): ActionRunResult { + return try { + if (ctx is QueryLevelTriggerExecutionContext && !MonitorRunnerService.isActionActionable( + action, + ctx.alert + ) + ) { + return ActionRunResult(action.id, action.name, mapOf(), true, null, null) + } + val actionOutput = mutableMapOf() + actionOutput[Action.SUBJECT] = if (action.subjectTemplate != null) + MonitorRunnerService.compileTemplate(action.subjectTemplate!!, ctx) + else "" + actionOutput[Action.MESSAGE] = MonitorRunnerService.compileTemplate(action.messageTemplate, ctx) + if (Strings.isNullOrEmpty(actionOutput[Action.MESSAGE])) { + throw IllegalStateException("Message content missing in the Destination with id: ${action.destinationId}") + } + if (!dryrun) { + val client = monitorCtx.client + client!!.threadPool().threadContext.stashContext().use { + withClosableContext( + InjectorContextElement( + monitor.id, + monitorCtx.settings!!, + monitorCtx.threadPool!!.threadContext, + monitor.user?.roles, + monitor.user + ) + ) { + actionOutput[Action.MESSAGE_ID] = getConfigAndSendNotification( + action, + monitorCtx, + actionOutput[Action.SUBJECT], + actionOutput[Action.MESSAGE]!! + ) + } + } + } + ActionRunResult(action.id, action.name, actionOutput, false, MonitorRunnerService.currentTime(), null) + } catch (e: Exception) { + ActionRunResult(action.id, action.name, mapOf(), false, MonitorRunnerService.currentTime(), e) + } + } + + protected suspend fun getConfigAndSendNotification( + action: Action, + monitorCtx: MonitorRunnerExecutionContext, + subject: String?, + message: String, + ): String { + val config = getConfigForNotificationAction(action, monitorCtx) + if (config.destination == null && config.channel == null) { + throw IllegalStateException("Unable to find a Notification Channel or Destination config with id [${action.destinationId}]") + } + + // Adding a check on TEST_ACTION Destination type here to avoid supporting it as a LegacyBaseMessage type + // just for Alerting integration tests + if (config.destination?.isTestAction() == true) { + return "test action" + } + + if (config.destination?.isAllowed(monitorCtx.allowList) == false) { + throw IllegalStateException( + "Monitor contains a Destination type that is not allowed: ${config.destination.type}" + ) + } + + var actionResponseContent = "" + actionResponseContent = config.channel + ?.sendNotification( + monitorCtx.client!!, + config.channel.getTitle(subject), + message + ) ?: actionResponseContent + + actionResponseContent = config.destination + ?.buildLegacyBaseMessage( + subject, + message, + monitorCtx.destinationContextFactory!!.getDestinationContext(config.destination) + ) + ?.publishLegacyNotification(monitorCtx.client!!) + ?: actionResponseContent + + return actionResponseContent + } + + /** + * The "destination" ID referenced in a Monitor Action could either be a Notification config or a Destination config + * depending on whether the background migration process has already migrated it from a Destination to a Notification config. + * + * To cover both of these cases, the Notification config will take precedence and if it is not found, the Destination will be retrieved. + */ + private suspend fun getConfigForNotificationAction( + action: Action, + monitorCtx: MonitorRunnerExecutionContext, + ): NotificationActionConfigs { + var destination: Destination? = null + var notificationPermissionException: Exception? = null + + var channel: NotificationConfigInfo? = null + try { + channel = + NotificationApiUtils.getNotificationConfigInfo(monitorCtx.client as NodeClient, action.destinationId) + } catch (e: OpenSearchSecurityException) { + notificationPermissionException = e + } + + // If the channel was not found, try to retrieve the Destination + if (channel == null) { + destination = try { + val table = Table( + "asc", + "destination.name.keyword", + null, + 1, + 0, + null + ) + val getDestinationsRequest = GetDestinationsRequest( + action.destinationId, + 0L, + null, + table, + "ALL" + ) + + val getDestinationsResponse: GetDestinationsResponse = monitorCtx.client!!.suspendUntil { + monitorCtx.client!!.execute(GetDestinationsAction.INSTANCE, getDestinationsRequest, it) + } + getDestinationsResponse.destinations.firstOrNull() + } catch (e: IllegalStateException) { + // Catching the exception thrown when the Destination was not found so the NotificationActionConfigs object can be returned + null + } catch (e: OpenSearchSecurityException) { + if (notificationPermissionException != null) + throw notificationPermissionException + else + throw e + } + + if (destination == null && notificationPermissionException != null) + throw notificationPermissionException + } + + return NotificationActionConfigs(destination, channel) + } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/WorkflowRunContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/WorkflowRunContext.kt index 14488a16a..8ebb2be9c 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/WorkflowRunContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/WorkflowRunContext.kt @@ -5,11 +5,51 @@ package org.opensearch.alerting.workflow +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.common.io.stream.Writeable +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.ToXContentObject +import org.opensearch.core.xcontent.XContentBuilder + data class WorkflowRunContext( // In case of dry run it's random generated id, while in other cases it's workflowId val workflowId: String, val workflowMetadataId: String, val chainedMonitorId: String?, val matchingDocIdsPerIndex: Map>, - val auditDelegateMonitorAlerts: Boolean -) + val auditDelegateMonitorAlerts: Boolean, +) : Writeable, ToXContentObject { + companion object { + fun readFrom(sin: StreamInput): WorkflowRunContext { + return WorkflowRunContext(sin) + } + } + + constructor(sin: StreamInput) : this( + sin.readString(), + sin.readString(), + sin.readOptionalString(), + sin.readMap() as Map>, + sin.readBoolean() + ) + + override fun writeTo(out: StreamOutput) { + out.writeString(workflowId) + out.writeString(workflowMetadataId) + out.writeOptionalString(chainedMonitorId) + out.writeMap(matchingDocIdsPerIndex) + out.writeBoolean(auditDelegateMonitorAlerts) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params?): XContentBuilder { + builder.startObject() + .field("workflow_id", workflowId) + .field("workflow_metadata_id", workflowMetadataId) + .field("chained_monitor_id", chainedMonitorId) + .field("matching_doc_ids_per_index", matchingDocIdsPerIndex) + .field("audit_delegate_monitor_alerts", auditDelegateMonitorAlerts) + .endObject() + return builder + } +}